Advertisement

[C++]固定大小线程池

阅读量:

线程池模型

线程池是并发编程中常用的模型。
线程是一种非常宝贵的资源,创建、销毁线程都是非常消耗时间的操作,所以我们的一个思路是在程序start up的时候,创建一个保存有多个线程的缓存,这样程序运行时就不会频繁的发生创建和销毁线程的操作,从而提高了并发的效率。
保存有多个线程的这个缓存,我们一般称其为线程池(ThreadPool)。如果线程池的中线程的数量可以动态变化,我们称其为动态大小的线程池,这里讨论并实现的是固定大小的线程池

线程池中维护多个线程(thread)的同时,维护一个任务队列。所谓的任务队列,就是需要我们去并发执行的一个个的任务,说的通俗一点,就是等待执行的一个个函数。一旦任务队列不空,取出一个任务给定某个空闲的线程去执行该任务。
这是一个典型的生产者和消费者的模型,所以我们要用到mutex和conditon variable原语。除此之外,我们还应该对“任务”有一个合理的抽象。
所以线程池这个类的数据成员应该有如下几个:

复制代码
mutex_t            mutex
    2.condition_t        cond
    3.vector_t<thread_t> threadVec
    4.queue_t<task_t>    taskQ
    
    
      
      
      
      
    

其中thread_t在C++11中可以使用std::thread来替代,可是这里的task_t我们还没有定义。
实际上,task_t是对某个执行过程的抽象,所以我们可以用C++11中的function语义来替代:

复制代码
    using task_t = function<void()>
    
    
      
    

数据成员定义完毕,那么一个线程池类应该支持什么样的操作呢。

复制代码
Start()
    2.Run(Task)
    3.Stop()
    
    
      
      
      
    

Start

Start操作是抽象了初始化的操作,在构造出一个线程池对象之后,我们需要对taskQ和threadVec做出初始化操作。taskQ的初始化实际上就是置空,threadVec的初始化有些复杂,我们需要创建一些std::thread的对象。
如果你了解thread的创建你可能会有一个困惑,创建thread时需要给定一个函数指针,意味着该thread将并发执行该函数指针指向的函数,那么这里的这个线程需要执行的函数是什么呢?
我们再来明确一下线程池的作用,一旦任务队列不空,我们就取出一个任务扔给某个空闲的线程,让该线程去执行任务。
我们称这些线程为worker,对应生产者消费者模型中的消费者,它消费生产队列(taskQ)中的物品(某个具体的task),所以创建thread给定的函数指针指向的就是:消费者从生产队列取出task并执行的这个过程。

复制代码
    Start()
    {
    for(i < nThreadCount)
        create thread(fetch_and_consume)
        threadVec.push(thread)
        thread.run()
     }
    
    fetch_and_consume()
    {
    lock(mutex)
    take one task
    execute the task
     }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

Run(task)

这个函数抽象的是生产者向生产队列中加入task的过程,暴露给使用线程池的用户使用。

复制代码
    Run(task)
    {
    lock(mutex)
    taskQ.Add(task)
    cond.notify()
     }  
    
    
      
      
      
      
      
      
    

Stop()

Stop会停止线程池的运作,一般在线程池的析构函数中主动调用,为了防止线程池析构时各worker线程还没完成他们的task,所以我们一般会在Stop中join各个worker线程。

复制代码
    Stop()
    {
    for all worker in threadVec
        worker.join()
     }
    
    
      
      
      
      
      
    

实现

