我设置了一个新的Kinesis数据流,并向其中写入了一些数据。然后我意识到我还没有设置Kinesis Firehose,所以我也设置了它。我希望Firehose收集之前已写入流的数据并将其转储到S3,但我发现Firehose仅在附加后才转储写入流的数据。
当没有附加使用者时,写入Kinesis数据流的数据会发生什么情况?流配置了24小时的保留期,所以我假设数据仍然在某个地方(至少现在是这样)。它还在那里吗,有什么方法可以让我找到它吗?
发布于 2020-04-07 15:03:21
是的,数据将保留在Kinesis流中,直到保留期到期。
不幸的是,它不能通过Kinesis Firehose访问。当Firehose初始化它的shard迭代器时,它使用模式LATEST在它开始运行的那一刻开始获取数据。似乎没有任何方法可以将Firehose配置为拾取之前发送到Kinesis的记录。
然而!完全可以使用任何可以在TRIM_HORIZON或AT_TIMESTAMP模式下初始化分片迭代器的进程从Kinesis流中提取数据。这可以使用awscli来完成,但我最终使用boto3编写了一个Python脚本来捕获分片中的数据。它看起来像这样。
response = client.get_shard_iterator(
StreamName=StreamName,
ShardId=ShardId,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=datetime(2020, 4, 7, 1, 0, 0),
)
while True:
response = client.get_records(
ShardIterator=ShardIterator,
Limit=150
)
if response['MillisBehindLatest'] == 0:
break
for record in response['Records']:
sys.stdout.buffer.write(record['Data'])
ShardIterator = response['NextShardIterator']还要注意:如果迭代器在没有数据可用时启动,get_records()可能会在多次迭代中返回一个空的零记录集[],直到迭代器赶上有数据时为止。每次我使用TRIM_HORIZON时都会发生这种情况,这就是为什么我改用AT_TIMESTAMP,这样我就可以直接跳到数据开始的地方。
https://stackoverflow.com/questions/61072541
复制相似问题