struct rd_kafka_q_s {
mtx_t rkq_lock; // 对队列操作的加锁用
cnd_t rkq_cond; // 队列中放入新的元素时, 用条件变量唤醒相应的等待线程
struct rd_kafka_q_s *rkq_fwdq; /* Forwarded/Routed queue.
* Used in place of this queue
* for all operations. */
// 放入队列的操作都存在这个tailq队列里
struct rd_kafka_op_tailq rkq_q; /* TAILQ_HEAD(, rd_kafka_op_s) */
int rkq_qlen; /* Number of entries in queue */
int64_t rkq_qsize; /* Size of all entries in queue */
int rkq_refcnt; //引用计数
int rkq_flags; // 当前队列的状态,以下三个可选值
#define RD_KAFKA_Q_F_ALLOCATED 0x1 /* Allocated: rd_free on destroy */
#define RD_KAFKA_Q_F_READY 0x2 /* Queue is ready to be used.
* Flag is cleared on destroy */
#define RD_KAFKA_Q_F_FWD_APP 0x4 /* Queue is being forwarded by a call
* to rd_kafka_queue_forward. */
rd_kafka_t *rkq_rk; //此队列关联的rd_kafka_t句柄
//队列中放入新的元素时, 用向fd写入数据的方式唤醒相应的等待线程
struct rd_kafka_q_io *rkq_qio; /* FD-based application signalling */
// 队列中的操作被执行中所执行的回调函数
/* Op serve callback (optional).
* Mainly used for forwarded queues to use the original queue's
* serve function from the forwarded position.
* Shall return 1 if op was handled, else 0. */
rd_kafka_q_serve_cb_t *rkq_serve;
void *rkq_opaque;
// 当前队列的名字,主要是调试用
#if ENABLE_DEVEL
char rkq_name[64]; /* Debugging: queue name (FUNC:LINE) */
#else
const char *rkq_name; /* Debugging: queue name (FUNC) */
#endif
};
rd_kafka_q_purge0
:int rd_kafka_q_purge0 (rd_kafka_q_t *rkq, int do_lock) {
rd_kafka_op_t *rko, *next;
TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
rd_kafka_q_t *fwdq;
int cnt = 0;
if (do_lock)
mtx_lock(&rkq->rkq_lock);
// 如果rkq_fwdq有值不为空, 就清除rkq_fwdq队列
// rd_kafka_q_fwd_get这方法会将rkq_fwdq的引用计数加1
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
if (do_lock)
mtx_unlock(&rkq->rkq_lock);
cnt = rd_kafka_q_purge(fwdq);
rd_kafka_q_destroy(fwdq);
return cnt;
}
/* Move ops queue to tmpq to avoid lock-order issue
* by locks taken from rd_kafka_op_destroy(). */
// 将rkq->rkq_q这个tailq的队列挂到tmpq这个临时tailq上面, 减少lock的时间
TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link);
/* Zero out queue */
rd_kafka_q_reset(rkq);
if (do_lock)
mtx_unlock(&rkq->rkq_lock);
/* Destroy the ops */
// destroy 队列中的每一个op
next = TAILQ_FIRST(&tmpq);
while ((rko = next)) {
next = TAILQ_NEXT(next, rko_link);
rd_kafka_op_destroy(rko);
cnt++;
}
return cnt;
}
rd_kafka_q_destroy0
:static RD_INLINE RD_UNUSED
void rd_kafka_q_destroy0 (rd_kafka_q_t *rkq, int disable) {
int do_delete = 0;
if (disable) {
/* To avoid recursive locking (from ops being purged
* that reference this queue somehow),
* we disable the queue and purge it with individual
* locking. */
rd_kafka_q_disable0(rkq, 1/*lock*/);
// 清除队列中所有的op元素
rd_kafka_q_purge0(rkq, 1/*lock*/);
}
mtx_lock(&rkq->rkq_lock);
rd_kafka_assert(NULL, rkq->rkq_refcnt > 0);
do_delete = !--rkq->rkq_refcnt;
mtx_unlock(&rkq->rkq_lock);
// 根据引用计数来确定是否调用rd_kafka_q_destroy_final
if (unlikely(do_delete))
rd_kafka_q_destroy_final(rkq);
}
void rd_kafka_q_fwd_set0 (rd_kafka_q_t *srcq, rd_kafka_q_t *destq,
int do_lock, int fwd_app) {
if (do_lock)
mtx_lock(&srcq->rkq_lock);
if (fwd_app)
srcq->rkq_flags |= RD_KAFKA_Q_F_FWD_APP;
// 先destroy原有的rkq_fwdq
if (srcq->rkq_fwdq) {
rd_kafka_q_destroy(srcq->rkq_fwdq);
srcq->rkq_fwdq = NULL;
}
if (destq) {
//被forward的destq的引用计数加1
rd_kafka_q_keep(destq);
/* If rkq has ops in queue, append them to fwdq's queue.
* This is an irreversible operation. */
if (srcq->rkq_qlen > 0) {
rd_dassert(destq->rkq_flags & RD_KAFKA_Q_F_READY);
// 拼接destq和原有的srcq, 下面详细分析
rd_kafka_q_concat(destq, srcq);
}
//重新赋值rkq_fwdq
srcq->rkq_fwdq = destq;
}
if (do_lock)
mtx_unlock(&srcq->rkq_lock);
}
rd_kafka_q_concat0
:int rd_kafka_q_concat0 (rd_kafka_q_t *rkq, rd_kafka_q_t *srcq, int do_lock) {
int r = 0;
while (srcq->rkq_fwdq) /* Resolve source queue */
srcq = srcq->rkq_fwdq;
if (unlikely(srcq->rkq_qlen == 0))
return 0; /* Don't do anything if source queue is empty */
if (do_lock)
mtx_lock(&rkq->rkq_lock);
if (!rkq->rkq_fwdq) {
rd_kafka_op_t *rko;
rd_dassert(TAILQ_EMPTY(&srcq->rkq_q) ||
srcq->rkq_qlen > 0);
if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
if (do_lock)
mtx_unlock(&rkq->rkq_lock);
return -1;
}
/* First insert any prioritized ops from srcq
* in the right position in rkq. */
// 先将srcq->rkq_q中rko的优先级>0的转移到rkq->rkq_q上, 并且按优先级插入适当位置
while ((rko = TAILQ_FIRST(&srcq->rkq_q)) && rko->rko_prio > 0) {
TAILQ_REMOVE(&srcq->rkq_q, rko, rko_link);
TAILQ_INSERT_SORTED(&rkq->rkq_q, rko,
rd_kafka_op_t *, rko_link,
rd_kafka_op_cmp_prio);
}
// 将srcq->rkq_q中剩下的优先级=0的拼到rkq->rkq_q队尾
TAILQ_CONCAT(&rkq->rkq_q, &srcq->rkq_q, rko_link);
if (rkq->rkq_qlen == 0)
rd_kafka_q_io_event(rkq);
rkq->rkq_qlen += srcq->rkq_qlen;
rkq->rkq_qsize += srcq->rkq_qsize;
cnd_signal(&rkq->rkq_cond);
rd_kafka_q_reset(srcq);
} else
//(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq这个有点多余, 直接rkq->rkq_fwdq 就成
r = rd_kafka_q_concat0(rkq->rkq_fwdq ? rkq->rkq_fwdq : rkq,
srcq,
rkq->rkq_fwdq ? do_lock : 0);
if (do_lock)
mtx_unlock(&rkq->rkq_lock);
return r;
}
rd_kafka_q_enq0
:rd_kafka_q_enq0 (rd_kafka_q_t *rkq, rd_kafka_op_t *rko, int at_head) {
// 优先级=0, 放到队尾
if (likely(!rko->rko_prio))
TAILQ_INSERT_TAIL(&rkq->rkq_q, rko, rko_link);
else if (at_head)
// 强制插到头部
TAILQ_INSERT_HEAD(&rkq->rkq_q, rko, rko_link);
else
// 按优先级确定位置
TAILQ_INSERT_SORTED(&rkq->rkq_q, rko, rd_kafka_op_t *,
rko_link, rd_kafka_op_cmp_prio);
rkq->rkq_qlen++;
rkq->rkq_qsize += rko->rko_len;
}
rd_kafka_q_enq
:int rd_kafka_q_enq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko) {
rd_kafka_q_t *fwdq;
mtx_lock(&rkq->rkq_lock);
rd_dassert(rkq->rkq_refcnt > 0);
// 如果当前队列不是ready状态, 走rd_kafka_op_reply流程
if (unlikely(!(rkq->rkq_flags & RD_KAFKA_Q_F_READY))) {
/* Queue has been disabled, reply to and fail the rko. */
mtx_unlock(&rkq->rkq_lock);
return rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__DESTROY);
}
// 如rko->rko_serve为null的话, 把默认的serve赋值给它
if (!rko->rko_serve && rkq->rkq_serve) {
/* Store original queue's serve callback and opaque
* prior to forwarding. */
rko->rko_serve = rkq->rkq_serve;
rko->rko_serve_opaque = rkq->rkq_opaque;
}
// 如果有rkq_fwdq队列,优先入rkq_fwdq队列, 没有直接入rkq_q队列
if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
rd_kafka_q_enq0(rkq, rko, 0);
cnd_signal(&rkq->rkq_cond);
if (rkq->rkq_qlen == 1)
rd_kafka_q_io_event(rkq);
mtx_unlock(&rkq->rkq_lock);
} else {
mtx_unlock(&rkq->rkq_lock);
rd_kafka_q_enq(fwdq, rko);
rd_kafka_q_destroy(fwdq); //rd_kafka_q_fwd_get将引用计数+1,这里-1
}
return 1;
}
int rd_kafka_q_reenq (rd_kafka_q_t *rkq, rd_kafka_op_t *rko)
无锁版的rd_kafka_q_enq
rd_kafka_q_purge_toppar_version
void rd_kafka_q_purge_toppar_version (rd_kafka_q_t *rkq,
rd_kafka_toppar_t *rktp, int version) {
rd_kafka_op_t *rko, *next;
TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
int32_t cnt = 0;
int64_t size = 0;
rd_kafka_q_t *fwdq;
mtx_lock(&rkq->rkq_lock);
// 有rkq_fwdq就清除rkq_fwdq
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
mtx_unlock(&rkq->rkq_lock);
rd_kafka_q_purge_toppar_version(fwdq, rktp, version);
rd_kafka_q_destroy(fwdq);
return;
}
/* Move ops to temporary queue and then destroy them from there
* without locks to avoid lock-ordering problems in op_destroy() */
// 比较version, 收集需要清除的op
while ((rko = TAILQ_FIRST(&rkq->rkq_q)) && rko->rko_rktp &&
rd_kafka_toppar_s2i(rko->rko_rktp) == rktp &&
rko->rko_version < version) {
TAILQ_REMOVE(&rkq->rkq_q, rko, rko_link);
TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
cnt++;
size += rko->rko_len;
}
rkq->rkq_qlen -= cnt;
rkq->rkq_qsize -= size;
mtx_unlock(&rkq->rkq_lock);
/清除
next = TAILQ_FIRST(&tmpq);
while ((rko = next)) {
next = TAILQ_NEXT(next, rko_link);
rd_kafka_op_destroy(rko);
}
}
rd_kafka_q_pop_serve
, 先看有无符合条件(按version过滤)的op可处理, 没有则等待, 如果超时, 函数退出rd_kafka_op_t *rd_kafka_q_pop_serve (rd_kafka_q_t *rkq, int timeout_ms,
int32_t version,
rd_kafka_q_cb_type_t cb_type,
rd_kafka_q_serve_cb_t *callback,
void *opaque) {
rd_kafka_op_t *rko;
rd_kafka_q_t *fwdq;
rd_dassert(cb_type);
// RD_POLL_INFINITE等同于一直等待,直到condition被signal
if (timeout_ms == RD_POLL_INFINITE)
timeout_ms = INT_MAX;
mtx_lock(&rkq->rkq_lock);
rd_kafka_yield_thread = 0;
if (!(fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
do {
rd_kafka_op_res_t res;
rd_ts_t pre;
/* Filter out outdated ops */
retry:
//找到符合条件的op, 按version来过滤
while ((rko = TAILQ_FIRST(&rkq->rkq_q)) &&
!(rko = rd_kafka_op_filter(rkq, rko, version)))
;
if (rko) {
/* Proper versioned op */
// 将找到的op弹出队列
rd_kafka_q_deq0(rkq, rko);
/* Ops with callbacks are considered handled
* and we move on to the next op, if any.
* Ops w/o callbacks are returned immediately */
// 处理op
res = rd_kafka_op_handle(rkq->rkq_rk, rkq, rko,
cb_type, opaque,
callback);
if (res == RD_KAFKA_OP_RES_HANDLED)
goto retry; /* Next op */
else if (unlikely(res ==
RD_KAFKA_OP_RES_YIELD)) {
/* Callback yielded, unroll */
mtx_unlock(&rkq->rkq_lock);
return NULL;
} else
break; /* Proper op, handle below. */
}
/* No op, wait for one if we have time left */
if (timeout_ms == RD_POLL_NOWAIT)
break;
pre = rd_clock();
//执行wait操作
if (cnd_timedwait_ms(&rkq->rkq_cond,
&rkq->rkq_lock,
timeout_ms) ==
thrd_timedout) {
mtx_unlock(&rkq->rkq_lock);
return NULL;
}
/* Remove spent time */
timeout_ms -= (int) (rd_clock()-pre) / 1000;
if (timeout_ms < 0)
timeout_ms = RD_POLL_NOWAIT;
} while (timeout_ms != RD_POLL_NOWAIT);
mtx_unlock(&rkq->rkq_lock);
} else {
/* Since the q_pop may block we need to release the parent
* queue's lock. */
mtx_unlock(&rkq->rkq_lock);
rko = rd_kafka_q_pop_serve(fwdq, timeout_ms, version,
cb_type, callback, opaque);
rd_kafka_q_destroy(fwdq);
}
return rko;
}
rd_kafka_q_serve
int rd_kafka_q_serve (rd_kafka_q_t *rkq, int timeout_ms,
int max_cnt, rd_kafka_q_cb_type_t cb_type,
rd_kafka_q_serve_cb_t *callback, void *opaque) {
rd_kafka_t *rk = rkq->rkq_rk;
rd_kafka_op_t *rko;
rd_kafka_q_t localq;
rd_kafka_q_t *fwdq;
int cnt = 0;
rd_dassert(cb_type);
mtx_lock(&rkq->rkq_lock);
rd_dassert(TAILQ_EMPTY(&rkq->rkq_q) || rkq->rkq_qlen > 0);
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
int ret;
/* Since the q_pop may block we need to release the parent
* queue's lock. */
mtx_unlock(&rkq->rkq_lock);
ret = rd_kafka_q_serve(fwdq, timeout_ms, max_cnt,
cb_type, callback, opaque);
rd_kafka_q_destroy(fwdq);
return ret;
}
if (timeout_ms == RD_POLL_INFINITE)
timeout_ms = INT_MAX;
/* Wait for op */
// 如果当前队列为空, 就wait, 如果wait超时后队列还为空就直接退出
while (!(rko = TAILQ_FIRST(&rkq->rkq_q)) && timeout_ms != 0) {
if (cnd_timedwait_ms(&rkq->rkq_cond,
&rkq->rkq_lock,
timeout_ms) != thrd_success)
break;
timeout_ms = 0;
}
if (!rko) {
mtx_unlock(&rkq->rkq_lock);
return 0;
}
//将需要处理的op转移到临时的localq中
/* Move the first `max_cnt` ops. */
rd_kafka_q_init(&localq, rkq->rkq_rk);
rd_kafka_q_move_cnt(&localq, rkq, max_cnt == 0 ? -1/*all*/ : max_cnt,
0/*no-locks*/);
mtx_unlock(&rkq->rkq_lock);
rd_kafka_yield_thread = 0;
/* Call callback for each op */
// 处理op
while ((rko = TAILQ_FIRST(&localq.rkq_q))) {
rd_kafka_op_res_t res;
rd_kafka_q_deq0(&localq, rko);
res = rd_kafka_op_handle(rk, &localq, rko, cb_type,
opaque, callback);
/* op must have been handled */
rd_kafka_assert(NULL, res != RD_KAFKA_OP_RES_PASS);
cnt++;
if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
rd_kafka_yield_thread)) {
/* Callback called rd_kafka_yield(), we must
* stop our callback dispatching and put the
* ops in localq back on the original queue head. */
// 当前thread资源被出让, 没处理完的op要还回到rkq中
if (!TAILQ_EMPTY(&localq.rkq_q))
rd_kafka_q_prepend(rkq, &localq);
break;
}
}
rd_kafka_q_destroy_owner(&localq);
return cnt;
}
rd_kafka_resp_err_t rd_kafka_q_wait_result (rd_kafka_q_t *rkq, int timeout_ms) {
rd_kafka_op_t *rko;
rd_kafka_resp_err_t err;
rko = rd_kafka_q_pop(rkq, timeout_ms, 0);
if (!rko)
err = RD_KAFKA_RESP_ERR__TIMED_OUT;
else {
err = rko->rko_err;
rd_kafka_op_destroy(rko);
}
return err;
}
rd_kafka_q_serve_rkmessages
:int rd_kafka_q_serve_rkmessages (rd_kafka_q_t *rkq, int timeout_ms,
rd_kafka_message_t **rkmessages,
size_t rkmessages_size) {
unsigned int cnt = 0;
TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
rd_kafka_op_t *rko, *next;
rd_kafka_t *rk = rkq->rkq_rk;
rd_kafka_q_t *fwdq;
mtx_lock(&rkq->rkq_lock);
// 如果有 forward queue就处理之
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
/* Since the q_pop may block we need to release the parent
* queue's lock. */
mtx_unlock(&rkq->rkq_lock);
cnt = rd_kafka_q_serve_rkmessages(fwdq, timeout_ms,
rkmessages, rkmessages_size);
rd_kafka_q_destroy(fwdq);
return cnt;
}
mtx_unlock(&rkq->rkq_lock);
rd_kafka_yield_thread = 0;
while (cnt < rkmessages_size) {
rd_kafka_op_res_t res;
mtx_lock(&rkq->rkq_lock);
// 如果rkq_q为空, 就wait, 如果超时,退出
while (!(rko = TAILQ_FIRST(&rkq->rkq_q))) {
if (cnd_timedwait_ms(&rkq->rkq_cond, &rkq->rkq_lock,
timeout_ms) == thrd_timedout)
break;
}
if (!rko) {
mtx_unlock(&rkq->rkq_lock);
break; /* Timed out */
}
// op弹出队列
rd_kafka_q_deq0(rkq, rko);
mtx_unlock(&rkq->rkq_lock);
// 判断op是否过期,过期的话加入到tmpq, 之后统一清除
if (rd_kafka_op_version_outdated(rko, 0)) {
/* Outdated op, put on discard queue */
TAILQ_INSERT_TAIL(&tmpq, rko, rko_link);
continue;
}
/* Serve non-FETCH callbacks */
res = rd_kafka_poll_cb(rk, rkq, rko,
RD_KAFKA_Q_CB_RETURN, NULL);
if (res == RD_KAFKA_OP_RES_HANDLED) {
/* Callback served, rko is destroyed. */
continue;
} else if (unlikely(res == RD_KAFKA_OP_RES_YIELD ||
rd_kafka_yield_thread)) {
/* Yield. */
break;
}
rd_dassert(res == RD_KAFKA_OP_RES_PASS);
/* Auto-commit offset, if enabled. */
if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) {
rd_kafka_toppar_t *rktp;
rktp = rd_kafka_toppar_s2i(rko->rko_rktp);
rd_kafka_toppar_lock(rktp);
rktp->rktp_app_offset = rko->rko_u.fetch.rkm.rkm_offset+1;
if (rktp->rktp_cgrp &&
rk->rk_conf.enable_auto_offset_store)
//存储offset, auto-commit时用
rd_kafka_offset_store0(rktp,
rktp->rktp_app_offset,
0/* no lock */);
rd_kafka_toppar_unlock(rktp);
}
/* Get rkmessage from rko and append to array. */
rkmessages[cnt++] = rd_kafka_message_get(rko);
}
/* Discard non-desired and already handled ops */
next = TAILQ_FIRST(&tmpq);
while (next) {
rko = next;
next = TAILQ_NEXT(next, rko_link);
rd_kafka_op_destroy(rko);
}
return cnt;
}
struct rd_kafka_queue_s {
rd_kafka_q_t *rkqu_q;
rd_kafka_t *rkqu_rk;
};
rd_kafka_queue_s
:rd_kafka_queue_t *rd_kafka_queue_new (rd_kafka_t *rk) {
rd_kafka_q_t *rkq;
rd_kafka_queue_t *rkqu;
rkq = rd_kafka_q_new(rk);
rkqu = rd_kafka_queue_new0(rk, rkq);
rd_kafka_q_destroy(rkq); /* Loose refcount from q_new, one is held
* by queue_new0 */
return rkqu;
}
rd_kafka_queue_s
:rd_kafka_queue_t *rd_kafka_queue_get_main (rd_kafka_t *rk) {
return rd_kafka_queue_new0(rk, rk->rk_rep);
}
rd_kafka_queue_s
:rd_kafka_queue_t *rd_kafka_queue_get_consumer (rd_kafka_t *rk) {
if (!rk->rk_cgrp)
return NULL;
return rd_kafka_queue_new0(rk, rk->rk_cgrp->rkcg_q);
}
rd_kafka_queue_s
:rd_kafka_queue_t *rd_kafka_queue_get_partition (rd_kafka_t *rk,
const char *topic,
int32_t partition) {
shptr_rd_kafka_toppar_t *s_rktp;
rd_kafka_toppar_t *rktp;
rd_kafka_queue_t *result;
if (rk->rk_type == RD_KAFKA_PRODUCER)
return NULL;
s_rktp = rd_kafka_toppar_get2(rk, topic,
partition,
0, /* no ua_on_miss */
1 /* create_on_miss */);
if (!s_rktp)
return NULL;
rktp = rd_kafka_toppar_s2i(s_rktp);
result = rd_kafka_queue_new0(rk, rktp->rktp_fetchq);
rd_kafka_toppar_destroy(s_rktp);
return result;
}