首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >基于apache beam的滚动窗特征工程

基于apache beam的滚动窗特征工程
EN

Stack Overflow用户
提问于 2020-06-30 18:23:49
回答 1查看 131关注 0票数 0

我已经能够读入以下数据,将客户事务表示为带有光束的csv (Python SDK)。

代码语言:javascript
复制
timestamp,customer_id,amount
2018-02-08 12:04:36.899422,1,45.92615814813004
2019-04-05 07:40:17.873746,1,47.360044568200514
2019-07-27 04:37:48.060949,1,23.325754816230106
2017-05-18 15:46:41.654809,2,25.47369262400646
2018-08-08 03:59:05.791552,2,34.859367944028875
2019-01-02 02:44:35.208450,2,5.2753275435507705
2020-03-06 09:45:29.866731,2,35.656304542140404
2020-05-28 20:19:08.593375,2,23.23715711587539

csv的读入方式如下:

代码语言:javascript
复制
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.textio import ReadFromText

import datetime

class Split(beam.DoFn):

    def process(self, element):
        timestamp, customer_id, amount = element.split(",")

        return [{
            'timestamp': timestamp,
            'customer': int(customer_id),
            'amount': float(amount)
        }]


options = PipelineOptions()
    
with beam.Pipeline(options=options) as p:

    rows = (
        p |
        ReadFromText('../data/sample_trxns.csv', skip_header_lines=1) |
        beam.ParDo(Split())
    )
    

    class UnixTime(beam.DoFn):
    
        def process(self, element):
            """
            Returns a list of tuples containing customer and amount
            """
            
            unix_time = datetime.datetime.strptime(
                element['timestamp'],
                "%Y-%m-%d %H:%M:%S.%f"
                ).timestamp()
  
                            
            return [{
                'timestamp': unix_time,
                'customer': element['customer'],
                'amount': element['amount']
            }]
             
    class AddTimestampDoFn(beam.DoFn):
      def process(self, element):
        unix_timestamp = element['timestamp']
        # Wrap and emit the current entry and new timestamp in a
        # TimestampedValue.
        yield beam.window.TimestampedValue(element, unix_timestamp)
   
    
    timed_rows = (
        rows |
        beam.ParDo(UnixTime()) |
        beam.ParDo(AddTimestampDoFn())
        )

然而,使用Beam我无法推导出滚动窗口功能,例如‘客户过去1000天的平均交易额’,以及最小、最大和总和的等效滚动窗口功能(不包括每次计算中的当前行)。这演示了使用pandas.Series.rolling函数计算并打印得到的pandas数据帧的特征值的期望值:

代码语言:javascript
复制
                            customer_id     amount  mean_trxn_amount_last_1000_days
timestamp                                                                          
2018-02-08 12:04:36.899422            1  45.926158                              NaN
2019-04-05 07:40:17.873746            1  47.360045                        45.926158
2019-07-27 04:37:48.060949            1  23.325755                        46.643101
2017-05-18 15:46:41.654809            2  25.473693                              NaN
2018-08-08 03:59:05.791552            2  34.859368                        25.473693
2019-01-02 02:44:35.208450            2   5.275328                        30.166530
2020-03-06 09:45:29.866731            2  35.656305                        20.067348
2020-05-28 20:19:08.593375            2  23.237157                        25.263667

我没有找到任何关于Beam中类似功能的文档-这样的功能是否可用?如果不是,我是不是误解了Beam提供的功能的预期范围,或者这种功能可能在未来可用?谢谢。

EN

回答 1

Stack Overflow用户

发布于 2020-07-12 07:59:36

您可以使用windowing,因为您已经在示例代码中提取了时间戳。

固定窗口:“窗口的最简单形式是使用固定的时间窗口:给定一个可能不断更新的带时间戳的PCollection,每个窗口可能捕获(例如)落入30秒间隔内的所有带有时间戳的元素。”

滑动窗口:“滑动时间窗口也表示数据流中的时间间隔;但是,滑动时间窗口可以重叠。例如,每个窗口可能会捕获60秒的数据,但每30秒就会启动一个新窗口。滑动窗口开始的频率称为周期。因此,我们的示例具有60秒的窗口持续时间和30秒的周期。”

应用窗口,然后使用Min/Max/Sum等的内置函数。或者创建您自己的组合器。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62655301

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档