我已经能够读入以下数据,将客户事务表示为带有光束的csv (Python SDK)。
timestamp,customer_id,amount
2018-02-08 12:04:36.899422,1,45.92615814813004
2019-04-05 07:40:17.873746,1,47.360044568200514
2019-07-27 04:37:48.060949,1,23.325754816230106
2017-05-18 15:46:41.654809,2,25.47369262400646
2018-08-08 03:59:05.791552,2,34.859367944028875
2019-01-02 02:44:35.208450,2,5.2753275435507705
2020-03-06 09:45:29.866731,2,35.656304542140404
2020-05-28 20:19:08.593375,2,23.23715711587539csv的读入方式如下:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.textio import ReadFromText
import datetime
class Split(beam.DoFn):
def process(self, element):
timestamp, customer_id, amount = element.split(",")
return [{
'timestamp': timestamp,
'customer': int(customer_id),
'amount': float(amount)
}]
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
rows = (
p |
ReadFromText('../data/sample_trxns.csv', skip_header_lines=1) |
beam.ParDo(Split())
)
class UnixTime(beam.DoFn):
def process(self, element):
"""
Returns a list of tuples containing customer and amount
"""
unix_time = datetime.datetime.strptime(
element['timestamp'],
"%Y-%m-%d %H:%M:%S.%f"
).timestamp()
return [{
'timestamp': unix_time,
'customer': element['customer'],
'amount': element['amount']
}]
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
unix_timestamp = element['timestamp']
# Wrap and emit the current entry and new timestamp in a
# TimestampedValue.
yield beam.window.TimestampedValue(element, unix_timestamp)
timed_rows = (
rows |
beam.ParDo(UnixTime()) |
beam.ParDo(AddTimestampDoFn())
)然而,使用Beam我无法推导出滚动窗口功能,例如‘客户过去1000天的平均交易额’,以及最小、最大和总和的等效滚动窗口功能(不包括每次计算中的当前行)。这演示了使用pandas.Series.rolling函数计算并打印得到的pandas数据帧的特征值的期望值:
customer_id amount mean_trxn_amount_last_1000_days
timestamp
2018-02-08 12:04:36.899422 1 45.926158 NaN
2019-04-05 07:40:17.873746 1 47.360045 45.926158
2019-07-27 04:37:48.060949 1 23.325755 46.643101
2017-05-18 15:46:41.654809 2 25.473693 NaN
2018-08-08 03:59:05.791552 2 34.859368 25.473693
2019-01-02 02:44:35.208450 2 5.275328 30.166530
2020-03-06 09:45:29.866731 2 35.656305 20.067348
2020-05-28 20:19:08.593375 2 23.237157 25.263667我没有找到任何关于Beam中类似功能的文档-这样的功能是否可用?如果不是,我是不是误解了Beam提供的功能的预期范围,或者这种功能可能在未来可用?谢谢。
发布于 2020-07-12 07:59:36
您可以使用windowing,因为您已经在示例代码中提取了时间戳。
固定窗口:“窗口的最简单形式是使用固定的时间窗口:给定一个可能不断更新的带时间戳的PCollection,每个窗口可能捕获(例如)落入30秒间隔内的所有带有时间戳的元素。”
滑动窗口:“滑动时间窗口也表示数据流中的时间间隔;但是,滑动时间窗口可以重叠。例如,每个窗口可能会捕获60秒的数据,但每30秒就会启动一个新窗口。滑动窗口开始的频率称为周期。因此,我们的示例具有60秒的窗口持续时间和30秒的周期。”
应用窗口,然后使用Min/Max/Sum等的内置函数。或者创建您自己的组合器。
https://stackoverflow.com/questions/62655301
复制相似问题