前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Librdkafka的操作处理队列

Librdkafka的操作处理队列

作者头像
扫帚的影子
发布2018-09-05 16:49:23
1.6K0
发布2018-09-05 16:49:23
举报
文章被收录于专栏:分布式系统进阶
  • Librdkafka将与kafka broker的交互,内部实现的一些操作,都封装成operator结构, 然后放入操作处理队列里, 统一处理;
  • 这个队列其实也是一个线程间通讯的管道;
  • 围绕这个队列的操作,是rdkafka的精华所在, 如果搞过windows编程的话,这个相当于event loop, 如果搞过golang的话,这个跟channel有点像;
  • 包括如下结构介绍:
    1. rd_kafka_q_s
    2. d_kafka_queue_s

rd_kafka_q_s
  • 所在文件: src/rdkafka_queue.h(c)
  • 定义:
代码语言:javascript
复制
struct rd_kafka_q_s {
    mtx_t  rkq_lock; // 对队列操作的加锁用
    cnd_t  rkq_cond; // 队列中放入新的元素时, 用条件变量唤醒相应的等待线程

    struct rd_kafka_q_s *rkq_fwdq; /* Forwarded/Routed queue.
                    * Used in place of this queue
                    * for all operations. */

        // 放入队列的操作都存在这个tailq队列里
    struct rd_kafka_op_tailq rkq_q;  /* TAILQ_HEAD(, rd_kafka_op_s) */
    int           rkq_qlen;      /* Number of entries in queue */
        int64_t       rkq_qsize;     /* Size of all entries in queue */

        int           rkq_refcnt; //引用计数 
        int           rkq_flags; // 当前队列的状态,以下三个可选值
#define RD_KAFKA_Q_F_ALLOCATED  0x1  /* Allocated: rd_free on destroy */
#define RD_KAFKA_Q_F_READY      0x2  /* Queue is ready to be used.
                                      * Flag is cleared on destroy */
#define RD_KAFKA_Q_F_FWD_APP    0x4  /* Queue is being forwarded by a call
                                      * to rd_kafka_queue_forward. */

        rd_kafka_t   *rkq_rk; //此队列关联的rd_kafka_t句柄

        //队列中放入新的元素时, 用向fd写入数据的方式唤醒相应的等待线程
    struct rd_kafka_q_io *rkq_qio;   /* FD-based application signalling */

         // 队列中的操作被执行中所执行的回调函数
        /* Op serve callback (optional).
         * Mainly used for forwarded queues to use the original queue's
         * serve function from the forwarded position.
         * Shall return 1 if op was handled, else 0. */
        rd_kafka_q_serve_cb_t *rkq_serve;
        void *rkq_opaque;

       // 当前队列的名字,主要是调试用
#if ENABLE_DEVEL
    char rkq_name[64];       /* Debugging: queue name (FUNC:LINE) */
#else
    const char *rkq_name;    /* Debugging: queue name (FUNC) */
#endif
};
  • 清除队列中所有的op元素 rd_kafka_q_purge0:
代码语言:javascript
复制
int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock) {
    rd_kafka_op_t *rko, *next;
    TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
        rd_kafka_q_t *fwdq;
        int cnt = 0;

        if (do_lock)
                mtx_lock(&rkq->rkq_lock);

        // 如果rkq_fwdq有值不为空, 就清除rkq_fwdq队列
        // rd_kafka_q_fwd_get这方法会将rkq_fwdq的引用计数加1
        if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
                if (do_lock)
                        mtx_unlock(&rkq->rkq_lock);
                cnt = rd_kafka_q_purge(fwdq);
                rd_kafka_q_destroy(fwdq);
                return cnt;
        }

    /* Move ops queue to tmpq to avoid lock-order issue
     * by locks taken from rd_kafka_op_destroy(). */
       // 将rkq->rkq_q这个tailq的队列挂到tmpq这个临时tailq上面, 减少lock的时间
    TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link);

    /* Zero out queue */
        rd_kafka_q_reset(rkq);

        if (do_lock)
                mtx_unlock(&rkq->rkq_lock);

    /* Destroy the ops */
        // destroy 队列中的每一个op
    next = TAILQ_FIRST(&tmpq);
    while ((rko = next)) {
        next = TAILQ_NEXT(next, rko_link);
        rd_kafka_op_destroy(rko);
                cnt++;
    }

        return cnt;
}
  • 引用计数为0时销毁当前队列, rd_kafka_q_destroy0:
