我使用boost::lockfree::spsc_queue将流数据从线程发送到工作线程。
这些是项目的结构:
struct spsc_queue_item
{
uint8_t ID;
void *data;
};
数据由spsc_queue.push插入,另一个线程通过spsc_queue.pop读取。
但是我也有一些关于工作线程的“命令”。就像ID 0是“启动过滤器”,ID 1是“停止过滤器”,ID 2 id“数据”.
因此,如果许多“数据”被推送到队列中,那么像“停止筛选器”这样的命令将被延迟,因为首先要处理“数据”项。但是,如果命令“停止筛选”出现在“数据”项中,则“数据”项是无用的,可以丢弃。
现在我知道还有成员函数"consume_one“和"consume_all”。
但是,我没有找到一个例子,如何使用函子与这些函数。我的想法是以consume_one为例,首先检查带有ID==0或ID==1的项是否在队列中,然后再继续使用ID==2处理数据项。
有人有一个很小的例子,如何使用函子过滤出发布到请求ID的项目?
或者,是否有其他快速方法可以通过“优先级”标志从队列中获取项?
为了响应sehe的回答,更新了,提供了更多信息:
谢谢你提供这些信息。
知道怎么做才能更好吗?
我需要向工作线程发出信号,例如“启动过滤器”、“停止筛选器”、.
我在考虑使用事件:
SetEvent(hStartFilter);
但是在这里,我必须为每个命令使用、创建和关闭一个事件。
“数据”也可以有不同的ID。与工作线程一样,线程接收:
"start filter" with ID=0
"start filter" with ID=1
然后,使用ID0和ID1的“数据”进入队列。现在线程用ID0接收“停止过滤器”。因此,使用ID0的数据队列中的所有项目都是过时的,可以删除。
我的第一个测试是从队列中抓取所有的物品。检查每一个匹配的ID并删除项目。剩下的其他项目将在之后被推回队列。但是,如果有大量不同if的数据(最多32),这将是大量的CPU使用和耗时操作。队列的最大大小是2048项。
还有更好的方法吗?
发布于 2016-01-13 04:16:05
队列具有FIFO语义。没有过滤的方法,除非您只想丢弃不符合您的标准的元素。(只需包装pop
函数)
这方面的典型解决方案是使用
如果数据队列中不能有超过x个元素,请考虑使用循环缓冲区。spsc_queue
使用环形缓冲区作为底层存储。
UPDATE作为对问题编辑的响应,我决定使用每个ID的过滤状态的带外信令创建一个演示。
让我先从通常的定义开始:
static constexpr uint8_t NUM_SOURCES = 32;
现在,由警察和生产者共同使用的共同定义:
namespace queueing {
using data_t = std::vector<char>; // just for demo
struct spsc_queue_item {
uint8_t ID;
data_t data;
};
// process control
boost::atomic_bool shutdown_flag { false };
namespace statistics {
namespace events {
boost::atomic_size_t occurred { 0 };
}
namespace packets {
boost::atomic_size_t queued { 0 };
boost::atomic_size_t dropped { 0 };
boost::atomic_size_t processed { 0 };
boost::atomic_size_t skipped[NUM_SOURCES] = {};
}
boost::atomic_size_t idle_cycles { 0 };
void report();
}
// business logic
boost::atomic_bool source_enabled [NUM_SOURCES] = {}; // true:started (process) / false:stopped (skip)
boost::lockfree::spsc_queue<spsc_queue_item, boost::lockfree::capacity<2048> > shared_queue;
}
正如您所看到的,我更改了数据(因为没有void*
更容易演示)。此外,在rest运行结束时,我还添加了许多有用的统计数据,这些统计数据可以是report()
-ed。
void producer_thread() {
using namespace boost;
namespace stats = queueing::statistics;
// helpers to generate random data packets or start/stop filter events
enum kind_t { knd_data, knd_start, knd_stop };
queueing::data_t const empty {};
struct event_t { kind_t kind; spsc_queue_item item; };
// ...
// now generate queue items in a loop
while (!queueing::shutdown_flag) {
auto evt = gen_event();
std::this_thread::sleep_for(std::chrono::nanoseconds(engine()%102400));
switch(evt.kind) {
case knd_data:
stats::events::occurred++;
if (queueing::shared_queue.push(evt.item)) {
stats::packets::queued++;
} else {
stats::packets::dropped++;
}
break;
case knd_start: {
bool expected = false;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, true))
std::cout << "+";// << static_cast<int>(evt.item.ID);
}
break;
case knd_stop: {
bool expected = true;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, false))
std::cout << "-";// << static_cast<int>(evt.item.ID);
}
break;
}
}
}
线程函数的主体是非常直接的,但值得注意的是,start
和stop
事件没有通过队列进行通信。
制片人甚至更简单。它所做的就是耗尽队列,更新一些统计计数器。在处理项之前,将检查相应的筛选状态(source_enabled
):
void consumer_thread() {
namespace stats = queueing::statistics;
queueing::spsc_queue_item item;
auto consume_pending = [&] {
while (queueing::shared_queue.pop(item)) {
if (queueing::source_enabled[item.ID])
fake_process(item); // if filtering started, process
else
stats::packets::skipped[item.ID]++; // if filtering stopped, skip
}
};
while (!queueing::shutdown_flag) {
consume_pending();
stats::idle_cycles++;
}
consume_pending(); // drain any remaining queued items, to avoid race with shutdown_flag
}
现在,一切都应该是不言自明的,所以,拼凑一个main()
函数:
int main() {
using namespace std;
// check no source_enabled flags are set at start
assert(0 == count(begin(queueing::source_enabled), end(queueing::source_enabled), true));
auto producer = thread(producer_thread);
auto consumer = thread(consumer_thread);
this_thread::sleep_for(chrono::seconds(1));
queueing::shutdown_flag = true;
if (producer.joinable()) producer.join();
if (consumer.joinable()) consumer.join();
queueing::statistics::report();
}
我们的程序运行这两个线程大约1秒,并等待他们加入。然后,它报告统计数据,在我的系统上,看起来像:
++-+++++++--+++-++++-++-+++---+-+-+-+++++-+--+---+++-++---+-++-++-+-+++---++--+++-++---+----+-+-+-+--+++-++--+--+--++--+-+-+-+--+--+++--++-+-++-++-+--+--+++-++-+---+----++-+++-+-++-+----+--+-+-+--+++--+++++-+-+--++-+--++++-+-+---++-+---+-+--++---++++----+-+---+-+-+-+--+-++--+-+++--+++-+----+-+-+-+++-+++--+-++-++++++---++--+-++-++---+-+-++--+-+-----++---+-+-+--+++--++---++--+-+++-++++-+++-+-+--+++-+-+----+-++++-+--+++----+++-------+-++-+-+-++++-++++---++-+---+-++-----+-++++----+++-++++--+--+-----+-++++----++++-+++-+---+---+-+-++++-++---+-++-+-+-+++-+-+--+-----++-+++---+-++---+++-++-+--+++++------++---+-++++-+-+-+--++++-++++-+--+++-++---+-----++-+-++-+-+++--++-+-+-++-++-----+-++--+--+--+-------++--+-++-+--++-++-++--+-+-++-+-+++-++++-+---+--+++--++--+-+++++-+-----++--++--+++--++-+---++----+--+-+--++-++---+++++++-+--+-++---+----+-+-+--+-+-+--++++-++--+--+-+---+++-+++++++-++-+-----+--++------+-++++++--++-++-+---+-++---++-++------+-++--+-++-+++--+++-+++-+-+--+-+--+--+---+-+-+-+--+-++-+-++---+++-+-+-++--+-++-+---++--+-+--++-+++-+--+++---+----+--++-++++++-++-+----+++-+-+--+++-----+---+--++-+--+-++++++-+-+++--+++---+-+-++++-++-+-+----++++----+++-++----+---++-+---++-+-+-++--+++---+--+++----++-++-+++--+--+---+++--+--+--+--+--++++-++++---+-+-+--+-+-+--++++--+-+--++--++++----++-++++++-+--+-+------+-----+++----++-+++++-+--+--+---++-+-++-+--++++-+++---+++-+----+--+++++-+-+--+++--++-+++-+-++---++-++-+-+-+--+-++--+---+-+++--+++++-----+-++-+-+++-+-+-------++++---+-+-++-+--+++++---+--++-+-++-+++----+++-++++---++------+-+---++++--+-+---+++------++++++---++-+----+-+++-+--++-+-+-+-----+-++-++-++--++-+-+-++++++--++---+-+-+-+-+-+-++-++-++----++--++-+++-++---+++--+++---+++--+-+++----++--+-+-+++---++---++-+--+++++-+---++----++--+++-+--+-+++++++-+--+---+--+---+----+-++-++-+--++--+--++-++---+++++--+-+---+-++-+-+----+++-++-+-+--+---+-++-+-----++---++++--+++++-+---+-++--+-+-+----+--++++-+-----++++--++-+-+++++----+++---+++++++--+---+--+--++++--+++-----+-++--+-+-----+++++----+-++++---+-++--+-++-+++--+++-+-+++++--+----++--+--+-+-++-----++-+--++--++++++-+-+++----++++---++-+--+-+------+-+--+++++--+++--++-----+--++-++-+++++-++-------+----++-++--+--++--++++-++---+-+++++----+-++-++---+++---+-++-++----++--++--+++++-+--+-----+-+-+-+-+++-+--++-+-+++--+-+-+++-+-++--+-+-+-+--+-+-+++++---+---+-++-+---++-+-++-+-+++-++-++-+-++-------++---+-++-++++-++--++--+-++-+++---++++--+----+---+-++-+++--+-+++---+-++-++----+--+--+-++--+-++-++++++--+-++-+--+---+-+--+-+--++---+--+-++--++--+--++-++++----+--+--+++-+++-+-+-++--++-+-+---+-+-------+--++++++-++++++-++-+-++-+---+--+-+-++--+++---+----+--+--+-++----+-+-++-++-++-+++--++---++-------+++++--+-+++++++--+--+-+--++--++--++-+--+--+++----+++++-++-------++---+-+--++-++--+++-+-+-+-+------+-+--+++++-+-+--++-++-++--+++++++---+-++--+++-+++--++++-++--+-+---+----+----+---+--+-+++-+-+++++---+--++--+-+++-+++++--+---+-+++++-+---++++--+-++----+---++----+++---+++++-+-++--+--+-++-++----+---++-++-+-+-+---+++-++-+++-+---+++--+-+-----++-+---++-+---++---+-++--++++-+--++-+-++----+-+-+--++--++++--+--++--+--+-+-+++++++--++-+-+-+++--+---+++--++++++--+-+-----+---++-+++--+++--++---+++--+--+-++++-----+++-----++++--++--+-+--
Events occurred: 3061
Queued packets: 3061
Dropped packets: 0
Processed packets: 1464
Filtered (per source) 58 48 53 51 47 39 45 42 53 52 57 50 63 43 49 57 45 58 40 42 56 54 58 52 44 53 61 41 50 33 51 52
Total filtered: 1597
Idle cycles: 26408166
第一行(++-+++++++--+++-++++-++-+++---+
.)是一个简写符号,表示source_enabled[]
标志中有效更改的数量。
您可以看到,在这种情况下,队列没有饱和,并且使用者线程有相当多的空闲周期。
演示住在Coliru
完整的清单供参考:
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/atomic.hpp>
#include <boost/random.hpp>
#include <boost/bind.hpp>
#include <thread>
static constexpr uint8_t NUM_SOURCES = 32;
namespace queueing {
using data_t = std::vector<char>; // just for demo
struct spsc_queue_item {
uint8_t ID;
data_t data;
};
// process control
boost::atomic_bool shutdown_flag { false };
namespace statistics {
namespace events {
boost::atomic_size_t occurred { 0 };
}
namespace packets {
boost::atomic_size_t queued { 0 };
boost::atomic_size_t dropped { 0 };
boost::atomic_size_t processed { 0 };
boost::atomic_size_t skipped[NUM_SOURCES] = {};
}
boost::atomic_size_t idle_cycles { 0 };
void report() {
namespace stats = queueing::statistics;
std::cout << "\n";
std::cout << "Events occurred: " << stats::events::occurred << "\n";
std::cout << "Queued packets: " << stats::packets::queued << "\n";
std::cout << "Dropped packets: " << stats::packets::dropped << "\n";
std::cout << "Processed packets: " << stats::packets::processed << "\n";
std::cout << "Filtered (per source) ";
std::copy(std::begin(stats::packets::skipped), std::end(stats::packets::skipped),
std::ostream_iterator<size_t>(std::cout, " "));
std::cout << "\n";
auto total_filtered = std::accumulate(std::begin(stats::packets::skipped), std::end(stats::packets::skipped), 0ul);
std::cout << "Total filtered: " << total_filtered << "\n";
std::cout << "Idle cycles: " << stats::idle_cycles << "\n";
}
}
// business logic
boost::atomic_bool source_enabled [NUM_SOURCES] = {}; // true:started (process) / false:stopped (skip)
boost::lockfree::spsc_queue<spsc_queue_item, boost::lockfree::capacity<2048> > shared_queue;
}
void producer_thread() {
using namespace boost;
namespace stats = queueing::statistics;
// generate random data packets or start/stop filter events
using queueing::spsc_queue_item;
mt19937 engine;
auto gen_srce = bind(uniform_int<uint8_t>(0, NUM_SOURCES-1), ref(engine));
auto gen_data = [&] {
std::vector<char> v;
std::generate_n(back_inserter(v), engine()%1024, bind(uniform_int<uint8_t>{}, ref(engine)));
return v;
};
enum kind_t { knd_data, knd_start, knd_stop };
auto gen_kind = bind(uniform_int<uint8_t>(knd_data, knd_stop), ref(engine));
queueing::data_t const empty {};
//
struct event_t { kind_t kind; spsc_queue_item item; };
auto gen_event = [&] {
auto kind = static_cast<kind_t>(gen_kind());
return event_t {
kind,
spsc_queue_item {
gen_srce(),
kind == knd_data? gen_data() : empty
}
};
};
// now that we can easily generate queue items, let's do so in a loop
while (!queueing::shutdown_flag) {
auto evt = gen_event();
std::this_thread::sleep_for(std::chrono::nanoseconds(engine()%102400));
switch(evt.kind) {
case knd_data:
stats::events::occurred++;
if (queueing::shared_queue.push(evt.item)) {
stats::packets::queued++;
} else {
stats::packets::dropped++;
}
break;
case knd_start:
{
bool expected = false;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, true))
std::cout << "+";// << static_cast<int>(evt.item.ID);
}
break;
case knd_stop:
{
bool expected = true;
if (queueing::source_enabled[evt.item.ID].compare_exchange_weak(expected, false))
std::cout << "-";// << static_cast<int>(evt.item.ID);
}
break;
}
}
}
void fake_process(queueing::spsc_queue_item const& item) {
// pretend it takes time proportional to the amount of data
std::this_thread::sleep_for(std::chrono::microseconds(item.data.size()));
queueing::statistics::packets::processed++;
}
void consumer_thread() {
namespace stats = queueing::statistics;
queueing::spsc_queue_item item;
auto consume_pending = [&] {
while (queueing::shared_queue.pop(item)) {
if (queueing::source_enabled[item.ID])
fake_process(item); // if filtering started, process
else
stats::packets::skipped[item.ID]++; // if filtering stopped, skip
}
};
while (!queueing::shutdown_flag) {
consume_pending();
stats::idle_cycles++;
}
consume_pending(); // drain any remaining queued items, to avoid race with shutdown_flag
}
#include <cassert>
int main() {
using namespace std;
// check no source_enabled flags are set at start
assert(0 == count(begin(queueing::source_enabled), end(queueing::source_enabled), true));
auto producer = thread(producer_thread);
auto consumer = thread(consumer_thread);
this_thread::sleep_for(chrono::seconds(1));
queueing::shutdown_flag = true;
if (producer.joinable()) producer.join();
if (consumer.joinable()) consumer.join();
queueing::statistics::report();
}
https://stackoverflow.com/questions/34766287
复制