Wednesday, September 25, 2013

Great resouce on good code design

This post was a nice run down on the design principles used for reusable and reworkable designs.
http://net.tutsplus.com/tutorials/how-to-write-code-that-embraces-change/

And there are less excuses now with C++11. Decoupling interfaces is a blast with lamdbas, Of course the older c++ versions could use boost::bind, it was just as effective. In my post a week or 2 back u can see the almost full decoupling of a producer consumer system via the bind method Here - Check for the "setCallback" function.

logging top CPU and MEM usage

Hacked a script to dump a log of the top CPU and MEM processing for a machine a while back.. All the info for it was nicked from here:
http://unixskylab.wordpress.com/2009/01/05/my-favorite-aix-top-resource-consuming-commands/

I figure its a rather useful little script to keep handy... so here is a quick reproduction but I don't have an AIX machine at home so its 50-50 that the AIX version will work off the bat... And after a couple of tweeks its ready and tested for ubuntu...

For AIX (untested)
while [ 1 ]; do 
  DATE=`date "+%Y%m%d"` 
  LOG=cpu_mem.log.$DATE 
  if [ 1 ]; then 
    echo; 
    date; 
    ps aux | head -1; 
    echo "TOP MEM"; 
    ps aux | sort -rn +3 | head; 
    echo "TOP CPU"; 
    ps aux | sort -rn +2 | head -10; 
    sleep 10; 
  fi | tee -a $LOG 
done 

For Unbuntu
while [ 1 ]; do 
  DATE=`date "+%Y%m%d"` 
  LOG=cpu_mem.log.$DATE 
  if [ 1 ]; then 
    echo; 
    date; 
    ps aux | head -1; 
    echo "TOP MEM"; 
    ps aux | sort -rnk4 | head; 
    echo "TOP CPU"; 
    ps aux | sort -rnk3 | head -10; 
    sleep 10; 
  fi | tee -a $LOG 
done 

Git show the version prior to the commit hash

getting the verion prior to the current hash point in git is yet another one of the annoying little bits of pointless info that makes people hate unix tools...

http://stackoverflow.com/questions/338436/is-there-a-quick-git-command-to-see-an-old-version-of-a-file

The secret is the "~1" which means prior to the commit hash number you gave. EG
git show commithash~1:path/to/file 

Saturday, September 14, 2013

A basic async queueless and lockless multi threaded boost::asio producer consumer example

Sean Parents talk at this years GoingNative2013 event about "Tasks" and his critic of futures got me thinking about my use of multi threaded async asio responders

Seans talk is online at http://channel9.msdn.com/Events/GoingNative/2013/Cpp-Seasoning and it is well worth the time to watch it and the rest of the series.

I dont quite see how Seans "Tasks" idea can work without any kind of mutex/locks but i might have miss-understood him.. The asio io_service's post method is thread safe and im not certain(and to lazy to check) but it seems like that would suggest that the posted message queue inside the asio should be mutex protected... The sample code i have created here has no locks at all..

Besides that my code is unfortunately old(i just cut it down for a post out).. it still uses boosts threads and binds instead of the newer std threads and lamdbas. But hey whats a blog post without bugs, typos and caveats..

Basically this code is all about passing messages (ie a request for some jobs sub task to be done) between a producer thread and a consumer thread using the asios "post" method.

The code is a multi threaded async event processing model so it can be confusing to read. Boosts Asio offers a busy system for io_service (called boost::asio::io_service::work) to keep the threads alive but i prefer to use a async heartbeat event fired by a timer.. that way you can periodically monitor the threads health and take timed actions related to it eaiser... The core operation is that threads call into "run" and boot off the "start" to generate the initial post messages for send_msg and then the task goes on generates more "post" for both the producer and consumer sides of the job, until it reaches the end point at 3mil messages..

It uses no locks to achieve this and can pass the 3million messages(which are just ints) in about 9-10 secs on a intel i5-2450 notebook.