代码语言:javascript
复制
static RD_INLINE RD_UNUSED
void rd_kafka_q_destroy0 (rd_kafka_q_t *rkq, int disable) {
        int do_delete = 0;

        if (disable) {
                /* To avoid recursive locking (from ops being purged
                 * that reference this queue somehow),
                 * we disable the queue and purge it with individual
                 * locking. */
                rd_kafka_q_disable0(rkq, 1/*lock*/);
                // 清除队列中所有的op元素
                rd_kafka_q_purge0(rkq, 1/*lock*/);
        }

        mtx_lock(&rkq->rkq_lock);
        rd_kafka_assert(NULL, rkq->rkq_refcnt > 0);
        do_delete = !--rkq->rkq_refcnt;
        mtx_unlock(&rkq->rkq_lock);

        // 根据引用计数来确定是否调用rd_kafka_q_destroy_final
        if (unlikely(do_delete))
                rd_kafka_q_destroy_final(rkq);
}
  • 设置或者消除forward queue:
代码语言:javascript
复制
void rd_kafka_q_fwd_set0 (rd_kafka_q_t *srcq, rd_kafka_q_t *destq,
                          int do_lock, int fwd_app) {

        if (do_lock)
                mtx_lock(&srcq->rkq_lock);
        if (fwd_app)
                srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP;

        // 先destroy原有的rkq_fwdq
    if (srcq->rkq_fwdq) {
        rd_kafka_q_destroy(srcq->rkq_fwdq);
        srcq->rkq_fwdq = NULL;
    }
    if (destq) {
                //被forward的destq的引用计数加1
        rd_kafka_q_keep(destq);

        /* If rkq has ops in queue, append them to fwdq's queue.
         * This is an irreversible operation. */
                if (srcq->rkq_qlen > 0) {
            rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY);
                        // 拼接destq和原有的srcq,  下面详细分析
            rd_kafka_q_concat(destq, srcq); 
        }

                //重新赋值rkq_fwdq
        srcq->rkq_fwdq = destq;
    }
        if (do_lock)
                mtx_unlock(&srcq->rkq_lock);
}
  • 两队列拼接 rd_kafka_q_concat0:
代码语言:javascript
复制
int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {
    int r = 0;

    while (srcq->rkq_fwdq) /* Resolve source queue */
        srcq = srcq->rkq_fwdq;
    if (unlikely(srcq->rkq_qlen == 0))
        return 0; /* Don't do anything if source queue is empty */

    if (do_lock)
        mtx_lock(&rkq->rkq_lock);
    if (!rkq->rkq_fwdq) {
                rd_kafka_op_t *rko;

                rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) ||
                           srcq->rkq_qlen > 0);
        if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
                        if (do_lock)
                                mtx_unlock(&rkq->rkq_lock);
            return -1;
        }
                /* First insert any prioritized ops from srcq
                 * in the right position in rkq. */
               // 先将srcq->rkq_q中rko的优先级>0的转移到rkq->rkq_q上, 并且按优先级插入适当位置
                while ((rko = TAILQ_FIRST(&srcq->rkq_q)) && rko->rko_prio > 0) {
                        TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
                        TAILQ_INSERT_SORTED(&rkq->rkq_q, rko,
                                            rd_kafka_op_t *, rko_link,
                                            rd_kafka_op_cmp_prio);
                }

                // 将srcq->rkq_q中剩下的优先级=0的拼到rkq->rkq_q队尾
        TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
        if (rkq->rkq_qlen == 0)
            rd_kafka_q_io_event(rkq);
                rkq->rkq_qlen += srcq->rkq_qlen;
                rkq->rkq_qsize += srcq->rkq_qsize;
        cnd_signal(&rkq->rkq_cond);

                rd_kafka_q_reset(srcq);
    } else
                //(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq这个有点多余, 直接rkq->rkq_fwdq 就成
        r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
                       srcq,
                       rkq->rkq_fwdq ? do_lock : 0);
    if (do_lock)
        mtx_unlock(&rkq->rkq_lock);

    return r;
}
  • 内部函数,op元素插入队列rd_kafka_q_enq0:
