
内容来自:程序员老廖
本项目围绕一个"任务队列 TaskQueue"展开,核心是用清晰、可讲解的代码实现一个贴近真实业务的异步任务调度系统,支持:
在不引入花哨语法、只用常见 C++11/14/17 特性的前提下,我们重点实现:
系统也会涉及到常用 C++11/14/17 语法。
视频教程与源码领取:C++进阶项目:工业级多任务队列设计与实现


这是任务队列中最关键的并发问题,会导致任务延迟执行甚至挂起。
在多线程环境下,工作线程和提交线程之间存在一个危险的时间窗口:

// ❌ 错误的实现
void TaskQueueSTD::postTask(std::unique_ptr<QueuedTask> task, TaskPriority prio) {
{
std::unique_lock<std::mutex> lock(pending_mutex_);
// 添加任务到队列
pending_normal_.push(std::move(entry));
} // 锁在这里释放
notifyWake(); // ❌ 在锁外调用,存在时间窗口!
}
void TaskQueueSTD::processTasks() {
while (true) {
auto task = getNextTask(); // 步骤1:检查队列(持有锁)
// 步骤2:释放锁
// ⚠️ 危险窗口:此时其他线程可能添加任务并唤醒
if (!task.run_task_) {
flag_notify_.wait(...); // 步骤3:开始等待(可能错过唤醒)
}
}
}核心原则:在释放锁后立即调用 notifyWake(),最小化时间窗口。
// ✅ 正确的实现
void TaskQueueSTD::postTask(std::unique_ptr<QueuedTask> task, TaskPriority prio) {
std::unique_lock<std::mutex> lock(pending_mutex_);
// 添加任务到队列
pending_normal_.push(std::move(entry));
lock.unlock(); // ✅ 显式释放锁
notifyWake(); // ✅ 立即通知,最小化时间窗口
}// 对比两种写法的时序差异
// ❌ 方式1:作用域结束自动释放锁
{
std::unique_lock<std::mutex> lock(pending_mutex_);
pending_normal_.push(std::move(entry));
} // 锁释放
// 可能执行其他清理代码
// 可能发生线程切换
notifyWake(); // 延迟较大
// ✅ 方式2:显式unlock + 立即notify
std::unique_lock<std::mutex> lock(pending_mutex_);
pending_normal_.push(std::move(entry));
lock.unlock(); // 精确控制释放时机
notifyWake(); // 紧接着通知,时间窗口最小当延迟任务的剩余时间小于 1ms 时,std::chrono::duration_cast 会将其截断为 0,导致工作线程无法区分:
// 获取下一个任务
auto diff = std::chrono::duration_cast<Millis>(delay_info.next_fire_at_ - tick);
result.sleep_time_ms_ = diff.count(); // 可能被截断为 0
// 工作线程处理
if (task.sleep_time_ms_ == 0) {
// ❌ 问题:无法区分"没任务"还是"即将到期"
flag_notify_.wait(vi::Event::kForever); // 可能导致任务延迟
}重新检查延迟队列来区分两种情况:
// ✅ 正确的处理逻辑
if (task.sleep_time_ms_ > 0) {
// 情况1:有明确的等待时间
flag_notify_.wait(static_cast<int>(task.sleep_time_ms_));
} else if (task.sleep_time_ms_ == 0) {
// 情况2:时间为0,需要区分原因
std::unique_lock<std::mutex> lock(pending_mutex_);
bool has_delayed = !delayed_queue_.empty();
lock.unlock();
if (has_delayed) {
// 有延迟任务即将到期,立即循环重新检查
// 不等待,直接进入下一次循环
} else {
// 没有任何任务,无限等待
flag_notify_.wait(vi::Event::kForever);
}
} else { // sleep_time_ms_ < 0
// 情况3:任务已过期,立即处理
// 不等待,直接进入下一次循环
}
周期任务通过自我重新投递实现循环执行:
template <typename Closure>
class PeriodicTask : public QueuedTask {
private:
bool run() override {
// 1. 执行业务逻辑
closure_();
// 2. 重新投递自己
TaskQueueBase* current = TaskQueueBase::current();
if (current) {
current->postDelayedTask(
std::unique_ptr<QueuedTask>(this),
interval_ms_);
// 3. 返回 false 表示所有权已转移
return false;
}
return true;
}
typename std::decay<Closure>::type closure_;
uint32_t interval_ms_{};
};
支持三种退避策略:
struct RetryStrategy {
enum class Type {
Fixed, // 固定间隔:base, base, base...
Linear, // 线性退避:base, 2*base, 3*base...
Exponential // 指数退避:base, 2*base, 4*base, 8*base...
};
Type type{Type::Fixed};
uint32_t base_delay_ms{1000};
};// 内部实现
std::queue<PendingEntry> pending_high_; // 高优先级
std::queue<PendingEntry> pending_normal_; // 普通优先级
std::queue<PendingEntry> pending_low_; // 低优先级
关键设计点:

