首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >当没有附加消费者时,Kinesis数据会发生什么情况?

当没有附加消费者时,Kinesis数据会发生什么情况?
EN

Stack Overflow用户
提问于 2020-04-07 11:50:09
回答 1查看 147关注 0票数 0

我设置了一个新的Kinesis数据流,并向其中写入了一些数据。然后我意识到我还没有设置Kinesis Firehose,所以我也设置了它。我希望Firehose收集之前已写入流的数据并将其转储到S3,但我发现Firehose仅在附加后才转储写入流的数据。

当没有附加使用者时,写入Kinesis数据流的数据会发生什么情况?流配置了24小时的保留期,所以我假设数据仍然在某个地方(至少现在是这样)。它还在那里吗,有什么方法可以让我找到它吗?

EN

回答 1

Stack Overflow用户

发布于 2020-04-07 15:03:21

是的,数据将保留在Kinesis流中,直到保留期到期。

不幸的是,它不能通过Kinesis Firehose访问。当Firehose初始化它的shard迭代器时,它使用模式LATEST在它开始运行的那一刻开始获取数据。似乎没有任何方法可以将Firehose配置为拾取之前发送到Kinesis的记录。

然而!完全可以使用任何可以在TRIM_HORIZONAT_TIMESTAMP模式下初始化分片迭代器的进程从Kinesis流中提取数据。这可以使用awscli来完成,但我最终使用boto3编写了一个Python脚本来捕获分片中的数据。它看起来像这样。

代码语言:javascript
运行
复制
response = client.get_shard_iterator(
    StreamName=StreamName,
    ShardId=ShardId,
    ShardIteratorType='AT_TIMESTAMP',
    Timestamp=datetime(2020, 4, 7, 1, 0, 0),
)

while True:
    response = client.get_records(
        ShardIterator=ShardIterator,
        Limit=150
    )
    if response['MillisBehindLatest'] == 0:
        break
    for record in response['Records']:
        sys.stdout.buffer.write(record['Data'])
    ShardIterator = response['NextShardIterator']

还要注意:如果迭代器在没有数据可用时启动,get_records()可能会在多次迭代中返回一个空的零记录集[],直到迭代器赶上有数据时为止。每次我使用TRIM_HORIZON时都会发生这种情况,这就是为什么我改用AT_TIMESTAMP,这样我就可以直接跳到数据开始的地方。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61072541

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档