ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • packaged_task를 이용한 스레드 풀 구현 2
    Modern C++/Task 기반 비동기 프로그래밍 2020. 8. 28. 22:58

    이전 장에 std::async 와 동일한 작업을 수행하는 함수를 구현해 보았다. 이제는 스레드에서 호출되는 람다를 queue에 저장하는 코드를 구현해 보자 

     

    2. queue 에 task 푸시

    #include <iostream>
    #include <future>
    #include <functional>
    #include <queue>
    
    //람다 저장용 task_queue
    std::queue<std::function<void()>> task_queue;
    #include <functional>
    
    template <typename Func, typename... Args>
    std::future<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>> async(Func&& func, Args... args) {
        using Result = std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>;
        //task 객체가 현재 범위를 벋어나면 사용 할 수 없으므로 동적으로 할당 
        auto task = new std::packaged_task<Result()>(
                std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
    
        auto future = task->get_future();
        //function<void()> 형식으로 변환 후 task_queue에 push
        
        task_queue.push([task = task](){
        	//packged_task를 수행후 객체 삭제 
            (*task)();
            delete task;
        });
        return future;
    }
    
    //테스트 코드
    int main() {
        auto fut = async([](int a, int b) {
            return a + b;
        }, 10, 20);
        auto func = task_queue.front();
        task_queue.pop();
        func();
        std::cout << fut.get() << std::endl;
        return 0;
    }

    std::packaged_task<Result()>는 람다로 캡쳐되는 데 큐에 저장되고 다른 스레드에 실행될 때 현재 범위를 벗어나므로 해당 태스크를 동적할당한다. 따라서 람다는 해당 패키지 태스크를 실행한 뒤 이를 직접 삭제해야 한다. 

    이를 std::unique_ptr로 작성하면 어떻게 될 지 알아보자. 

    template <typename Func, typename... Args>
    std::future<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>> async(Func&& func, Args... args) {
        ...
        auto task = std::make_unique<std::packaged_task<Result()>>(
                std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
        ...
        task_queue.push([task = std::move(task)](){
            (*task)();
            //delete task;
        });
        return future;
    }
    

    위 함수는 task를 std::unique_ptr을 이용해 생성하고 람다 캡쳐 내 이를 이동 할당 시켰다. 하지만 해당 코드는 컴파일 되지 않는다. 

    이는 std::function<> 은 이동 연산을 지원하지 않기 때문에 내부적으로 복사가 발생하는 데, std::unique_ptr은 복사될 수 없으므로 이 코드가 컴파일 되지 않는다. 

    따라서 생 포인터를 사용하거나 std::shared_ptr을 사용해야 하는 데 여기서는 공유 포인터를 사용하기로 한다. 

     

    이제 이를 바탕으로 TaskQueue 를 구현해보자.

    TaskQueue 정의 

    template <size_t Size = 2>
    class TaskQueue {
    public:
        TaskQueue();
        ~TaskQueue();
        constexpr size_t number_of_schedulers() { return Size; }
    
        template<typename Func, typename... Args>
        std::future<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>>
        async(Func&& func, Args&&... args);
    
    private:
        void dispatch();
    private:
        std::queue<std::function<void()>> tasks_;
        std::array<std::thread, Size> schedulers_;
        std::mutex m_tasks_;
        std::condition_variable cv_tasks_;
        bool b_stop = false;
    };

    async() 함수는 callable target( 함수, 람다, 함수 객체등) 을 받아 태스크 규에 푸시한다. 

    스케줄러는 스레드로 dispatch()를 수행한다. dispatch() 함수는 루프를 돌면서 큐에 대기 중인 태스크를 가져와 이를 수행한다. 

    큐는 동기화를 위해 뮤텍스를 사용하고 큐에 태스크 입력시 조건 변수를 통해 통지하고 dispatch() 함수는  큐에 대기 중인  태스크가 없을 경우 큐에 태스크가 들어올 때 까지 대기한다.

    이를 구현한 태스크 큐(스레드 풀)의 구현 코드는 아래와 같다.  

    TaskQueue.h

    #pragma once
    
    #include <queue>
    #include <mutex>
    #include <future>
    #include <functional>
    #include <condition_variable>
    #include <array>
    using namespace std;
    
    template <size_t Size = 2>
    class TaskQueue {
    public:
        TaskQueue();
        ~TaskQueue();
        constexpr size_t number_of_schedulers() { return Size; }
    
        template<typename Func, typename... Args>
        std::future<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>>
        async(Func&& func, Args&&... args);
    
    private:
        void dispatch();
    private:
        std::queue<std::function<void()>> tasks_;
        std::array<std::thread, Size> schedulers_;
        std::mutex m_tasks_;
        std::condition_variable cv_tasks_;
        bool b_stop = false;
    };
    
    
    template <size_t Size>
    TaskQueue<Size>::TaskQueue() {
        for(auto& scheduler : schedulers_) {
            scheduler = std::move(std::thread([this]{dispatch();}));
        }
    }
    
    template <size_t Size>
    TaskQueue<Size>::~TaskQueue() {
        b_stop = true;
        cv_tasks_.notify_all();
        for(auto& scheduler : schedulers_) {
            scheduler.join();
        }
    }
    
    template <size_t Size>
    template<typename Func, typename... Args>
    std::future<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>>
    TaskQueue<Size>::async(Func&& func, Args&&... args) {
        using Result = std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>;
        auto task = std::make_shared<std::packaged_task<Result()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
        auto future = task->get_future();
        std::unique_lock<std::mutex> lock(m_tasks_);
        tasks_.push([task = task]{(*task)();});
        cv_tasks_.notify_one();
        return future;
    }
    
    template <size_t Size>
    void TaskQueue<Size>::dispatch() {
        while (!b_stop) {
            std::unique_lock<std::mutex> lock(m_tasks_);
            cv_tasks_.wait(lock, [this] { return !tasks_.empty() || b_stop; });
            if (b_stop && tasks_.empty()) {
                return;
            }
            auto task = std::move(tasks_.front());
            tasks_.pop();
            lock.unlock();
            task();
        }
    }

    아래는 10만개의 랜덤한 정수 값의 합을 태스크 큐와 메인 스레드에서 수행하는 시간을 비교하는 테스트 프로그램이다. 

    #include <iostream>
    #include <random>
    #include "TaskQueue.h"
    
    int main() {
        int sum = 0;
        auto taskQueue = TaskQueue<4>();
        constexpr size_t num_threads = taskQueue.number_of_schedulers();
    
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<int> dis(0, 100);
    
        std::array<int, 1000000> numbers = {0,};
        std::for_each(numbers.begin(), numbers.end(), [&](int& a) {
           a = dis(gen);
        });
    
        int unit = numbers.size() / num_threads;
        std::array<std::future<int>, num_threads> futures;
        auto start = std::chrono::steady_clock::now();
        for(int i = 0; i < num_threads; i++) {
            auto begin = numbers.begin() + i * unit;
            auto end = begin + unit;
            futures[i] = taskQueue.async([](int* b, int *e) {
               return std::accumulate(b, e, 0, [](int a, int b) {
                  return a + b;
               });
            }, begin, end);
        }
    
        for_each(futures.begin(), futures.end(), [&sum](future<int>& fut){
            sum += fut.get();
        });
    
        auto elapsed_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
                std::chrono::steady_clock::now() - start
        );
    
        std::cout << "asyncronous elipsed_time: " << elapsed_time.count() << "ns, sum= " << sum << std::endl;
    
        start = std::chrono::steady_clock::now();
        sum = std::accumulate(numbers.begin(), numbers.end(), 0, [](int a, int b) {
            return a + b;
        });
    
        elapsed_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
                std::chrono::steady_clock::now() - start
        );
    
        std::cout << "synchronos elipsed_time: " << elapsed_time.count() << "ns, sum= " << sum << std::endl;
    
        return 0;
    }
    
    

     

    여기까지 TaskQueue 를 구현하고 호출가능 타겟들을 인수로 받아 이를 각각의 스레드로 스케줄링해 이를 처리하는 스레드 풀의 구현을 완료한다. 

    댓글

Designed by Tistory.