// 单线程队列(默认)
auto queue1 = TaskQueue::create("worker1"); // 1个工作线程
// 线程池模式
auto queue_pool = std::make_unique<TaskQueue>(
std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueSTD("pool", 4) // 4个工作线程
)
);
关键点:
struct CapacityConfig {
std::size_t max_pending{0}; // 即时任务队列容量(0=不限制)
std::size_t max_delayed{0}; // 延迟任务队列容量(0=不限制)
std::function<void(std::unique_ptr<QueuedTask>)> on_reject; // 拒绝回调
};
// 使用示例
TQ("worker1")->configureCapacity({
.max_pending = 1000,
.max_delayed = 500,
.on_reject = [](std::unique_ptr<QueuedTask> task) {
std::cerr << "任务被拒绝:队列已满" << std::endl;
}
});
// 执行任务时的异常捕获
QueuedTask* release_ptr = task.run_task_.release();
try {
if (release_ptr->run()) {
delete release_ptr;
}
} catch (const std::exception& e) {
std::cerr << "[TaskQueueSTD:" << name_ << "] std::exception: "
<< e.what() << std::endl;
delete release_ptr;
} catch (...) {
std::cerr << "[TaskQueueSTD:" << name_ << "] unknown exception" << std::endl;
delete release_ptr;
}struct QueueStats {
std::uint64_t executed_task_count{0}; // 已执行任务总数
std::size_t pending_task_count{0}; // 待执行即时任务数
std::size_t delayed_task_count{0}; // 待执行延迟任务数
};
// 使用示例
auto stats = TQ("worker1")->stats();
std::cout << "执行=" << stats.executed_task_count
<< ", 待处理=" << stats.pending_task_count
<< ", 延迟=" << stats.delayed_task_count << std::endl;// CurrentTaskQueueSetter 使用 RAII 模式
class CurrentTaskQueueSetter {
public:
explicit CurrentTaskQueueSetter(TaskQueueBase* taskQueue)
: _previous(_current) {
_current = taskQueue; // 构造时设置
}
~CurrentTaskQueueSetter() {
_current = _previous; // 析构时恢复
}
private:
TaskQueueBase* const _previous;
};
// 使用
void processTasks() {
CurrentTaskQueueSetter setCurrent(this); // 自动管理 thread_local
// 任务执行期间,current() 返回正确的队列指针
}// 任务所有权流转
Client TaskQueue QueuedTask
| | |
|--postTask(task)------->| |
| (move ownership) | |
| |--store in queue------->|
| | |
| Worker Thread |
| | |
| |<--getNextTask()--------|
| | |
| |--run()---------------->|
| | |
| | return true: delete |
| | return false: requeue |template <typename Closure>
class ClosureTask : public QueuedTask {
public:
explicit ClosureTask(Closure&& closure)
: closure_(std::forward<Closure>(closure)) {}
private:
typename std::decay<Closure>::type closure_;
};为什么使用 std::decay?
using Clock = std::chrono::steady_clock;
using TimePoint = Clock::time_point;
using Millis = std::chrono::milliseconds;
// 计算延迟任务触发时间
auto fire_at = Clock::now() + Millis(delay_ms);
// 计算剩余时间
auto diff = std::chrono::duration_cast<Millis>(fire_at - now());
int64_t remaining_ms = diff.count();namespace {
thread_local TaskQueueBase* _current = nullptr;
}
// 每个线程有独立的 _current 副本
TaskQueueBase* TaskQueueBase::current() {
return _current;
}// 只接受非 unique_ptr<QueuedTask> 类型的闭包
template <class Closure,
typename std::enable_if<
!std::is_convertible<Closure, std::unique_ptr<QueuedTask>>::value
>::type* = nullptr>
void postTask(Closure&& closure) {
postTask(ToQueuedTask(std::forward<Closure>(closure)));
}TQ("heartbeat")->postPeriodicTask([](){
// 每30秒上报一次心跳
sendHeartbeat();
}, 30000);auto order_id = createOrder();
auto task_id = TQ("order")->postCancellableDelayedTask([order_id](){
// 30分钟后自动取消订单
cancelOrder(order_id);
}, 30 * 60 * 1000);
// 用户支付成功,取消定时任务
if (paymentSuccess) {
TQ("order")->cancelTask(task_id);
}TQ("network")->postRetryTask([]() -> bool {
return sendRequest(); // 返回 true 表示成功
}, 3, { // 最多重试3次
.type = TaskQueue::RetryStrategy::Type::Exponential,
.base_delay_ms = 1000 // 1s, 2s, 4s 指数退避
});// CPU 密集型任务
size_t thread_count = std::thread::hardware_concurrency();
// IO 密集型任务
size_t thread_count = std::thread::hardware_concurrency() * 2;
auto queue = std::make_unique<TaskQueue>(
std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
new TaskQueueSTD("worker", thread_count)
)
);本任务队列系统通过清晰的设计和严谨的实现,展示了:
这是一个生产级别的任务队列实现,适合用于:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。