Применение потоков хорошо там, где можно выполнять операции параллельно — например, в ряде математических задач (графика, цифровая обработка сигналов, и т.д.). Потоки также прекрасны там, где программа должна выполнять несколько независимых функций, при этом использующих общие данные — например, веб-сервер, который обслуживает несколько клиентов одновременно. Эти два класса задач мы здесь и рассмотрим.
Потоки в математических операциях
Предположим, что мы имеем графическую программу, выполняющую алгоритм трассировки луча. Каждая строка растра на экране зависит от содержимого основной базы данных (которая описывает генерируемую картинку). Ключевым моментом здесь является то, что каждая строка растра не зависит от остальных. Это обстоятельство (независимость строк растра) автоматически приводит к программированию данной задачи как многопоточной.
Ниже приведен однопоточный вариант:
int main (int argc, char **argv) {
int x1;
... // Выполнить инициализации
for (x1 = 0; x1 < num_x_lines; x1++) {
do_one_line(x1);
}
... // Вывести результат
}
Здесь мы видим, что программа итеративно по всем значениям рассчитает необходимые растровые строки.
В многопроцессорных системах эта программа будет использовать только один из процессоров. Почему? Потому что мы не указали операционной системе выполнять что-либо параллельно. Операционная система не настолько умна, чтобы посмотреть на программу и сказать: «Эй, секундочку! У нас ее 4 процессора, и похоже, что у нас тут несколько независимых потоков управления. Запущу-ка я это на всех 4 процессорах сразу!»
Так что это дело разработчика (ваше дело!) — сообщить QNX/Neutrino, какие разделы программы следует выполнять параллельно. Проще всего это можно было бы сделать так:
int main (int argc, char **argv) {
int x1;
... // Выполнить инициализации
for (x1 = 0; x1 < num_x_lines; x1++) {
pthread_create(NULL, NULL, do_one_line, (void*)x1);
}
... // Вывести результат
}
С таким упрощением связано множество проблем. Первая из них (и самая незначительная) состоит в том, что функцию do_one_line() придется модифицировать так, чтобы она могла в качестве своего аргумента принимать значение типа void* вместо int. Это можно легко исправить с помощью оператора приведения типа (typecast).
Вторая проблема несколько сложнее. Скажем, что разрешающая способность дисплея, для которой вы рассчитывали картинку, была равна 1280×1024. Нам пришлось бы создать 1280 потоков! В общем-то, для QNX/Neutrino это не проблема — QNX/Neutrino позволяет создавать до 32767 потоков в одном процессе! Однако, каждый поток должен иметь свой уникальный стек. Если ваш стек имеет разумный размер (скажем 8 Кб), эта программа израсходует под стек 1280×8 Кб (10 мегабайт!) ОЗУ. И ради чего? В вашей системе есть только 4 процессора. Это означает, что только 4 из этих 1280 потоков будут работать одновременно, а другие 1276 потоков будут ожидать доступа к процессору. (В действительности, в данном случае пространство под стек будет выделяться только по мере необходимости. Но тем не менее, это все равно расходование ресурсов впустую — есть ведь еще и другие издержки.)
Более красивым способом решения этой задачи было бы разбить ее на 4 части (по одной подзадаче на каждый процессор), и обрабатывать каждую часть как отдельный поток:
int num_lines_per_cpu;
int num_cpus;
int main (int argc, char **argv) {
int cpu;
... // Выполнить инициализации
// Получить число процессоров
num_cpus = _syspage_ptr->num_cpu;
num_lines_per_cpu = num_x_lines / num_cpus;
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_create(NULL, NULL, do_one_batch, (void*)cpu);
}
... // Вывести результат
}
void* do_one_batch(void *c) {
int cpu = (int)c;
int x1;
for (x1 = 0; x1 < num_lines_per_cpu; x1++) {
do_line_line(x1 + cpu * num_lines_per_cpu);
}
}
Здесь мы запускаем только num_cpus потоков. Каждый поток будет выполняться на отдельном процессоре. А поскольку мы имеем дело с небольшим числом потоков, мы тем самым не засоряем память ненужными стеками. Обратите внимание, что мы получили число процессоров путем разыменования глобальной переменной — указателя на системную страницу _syspage_ptr. (Дополнительную информацию относительно системной страницы можно найти в книге «Building Embedded Systems» (поставляется в комплекте документации по QNX/ Neutrino — прим. ред.) или в заголовочном файле <sys/syspage.h>).
Программирование для одного или нескольких процессоров
Последняя программа в первую очередь интересна тем, что будет корректно функционировать в системе с одиночным процессором тоже. Просто будет создан только один поток, который и выполнит всю работу. Дополнительные издержки (один стек) с лихвой окупаются гибкостью программы, умеющей работать быстрее в многопроцессорной системе.
Синхронизация по отношению к моменту завершения потока
Я уже упоминал, что с приведенным выше упрощенным примером программы связана масса проблем. Так вот, еще одна связанная с ним проблема состоит в том, что функция main() сначала запускает целый букет потоков, а затем отображает результаты. Но как функция узнает, когда уже можно выводить результаты?
Заставлять main() заниматься опросом, закончены ли вычисления, противоречит самому замыслу ОС реального времени.
int main (int argc, char **argv) {
...
// Запустить потоки, как раньше
while (num_lines_completed < num_x_lines) {
sleep(1);
}
}
He вздумайте писать такие программы!
Для решения этой задачи существуют два изящных решения: применение функций pthread_join() и barrier_wait().
«Присоединение» (joining)
Самый простой метод синхронизации — это «присоединение» потоков. Реально это действие означает ожидание завершения.
Присоединение выполняется одним потоком, ждущим завершения другого потока. Ждущий поток вызывает pthread_join():
#include <pthread.h>
int pthread_join(pthread_t thread, void **value_ptr);
Функции pthread_join() передается идентификатор потока, к которому вы желаете присоединиться, а также необязательный аргумент value_ptr, который может быть использован для сохранения возвращаемого присоединяемым потоком значения (Вы можете передать вместо этого параметра NULL, если это значение для вас не представляет интереса — в данном примере мы так и сделаем).
Где нам брать идентификатор потока? Мы игнорировали его в функции pthread_create(), передав NULL в качестве первого параметра. Давайте исправим нашу программу:
int num_lines_per_cpu;
int num_cpus;
int main(int argc, char **argv) {
int cpu;
pthread_t *thread_ids;
... // Выполнить инициализации
thread_ids = malloc(sizeof(pthread_t) * num_cpus);
num_lines_per_cpu = num_x_lines / num_cpus;
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_create(
&thread_ids[cpu], NULL, do_one_batch, (void*)cpu);
}
// Синхронизироваться с завершением всех потоков
for (cpu = 0; cpu < num_cpus; cpu++) {
pthread_join(thread_ids[cpu], NULL);
}
... // Вывести результат
}
Обратите внимание, что на этот раз мы передали функции pthread_create() в качестве первого аргумента указатель на pthread_t. Там и будет сохранен идентификатор вновь созданного потока. После того как первый цикл for завершится, у нас будет num_cpu работающих потоков, плюс поток, выполняющий main(). Потребление ресурсов процессора потоком main() нас мало интересует — этот поток потратит все свое время на ожидание.
Ожидание достигается применением функции pthread_join() к каждому из наших потоков. Сначала мы ждем завершения потока thread_ids[0]. Когда он завершится, функция pthread_join() разблокируется. Следующая итерация цикла for заставит нас ждать завершения потока thread_ids[1], и так далее для всех num_cpus потоков.
В этот момент возникает законный вопрос: «А что если потоки завершат работу в обратном порядке?» Другими словами, если имеются 4 процессора, и по какой-либо причине поток, выполняющийся на последнем процессоре (с номером 3), завершит работу первым, затем завершится поток, выполняющийся на процессоре с номером 2, и так далее? Вся прелесть приведенной схемы заключается в том, что ничего плохого не произойдет.