Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
495 views
in Technique[技术] by (71.8m points)

multithreading - c++ work queues with blocking

This question should be a little simpler than my last few. I've implemented the following work queue in my program:

Pool.h:

// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
    public:
        tpool( std::size_t tpool_size );
        ~tpool();
        template< typename Task >
        void run_task( Task task ){
        boost::unique_lock< boost::mutex > lock( mutex_ );
            if( 0 < available_ ) {
                --available_;
                io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function< void() > ( task ) ) );
            }
        }
    private:
        boost::asio::io_service io_service_;
        boost::asio::io_service::work work_;
        boost::thread_group threads_;
        std::size_t available_;
        boost::mutex mutex_;
        void wrap_task( boost::function< void() > task );
};
extern tpool dbpool;
#endif

pool.cpp:

#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include "pool.h"
tpool::tpool( std::size_t tpool_size ) : work_( io_service_ ), available_( tpool_size ) {
    for ( std::size_t i = 0; i < tpool_size; ++i ){
        threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) );
    }
}
tpool::~tpool() {
    io_service_.stop();
    try {
        threads_.join_all();
    }
    catch( ... ) {}
}
void tpool::wrap_task( boost::function< void() > task ) {
    // run the supplied task
    try {
        task();
    } // suppress exceptions
    catch( ... ) {
    }
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
}
tpool dbpool( 50 );

The problem is, though, is that not all my calls to run_task() are being completed by worker threads. I'm not sure if it's because it's not entering into the queue or because the task vanishes when the thread that created it exits.

So my question is, is there anything special I have to give to boost::thread to make it wait until the queue is unlocked? and what is the expected lifetime of a task entered into a queue? Do the tasks go out of scope when the thread that created them exits? If so, how can I prevent that from happening?

Edit: I've made the following changes to my code:

template< typename Task >
void run_task( Task task ){ // add item to the queue
    io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function< void() > ( task ) ) );
}

and am now seeing all entries being entered correctly. However, I am left with one lingering question: What is the lifetime of tasks added to the queue? Do they cease to exists once the thread that created them exits?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Well. That's really quite simple; You're rejecting the tasks posted!

template< typename Task >
void run_task(task task){
    boost::unique_lock<boost::mutex> lock( mutex_ );
    if(0 < available_) {
        --available_;
        io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task )));
    }
}

Note that the lock "waits" until the mutex is not owned by a thread. This might already be the case, and possibly when available_ is already 0. Now the line

if(0 < available_) {

This line is simply the condition. It's not "magical" because you're holding the mutex_ locked. (The program doesn't even know that a relation exists between mutex_ and available_). So, if available_ <= 0 you will just skip posting the job.


Solution #1

You should use the io_service to queue for you. This is likely what you wanted to achieve in the first place. Instead of keeping track of "available" threads, io_service does the work for you. You control how many threads it may use, by running the io_service on as many threads. Simple.

Since io_service is already thread-safe, you can do without the lock.

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <iostream>

// tpool class
// It's always closed. :glasses:
#ifndef __POOL_H
#define __POOL_H
class tpool {
    public:
        tpool( std::size_t tpool_size );
        ~tpool();

        template<typename Task>
        void run_task(Task task){
            io_service_.post(task);
        }
    private:
        // note the order of destruction of members
        boost::asio::io_service io_service_;
        boost::asio::io_service::work work_;

        boost::thread_group threads_;
};

extern tpool dbpool;
#endif

#include <boost/asio/io_service.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
//#include "pool.h"

tpool::tpool(std::size_t tpool_size) : work_(io_service_) {
    for (std::size_t i = 0; i < tpool_size; ++i)
    {
        threads_.create_thread( 
                boost::bind(&boost::asio::io_service::run, &io_service_) 
            );
    }
}

tpool::~tpool() {
    io_service_.stop();

    try {
        threads_.join_all();
    }
    catch(...) {}
}

void foo() { std::cout << __PRETTY_FUNCTION__ << "
"; }
void bar() { std::cout << __PRETTY_FUNCTION__ << "
"; }

int main() {
    tpool dbpool(50);

    dbpool.run_task(foo);
    dbpool.run_task(bar);

    boost::this_thread::sleep_for(boost::chrono::seconds(1));
}

For shutdown purposes, you will want to enable "clearing" the io_service::work object, otherwise your pool will never exit.


Solution #2

Don't use io_service, instead roll your own queue implementation with a condition variable to notify a worker thread of new work being posted. Again, the number of workers is determined by the number of threads in the group.

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

void the_work(int id)
{
    std::cout << "worker " << id << " entered
";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work
";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done
";
}

int main()
{
    thread_pool pool; // uses 1 thread per core

    for (int i = 0; i < 10; ++i)
        pool.enqueue(bind(the_work, i));
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...