## Saturday, September 14, 2013

### A basic async queueless and lockless multi threaded boost::asio producer consumer example

Sean Parents talk at this years GoingNative2013 event about "Tasks" and his critic of futures got me thinking about my use of multi threaded async asio responders

Seans talk is online at http://channel9.msdn.com/Events/GoingNative/2013/Cpp-Seasoning and it is well worth the time to watch it and the rest of the series.

I dont quite see how Seans "Tasks" idea can work without any kind of mutex/locks but i might have miss-understood him.. The asio io_service's post method is thread safe and im not certain(and to lazy to check) but it seems like that would suggest that the posted message queue inside the asio should be mutex protected... The sample code i have created here has no locks at all..

Besides that my code is unfortunately old(i just cut it down for a post out).. it still uses boosts threads and binds instead of the newer std threads and lamdbas. But hey whats a blog post without bugs, typos and caveats..

Basically this code is all about passing messages (ie a request for some jobs sub task to be done) between a producer thread and a consumer thread using the asios "post" method.

The code is a multi threaded async event processing model so it can be confusing to read. Boosts Asio offers a busy system for io_service (called boost::asio::io_service::work) to keep the threads alive but i prefer to use a async heartbeat event fired by a timer.. that way you can periodically monitor the threads health and take timed actions related to it eaiser... The core operation is that threads call into "run" and boot off the "start" to generate the initial post messages for send_msg and then the task goes on generates more "post" for both the producer and consumer sides of the job, until it reaches the end point at 3mil messages..

It uses no locks to achieve this and can pass the 3million messages(which are just ints) in about 9-10 secs on a intel i5-2450 notebook.

UPDATE: forgot to mention there is no backoff throttle between the producer and consumer so the producer can flood out the system with messages. If you want to fix that just add a Queue or Circular buffer for the messages between them. The producer times out when the queue level hits an upper threshold, and posts are sent only if the consumer is at the lower threashold/idling/or a check is done at each heartbeat.

Another option is to do this via a virtual queue(like the counters from the circular buffer) e.g use 2 message counters one for consumed and one for produced.. the delta of them is the queue message dept.. youll note that this double counter option is also lockless..

#include <iostream>

#include <boost/bind.hpp>
#include <boost/asio.hpp>

#include <boost/function.hpp>

/* RESULTS
DEBUG 116 send msg:3000000
DEBUG 56 recv msg:3000000
DEBUG 101 tock
DEBUG 90 run ended ??
DEBUG 42 tick
DEBUG 32 run ended ??

real    0m9.050s
user    0m0.000s
sys     0m0.031s
*/

const int EXIT_COUNT=3000000;

typedef uint32_t Message;

class Consumer
{
public:
Consumer() :
timer_(io_service_)
{}

void run()
{
std::cout << "DEBUG " << __LINE__ << " run started\n";

running_ = true;
start();

io_service_.run();
running_ = false;

std::cout << "DEBUG " << __LINE__ << " run ended ??\n";
}

void start()
{
io_service_.post(boost::bind(&Consumer::do_heartbeat, this));
}

void do_heartbeat()
{
std::cout << "DEBUG " << __LINE__ << " tick\n";
if (running_)
{
timer_.expires_from_now(boost::posix_time::seconds(1));
timer_.async_wait(boost::bind(&Consumer::do_heartbeat, this));
}
}

void recv_msg(Message msg)
{
if (msg >= EXIT_COUNT)
running_ = false;

if (msg % 1000 == 0)
std::cout << "DEBUG " << __LINE__ << " recv msg:" << msg << "\n";
}

void check_que(Message msg)
{
io_service_.post(boost::bind(&Consumer::recv_msg, this, msg));
}

boost::asio::io_service io_service_;
bool running_;
};

class Producer
{
public:
Producer() :
timer_(io_service_),
count_(0)
{}

void run()
{
std::cout << "DEBUG " << __LINE__ << " run started\n";

running_ = true;
start();

io_service_.run();
running_ = false;

std::cout << "DEBUG " << __LINE__ << " run ended ??\n";
}

void start()
{
io_service_.post(boost::bind(&Producer::do_heartbeat, this));
io_service_.post(boost::bind(&Producer::send_msg, this));
}

void do_heartbeat()
{
std::cout << "DEBUG " << __LINE__ << " tock\n";
if (running_)
{
timer_.expires_from_now(boost::posix_time::seconds(1));
timer_.async_wait(boost::bind(&Producer::do_heartbeat, this));
}
}

void send_msg()
{
count_++;

callback_(count_);

if (count_ % 1000 == 0)
std::cout << "DEBUG " << __LINE__ << " send msg:" << count_ << "\n";

if (count_ > EXIT_COUNT)
running_ = false;
else
io_service_.post(boost::bind(&Producer::send_msg, this));
}

template <class C>
void setCallback(C callback)
{
callback_ = callback;
}

boost::function<void (Message)> callback_; // reciver for signaling..

uint32_t count_;

boost::asio::io_service io_service_;
bool running_;
};

int main(int argc, char* argv[])
{
try
{
Producer p;
Consumer c;

p.setCallback(boost::bind(&Consumer::check_que, &c, _1));