Мобильное программирование приложений реального времени в стандарте POSIX

       

Создание и терминирование потоков управления


Для создания нового потока управления служит функция pthread_create() (см. листинг 1.24).

#include <pthread.h> int pthread_create ( pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine) (void *), void *restrict arg);

Листинг 1.24. Описание функции pthread_create(). (html, txt)

Выполнение созданного потока управления начнется с вызова (*start_routine) (arg); возврат из этой функции приведет к терминированию потока с возвращаемым значением в качестве статуса завершения. Из этого правила стандартом предусмотрено одно, вполне естественное исключение: для потока, выполнение которого началось с функции main(), возврат из нее означает завершение процесса, содержащего поток, со всеми вытекающими отсюда последствиями.

Аргумент attr задает атрибуты нового потока; если значение attr равно NULL, используются зависящие от реализации подразумеваемые атрибуты.

От "родительского" вновь созданный поток управления наследует немногое: маску сигналов и характеристики вещественной арифметики.

К числу средств создания потоков можно отнести и функцию fork(). Правда, здесь нас будет интересовать не она сама, а ассоциированные с ней обработчики, зарегистрированные с помощью функции pthread_atfork() (см. листинг 1.25).

#include <pthread.h> int pthread_atfork ( void (*prepare) (void), void (*parent) (void), void (*child) (void));

Листинг 1.25. Описание функции pthread_atfork(). (html, txt)

В каждом обращении к pthread_atfork() фигурируют три обработчика (если, конечно, в качестве значения аргумента не задан пустой указатель). Первый ((*prepare)()) выполняется в контексте потока, вызвавшего fork(), до разветвления процесса; второй ((*parent)()) – в том же контексте, но после разветвления; третий ((*child)()) – в контексте единственного потока порожденного процесса.

С помощью pthread_atfork() можно зарегистрировать несколько троек обработчиков. Первые элементы троек вызываются в порядке, обратном по отношению к регистрации; вторые и третьи выполняются в прямом порядке.


Для создания нового потока управления служит функция pthread_create() (см. листинг 1.24).

#include <pthread.h> int pthread_create ( pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine) (void *), void *restrict arg);

Листинг 1.24. Описание функции pthread_create().

Выполнение созданного потока управления начнется с вызова (*start_routine) (arg); возврат из этой функции приведет к терминированию потока с возвращаемым значением в качестве статуса завершения. Из этого правила стандартом предусмотрено одно, вполне естественное исключение: для потока, выполнение которого началось с функции main(), возврат из нее означает завершение процесса, содержащего поток, со всеми вытекающими отсюда последствиями.

Аргумент attr задает атрибуты нового потока; если значение attr равно NULL, используются зависящие от реализации подразумеваемые атрибуты.

От "родительского" вновь созданный поток управления наследует немногое: маску сигналов и характеристики вещественной арифметики.

К числу средств создания потоков можно отнести и функцию fork(). Правда, здесь нас будет интересовать не она сама, а ассоциированные с ней обработчики, зарегистрированные с помощью функции pthread_atfork() (см. листинг 1.25).

#include <pthread.h> int pthread_atfork ( void (*prepare) (void), void (*parent) (void), void (*child) (void));



Листинг 1.25. Описание функции pthread_atfork().

В каждом обращении к pthread_atfork() фигурируют три обработчика (если, конечно, в качестве значения аргумента не задан пустой указатель). Первый ((*prepare)()) выполняется в контексте потока, вызвавшего fork(), до разветвления процесса; второй ((*parent)()) – в том же контексте, но после разветвления; третий ((*child)()) – в контексте единственного потока порожденного процесса.

С помощью pthread_atfork() можно зарегистрировать несколько троек обработчиков. Первые элементы троек вызываются в порядке, обратном по отношению к регистрации; вторые и третьи выполняются в прямом порядке.

Как и процесс, поток управления можно терминировать изнутри и извне. С одним, неявным, но наиболее естественным способом "самоликвидации" – выходом из стартовой функции потока – мы уже познакомились. Тот же эффект достигается вызовом функции pthread_exit() (см. листинг 1.26).

#include <pthread.h> void pthread_exit (void *value_ptr);

Листинг 1.26. Описание функции pthread_exit().

Терминирование потока управления в идейном плане существенно сложнее создания. Чтобы осветить все тонкости, необходимо ввести несколько новых понятий и описать целый ряд функций. Пока мы укажем лишь, что терминирование последнего потока в процессе вызывает его (процесса) завершение с нулевым кодом.

Из общих соображений (например, если исходить из аналогии между процессами и потоками управления) очевидно, что должна существовать возможность дождаться завершения заданного потока управления. Эта возможность реализуется функцией pthread_join() (см. листинг 1.27), напоминающей waitpid().

#include <pthread.h> int pthread_join ( pthread_t thread, void **value_ptr_ptr);

Листинг 1.27. Описание функции pthread_join().

Поток управления, вызвавший функцию pthread_join(), приостанавливает выполнение до завершения потока, идентификатор которого задан аргументом thread. При успешном возврате из pthread_join() результат, как и положено, равен нулю, а по указателю value_ptr_ptr (если он не пуст) помещается значение (указатель value_ptr), переданное в качестве аргумента функции pthread_exit(). Тем самым ждуший поток получает данные о статусе завершения ожидаемого.