代码语言:javascript
复制
rd_kafka_q_enq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int at_head) {
    // 优先级=0, 放到队尾
    if (likely(!rko->rko_prio))
        TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
    else if (at_head)
            // 强制插到头部
            TAILQ_INSERT_HEAD(&rkq->rkq_q, rko, rko_link);
    else
        // 按优先级确定位置 
        TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *,
                            rko_link, rd_kafka_op_cmp_prio);
    rkq->rkq_qlen++;
    rkq->rkq_qsize += rko->rko_len;
}
  • op元素插入队列rd_kafka_q_enq:
代码语言:javascript
复制
int rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
    rd_kafka_q_t *fwdq;
    mtx_lock(&rkq->rkq_lock);
        rd_dassert(rkq->rkq_refcnt > 0);

        // 如果当前队列不是ready状态, 走rd_kafka_op_reply流程
        if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
                /* Queue has been disabled, reply to and fail the rko. */
                mtx_unlock(&rkq->rkq_lock);

                return rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY);
        }

        // 如rko->rko_serve为null的话, 把默认的serve赋值给它
        if (!rko->rko_serve && rkq->rkq_serve) {
                /* Store original queue's serve callback and opaque
                 * prior to forwarding. */
                rko->rko_serve = rkq->rkq_serve;
                rko->rko_serve_opaque = rkq->rkq_opaque;
        }

        // 如果有rkq_fwdq队列,优先入rkq_fwdq队列, 没有直接入rkq_q队列
    if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
        rd_kafka_q_enq0(rkq, rko, 0);
        cnd_signal(&rkq->rkq_cond);
        if (rkq->rkq_qlen == 1)
            rd_kafka_q_io_event(rkq);
        mtx_unlock(&rkq->rkq_lock);
    } else {
        mtx_unlock(&rkq->rkq_lock);
        rd_kafka_q_enq(fwdq, rko);
        rd_kafka_q_destroy(fwdq); //rd_kafka_q_fwd_get将引用计数+1,这里-1
    }

        return 1;
}
  • int rd_kafka_q_reenq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko)无锁版的rd_kafka_q_enq
  • 清除rd_kafka_toppar_t->version过期的op元素rd_kafka_q_purge_toppar_version
代码语言:javascript
复制
void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq,
                                      rd_kafka_toppar_t *rktp, int version) {
    rd_kafka_op_t *rko, *next;
    TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
        int32_t cnt = 0;
        int64_t size = 0;
        rd_kafka_q_t *fwdq;

    mtx_lock(&rkq->rkq_lock);

       // 有rkq_fwdq就清除rkq_fwdq
        if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
                mtx_unlock(&rkq->rkq_lock);
                rd_kafka_q_purge_toppar_version(fwdq, rktp, version);
                rd_kafka_q_destroy(fwdq);
                return;
        }

        /* Move ops to temporary queue and then destroy them from there
         * without locks to avoid lock-ordering problems in op_destroy() */
       // 比较version, 收集需要清除的op
        while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp &&
               rd_kafka_toppar_s2i(rko->rko_rktp) == rktp &&
               rko->rko_version < version) {
                TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
                TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
                cnt++;
                size += rko->rko_len;
        }


        rkq->rkq_qlen -= cnt;
        rkq->rkq_qsize -= size;
    mtx_unlock(&rkq->rkq_lock);

        /清除
    next = TAILQ_FIRST(&tmpq);
    while ((rko = next)) {
        next = TAILQ_NEXT(next, rko_link);
        rd_kafka_op_destroy(rko);
    }
}
  • 最多处理队列中的一个op元素 rd_kafka_q_pop_serve, 先看有无符合条件(按version过滤)的op可处理, 没有则等待, 如果超时, 函数退出