复制代码
    #ifndef  SIXDAY_THREAD_POOL_H
    #define  SIXDAY_THREAD_POOL_H
    
    #include <thread>
    #include <vector>
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    #include <cstdint>
    
    namespace sixday
    {
    	class FixSizeThreadPool
    	{
    	public:
    
    		using Task = std::function<void()>;
    
    	public:
    
    		explicit FixSizeThreadPool(int32_t nFixedSize);
    		~FixSizeThreadPool();
    
    		FixSizeThreadPool() = delete;
    		FixSizeThreadPool(const FixSizeThreadPool&) = delete;
    		FixSizeThreadPool(const FixSizeThreadPool&&) = delete;
    		FixSizeThreadPool& operator=(const FixSizeThreadPool&) = delete;
    		FixSizeThreadPool& operator=(const FixSizeThreadPool&&) = delete;
    
    		void Start();
    		void Run(const Task& task);
    		void Stop();
    
    	private:
    
    		std::vector<std::thread> m_Worker;
    
    		std::queue<Task> m_TaskQ;
    
    		std::mutex m_Mutex;
    
    		std::condition_variable m_Cond;
    
    		bool m_bIsRunning;
    
    		int32_t m_nWorkerCount;
    
    		void RunInThread();
    
    		Task Take();
    	};
    }
    
    #endif // ! SIXDAY_THREAD_POOL_H
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
复制代码
    #include "ThreadPool.h"
    #include <cassert>
    
    namespace sixday
    {
    	FixSizeThreadPool::FixSizeThreadPool(int32_t nFixedSize)
    	{
    		assert(nFixedSize > 0);
    		m_Worker.reserve(nFixedSize);
    		m_nWorkerCount = nFixedSize;
    		m_bIsRunning = false;
    	}
    
    	FixSizeThreadPool::~FixSizeThreadPool()
    	{
    		if (m_bIsRunning)
    		{
    			Stop();
    		}
    	}
    
    	void FixSizeThreadPool::Start()
    	{
    		assert(m_nWorkerCount > 0);
    		m_bIsRunning = true;
    		for (int32_t i = 0; i < m_nWorkerCount; ++i)
    		{
    			auto func = std::bind(&FixSizeThreadPool::RunInThread, this);
    			m_Worker.push_back(std::thread(func));
    		}
    	}
    
    	void FixSizeThreadPool::Run(const Task & task)
    	{	
    		std::unique_lock<std::mutex> lock(m_Mutex);
    		m_TaskQ.push(task);
    		m_Cond.notify_one();
    	}
    
    	void FixSizeThreadPool::Stop()
    	{
    		{
    			std::unique_lock<std::mutex> lock(m_Mutex);
    			m_bIsRunning = false;
    			m_Cond.notify_all();
    		}
    
    		for (auto& thread : m_Worker)
    		{
    			thread.join();
    		}
    	}
    
    	void FixSizeThreadPool::RunInThread()
    	{
    		while (m_bIsRunning)
    		{
    			Task task = Take();
    			if (task != nullptr)
    			{
    				task();
    			}
    		}
    	}
    
    	FixSizeThreadPool::Task FixSizeThreadPool::Take()
    	{
    		std::unique_lock<std::mutex> lock(m_Mutex);
    		while (m_TaskQ.empty() && m_bIsRunning)
    		{
    			m_Cond.wait(lock);
    		}
    
    		Task task = nullptr;
    		if (!m_TaskQ.empty())
    		{
    			task = m_TaskQ.front();
    			m_TaskQ.pop();
    		}
    		return task;
    	}
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

//Main.cpp

复制代码
    #include "CountDownLatch.h"
    #include "ThreadPool.h"
    #include <cstdio>
    using namespace sixday;
    
    static const int32_t LoopMax = 1000000;
    void PrintHello()
    {
    	for(int32_t i = 0 ; i < LoopMax; ++i)
    		printf("hello\n");
    }
    
    void PrintWorld()
    {
    	for (int32_t i = 0; i < LoopMax; ++i)
    		printf("world\n");
    }
    
    void PrintSay()
    {
    	for (int32_t i = 0; i < LoopMax; ++i)
    		printf("say\n");
    }
    
    void PrintName()
    {
    	for (int32_t i = 0; i < LoopMax; ++i)
    		printf("fancy\n");
    }
    
    int main()
    {
    	CountDownLatch latch(1);
    
    	FixSizeThreadPool threadpool(5);
    	threadpool.Start();
    	threadpool.Run(PrintHello);
    	threadpool.Run(PrintWorld);
    	threadpool.Run(PrintName);
    	threadpool.Run(PrintSay);
    
    	latch.Wait();
    	return 0;
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

全部评论 (0)

还没有任何评论哟~