Отметим, что трактовка значения value_ptr возлагается на приложение. Например, оно может считать его целым числом, а не указателем; по этой причине операционная система при выполнении функции pthread_exit() не вправе выдавать ошибку типа "неверный адрес" каким бы ни был аргумент value_ptr. Если он все же является указателем, то ему нельзя присваивать адрес автоматической переменной, поскольку к моменту его использования ожидаемый поток уже завершится и состояние его автоматических переменных станет неопределенным.

Второе общее соображение касается того обстоятельства, что вызов такой функции, как ожидание завершения (pthread_join()) способен приостановить выполнение вызывающего потока управления на неопределенное время, в течение которого ему может быть доставлен обрабатываемый сигнал. Как правило, в подобных ситуациях выполнение функций (таких, например, как read()) завершается с частично достигнутым результатом (например, с числом прочитанных байт, меньшим запрошенного) и кодом ошибки EINTR, нуждающимся в нестандартной обработке, далеко не всегда реализуемой разработчиками приложений. Из-за этого в программах появляются дефекты, которые трудно воспроизвести и, соответственно, исправить.

Согласно стандарту POSIX-2001, функции, обслуживающие потоки управления, свободны от этого недостатка. Они никогда не завершаются с частичным результатом и не выдают код ошибки EINTR. Восстановление нормального состояния после того, как ожидание было прервано доставкой и обработкой сигнала, возлагается на операционную систему, а не на приложение.

Третье общее соображение состоит в том, что такое критически важное событие, как завершение потока управления, не может оставаться без функций-обработчиков. Стандартом POSIX-2001 предусмотрено существование не одного, а целого стека подобных обработчиков, ассоциированного с потоком управления. Операции над этим стеком возложены на функции pthread_cleanup_push() и pthread_cleanup_pop() (см. листинг 1.28).

#include <pthread.h>

void pthread_cleanup_push ( void (*routine) (void *), void *arg);

void pthread_cleanup_pop (int execute);

Листинг 1.28. Описание функций pthread_cleanup_push() и pthread_cleanup_pop().

Функция pthread_cleanup_push() помещает заданный аргументами routine и arg обработчик в стек обработчиков вызывающего потока. Функция pthread_cleanup_pop() извлекает верхний обработчик из этого стека и, если значение аргумента execute отлично от нуля, вызывает его (как (*routine) (arg)).

Разумеется, все обработчики, начиная с верхнего, извлекаются из стека и вызываются при терминировании потока управления (вне зависимости от того, объясняется ли терминирование внутренними или внешними причинами). В частности, это происходит после того, как поток обратится к функции pthread_exit().

Напомним, что обработчики завершения существуют и для процессов (они регистрируются с помощью функции atexit()), однако применительно к потокам управления идея стека обработчиков оформлена в более явном и систематическом виде.

Пару функций pthread_cleanup_push() и pthread_cleanup_pop() можно представлять себе как открывающую и закрывающую скобки, оформленные в виде отдельных инструкций языка C и обрамляющие обслуживаемый обработчиком участок программы. Согласно стандарту POSIX-2001, этот участок должен представлять собой фрагмент одной лексической области видимости (блока), а pthread_cleanup_push() и pthread_cleanup_pop() могут быть реализованы как макросы (см. листинг 1.29).

#define pthread_cleanup_push (rtn, arg) { \ struct _pthread_handler_rec \ __cleanup_handler, \ **__head; \ __cleanup_handler.rtn = rtn; \ __cleanup_handler.arg = arg; \ (void) pthread_getspecific \ (_pthread_handler_key, &__head); \ __cleanup_handler.next = *__head; \ *__head = &__cleanup_handler;

#define pthread_cleanup_pop (ex) \ *__head = __cleanup_handler.next; \ if (ex) (*__cleanup_handler.rtn) \ (__cleanup_handler.arg); \ }

Листинг 1.29. Возможная реализация функций pthread_cleanup_push() и pthread_cleanup_pop() как макросов.

Обратим внимание на то, что в определении макроса pthread_cleanup_push() открывается внутренний блок, в котором декларируются два необходимых объекта – структура __cleanup_handler, описывающая обработчик, и указатель __head на вершину стека, представленного в виде односвязанного (линейного) списка. В определении pthread_cleanup_pop() этот блок закрывается. Так что даже из соображений синтаксической корректности вызовы pthread_cleanup_push() и pthread_cleanup_pop() должны быть парными и располагаться в одном блоке, но более существенной нам представляется корректность семантическая.

Если поток управления по внутренним или внешним причинам терминируется при выполнении обслуживаемого обработчиком участка, тот должен обеспечить аккуратное завершение с восстановлением (если это необходимо) целостного, корректного состояния объектов, видимых в блоке, и освобождением ресурсов, лексически доступных из текущей области видимости. Иными словами, характер обработки завершения определяется программным контекстом, в котором прервано выполнение потока управления. Отсюда и привязка стека обработчиков к блочной структуре программы.

После того, как выполнятся все обработчики завершения, в неспецифицированном порядке вызываются деструкторы индивидуальных данных (если у потока управления таковые имеются). То, что деструкторы вызываются после обработчиков завершения, делает доступными для последних индивидуальные данные потоков управления.

