#pragma once
#include <future>
#include <functional>
#include <Common/ThreadPool.h>
namespace DB
{
/// Interface to run task asynchronously with possibility to wait for execution.
class Executor
{
public:
virtual ~Executor() = default;
virtual std::future<void> execute(std::function<void()> task) = 0;
};
/// Executes task synchronously in case when disk doesn't support async operations.
class SyncExecutor : public Executor
{
public:
SyncExecutor() = default;
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
try
{
task();
promise->set_value();
}
catch (...)
{
try
{
promise->set_exception(std::current_exception());
}
catch (...) { }
}
return promise->get_future();
}
};
/// Runs tasks asynchronously using thread pool.
class AsyncExecutor : public Executor
{
public:
explicit AsyncExecutor(const String & name_, int thread_pool_size) : name(name_), pool(ThreadPool(thread_pool_size)) { }
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
pool.scheduleOrThrowOnError([promise, task]() {
try
{
task();
promise->set_value();
}
catch (...)
{
tryLogCurrentException("Failed to run async task");
try
{
promise->set_exception(std::current_exception());
}
catch (...)
{
}
}
});
return promise->get_future();
}
void setMaxThreads(size_t threads) { pool.setMaxThreads(threads); }
private:
String name;
ThreadPool pool;
};
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。