代码语言:javascript
复制
rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
                                     int32_t version,
                                     rd_kafka_q_cb_type_t cb_type,
                                     rd_kafka_q_serve_cb_t *callback,
                                     void *opaque) {
    rd_kafka_op_t *rko;
        rd_kafka_q_t *fwdq;

        rd_dassert(cb_type);

        // RD_POLL_INFINITE等同于一直等待,直到condition被signal
    if (timeout_ms == RD_POLL_INFINITE)
        timeout_ms = INT_MAX;

    mtx_lock(&rkq->rkq_lock);

        rd_kafka_yield_thread = 0;
        if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
                do {
                        rd_kafka_op_res_t res;
                        rd_ts_t pre;

                        /* Filter out outdated ops */
                retry:
                        //找到符合条件的op, 按version来过滤
                        while ((rko = TAILQ_FIRST(&rkq->rkq_q)) &&
                               !(rko = rd_kafka_op_filter(rkq, rko, version)))
                                ;

                        if (rko) {
                                /* Proper versioned op */
                                // 将找到的op弹出队列
                                rd_kafka_q_deq0(rkq, rko);

                                /* Ops with callbacks are considered handled
                                 * and we move on to the next op, if any.
                                 * Ops w/o callbacks are returned immediately */
                                // 处理op
                                res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko,
                                                         cb_type, opaque,
                                                         callback);
                                if (res == RD_KAFKA_OP_RES_HANDLED)
                                        goto retry; /* Next op */
                                else if (unlikely(res ==
                                                  RD_KAFKA_OP_RES_YIELD)) {
                                        /* Callback yielded, unroll */
                                        mtx_unlock(&rkq->rkq_lock);
                                        return NULL;
                                } else
                                        break; /* Proper op, handle below. */
                        }

                        /* No op, wait for one if we have time left */
                        if (timeout_ms == RD_POLL_NOWAIT)
                                break;

            pre = rd_clock();
                        //执行wait操作
            if (cnd_timedwait_ms(&rkq->rkq_cond,
                         &rkq->rkq_lock,
                         timeout_ms) ==
                thrd_timedout) {
                mtx_unlock(&rkq->rkq_lock);
                return NULL;
            }
            /* Remove spent time */
            timeout_ms -= (int) (rd_clock()-pre) / 1000;
            if (timeout_ms < 0)
                timeout_ms = RD_POLL_NOWAIT;

        } while (timeout_ms != RD_POLL_NOWAIT);

                mtx_unlock(&rkq->rkq_lock);

        } else {
                /* Since the q_pop may block we need to release the parent
                 * queue's lock. */
                mtx_unlock(&rkq->rkq_lock);
        rko = rd_kafka_q_pop_serve(fwdq, timeout_ms, version,
                       cb_type, callback, opaque);
                rd_kafka_q_destroy(fwdq);
        }
    return rko;
}
  • 批量处理队列中的op, 无需按version过滤 rd_kafka_q_serve