Отметим, что если поток – не последний в процессе, то при его завершении, разумеется, не происходит автоматического освобождения видимых приложению ресурсов процесса (таких, например, как файловые дескрипторы) и не выполняются другие общепроцессные зачистки (такие, как выполнение функций, зарегистрированных посредством atexit()).

Заказать терминирование извне потока управления с заданным идентификатором можно, воспользовавшись функцией pthread_cancel() (см. листинг 1.30).

#include <pthread.h> int pthread_cancel (pthread_t thread);

Листинг 1.30. Описание функции pthread_cancel().

Напомним, что на выполнение "заказа" влияют состояние восприимчивости и тип терминирования, установленные для потока, а также достижение точки терминирования. Эти атрибуты опрашиваются и изменяются с помощью функций pthread_setcancelstate(), pthread_setcanceltype() и pthread_testcancel() (см. листинг 1.31).

#include <pthread.h>

int pthread_setcancelstate ( int state, int *oldstate);

int pthread_setcanceltype ( int type, int *oldtype);

void pthread_testcancel (void);

Листинг 1.31. Описание функций pthread_setcancelstate(), pthread_setcanceltype(), pthread_testcancel().

Функции pthread_setcancelstate() и pthread_setcanceltype() атомарным образом, в рамках неделимой транзакции устанавливают новые значения (state и type) для состояния восприимчивости и типа и помещают по заданным указателям (соответственно, oldstate и oldtype) старые значения. Допустимыми значениями для состояния восприимчивости к терминированию являются PTHREAD_CANCEL_ENABLE (терминирование разрешено – подразумеваемое значение для вновь созданных потоков управления) и PTHREAD_CANCEL_DISABLE, для типа – PTHREAD_CANCEL_DEFERRED (отложенное терминирование – подразумеваемое значение) и PTHREAD_CANCEL_ASYNCHRONOUS (немедленное, асинхронное терминирование).

Функция pthread_testcancel() создает в вызывающем потоке точку терминирования, то есть проверяет наличие ждущего заказа на терминирование и, при наличии такового, инициирует его обработку (если она разрешена).

Из общих соображений следует, что манипуляции с атрибутами терминирования необходимы для обеспечения атомарности транзакций и сохранения программных инвариантов. При входе в транзакцию терминирование запрещают, при выходе восстанавливают старое значение. Аналогично, если в пределах критического интервала нарушается некий программный инвариант, на этом интервале поток должен защититься от терминирования. Ресурсы, ассоциированные с потоком, обязаны оставаться в корректном состоянии, а осмысленные действия – или выполняться до конца, или не выполняться вообще.

Возвращаясь к функции pthread_cancel(), отметим, что иногда обработку заказа на терминирование сравнивают с реакцией на доставку сигнала. На наш взгляд, к подобной аналогии нужно относиться осторожно, поскольку она довольно поверхностна и основана в первую очередь на том, что и сигналы, и заказы на терминирование являются средствами асинхронного программного воздействия на потоки управления, и других механизмов подобной направленности стандарт POSIX-2001 не предусматривает. В частности, доставка как сигналов, так и заказов на терминирование способна вывести поток управления из состояния ожидания, быть может, потенциально бесконечного. Еще одна параллель – запрещение терминирования при входе в обработчик завершения и блокирование сигнала при входе в функцию его обработки.

Противоречат отмеченной аналогии следующие обстоятельства. Во-первых, обработка сигналов зачастую направлена на продолжение, а не завершение выполнения и, следовательно, носит принципиально иной, чем у обработчика завершения, характер. Во-вторых, для обработки терминирования предоставлено больше средств (не одна функция, как в случае сигналов, а целый стек обработчиков завершения плюс деструкторы индивидуальных данных) и они строже регламентированы (добавление и изъятие обработчиков стандартизованы как парные операции, выполняемые в пределах одной лексической области видимости, отложенное терминирование производится только в определенных точках программы, нелокальные переходы (siglongjmp()), являющиеся стандартным элементом обработки сигналов, применительно к обработчикам завершения допускаются с многочисленными оговорками и т.п.). Конечно, терминирование извне может быть реализовано на основе механизма сигналов, но есть и другие возможности.

Разумеется, функция pthread_cancel() не ожидает выполнения заказа на терминирование, так что вызывающий и целевой потоки управления продолжают выполняться параллельно, пока последний не осуществит всех специфицированных действий по зачистке и не завершится, выдав ждущим этого события потокам значение PTHREAD_CANCELED.

Помимо заказа на терминирование, потоку управления можно направить и "честный" сигнал, воспользовавшись средством "внутрипроцессного межпотокового" взаимодействия – функцией pthread_kill() (см. листинг 1.32).

#include <signal.h> int pthread_kill ( pthread_t thread, int sig);

Листинг 1.32. Описание функции pthread_kill().

Как и в случае функции kill(), при нулевом значении аргумента sig проверяется корректность заданного идентификатора потока, но никакой сигнал не генерируется.

