我想知道哪种方法是测试检查束流管道产生的输出长度的最好方法。
我有一些像这样的测试代码:
test_data = [
{'kind': 'storage#object', 'name': 'file1.doc', 'contentType': 'application/octet-stream', 'bucket': 'bucket123' },
{'kind': 'storage#object', 'name': 'file2.pdf', 'contentType': 'application/pdf','bucket': 'bucket234'},
{'kind': 'storage#object', 'name': 'file3.msg', 'contentType': 'message/rfc822', 'bucket': 'bucket345'}
]
with TestPipeline() as p:
output = (p
| beam.Create(test_data)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
我想测试确保test_data列表中的所有元素都转到“output.ok”。我认为这样做的方法是这样计算它们:
with TestPipeline() as p:
output = (p
| beam.Create(testdata)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
okay_count = (output.ok | beam.Map(lambda x: ('dummy_key',x))
| beam.GroupByKey() # This gets ('dumm_key',[element1,element2....])
| beam.Map(lambda x: len(x[1]) ) # Drop the key and get the lengh of the list
)
# And finally check^H^H^H^H^H^H assert the count is correct:
assert_that(okay_count, equal_to([len(test_data)])
这是可行的;但我觉得这不是最好的方法,我相信还有更多的方法。
最佳选择(到目前为止)
这是最新推荐的最佳选项:使用beam.combiners.Count.Globally()
with TestPipeline() as p:
output = (p
| beam.Create(testdata)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
okay_count = output | beam.combiners.Count.Globally()
assert_that(okay_count, equal_to([len(test_data)])
发布于 2020-09-25 16:16:02
你在问题中回答了你自己的问题。将它写在这里作为答案:
with TestPipeline() as p:
output = (p
| beam.Create(testdata)
| beam.ParDo(DoFn_To_Test()).with_outputs('ok','error')
)
okay_count = output | beam.combiners.Count.Globally()
assert_that(okay_count, equal_to([len(test_data)])
https://stackoverflow.com/questions/64017676
复制相似问题