Recap on EBus
From our last post on an eBus implementation. We talk about that how you can implement the Observer pattern so your can decouple the code from the callers to its callees. The perfect use case for that is obviously to register the observers for certain events such as ticks,
For example, a keyboard event listener may looks like this:
//the listener will override those methods
struct keyboard_input_listener : public ebus_handler<keyboard_events>
{
virtual void on_enter(keyboard_t* keyboard,
surface_t* surf, uint32_t key) override;
virtual void on_leave(keyboard_t* keyboard, surface_t *surf) override;
virtual void on_key(keyboard_t* keyboard,
uint32_t key, uint32_t state) override;
//other events ...
};
//then listen on the events.
keyboard_input_listener listener;
listener.connect();
//certain point a system will trigger events
ebus<keyboard_events>::event(&keyboard_events::on_enter, ...);
ebus<keyboard_events>::event(&keyboard_events::on_key, ...);
///...
ebus<keyboard_events>::event(&keyboard_events::on_leave, ...);
But actually the power of the eBus is far beyond this. On top of this system, we can build other system as well. In this post, we are going to build a (async) task system.
What exactly is a (async) task system
Think of following code executes at run-time, line by line. (Well not exactly since both compiler and CPU will optimize for us). We start by opening test.txt
for reading. We read it line by line then close it, then we open it again for appending some random integers a thousand times to it and then we close it.
int main()
{
ifstream fr("test.txt");
if (fr.is_open()) {
string s;
while (getline(fr, s))
cout << s << endl;
fr.close();
}
else
{
cerr << "error opening file" << endl;
return -1;
}
ofstream fo("test.txt", ios::out | ios::app);
if (fo.is_open()) {
random_device dev;
mt19937 rng(dev());
uniform_int_distribution<mt19937::result_type> dist6(1,100);
for (int i = 0; i < 1000; i++)
fo << dist6(rng) << endl;
fo.close();
}
else {
cerr << "error opening file" << endl;
return -1;
}
}
I assume that with some basic programming experience, you know that code above will spend most of its time waiting for IO. And you can verify that with time
command, most of the time is on spent on system side. Depends on the workload of your application, you may not want to spend your user CPU time just for waiting. This is a small example of course, but if you are waiting for reading a large file, such as 3D assets. It's likely you will halt the program for really long time.
It would be nice nice that we can submit a task and do something else, when the task is done, we may continue the next step. This is basically a task system.
task system with std::async
Before we implement such task interface in EBus, I need to mention that there is an std::async interface for task scheduling. The std::async(F&& f, Args&&, args...)
will return a std::future
, when you try to get the result of std::future
, you may be blocked until it is available.
template<typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len / 2;
//the handle is return to you immediately, the actual sum is deferred.
std::future<int> handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
//you can continue do the other half of the work without being blocked by first half
int sum = parallel_sum<RandomIt>(beg, mid);
//the handle.get() is the wait you have to do
return sum + handle.get();
}
This is a simple example of async task submission, as the comment suggested, the first recursive parallel_sum()
is submitted as a task asynchronously (in a different thread), the second parallel_sum()
is done by the current thread. You will not be blocked until the handle.get()
, by that time hopefully the first async is done already. In a sense the code is being naturally multi-threaded, without explicitly creating std::thread
.
The async work flow clearly hides many details of the thread dispatching, waiting from users, so it creates a continues workflow and making the code intuitive to understand.
task system using EBus
You may want to ask, why do that if the async feature is already done by system library, why do I need to implement myself? Beyond the reason of doing it for fun. There are actually some good reasons.
- I may not want to be limited by
std::future
for waiting. I may have other types of thread blocking mechanism - Similarly, I may not just want to schedule CPU tasks, I may want to schedule some GPU tasks.
- It is not possible to reschedule (that I will get to it a following post).
The interface
If you are convinced, then how do we implement something like std::async
using ebus
you may want to ask? Well, it is through the event dispatching interface we had before.
//broadcasting events.
static void
ebus<interface>::broadcast(function_t&& func, args_t&&... args);
//dispatch a single event.
static void
ebus<interface>::event(size_t id, function_t&& func, args_t&&... args);
//invoking a method without an instance and getting the result back
static void
ebus<interface>::invoke(result_t& result, function_t&& func, args_t&&... args);
The interface above allows us to call any methods without a class instance, as long as there is someone listen to it. Now imaging we can declare an task scheduler interface for ebus
.
struct task_scheduler_iface : public ebus_iface<ebus_type::GLOBAL>
{
//the listener need to implement add task asynchronously
virtual void m_add_task(task_base::ptr task) = 0;
//the convenience static method for ease of use
static void add_task(task_base::ptr);
};
Under the hood, it is really as simple as dispatching the event.
using task_scheduler_bus = ebus<task_scheduler_iface>;
void
task_scheduler_iface::add_task(task_base::ptr task)
{
//The task scheduling is implemented as an broadcasting event.
task_scheduler_bus::broadcast(&task_scheduler_iface::m_add_task, std::ref(task));
}
//the user would simply call
task_scheduler_iface::add_task(task_base::ptr(task));
I choose to declare a task_base
object, which is a class with pure virtual functions, you will see why a task_base
interface is really useful compared to a single std::function
in the next post.
struct task_base
{
using ptr = INTRUSIVE_NS::intrusive_ptr<task_base>;
using exec_fn = std::function<bool(void)>;
task_base(const exec_fn& fn = []() { return true; }) :
m_function(fn)
{
}
virtual ~task_base() {}
bool exec() { return m_function(); }
// this should defines a done event for the task, the subclass
virtual void task_done() = 0;
//////////////////////////////////////////////////////////////////////////
// for intrusive_ptr support
virtual void add_ref() = 0; // a default impl is ++ref_count
virtual void release() = 0; // a default impl is delete if <= 0
//////////////////////////////////////////////////////////////////////////
exec_fn m_function; // return true if success.
};
Implementing async task scheduler
At the end of day, someone has to listen to the interface and do the actual work, scheduling. But because the decoupling of interface and implementation, you can actually implement one yourself to your liking. The ebus
provides an standard implementation here called default_task_scheduler.
class default_task_scheduler : public ebus_handler<task_scheduler_iface>
{
using handler_t = ebus_handler<task_scheduler_iface>;
public:
void m_add_task(task_base::ptr task) override;
default_task_scheduler();
~default_task_scheduler();
private:
std::vector<std::unique_ptr<task_worker>> m_workers;
std::vector<std::thread> m_worker_threads;
};
The scheduler itself manages a number of workers, which listens to users calling task_scheduler_interface::add_task()
. Under the hood, it has load balancing mechanism to delegate the task to least busy worker. The scheduler
and worker
forms a typical Producer–consumer relationship, with the workers try to exhaust its work queue and sleep until the scheduler
provides new tasks to them.
Closing remarks
So far, We've seen the implementation of an async task scheduler via an special ebus interface, task_scheduler_interface
, we can see how powerful this ebus
interface is. In the next posts we will take a step further to see how we can extend beyond scheduling a single task at a time. Stay tuned.