代码语言:javascript
复制
int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms,
                      int max_cnt, rd_kafka_q_cb_type_t cb_type,
                      rd_kafka_q_serve_cb_t *callback, void *opaque) {
        rd_kafka_t *rk = rkq->rkq_rk;
    rd_kafka_op_t *rko;
    rd_kafka_q_t localq;
        rd_kafka_q_t *fwdq;
        int cnt = 0;

        rd_dassert(cb_type);

    mtx_lock(&rkq->rkq_lock);

        rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0);
        if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
                int ret;
                /* Since the q_pop may block we need to release the parent
                 * queue's lock. */
                mtx_unlock(&rkq->rkq_lock);
        ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt,
                                       cb_type, callback, opaque);
                rd_kafka_q_destroy(fwdq);
        return ret;
    }

    if (timeout_ms == RD_POLL_INFINITE)
        timeout_ms = INT_MAX;

    /* Wait for op */
       // 如果当前队列为空, 就wait, 如果wait超时后队列还为空就直接退出
    while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && timeout_ms != 0) {
        if (cnd_timedwait_ms(&rkq->rkq_cond,
                     &rkq->rkq_lock,
                     timeout_ms) != thrd_success)
            break;

        timeout_ms = 0;
    }

    if (!rko) {
        mtx_unlock(&rkq->rkq_lock);
        return 0;
    }

       //将需要处理的op转移到临时的localq中
    /* Move the first `max_cnt` ops. */
    rd_kafka_q_init(&localq, rkq->rkq_rk);
    rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1/*all*/ : max_cnt,
                0/*no-locks*/);

        mtx_unlock(&rkq->rkq_lock);

        rd_kafka_yield_thread = 0;

    /* Call callback for each op */
       // 处理op
        while ((rko = TAILQ_FIRST(&localq.rkq_q))) {
                rd_kafka_op_res_t res;

                rd_kafka_q_deq0(&localq, rko);
                res = rd_kafka_op_handle(rk, &localq, rko, cb_type,
                                         opaque, callback);
                /* op must have been handled */
                rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);
                cnt++;

                if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
                             rd_kafka_yield_thread)) {
                        /* Callback called rd_kafka_yield(), we must
                         * stop our callback dispatching and put the
                         * ops in localq back on the original queue head. */
                        // 当前thread资源被出让, 没处理完的op要还回到rkq中
                        if (!TAILQ_EMPTY(&localq.rkq_q))
                                rd_kafka_q_prepend(rkq, &localq);
                        break;
                }
    }

    rd_kafka_q_destroy_owner(&localq);

    return cnt;
}
  • 等待处理单个op:
代码语言:javascript
复制
rd_kafka_resp_err_t rd_kafka_q_wait_result (rd_kafka_q_t *rkq, int timeout_ms) {
        rd_kafka_op_t *rko;
        rd_kafka_resp_err_t err;

        rko = rd_kafka_q_pop(rkq, timeout_ms, 0);
        if (!rko)
                err = RD_KAFKA_RESP_ERR__TIMED_OUT;
        else {
                err = rko->rko_err;
                rd_kafka_op_destroy(rko);
        }

        return err;
}
  • 处理 fetch操作, 即获取kafka message rd_kafka_q_serve_rkmessages:
