Личный опыт разработки ПО

Сборник рецептов

Архив за 11 2011

Многопоточность, баги и списки инициализации

19 комментариев »

Разработка многопоточных программ – задача нетривиальная. Локализовать место ошибки сложно, отладка затруднена, иногда даже возникает желание все свалить на ошибки в сторонних библиотеках и/или компилятор/операционную систему. Так делать не надо. Конечно вероятность такая есть, но она несравненно меньше того, что виновником ошибки являетесь именно вы.

Например приложение работает хорошо, ну скажем в 95 случаях из 100, но в оставшихся пяти оно может зависнуть или вылететь с segmentation fault. А может упасть.

Посмотрим на следующий код:

#include <condition_variable>
#include <iostream>
#include <queue>
#include <mutex>
#include <thread>
 
class Worker
{
public:
    Worker()
        : shutdown_(false)
        , thread_(&Worker::taskProcessor, this)
    {
    }
 
    ~Worker()
    {
        {
            std::lock_guard<decltype(mutex_)> lock(mutex_);
            shutdown_ = true;
        }
 
        hasTask_.notify_one();
 
        thread_.join();
    }
 
    void addTask(int taskId)
    {
        {
            std::lock_guard<decltype(mutex_)> lock(mutex_);
            std::cout << "Adding task " << taskId << std::endl;
            tasks_.push(taskId);
        }
 
        hasTask_.notify_one();
    }
 
private:
    void taskProcessor()
    {
        while (true)
        {
            int task = 0;
 
            {
                std::unique_lock<decltype(mutex_)> lock(mutex_);
 
                if (shutdown_)
                    return;
 
                while (tasks_.empty())
                {
                    hasTask_.wait(lock);
 
                    if (shutdown_)
                        return;
                }
 
                task = tasks_.front();
                tasks_.pop();
            }
 
            // освободили мютекс, чтобы не блокировать другие потоки и делаем работу
            std::cout << "Processing task " << task << std::endl;
        }
    }
 
private:
    bool shutdown_;
 
    std::queue<int> tasks_;
 
    std::thread thread_;
    std::mutex mutex_;
    std::condition_variable hasTask_;
};
 
int main(int argc, char* argv[])
{
    Worker worker;
 
    for (int i = 1; i < 6; ++i)
    {
        worker.addTask(i);
    }
 
    return 0;
}

Здесь приведен простой класс, объект которого обрабатывает некоторые задачи в параллельном потоке. Из основного потока функцией addTask можно добавить задачу в очередь и известить об этом параллельный поток. Поток проснувшись, возьмет из очереди задачу, обработает ее и если задач больше нет – уснет. В деструкторе, прежде чем объект будет уничтожен происходит ожидание обработки всех задач в очереди.

На первый взгляд все прозрачно, но в некоторых случаях, мы вылетим по ассерту из стандартной библиотеки (и даже можем начать грешить на его разработчиков) или зависнем, но повторюсь – ошибка здесь наша.

Проблема в списке инициализации. Как мы помним, конструирование членов класса происходит в порядке их объявления. В нашем случае сначала будет создан объект типа std::thread и только после него std::mutex. Если звезды сложатся удачно, то запуск параллельного потока займет некоторое время, за которое в основном потоке будет сконструирован мютекс и программа будет работать. Если со звездами не сложиться, то поток начав работу попробует заблокировать мютекс который еще не сконструирован, со всеми вытекающими. Решение простое, но не самое лучшее, так как делает код хрупким – изменить положение членов в списке инициализации:

boost::mutex Mutex_;
std::condition_variable hasTask_;
std::thread thread_;

Лучше будет разделить конструирование и инициализацию:

class Worker
{
public:
	static std::unique_ptr<Worker> create()
	{
		std::unique_ptr<Worker> worker(new Worker());
		worker->init();
		return worker;
	}
 
	...
 
private:
	Worker()
		: shutdown_(false)
	{
	}
 
	void init()
	{
		thread_ = std::thread(&Worker::taskProcessor, this));
	}
 
	...
 
};

15th Ноябрь 2011
21:38

Рубрика: C++

Метки: , ,