Но вернемся к нашему примеру. Можно сделать так, чтобы с помощью семафора потоковая функция проверяла, сколько заданий находится в очереди. Измененная версия программы приведена в листинге 4.12.
Листинг 4.12. (
job-queue3.c) Работа с очередью заданий с применением семафора
#include <malloc.h>
#include <pthread.h>
#include <semaphore.h>
struct job {
/* Ссылка на следующий элемент связанного списка. */
struct job* next;
/* Другие поля, описывающие требуемую операцию... */
};
/* Список отложенных заданий. */
struct job* job_queue;
/* Исключающий семафор, защищающий очередь. */
pthread_mutex_t job_queue_mutex =
PTHREAD_MUTEX_INITIALIZER;
/* Семафор, подсчитывающий число гаданий в очереди. */
sem_t job_queue_count;
/* Начальная инициализация очереди. */
void initialize_job_queue() {
/* Вначале очередь пуста. */
job_queue = NULL;
/* Устанавливаем начальное значение счетчика семафора
равным 0. */
sem_init(&job_queue_count, 0, 0);
}
/* Обработка заданий до тех пор, пока очередь не опустеет. */
void* thread_function(void* arg) {
while (1) {
struct job* next_job;
/* Дожидаемся готовности семафора. Если его значение больше
нуля, значит, очередь не пуста; уменьшаем счетчик на 1.
В противном случае операция блокируется до тех пор, пока
в очереди не появится новое задание. */
sem_wait(&job_queue_count);
/* Захват исключающего семафора, защищающего очередь. */
pthread_mutex_lock(&job_queue_mutex);
/* Мы уже знаем, что очередь не пуста, поэтому без лишней
проверки запрашиваем новое задание. */
next_job = job_queue;
/* Удаляем задание из списка. */
job_queue = job_queue->next;
/* освобождаем исключающий семафор, так как работа с
очередью окончена. */
pthread_mutex_unlock(&job_queue_mutex);
/* Выполняем задание. */
process_job(next_job);
/* Очистка. */
free(next_job);
}
return NULL;
}
/* Добавление нового задания в начало очереди. */
void enqueue_job(/* Передача необходимых данных... */) {
struct job* new_job;
/* Выделение памяти для нового объекта задания. */
new_job = (struct job*)malloc(sizeof(struct job));
/* Заполнение остальных полей структуры JOB... */
/* Захватываем исключающий семафор, прежде чем обратиться
к очереди. */
pthread_mutex_lock(&job_queue_mutex);
/* Помещаем новое задание в начало очереди. */
new_job->next = job_queue;
job_queue = new_job;
/* Устанавливаем семафор, сообщая о том, что в очереди появилось
новое задание. Если есть потоки, заблокированные в ожидании
семафора, один из них будет разблокирован и
обработает задание. */
sem_post(&job_queue_count);
/* Освобождаем исключающий семафор. */
pthread_mutex_unlock(&job_queue_mutex);
}
Прежде чем извлекать задание из очереди, каждый поток дожидается семафора. Если счетчик семафора равен нулю, т.е. очередь пуста, поток блокируется до тех пор, пока в очереди не появится новое задание и счетчик не станет положительным.
Функция enqueue_job() добавляет новое задание в очередь. Подобно потоковой функции, она захватывает исключающий семафор, перед тем как обратиться к очереди. После добавления задания функция enqueue_job() устанавливает семафор, сообщая потокам о том, что задание доступно. В программе, показанной в листинге 4.12, потоки никогда не завершаются: если задания не поступают в течение длительного времени, все потоки переводятся в режим блокирования функцией sem_wait().
4.4.6. Сигнальные (условные) переменные
Мы узнали, как с помощью исключающего семафора защитить переменную от одновременного доступа со стороны двух и более потоков и как посредством обычного семафора реализовать счетчик обращений, доступный нескольким потокам. Сигнальная переменная (называемая также условной переменной) — это третий элемент синхронизации в Linux. Благодаря ему можно задавать более сложные условия выполнения потоков.
Предположим, требуется написать потоковую функцию, которая входит в бесконечный цикл, выполняя на каждой итерации какие-то действия. Но работа цикла должна контролироваться флагом: действие выполняется только в том случае, когда он установлен.
В листинге 4.13 показан вариант такой программы. На каждой итерации цикла потоковая функция проверяет, установлен ли флаг. Поскольку к флагу обращается сразу несколько потоков, он защищается исключающим семафором. Подобная реализация является корректной, но она неэффективна. Если флаг не установлен, потоковая функция будет впустую тратить ресурсы процессора, занимаясь бесцельными проверками флага, а также захватывая и освобождая семафор. На самом деле необходимо как-то перевести функцию в неактивный режим, пока какой-нибудь другой поток не установит этот флаг.
Листинг 4.15. (
spin-condvar.c) Простейшая реализация сигнальной переменной
#include <pthread.h>
int thread_flag;
pthread_mutex_t thread_flag_mutex;
void initialize_flag() {
pthread_mutex_init(&thread_flag_mutex, NULL);
thread_flag = 0;
}
/* Если флаг установлен, многократно вызывается функция do_work().
В противном случае цикл работает вхолостую. */
void* thread_function(void* thread_arg) {
while (1) {
int flag_is_set;
/* Защищаем флаг с помощью исключающего семафора. */
pthread_mutex_lock(&thread_flag_mutex);
flag_is_set = thread_flag;
pthread_mutex_unlock(&thread_flag_mutex);
if (flag_is_set)
do_work();
/* Если флаг не установлен, ничего не делаем. Просто переходим
на следующую итерацию цикла. */
}
return NULL;
}
/* Задаем значение флага равным FLAG_VALUE. */
void set_thread_flag(int flag_value) {
/* Защищаем флаг с помощью исключающего семафора. */
pthread_mutex_lock(&thread_flag_mutex);
thread_flag = flag_value;
pthread_mutex_unlock(&thread_flag_mutex);
}
Сигнальная переменная позволяет организовать такую проверку, при которой поток либо выполняется, либо блокируется. Как и в случае семафора, поток может ожидать сигнальную переменную. Поток A, находящийся в режиме ожидания, блокируется до тех пор, пока другой поток. Б, не просигнализирует об изменении состояния этой переменной. Сигнальная переменная не имеет внутреннего счетчика, что отличает ее от семафора. Поток А должен перейти в состояние ожидания до того, как поток Б пошлет сигнал. Если сигнал будет послал раньше, он окажется потерянным и поток А заблокируется, пока какой-нибудь поток не пошлет сигнал еще раз.
Вот как можно сделать предыдущую программу более эффективной.
■ Функция thread_function() в цикле проверяет флаг. Если он не установлен, поток переходит в режим ожидания сигнальной переменной.
■ Функция set_thread_flag() устанавливает флаг и сигнализирует об изменении условной переменной. Если функция thread_function() была заблокирована в ожидании сигнала, она разблокируется и снова проверяет флаг.
Но существует одна проблема: возникает гонка между операцией проверки флага и операцией сигнализирования или ожидания сигнала. Предположим, что функция thread_function() проверяет флаг и обнаруживает, что он не установлен. В этот момент планировщик Linux прерывает выполнение данного потока и активизирует главную программу. По стечению обстоятельств программа как раз находится в функции set_thread_flag(). Она устанавливает флаг и сигнализирует об изменении условной переменной. Но поскольку в данный момент нет потока, ожидающего получения этого сигнала (вспомните, что функция thread_function() была прервана перед тем, как перейти в режим ожидания), сигнал окажется потерян. Когда Linux вновь активизирует дочерний поток, он начнет ждать сигнал, который, возможно, никогда больше не придет.
Чтобы избежать этой проблемы, необходимо одновременно захватить и флаг, и сигнальную переменную с помощью исключающего семафора. К счастью, в Linux это предусмотрено. Любая сигнальная переменная должна использоваться совместно с исключающим семафором для предотвращения состояния гонки. Наша потоковая функция должна следовать такому алгоритму: