Рассматриваемые примитивы служат принципиально различным целям. Мьютекс, как уже было сказано ранее, предназначен в первую очередь для регламентации доступа к участкам программного кода. Семафоры же больше предназначены для регламентации порядка доступа к определенным объектам данных. Классическими задачами этого класса являются задачи «производитель-потребитель», когда M производителей создают некоторые объекты данных (читая эти данные с реальных внешних устройств, или создавая их как результат только внутренних вычислений, или любым другим способом), а N потребителей независимо берут произведенные объекты данных на последующую обработку.
Это настолько общий и часто встречающийся класс задач, что покажем для него простейший «скелет» в виде отдельного приложения, в котором отслеживание порядка доступа потребителей будет осуществлять счетный семафор (файл sy22.cc). Для простоты понимания приложение сделано как трансформация кода предшествующей группы тестов. В качестве имитации производства объекта данных, как и в качестве его обработки потребителем, используется пассивная пауза (delay()) на случайную величину (производство и обработка объектов данных в коде не показаны, так как это не относится к существу рассматриваемого — нас интересуют процессы синхронизации этих операций, а не сами операции).
Кроме основной нашей цели это приложение дополнительно демонстрирует:
• Практическое использование принудительного завершения (отмены) потоков «извне» с управлением состоянием завершаемости потоков и расстановкой точек отмены, о чем мы уже говорили ранее.
• Использование атомарных (непрерываемых) операций (например, atomic_add_value()), о которых мы будем говорить чуть позже.
• Использование реентерабельных форм функций стандартной библиотеки, безопасных в многопоточной среде (rand_r() вместо rand()).
Один производитель — T потребителей
#include <stdlib.h>
#include <stdio.h>
#include <iostream.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <semaphore.h>
#include <atomic.h>
const int D = 10;
unsigned int T = 2;
static sem_t sem;
pthread_t* tid;
void* writer(void* data) {
unsigned long i = (int)(data); // общий размер выборки
unsigned int s = 1;
while (i-- > 0) {
delay((long)rand_r(&s) * D / RAND_MAX + 1);
sem_post(&sem); // объект данных произведен
}
for (i = 0; i < T; i++) pthread_cancel(tid[i + 1]);
return NULL;
}
static char *str; // строка результирующей диагностики
static volatile unsigned ind = 0;
void* reader(void*) {
char tid[8];
sprintf(tid, "%X", pthread_self());
unsigned int s = rand();
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
while(true) {
sem_wait(&sem); // получен объект данных
str[atomic_add_value(&ind, 1)] = *tid;
pthread_testcancel();
delay((long)rand_r(&s) * D * T / RAND_MAX + 1);
}
return NULL;
}
int main(int argc, char *argv[]) {
unsigned long N = 1000;
int opt, val;
while ((opt = getopt(argc, argv, "n:t:")) != -1) {
switch(opt) {
case 'n':
if (sscanf(optarg, "%i", &val) != 1)
cout << "parse command line error" << endl, exit(EXIT_FAILURE);
if (val > 0) N = val;
break;
case 't':
if (sscanf(optarg, "%i", &val) != 1)
cout << "parse command line error" << endl, exit(EXIT_FAILURE);
if (val > 0) T = val;
break;
default:
exit(EXIT_FAILURE);
}
}
str = new char[N + 1];
tid = new pthread_t[T + 1];
if (sem_init(&sem, 0, 0))
perror("semaphore init"), exit(EXIT_FAILURE);
if (pthread_create(tid, NULL, writer, (void*)N) >= EOK)
perror("writer create error"), exit(EXIT_FAILURE);
for (int i = 0; i < T; i++)
if (pthread_create(tid + i + 1, NULL, reader, NULL) != EOK)
perror("reader create error"), exit(EXIT_FAILURE);
for (int i = 0; i < T; i++)
pthread_join(tid[i], NULL);
sem_destroy(&sem);
delete [] tid;
str[ind] = ' ';
cout << str << endl;
delete [] str;
exit(EXIT_SUCCESS);
}
Вот как выглядит результат выполнения этой программы (во избежание внесения дополнительного синхронизма в качестве общего числа циклов «производства» и числа потоков потребителей выбраны взаимно простые числа):
# sy22 -n200 -t13
3456789ABCDEF7936A8547E39DCB45F67A59B84D37EC64F395B6AEF78B9DF34CB53B86A5FEDF975B3A8EC46FB8AD954736FA78C3ED46F7B594EC7B83AC6F9D4BCE569A73F86BCAD74C536EB79F5C8DA5B463EFBC7D937AEC85FDE4566CAF69DE7F385CA6
Хорошо видно, как строго последовательный поначалу порядок доступа потребителей к объектам данных десинхронизируется и становится хаотическим: каждый освободившийся потребитель приступает к работе над следующим объектом данных, как только тот становится доступен.
Атомарные операции не относятся к элементам синхронизации параллельных ветвей программы. Но им следует уделить внимание по двум причинам. Во-первых, атомарные операции — это простое и эффективное средство, позволяющее во многих случаях избежать использования механизмов синхронизации. А во-вторых, атомарные операции зачастую выпадают из рассмотрения из-за их двойственного положения: при обсуждении параллелизма и синхронизации они не рассматриваются, потому что не являются элементами синхронизации, а при обсуждении последовательных программ не рассматриваются потому, что здесь в них просто нет необходимости.
Атомарные операции — это операции, для которых гарантируется их непрерываемость даже при выполнении на симметричных мультипроцессорных платформах. Выполнение атомарных операций не прерывается даже асинхронными аппаратными прерываниями. Таким образом, эта группа операций является также и безопасной в многопоточном окружении.
Действительно, наиболее часто примитивы синхронизации применяются для создания критической секции кода с целью предотвращения возможности одновременного воздействия на объекты данных со стороны нескольких параллельно развивающихся ветвей программы.
При одновременной работе с данными из различных потоков состояние данных после такого воздействия должно считаться «неопределенным», при этом последствия могут быть более тяжкими, чем просто некорректное состояние данных - структура сложных объектов может быть просто разрушена.
В многопоточной среде элементарные и привычные операции могут таить в себе опасности. Действительно, простейший оператор вида:
i = i + 1;
содержит в себе опасность, если этот оператор записан в функции потока, выполняемой несколькими экземплярами потоков (совершенно типичный случай). Не менее опасен, но менее очевиден по внешнему виду и оператор:
i += 1;
Даже операторы инкремента и декремента (++i и --i), которые в системе команд практически всех типов процессоров выполняются как атомарные и которые являются основой для реализации семафорных операций, в симметричной мультипроцессорной архитектуре перестают быть безопасными. Хуже того, привычные программисту операции стандартной библиотеки и просто синтаксические конструкции языка становятся небезопасными в многопоточной среде. Вот еще два примера:
1. Оператор копирования нетипизированного блока памяти, безбоязненно используемый десятилетиями:
void* memcpy(void* dst, const void* src, size_t length);
2. Операторы присваивания, инициализации или сравнения структурированных объектов данных:
struct X {
X(const X& y) { ... }
friend bool operator==(const X& f, const X& s) { ... }
// оператор присваивания мы не переопределяем, используется
// присваивание по умолчанию - побайтовое копирование
};
...
X A;
...
X B(А); // потенциальная ошибка
...
B = A; // потенциальная ошибка
if (А == В) { ... } // потенциальная ошибка
Примечание
Обратите внимание, что все объекты данных, для которых могут наблюдаться обсуждаемые эффекты, должны быть доступны вне потока, то есть быть глобальными с точки зрения видимости в потоке.
Именно для безопасного манипулирования данными в параллельной среде QNX API и вводятся атомарные операции. Десять атомарных функций делятся на две симметричные группы по виду своего именования и логике функционирования. Все атомарные операции осуществляются только над одним типом данных unsigned int, но, как будет показано далее, это не такое уж и сильное ограничение. Сам объект, над которым осуществляется атомарная операция (типа unsigned int), — это самая обычная переменная целочисленного типа, только описанная с квалификатором volatile.