前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >milvus Upsert api写s3的流程

milvus Upsert api写s3的流程

原创
作者头像
melodyshu
发布2024-03-11 14:29:12
1340
发布2024-03-11 14:29:12
举报
文章被收录于专栏:milvus数据库

Upsert api写s3的流程

milvus版本:v2.3.2

实现:先insert再delete,并限制不能修改主键列。

整体架构:

Upsert 的数据流向

upsert写入s3的流程

upsert先insert,再delete。从proxy的execute()方法可以看出。

代码语言:go
复制
func (it *upsertTask) Execute(ctx context.Context) (err error) {
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")
	defer sp.End()
	log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))

	tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))
	// 拿到stream,类型为msgstream.mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)
	if err != nil {
		return err
	}
    // 创建msgPack
	msgPack := &msgstream.MsgPack{
		BeginTs: it.BeginTs(),
		EndTs:   it.EndTs(),
	}
    // 添加insertMsgPack
	err = it.insertExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to insertExecute", zap.Error(err))
		return err
	}
    // 添加deleteMsgPack
	err = it.deleteExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to deleteExecute", zap.Error(err))
		return err
	}

	tr.RecordSpan()
    // 发送数据至mq
	err = stream.Produce(msgPack)
	if err != nil {
		it.result.Status = merr.Status(err)
		return err
	}
	sendMsgDur := tr.RecordSpan()
	metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
	totalDur := tr.ElapseSpan()
	log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
		zap.Duration("total duration", totalDur))
	return nil
}

将insertmsg和deletemsg加入msgPack,然后datanode进行消费。

insert和delete流程分别可以参考对应写入s3的流程。产生insertlog和deletelog。

代码语言:go
复制
// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
	......
    // 遍历msMsg
	for _, msg := range msMsg.TsMessages() {
		switch msg.Type() {
		case commonpb.MsgType_DropCollection:
			......

		case commonpb.MsgType_DropPartition:
			......
        // 处理insert消息
		case commonpb.MsgType_Insert:
			......
        // 处理delete消息
		case commonpb.MsgType_Delete:
			......
		}
	}

	......
}

s3文件

upsert结合了insert和delete操作。因此在s3对应的文件也即insert和delete对应的文件。

主要涉及delta_log和stats_log。

insert:

代码语言:shell
复制
files/insert_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collectionID}/{partitionID}/{segmentID}/{fieldID}/{logidx}(not flushed)

delete:

代码语言:shell
复制
files/delta_log/{collID}/{partID}/{segmentID}/{logID}
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/1(flushed)
files/stats_log/{collID}/{partID}/{segmentID}/{fieldID}/{logID}(not flushed)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Upsert api写s3的流程
    • Upsert 的数据流向
      • upsert写入s3的流程
        • s3文件
        相关产品与服务
        向量数据库
        腾讯云向量数据库(Tencent Cloud VectorDB)是一款全托管的自研企业级分布式数据库服务,专用于存储、检索、分析多维向量数据。该数据库支持多种索引类型和相似度计算方法,单索引支持千亿级向量规模,可支持百万级 QPS 及毫秒级查询延迟。腾讯云向量数据库不仅能为大模型提供外部知识库,提高大模型回答的准确性,还可广泛应用于推荐系统、自然语言处理等 AI 领域。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档