代码语言:javascript
复制
int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
                                 rd_kafka_message_t **rkmessages,
                                 size_t rkmessages_size) {
    unsigned int cnt = 0;
        TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
        rd_kafka_op_t *rko, *next;
        rd_kafka_t *rk = rkq->rkq_rk;
        rd_kafka_q_t *fwdq;

    mtx_lock(&rkq->rkq_lock);
        // 如果有 forward queue就处理之
        if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
                /* Since the q_pop may block we need to release the parent
                 * queue's lock. */
                mtx_unlock(&rkq->rkq_lock);
        cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms,
                          rkmessages, rkmessages_size);
                rd_kafka_q_destroy(fwdq);
        return cnt;
    }
        mtx_unlock(&rkq->rkq_lock);

        rd_kafka_yield_thread = 0;
    while (cnt < rkmessages_size) {
                rd_kafka_op_res_t res;

                mtx_lock(&rkq->rkq_lock);

               // 如果rkq_q为空, 就wait, 如果超时,退出
        while (!(rko = TAILQ_FIRST(&rkq->rkq_q))) {
            if (cnd_timedwait_ms(&rkq->rkq_cond, &rkq->rkq_lock,
                                             timeout_ms) == thrd_timedout)
                break;
        }

        if (!rko) {
                        mtx_unlock(&rkq->rkq_lock);
            break; /* Timed out */
                }

                // op弹出队列
        rd_kafka_q_deq0(rkq, rko);

                mtx_unlock(&rkq->rkq_lock);

                // 判断op是否过期,过期的话加入到tmpq, 之后统一清除
        if (rd_kafka_op_version_outdated(rko, 0)) {
                        /* Outdated op, put on discard queue */
                        TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
                        continue;
                }

                /* Serve non-FETCH callbacks */
                res = rd_kafka_poll_cb(rk, rkq, rko,
                                       RD_KAFKA_Q_CB_RETURN, NULL);
                if (res == RD_KAFKA_OP_RES_HANDLED) {
                        /* Callback served, rko is destroyed. */
                        continue;
                } else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
                                    rd_kafka_yield_thread)) {
                        /* Yield. */
                        break;
                }
                rd_dassert(res == RD_KAFKA_OP_RES_PASS);

        /* Auto-commit offset, if enabled. */
        if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) {
                        rd_kafka_toppar_t *rktp;
                        rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
            rd_kafka_toppar_lock(rktp);
            rktp->rktp_app_offset = rko->rko_u.fetch.rkm.rkm_offset+1;
                        if (rktp->rktp_cgrp &&
                rk->rk_conf.enable_auto_offset_store)
                                //存储offset, auto-commit时用
                                rd_kafka_offset_store0(rktp,
                               rktp->rktp_app_offset,
                                                       0/* no lock */);
            rd_kafka_toppar_unlock(rktp);
                }

        /* Get rkmessage from rko and append to array. */
        rkmessages[cnt++] = rd_kafka_message_get(rko);
    }

        /* Discard non-desired and already handled ops */
        next = TAILQ_FIRST(&tmpq);
        while (next) {
                rko = next;
                next = TAILQ_NEXT(next, rko_link);
                rd_kafka_op_destroy(rko);
        }

    return cnt;
}
rd_kafka_queue_s
  • 所在文件: src/rdkafka_queue.h
  • 这个是对外提供服务的接口,以rd_kafka_t和rd_kafka_q_t组合在一起
  • 定义:
代码语言:javascript
复制
struct rd_kafka_queue_s {
    rd_kafka_q_t *rkqu_q;
        rd_kafka_t   *rkqu_rk;
};
  • 创建 rd_kafka_queue_s:
代码语言:javascript
复制
rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk) {
    rd_kafka_q_t *rkq;
    rd_kafka_queue_t *rkqu;

    rkq = rd_kafka_q_new(rk);
    rkqu = rd_kafka_queue_new0(rk, rkq);
    rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held
                  * by queue_new0 */
    return rkqu;
}
  • 获取rdkafka和应用层交互使用的 rd_kafka_queue_s:
代码语言:javascript
复制
rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk) {
    return rd_kafka_queue_new0(rk, rk->rk_rep);
}
  • 获取consumer对应的 rd_kafka_queue_s:
代码语言:javascript
复制
rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) {
    if (!rk->rk_cgrp)
        return NULL;
    return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);
}
  • 获取任一partition对应的rd_kafka_queue_s:
代码语言:javascript
复制
rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
                                                const char *topic,
                                                int32_t partition) {
        shptr_rd_kafka_toppar_t *s_rktp;
        rd_kafka_toppar_t *rktp;
        rd_kafka_queue_t *result;

        if (rk->rk_type == RD_KAFKA_PRODUCER)
                return NULL;

        s_rktp = rd_kafka_toppar_get2(rk, topic,
                                      partition,
                                      0, /* no ua_on_miss */
                                      1 /* create_on_miss */);

        if (!s_rktp)
                return NULL;

        rktp = rd_kafka_toppar_s2i(s_rktp);
        result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);
        rd_kafka_toppar_destroy(s_rktp);

        return result;
}

Librdkafka源码分析-Content Table

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.01.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Librdkafka源码分析-Content Table
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档