Напомним (см. курс [1]), что сигналы генерируются для конкретного потока управления (так, в частности, поступает функция pthread_kill()) или для процесса в целом (как это делает функция kill()), но доставляются они всегда одному потоку, только во втором случае его выбор определяется реализацией из соображений простоты доставки: обычно берется активный поток, если он не блокирует данный сигнал. Естественно, если для сигнала определена функция обработки, она выполняется в контексте целевого потока управления. Другие возможные действия (терминирование, остановка) всегда применяются к процессу в целом.

Для иллюстрации изложенного приведем небольшую программу (см. листинг 1.33), в которой создается поток управления с последующей доставкой ему сигнала SIGINT. Возможные результаты работы этой программы показаны на листинге 1.34.

/* * * * * * * * * * * * * * * * * */ /* Программа демонстрирует генерацию */ /* и доставку сигналов */ /* потокам управления */ /* * * * * * * * * * * * * * * * * */

#include <unistd.h> #include <stdio.h> #include <pthread.h> #include <signal.h> #include <errno.h>

/* * * * * * * * * * * * * * / /* Функция обработки сигнала */ /* * * * * * * * * * * * * * / static void signal_handler (int dummy) { printf ("Идентификатор потока, обрабатывающего сигнал: %lx\n", pthread_self ()); }

/* * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока управления, */ /* которому будет направлен сигнал */ /* * * * * * * * * * * * * * * * * * * */ static void *thread_start (void *dummy) { printf ("Идентификатор нового потока управления: %lx\n", pthread_self ()); while (1) { sleep (1); }

return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGINT, */ /* создает поток управления и посылает ему сигнал */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t thread_id; struct sigaction act;

/* Установим реакцию на сигнал SIGINT */ act.sa_handler = signal_handler; (void) sigemptyset (&act.sa_mask); act.sa_flags = 0; (void) sigaction (SIGINT, &act, (struct sigaction *) NULL);

if ((errno = pthread_create (&thread_id, NULL, thread_start, NULL)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } printf ("Идентификатор созданного потока управления: %lx\n", thread_id);

(void) pthread_kill (thread_id, SIGINT); printf ("После вызова pthread_kill()\n");

sleep (1); printf ("Выспались...\n");

return (0); }

Листинг 1.33. Пример использования механизма сигналов в многопотоковой программе.

Идентификатор созданного потока управления: 402 После вызова pthread_kill() Идентификатор потока, обрабатывающего сигнал: 402 Идентификатор нового потока управления: 402 Выспались...

Листинг 1.34. Возможные результаты работы многопотоковой программы, использующей механизм сигналов.

Обратим внимание на два любопытных (хотя и довольно очевидных) момента. Во-первых, начальный поток управления процесса продолжает выполнение после вызова pthread_kill() и успевает сообщить об этом. Затем он на секунду засыпает, активным становится вновь созданный поток и первым делом приступает к обработке доставленного ему сигнала. Уже после возврата из функции обработки начинается выполнение инструкций стартовой функции потока управления.

Читателю предлагается самостоятельно проанализировать, как будет вести себя приведенная программа при подразумеваемом способе обработки сигнала SIGINT.

Отмеченную выше устойчивость ожидания в функциях, обслуживающих потоки управления, к доставке и обработке сигналов проиллюстрируем программой, показанной на листинге 1.35.

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа демонстрирует взаимодействие сигналов */ /* и ожидания завершения потока управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h> #include <pthread.h> #include <signal.h> #include <errno.h>

/* * * * * * * * * * * * * * */ /* Функция обработки сигнала */ /* * * * * * * * * * * * * * */ static void signal_handler (int dummy) { printf ("Идентификатор потока, обрабатывающего сигнал: %lx\n", pthread_self ()); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция создаваемого потока управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void *thread_start (void *thread_id) { printf ("Идентификатор нового потока управления: %lx\n", pthread_self ()); (void) pthread_kill ((pthread_t) thread_id, SIGINT);

return ((void *) pthread_self ()); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGINT, */ /* создает поток управления и ожидает его завершения */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t thread_id; struct sigaction act; void *pv;

/* Установим реакцию на сигнал SIGINT */ act.sa_handler = signal_handler; (void) sigemptyset (&act.sa_mask); act.sa_flags = 0; (void) sigaction (SIGINT, &act, (struct sigaction *) NULL);

if ((errno = pthread_create (&thread_id, NULL, thread_start, (void *) pthread_self ())) != 0) { perror ("PTHREAD_CREATE"); return (errno); } printf ("Идентификаторы начального и созданного потоков " "управления: " "%lx %lx\n", pthread_self (), thread_id);

/* Дождемся завершения созданного потока управления */ if ((errno = pthread_join (thread_id, &pv)) != 0) { perror ("PTHREAD_JOIN"); return (errno); } printf ("Статус завершения созданного потока " "управления: %p\n", pv);

return (0); }

Листинг 1.35. Пример программы, обрабатывающей сигнал во время ожидания завершения потока управления.

Если посмотреть на возможные результаты работы этой программы (см. листинг 1.36), можно сделать вывод, что, несмотря на получение и обработку сигнала, функция pthread_join отрабатывает с нормальным (нулевым) результатом, получая статус завершения "ожидаемого" потока.

Идентификаторы начального и созданного потоков управления: 400 402 Идентификатор нового потока управления: 402 Идентификатор потока, обрабатывающего сигнал: 400 Статус завершения созданного потока управления: 0x402

Листинг 1.36. Возможные результаты работы программы, обрабатывающей сигнал во время ожидания завершения потока управления.

И здесь читателю рекомендуется самостоятельно выяснить, как поведет себя приведенная программа при подразумеваемой реакции на сигнал SIGINT.

Еще одна полезная операция, связанная с обработкой завершения потока управления, – его динамическое обособление, выполняемое функцией pthread_detach() (см. листинг 1.37).

#include <pthread.h> int pthread_detach (pthread_t thread);

Листинг 1.37. Описание функции pthread_detach().

При завершении обособленного потока операционная система может освободить использовавшуюся им память.

Может показаться, что возможность динамического обособления является излишней (мол, достаточно соответствующего атрибута, принимаемого во внимание при создании потока функцией pthread_create()), однако это не так. Во-первых, начальный поток процесса создается нестандартным образом и обособить его статически невозможно. Во-вторых, если терминируется поток, ждущий в функции pthread_join(), обработчик его завершения должен обособить того, чье завершение является предметом ожидания, иначе с утилизацией памяти могут возникнуть проблемы.

Если приложение заботится об аккуратном освобождении памяти, то для всех потоков управления, созданных с атрибутом PTHREAD_CREATE_JOINABLE, следует предусмотреть вызов либо pthread_join(), либо pthread_detach().

В качестве примера многопотоковой программы приведем серверную часть рассматривавшегося в курсе [1] приложения, копирующего строки со стандартного ввода на стандартный вывод с "прокачиванием" их через потоковые сокеты (см. листинг 1.38).

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа процесса (будем называть его серверным), */ /* принимающего запросы на установления соединения и */ /* запускающего потоки управления для их обслуживания */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h> #include <netdb.h> #include <pthread.h> #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h>

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая (и единственная) функция потоков управления, */ /* обслуживающих запросы на копирование строк, */ /* поступающих из сокета */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *srv_thread_start (void *ad) { FILE *fpad; /* Поток данных, соответствующий */ /* дескриптору ad */ char line [LINE_MAX]; /* Буфер для принимаемых строк */ /* Структура для записи адреса */ struct sockaddr_in sai; /* Длина адреса */ socklen_t sai_len = sizeof (struct sockaddr_in);

/* Опросим адрес партнера по общению (передающего сокета) */ if (getpeername ((int) ad, (struct sockaddr *) &sai, &sai_len) < 0) { perror ("GETPEERNAME"); return (NULL); }

/* По файловому дескриптору ad сформируем */ /* буферизованный поток данных */ if ((fpad = fdopen ((int) ad, "r")) == NULL) { perror ("FDOPEN"); return (NULL); }

/* Цикл чтения строк из сокета */ /* и выдачи их на стандартный вывод */ while (fgets (line, sizeof (line), fpad) != NULL) { printf ("Вы ввели и отправили с адреса %s, " "порт %d :", inet_ntoa (sai.sin_addr), ntohs (sai.sin_port)); fputs (line, stdout); }

/* Закрытие соединения */ shutdown ((int) ad, SHUT_RD); (void) fclose (fpad);

return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* В функции main() принимаются запросы на установление */ /* соединения и запускаются потоки управления для их обслуживания */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { int sd; /* Дескриптор слушающего сокета */ int ad; /* Дескриптор приемного сокета */ /* Буфер для принимаемых строк */ struct addrinfo hints = {AI_PASSIVE, AF_INET, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; /* Указатель – выходной аргумент getaddrinfo */ struct addrinfo *addr_res; int res; /* Результат getaddrinfo */ pthread_attr_t patob; /* Атрибутный объект для создания */ /* потоков управления */ pthread_t adt_id; /* Идентификатор обслуживающего потока управления */

/* Создадим слушающий сокет */ if ((sd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { perror ("SOCKET"); return (1); }

/* Привяжем этот сокет к адресу сервиса spooler */ /* на локальном хосте */ if ((res = getaddrinfo (NULL, "spooler", &hints, &addr_res)) != 0) { fprintf (stderr, "GETADDRINFO: %s\n", gai_strerror (res)); return (2); } if (bind (sd, addr_res->ai_addr, addr_res->ai_addrlen) < 0) { perror ("BIND"); return (3); }

/* Можно освободить память, которую запрашивала */ /* функция getaddrinfo() */ freeaddrinfo (addr_res);

/* Пометим сокет как слушающий */ if (listen (sd, SOMAXCONN) < 0) { perror ("LISTEN"); return (4); }

/* Инициализируем атрибутный объект потоков управления */ if ((errno = pthread_attr_init (&patob)) != 0) { perror ("PTHREAD_ATTR_INIT"); return (errno); } /* Потоки управления будем создавать обособленными */ (void) pthread_attr_setdetachstate (&patob, PTHREAD_CREATE_DETACHED);

/* Цикл приема соединений и запуска */ /* обслуживающих потоков управления */ while (1) { /* Примем соединение. */ /* Адрес партнера по общению нас */ /* в данном случае не интересует */ if ((ad = accept (sd, NULL, NULL)) < 0) { perror ("ACCEPT"); return (6); }

/* Запустим обслуживающий поток управления */ if ((errno = pthread_create (&adt_id, &patob, srv_thread_start,(void *) ad)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } }

return (0); }

Листинг 1.38. Пример многопотоковой программы, обслуживающей запросы на копирование строк, поступающих через сокеты.

Многопотоковая реализация в данном случае уместнее многопроцессной: она и выглядит проще (поскольку потоки проще создавать и не обязательно в явном виде ждать их завершения), и ресурсов потребляет меньше.

Можно надеяться, что и следующая программа (см. листинг 1.39), реализующая идею обработки данных с контролем времени, подтверждает, что применение потоков управления позволяет сделать исходный текст более простым и наглядным по сравнению с "беспотоковым" вариантом (см. курс [1]). Удалось избавиться от такого сугубо "неструктурного" средства, как нелокальные переходы, и от ассоциированных с ними тонкостей, чреватых ошибками.

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа вызывает функции обработки в рамках */ /* порождаемых потоков управления и контролирует время */ /* их выполнения с помощью интервального таймера */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h> #include <pthread.h> #include <sys/time.h> #include <signal.h> #include <errno.h>

/* Период интервального таймера (в секундах) */ #define IT_PERIOD 1

static pthread_t cthread_id; /* Идентификатор текущего */ /* потока управления, */ /* обрабатывающего данные */

static int in_proc_data = 0; /* Признак активности */ /* потока обработки данных */

static double s; /* Результат функций */ /* обработки данных */

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция обработки срабатывания таймера реального */ /* времени (сигнал SIGALRM) */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_sigalrm (int dummy) { if (in_proc_data) { /* Не имеет значения, какой поток обрабатывает сигнал */ /* и заказывает терминирование (быть может, себя) */ (void) pthread_cancel (cthread_id); } }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Обработчик завершения потока управления. */ /* Сбрасывает признак активности потока обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_cleanup_handler (void *arg) { in_proc_data = (int) arg; }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока управления, обрабатывающего */ /* данные. Аргумент – указатель на функцию обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_func (void *proc_data_func) { /* Поместим в стек обработчик завершения */ pthread_cleanup_push (proc_data_cleanup_handler, 0); in_proc_data = 1; /* Время пошло ... */

/* На время выполнения функции обработки данных установим */ /* асинхронный тип терминирования, иначе оно не сработает */ (void) pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

/* Выполним функцию обработки данных */ ((void (*) (void)) (proc_data_func)) ();

/* Установим отложенный тип терминирования, */ /* иначе изъятие обработчика из стека */ /* будет небезопасным действием */ (void) pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);

/* Выполним обработчик завершения и удалим его из стека */ pthread_cleanup_pop (1); return (NULL); }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Первая функция обработки данных (вычисляет ln (2)) */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_1 (void) { double d = 1; int i;

s = 0; for (i = 1; i <= 100000000; i++) { s += d / i; d = -d; } }

/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Вторая функция обработки данных (вычисляет sqrt (2))*/ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static void proc_data_2 (void) { s = 1; do { s = (s + 2 / s) * 0.5; } while ((s * s – 2) > 0.000000001); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция main() задает способ обработки сигнала SIGALRM, */ /* взводит периодический таймер реального времени */ /* и запускает в цикле потоки обработки данных */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { /* Массив указателей на функции обработки данных */ void (*fptrs []) (void) = {proc_data_1, proc_data_2, NULL}; /* Указатель на указатель на */ /* текущую функцию обработки данных */ void (**tfptr) (void); void *pstat; /* Статус завершения потока */ /* обработки данных */ struct itimerval itvl; struct sigaction sact; int i;

/* Установим реакцию на сигнал SIGALRM */ sact.sa_handler = proc_sigalrm; sact.sa_flags = 0; (void) sigemptyset (&sact.sa_mask); if (sigaction (SIGALRM, &sact, NULL) < 0) { perror ("SIGACTION"); return (1); }

/* Сделаем таймер реального времени периодическим */ itvl.it_interval.tv_sec = IT_PERIOD; itvl.it_interval.tv_usec = 0;

/* Цикл запуска потоков обработки данных. */ /* Выполним его дважды */ for (i = 0; i < 2; i++) { for (tfptr = fptrs; *tfptr != NULL; tfptr++) { /* Взведем интервальный таймер реального времени */ itvl.it_value.tv_sec = IT_PERIOD; itvl.it_value.tv_usec = 0; if (setitimer (ITIMER_REAL, &itvl, NULL) < 0) { perror ("SETITIMER"); return (2); }

/* Создадим поток обработки данных, */ /* затем дождемся его завершения */ if ((errno = pthread_create (&cthread_id, NULL, start_func, (void *) *tfptr)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } if ((errno = pthread_join (cthread_id, &pstat)) != 0) { perror ("PTHREAD_JOIN"); return (errno); }

if (pstat == PTHREAD_CANCELED) { printf ("Частичный результат функции " "обработки данных: %g\n", s); } else { printf ("Полный результат функции " "обработки данных: %g\n", s); } } }

return 0; }

Листинг 1.39. Пример многопотоковой программы, осуществляющей обработку данных с контролем времени.

Возможные результаты работы приведенной программы показаны на листинге 1.40.

Частичный результат функции обработки данных: 0.693147 Полный результат функции обработки данных: 1.41421 Частичный результат функции обработки данных: 0.693147 Полный результат функции обработки данных: 1.41421

Листинг 1.40. Возможные результаты работы многопотоковой программы, осуществляющей обработку данных с контролем времени.

К сожалению, там, где есть недетерминированность и асинхронность, без тонкостей все равно не обойтись. Мы обратим внимание на три из них. Во-первых, возможно срабатывание таймера и доставка сигнала SIGALRM до того, как завершится (или даже начнется) первый вызов pthread_create() и будет инициализирована переменная cthread_id. Чтобы не допустить терминирования "неопределенного" потока управления в функции обработки сигнала, введен признак активности потока обработки данных. Если он установлен, переменная cthread_id заведомо инициализирована.

Во-вторых, не имеет значения, в контексте какого из потоков выполняется функция обработки сигнала – вполне допустимо, чтобы поток заказал собственное терминирование.

В-третьих, поскольку функции обработки данных не содержат точек терминирования, на время их выполнения необходимо установить асинхронный тип терминирования, иначе оно попросту не сработает. Однако использование этого типа крайне опасно, поэтому при первой возможности следует вернуться к более надежному отложенному типу. Перед манипуляциями со стеком обработчиков завершения установка данного типа или запрет терминирования являются обязательными.

Как мы уже упоминали, потоки управления иногда называют легковесными процессами. Любопытно оценить, какова степень их легковесности, насколько накладные расходы на их обслуживание меньше, чем для "настоящих" процессов.

На листингах 1.41 и 1.42 показана программа, которая в цикле порождает практически пустые процессы и дожидается их завершения; на листинге 1.43 приведены данные о времени ее работы, полученные с помощью команды time -p. Даже если сделать процессам послабление и убрать вызов execl(), времена получатся довольно большими (см. листинг 1.44) в сравнении с аналогичными данными для варианта с потоками управления (см. листинги 1.45 и 1.46).

#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <sys/wait.h>

#define N 10000

int main (void) { int i;

for (i = 0; i < N; i++) { switch (fork ()) { case -1: perror ("FORK"); return (1); case 0: /* Порожденный процесс */ (void) execl ("./dummy", "dummy", (char *) 0); exit (0); default: /* Родительский процесс */ (void) wait (NULL); } }

return 0; }

Листинг 1.41. Пример программы, порождающей в цикле практически пустые процессы.

int main (void) { return 0; }

Листинг 1.42. Содержимое файла dummy.c

real 34.97 user 12.36 sys 22.61

Листинг 1.43. Возможные результаты измерения времени работы программы, порождающей в цикле практически пустые процессы (вариант с вызовом execl()).

real 11.49 user 2.38 sys 9.11

Листинг 1.44. Возможные результаты измерения времени работы программы, порождающей в цикле практически пустые процессы (вариант без вызова execl()).

#include <unistd.h> #include <stdio.h> #include <pthread.h> #include <errno.h>

#define N 10000

static void *thread_start (void *arg) { pthread_exit (arg); }

int main (void) { pthread_t thread_id; int i;

for (i = 0; i < N; i++) { if ((errno = pthread_create ( &thread_id, NULL, thread_start, NULL)) != 0) { perror ("PTHREAD_CREATE"); return (errno); } if ((errno = pthread_join ( thread_id, NULL)) != 0) { perror ("PTHREAD_JOIN"); return (errno); } }

return (0); }

Листинг 1.45. Пример программы, порождающей в цикле потоки управления.

real 2.08 user 0.52 sys 1.56

Листинг 1.46. Возможные результаты измерения времени работы программы, порождающей в цикле потоки управления.

В первом приближении можно считать, что потоки управления на порядок дешевле процессов. На самом деле, в реальных ситуациях, когда процессы существенно превосходят по размеру приведенные выше "пустышки", различие будет еще больше.

Можно сделать вывод, что потоки управления допускают довольно свободное использование, накладные расходы на их обслуживание невелики, особенно в сравнении с обслуживанием процессов. Поэтому, проектируя приложение с параллельно выполняемыми компонентами, следует в первую очередь проанализировать возможность многопотоковой реализации. Главное препятствие в осуществлении подобной возможности – разделение потоками одного адресного пространства. Только если это препятствие окажется непреодолимым, целесообразно воспользоваться механизмом процессов.

<


Как и процесс, поток управления можно терминировать изнутри и извне. С одним, неявным, но наиболее естественным способом "самоликвидации" – выходом из стартовой функции потока – мы уже познакомились. Тот же эффект достигается вызовом функции pthread_exit() (см. листинг 1.26).

#include <pthread.h> void pthread_exit (void *value_ptr);

Листинг 1.26. Описание функции pthread_exit(). (html, txt)

Терминирование потока управления в идейном плане существенно сложнее создания. Чтобы осветить все тонкости, необходимо ввести несколько новых понятий и описать целый ряд функций. Пока мы укажем лишь, что терминирование последнего потока в процессе вызывает его (процесса) завершение с нулевым кодом.

Из общих соображений (например, если исходить из аналогии между процессами и потоками управления) очевидно, что должна существовать возможность дождаться завершения заданного потока управления. Эта возможность реализуется функцией pthread_join() (см. листинг 1.27), напоминающей waitpid().

#include <pthread.h> int pthread_join ( pthread_t thread, void **value_ptr_ptr);

Листинг 1.27. Описание функции pthread_join(). (html, txt)

Поток управления, вызвавший функцию pthread_join(), приостанавливает выполнение до завершения потока, идентификатор которого задан аргументом thread. При успешном возврате из pthread_join() результат, как и положено, равен нулю, а по указателю value_ptr_ptr (если он не пуст) помещается значение (указатель value_ptr), переданное в качестве аргумента функции pthread_exit(). Тем самым ждуший поток получает данные о статусе завершения ожидаемого.

Отметим, что трактовка значения value_ptr возлагается на приложение. Например, оно может считать его целым числом, а не указателем; по этой причине операционная система при выполнении функции pthread_exit() не вправе выдавать ошибку типа "неверный адрес" каким бы ни был аргумент value_ptr. Если он все же является указателем, то ему нельзя присваивать адрес автоматической переменной, поскольку к моменту его использования ожидаемый поток уже завершится и состояние его автоматических переменных станет неопределенным.



Второе общее соображение касается того обстоятельства, что вызов такой функции, как ожидание завершения (pthread_join()) способен приостановить выполнение вызывающего потока управления на неопределенное время, в течение которого ему может быть доставлен обрабатываемый сигнал. Как правило, в подобных ситуациях выполнение функций (таких, например, как read()) завершается с частично достигнутым результатом (например, с числом прочитанных байт, меньшим запрошенного) и кодом ошибки EINTR, нуждающимся в нестандартной обработке, далеко не всегда реализуемой разработчиками приложений. Из-за этого в программах появляются дефекты, которые трудно воспроизвести и, соответственно, исправить.

Согласно стандарту POSIX-2001, функции, обслуживающие потоки управления, свободны от этого недостатка. Они никогда не завершаются с частичным результатом и не выдают код ошибки EINTR. Восстановление нормального состояния после того, как ожидание было прервано доставкой и обработкой сигнала, возлагается на операционную систему, а не на приложение.

Третье общее соображение состоит в том, что такое критически важное событие, как завершение потока управления, не может оставаться без функций-обработчиков. Стандартом POSIX-2001 предусмотрено существование не одного, а целого стека подобных обработчиков, ассоциированного с потоком управления. Операции над этим стеком возложены на функции pthread_cleanup_push() и pthread_cleanup_pop() (см. листинг 1.28).

#include <pthread.h>

void pthread_cleanup_push ( void (*routine) (void *), void *arg);

void pthread_cleanup_pop (int execute);

Листинг 1.28. Описание функций pthread_cleanup_push() и pthread_cleanup_pop(). (html, txt)

Функция pthread_cleanup_push() помещает заданный аргументами routine и arg обработчик в стек обработчиков вызывающего потока. Функция pthread_cleanup_pop() извлекает верхний обработчик из этого стека и, если значение аргумента execute отлично от нуля, вызывает его (как (*routine) (arg)).

Разумеется, все обработчики, начиная с верхнего, извлекаются из стека и вызываются при терминировании потока управления (вне зависимости от того, объясняется ли терминирование внутренними или внешними причинами).


В частности, это происходит после того, как поток обратится к функции pthread_exit().

Напомним, что обработчики завершения существуют и для процессов (они регистрируются с помощью функции atexit()), однако применительно к потокам управления идея стека обработчиков оформлена в более явном и систематическом виде.

Пару функций pthread_cleanup_push() и pthread_cleanup_pop() можно представлять себе как открывающую и закрывающую скобки, оформленные в виде отдельных инструкций языка C и обрамляющие обслуживаемый обработчиком участок программы. Согласно стандарту POSIX-2001, этот участок должен представлять собой фрагмент одной лексической области видимости (блока), а pthread_cleanup_push() и pthread_cleanup_pop() могут быть реализованы как макросы (см. листинг 1.29).

#define pthread_cleanup_push (rtn, arg) { \ struct _pthread_handler_rec \ __cleanup_handler, \ **__head; \ __cleanup_handler.rtn = rtn; \ __cleanup_handler.arg = arg; \ (void) pthread_getspecific \ (_pthread_handler_key, &__head); \ __cleanup_handler.next = *__head; \ *__head = &__cleanup_handler;

#define pthread_cleanup_pop (ex) \ *__head = __cleanup_handler.next; \ if (ex) (*__cleanup_handler.rtn) \ (__cleanup_handler.arg); \ }

Листинг 1.29. Возможная реализация функций pthread_cleanup_push() и pthread_cleanup_pop() как макросов. (html, txt)

Обратим внимание на то, что в определении макроса pthread_cleanup_push() открывается внутренний блок, в котором декларируются два необходимых объекта – структура __cleanup_handler, описывающая обработчик, и указатель __head на вершину стека, представленного в виде односвязанного (линейного) списка. В определении pthread_cleanup_pop() этот блок закрывается. Так что даже из соображений синтаксической корректности вызовы pthread_cleanup_push() и pthread_cleanup_pop() должны быть парными и располагаться в одном блоке, но более существенной нам представляется корректность семантическая.



© 2003-2007 INTUIT.ru. Все права защищены.

Содержание раздела