UPDATE: forgot to mention there is no backoff throttle between the producer and consumer so the producer can flood out the system with messages. If you want to fix that just add a Queue or Circular buffer for the messages between them. The producer times out when the queue level hits an upper threshold, and posts are sent only if the consumer is at the lower threashold/idling/or a check is done at each heartbeat.

Another option is to do this via a virtual queue(like the counters from the circular buffer) e.g use 2 message counters one for consumed and one for produced.. the delta of them is the queue message dept.. youll note that this double counter option is also lockless..

#include <iostream>

#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>

#include <boost/thread/mutex.hpp>
#include <boost/function.hpp>

/* RESULTS
DEBUG 116 send msg:3000000
DEBUG 56 recv msg:3000000
DEBUG 101 tock
DEBUG 90 run ended ??
DEBUG 42 tick
DEBUG 32 run ended ??

real    0m9.050s
user    0m0.000s
sys     0m0.031s
 */

const int EXIT_COUNT=3000000;

typedef uint32_t Message;

class Consumer
{
public:
    Consumer() :
        timer_(io_service_)
    {}

    void run()
    {
        // WARNING THREAD CORE LOOP!
        std::cout << "DEBUG " << __LINE__ << " run started\n";

        running_ = true;
        start();

        io_service_.run();
        running_ = false;

        std::cout << "DEBUG " << __LINE__ << " run ended ??\n";
    }

    void start()
    {
        io_service_.post(boost::bind(&Consumer::do_heartbeat, this));
    }

    void do_heartbeat()
    {
        std::cout << "DEBUG " << __LINE__ << " tick\n";
        if (running_)
        {
            timer_.expires_from_now(boost::posix_time::seconds(1));
            timer_.async_wait(boost::bind(&Consumer::do_heartbeat, this));
        }
    }

    void recv_msg(Message msg)
    {
        if (msg >= EXIT_COUNT)
            running_ = false;

        if (msg % 1000 == 0)
            std::cout << "DEBUG " << __LINE__ << " recv msg:" << msg << "\n";
    }

    void check_que(Message msg)
    {
        //WARNING external thread entry point
        io_service_.post(boost::bind(&Consumer::recv_msg, this, msg));
    }

    boost::asio::io_service io_service_;
    boost::asio::deadline_timer timer_;
    bool running_;
};


class Producer
{
public:
    Producer() :
        timer_(io_service_),
        count_(0)
    {}

    void run()
    {
        // WARNING THREAD CORE LOOP!
        std::cout << "DEBUG " << __LINE__ << " run started\n";

        running_ = true;
        start();

        io_service_.run();
        running_ = false;

        std::cout << "DEBUG " << __LINE__ << " run ended ??\n";
    }

    void start()
    {
        io_service_.post(boost::bind(&Producer::do_heartbeat, this));
        io_service_.post(boost::bind(&Producer::send_msg, this));
    }

    void do_heartbeat()
    {
        std::cout << "DEBUG " << __LINE__ << " tock\n";
        if (running_)
        {
            timer_.expires_from_now(boost::posix_time::seconds(1));
            timer_.async_wait(boost::bind(&Producer::do_heartbeat, this));
        }
    }

    void send_msg()
    {
        count_++;

        callback_(count_);

        if (count_ % 1000 == 0)
            std::cout << "DEBUG " << __LINE__ << " send msg:" << count_ << "\n";

        if (count_ > EXIT_COUNT)
            running_ = false;
        else
            io_service_.post(boost::bind(&Producer::send_msg, this));
    }

    template <class C>
    void setCallback(C callback)
    {
        callback_ = callback;
    }

    boost::function<void (Message)> callback_; // reciver for signaling..

    uint32_t count_;

    boost::asio::io_service io_service_;
    boost::asio::deadline_timer timer_;
    bool running_;
};

int main(int argc, char* argv[])
{
    try
    {
        Producer p;
        Consumer c;

        p.setCallback(boost::bind(&Consumer::check_que, &c, _1));

        boost::thread pThread(boost::bind(&Producer::run, &p ));
        boost::thread cThread(boost::bind(&Consumer::run, &c ));

        pThread.join();
        cThread.join();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }

    return 0;
}