Передача и прием сообщений в реальном времени
Средства локальной передачи и приема сообщений присутствуют, вероятно, во всех или почти во всех операционных системах. Во многих версиях ОС Unix представлено по нескольку разновидностей подобных средств. Следуя за исторически сложившимися реализациями, стандарт POSIX-2001 предусматривает два вида очередей сообщений и обслуживающих их функций. Один из них был рассмотрен нами в курсе [1]. Здесь мы опишем вторую разновидность, предназначенную для использования в системах реального времени.
Описываемые очереди сообщений являются общесистемными. Они доступны по именам, которые могут быть маршрутными именами файлов.
Над очередями сообщений определены следующие группы функций:
- открытие очереди;
- отправка сообщения в очередь;
- прием (синхронный или асинхронный) сообщения из очереди;
- изменение атрибутов очереди;
- регистрация на получение уведомления о появлении сообщения в очереди;
- закрытие очереди;
- удаление очереди.
Одну очередь могут открыть несколько посылающих и/или принимающих сообщения процессов. При открытии может производиться контроль прав доступа.
Для каждой очереди задается фиксированная верхняя граница размера сообщений, которые могут быть в эту очередь отправлены.
На порядок приема влияет имеющийся механизм приоритетов сообщений.
Процесс может получать асинхронные уведомления о том, что в очереди появилось сообщение.
Для открытия очереди служит функция mq_open() (см. листинг 4.1), которая, по аналогии с файлами, создает описание открытой очереди и ссылающийся на него дескриптор типа mqd_t, возвращаемый в качестве нормального результата.
#include <mqueue.h> mqd_t mq_open ( const char *name, int oflag, ...);
Листинг 4.1. Описание функции mq_open(). (html, txt)
Трактовка имени очереди (аргумент name) зависит от реализации. Оговаривается только, что оно должна подчиняться ограничениям, налагаемым на маршрутные имена, и что если его первым символом является '/', то процессы, вызывающие mq_open() с одинаковыми значениями name, ссылаются на одну и ту же очередь (если, конечно, ее не удаляли).
При реализации дескрипторов очередей сообщений могут применяться файловые дескрипторы. В таком случае приложение может открыть не менее OPEN_MAX файлов и очередей.
Аргумент oflag специфицирует запрашиваемые виды доступа к очереди: на прием (чтение) и отправку (запись). Контроль прав доступа для очередей сообщений производится так же, как и для файлов.
В целом набор флагов и их трактовка для очередей сообщений те же, что и для файлов: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK.
Если установлен флаг O_CREAT, то при вызове функции mq_open() необходимо задать два дополнительных аргумента: режим доступа (тип mode_t) и указатель на атрибутный объект (тип struct mq_attr *) создаваемой очереди сообщений.
Согласно стандарту POSIX-2001, структура типа mq_attr, описанная в заголовочном файле <mqueue.h>, содержит по крайней мере следующие поля.
long mq_flags; /* Флаги очереди сообщений */ long mq_maxmsg; /* Максимальное число сообщений в очереди */ long mq_msgsize; /* Максимальный размер сообщения в очереди */ long mq_curmsgs; /* Текущее число сообщений в очереди */
Для опроса и/или установки атрибутов очереди служат функции mq_getattr() и mq_setattr() (см. листинг 4.2). Впрочем, про установку атрибутов сказано, пожалуй слишком сильно: посредством вызова 2 можно изменить лишь состояние флага 2 (и, возможно, некоторых других флагов, зависящих от реализации).
#include <mqueue.h>
int mq_getattr ( mqd_t mqdes, struct mq_attr *mqstat);
int mq_setattr (mqd_t mqdes, const struct mq_attr *restrict mqstat, struct mq_attr *restrict omqstat);
Листинг 4.2. Описание функций mq_getattr() и mq_setattr(). (html, txt)
После того, как процесс завершил работу с очередью сообщений, соответствующий дескриптор следует закрыть, воспользовавшись функцией mq_close() (см. листинг 4.3).
#include <mqueue.h> int mq_close (mqd_t mqdes);
Листинг 4.3. Описание функции mq_close(). (html, txt)
Если очередь сообщений стала совсем ненужной, ее можно удалить с помощью функции mq_unlink() (см.листинг 4.4).
#include <mqueue.h> int mq_unlink (const char *name);
Листинг 4.4. Описание функции mq_unlink(). (html, txt)
При реализации дескрипторов очередей сообщений могут применяться файловые дескрипторы. В таком случае приложение может открыть не менее OPEN_MAX файлов и очередей.
Аргумент oflag специфицирует запрашиваемые виды доступа к очереди: на прием (чтение) и отправку (запись). Контроль прав доступа для очередей сообщений производится так же, как и для файлов.
В целом набор флагов и их трактовка для очередей сообщений те же, что и для файлов: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK.
Если установлен флаг O_CREAT, то при вызове функции mq_open() необходимо задать два дополнительных аргумента: режим доступа (тип mode_t) и указатель на атрибутный объект (тип struct mq_attr *) создаваемой очереди сообщений.
Согласно стандарту POSIX-2001, структура типа mq_attr, описанная в заголовочном файле <mqueue.h>, содержит по крайней мере следующие поля.
long mq_flags; /* Флаги очереди сообщений */ long mq_maxmsg; /* Максимальное число сообщений в очереди */ long mq_msgsize; /* Максимальный размер сообщения в очереди */ long mq_curmsgs; /* Текущее число сообщений в очереди */
Для опроса и/или установки атрибутов очереди служат функции mq_getattr() и mq_setattr() (см. листинг 4.2). Впрочем, про установку атрибутов сказано, пожалуй слишком сильно: посредством вызова 2 можно изменить лишь состояние флага 2 (и, возможно, некоторых других флагов, зависящих от реализации).
#include <mqueue.h>
int mq_getattr ( mqd_t mqdes, struct mq_attr *mqstat);
int mq_setattr (mqd_t mqdes, const struct mq_attr *restrict mqstat, struct mq_attr *restrict omqstat);
Листинг 4.2. Описание функций mq_getattr() и mq_setattr().
После того, как процесс завершил работу с очередью сообщений, соответствующий дескриптор следует закрыть, воспользовавшись функцией mq_close() (см. листинг 4.3).
#include <mqueue.h> int mq_close (mqd_t mqdes);
Листинг 4.3. Описание функции mq_close().
Если очередь сообщений стала совсем ненужной, ее можно удалить с помощью функции mq_unlink() (см. листинг 4.4).
#include <mqueue.h> int mq_unlink (const char *name);
Листинг 4.4. Описание функции mq_unlink().
Переходя к описанию содержательных действий с очередями сообщений, укажем, что отправка осуществляется функциями mq_send() и mq_timedsend() (см. листинги 4.5 и 4.6).
#include <mqueue.h> int mq_send (mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
Листинг 4.5. Описание функции mq_send(). #include <mqueue.h> #include <time.h> int mq_timedsend ( mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abstime);
Листинг 4.6. Описание функции mq_timedsend().
Более точно: функции mq_send() и mq_timedsend() помещают сообщение из msg_len байт, на которое указывает аргумент msg_ptr, в очередь, заданную дескриптором mqdes (если она не полна), в соответствии с приоритетом msg_prio (большим значениям msg_prio соответствует более высокий приоритет сообщения; допустимый диапазон - от 0 до MQ_PRIO_MAX - 1).
Если очередь полна, а флаг O_NONBLOCK не установлен, вызов mq_send() блокируется до появления свободного места. Функция mq_timedsend() в таких случаях контролирует длительность ожидания (до заданного аргументом abstime абсолютного момента времени по часам CLOCK_REALTIME).
Для извлечения (разумеется, с удалением) сообщений из очереди служат функции mq_receive() и mq_timedreceive() (см. листинги 4.7 и 4.8). Извлекается самое старое из сообщений с самым высоким приоритетом и помещается в буфер, на который указывает аргумент msg_ptr. Если размер буфера (значение аргумента msg_len) меньше атрибута очереди mq_msgsize, вызов завершается неудачей. Если значение msg_prio_ptr отлично от NULL, в указуемый объект помещается приоритет принятого сообщения.
#include <mqueue.h> ssize_t mq_receive ( mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio_ptr);
Листинг 4.7. Описание функции mq_receive().
#include <mqueue.h> #include <time.h> ssize_t mq_timedreceive ( mqd_t mqdes, char *restrict msg_ptr, size_t msg_len, unsigned *restrict msg_prio_ptr, const struct timespec *restrict abstime);
Листинг 4.8. Описание функции mq_timedreceive().
Если очередь пуста, а флаг O_NONBLOCK не установлен, вызов mq_receive() блокируется до появления сообщения. Функция mq_timedreceive() в таких случаях контролирует длительность ожидания.
Нормальным результатом обеих функций является размер в байтах извлеченного из очереди сообщения. Как всегда, признаком ошибки служит -1.
Посредством функции mq_notify() (см. листинг 4.9) процесс может зарегистрироваться на получение уведомления о том, что в очередь, бывшую до этого пустой, поступило сообщение.
#include <mqueue.h> int mq_notify (mqd_t mqdes, const struct sigevent *notification);
Листинг 4.9. Описание функции mq_notify().
Для уведомлений используется механизм сигналов реального времени. В каждый момент времени только один процесс может быть зарегистрированным, а сама регистрация является одноразовой: после поступления уведомления она отменяется. Процесс может добровольно отменить регистрацию, если передаст NULL в качестве значения аргумента notification.
Если, наряду с зарегистрированным процессом, имеется поток управления, ожидающий сообщения в вызове mq_receive() или mq_timedreceive(), поступившее сообщение достанется потоку, а процесс не получит никакого уведомления, как если бы очередь осталась пустой. Это очень по-человечески: живое стояние в очереди всегда ценилось выше всяких списков и уведомлений.
На наш взгляд, возможность получать уведомления о том, что очередь сообщений стала непустой, является скорее заплатой, призванной скрыть неполноту функциональности poll() и select(), чем полноценным, практически полезным средством. В стандартизованном интерфейсе отсутствует концептуальная целостность - из соображений симметрии необходимы уведомления о том, что очередь стала неполной и в нее можно отправлять сообщения. Далее, состояние очереди может меняться параллельно с регистрацией и/или получением уведомления, и нет никаких средств, чтобы сделать соответствующую транзакцию атомарной. В результате остается неясным, когда процесс получит уведомление (и получит ли он его вообще), какое сообщение он примет после получения уведомления (и будет ли что принимать) и т.п., хотя, с другой стороны, устраивать конкуренцию за сообщения тоже не обязательно.
Поучительно сопоставить два вида очередей сообщений - только что описанные и те, что были представлены в курсе [1]. Сразу видно, что первые устроены проще и, следовательно, могут быть реализованы эффективнее. Во-первых, структура типа mq_attr гораздо компактнее, чем msqid_ds. Нет и речи о хранении времени последних операций и идентификаторов процессов, их выполнивших. Во-вторых, упрощены производимые проверки. Контролируется максимальный размер одного сообщения, а не суммарный размер сообщений в очереди. Права доступа проверяются при открытии, а не при каждой операции. Только наличие приоритетов вносит элемент сложности, сопоставимый с обработкой типов сообщений, но приоритеты так или иначе должны быть реализованы - либо на уровне ОС, либо в приложении, поэтому лучше этот аспект стандартизовать и поручить операционной системе.
Примером применения очередей сообщений послужит программа, вычисляющая взвешенную сумму целых чисел, генерируемых и отправляемых множеством потоков управления (см. листинг 4.10).
/* * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа принимает сообщения */ /* и суммирует содержащиеся в них целые числа. */ /* Массивы случайных целых чисел */ /* генерируют несколько потоков управления. */ /* Каждый поток использует свою очередь сообщений.*/ /* Используется режим без блокировки, */ /* с уведомлениями о появлении сообщений в очереди*/ /* * * * * * * * * * * * * * * * * * * * * * * * */
#include <stdlib.h> #include <stdio.h> #include <limits.h> #include <fcntl.h> #include <mqueue.h> #include <signal.h> #include <pthread.h> #include <errno.h>
/* Число потоков управления, порождающих случайные числа */ #define PT_N MQ_OPEN_MAX
/* Число сообщений, генерируемых каждым потоком управления */ #define MSG_N 128
/* Длина имени очереди сообщений */ #define MQ_NAME_LENGTH PATH_MAX
/* Количество целых чисел в одном сообщении */ #define MSG_INT_SIZE 32
/* Максимальное число сообщений в очереди */ #define MQ_MSGS_MAX 16
/* Приоритет порождаемого сообщения */ #define prio_rnd (rand () % MQ_PRIO_MAX)
/* Номер сигнала, используемого для уведомлений */ #define SIG_MQ_NOTIFY SIGRTMIN
/* Массив идентификаторов очередей сообщений */ static mqd_t mq_des [PT_N];
/* Массив структур для задания уведомлений */ /* о поступлении сообщений в очереди */ static struct sigevent evnt_mq_notify [PT_N];
/* Мьютекс, используемый для синхронизации */ /* доступа к переменной sum */ static pthread_mutex_t sm_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Общий результат суммирования */ static int sum = 0;
/* * * * * * * * * * * * * * * * * * * * * * * */ /* Функция, вызываемая при получении уведомления*/ /* о том, что очередь сообщений стала непустой. */ /* Номер очереди передается как аргумент */ /* * * * * * * * * * * * * * * * * * * * * * * */ static void msg_arrvd (union sigval pt_nm) { int msg_buf [MSG_INT_SIZE]; mqd_t mqdes; unsigned int msg_prio; int msg_sum = 0; ssize_t msg_size; int i;
mqdes = mq_des [pt_nm.sival_int];
/* Примем и обработаем имеющиеся сообщения, */ /* а затем снова зарегистрируемся на получение */ /* такого же уведомления */ while ((msg_size = mq_receive (mqdes, (char *) msg_buf, MSG_INT_SIZE * sizeof (int), &msg_prio)) > 0) { for (i = 0; i < (msg_size / (signed int) sizeof (int)); i++) { msg_sum += msg_buf [i]; } msg_sum *= msg_prio; }
if ((errno = pthread_mutex_lock (&sm_mutex)) != 0) { perror ("PTHREAD_MUTEX_LOCK"); } sum += msg_sum; if ((errno = pthread_mutex_unlock (&sm_mutex)) != 0) { perror ("PTHREAD_MUTEX_UNLOCK"); }
if (mq_notify (mqdes, &evnt_mq_notify [pt_nm.sival_int]) != 0) { perror ("MQ_NOTIFY"); } }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока, генерирующего числа и */ /* посылающего сообщения. */ /* Аргумент - номер очереди сообщений */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_sender (void *pt_nm) {
int msg_buf [MSG_INT_SIZE]; int i, j;
/* Сформируем и пошлем заданное число сообщений */ /* (проверяя, не переполнилась ли очередь)*/ for (j = 0; j < MSG_N; j++) { for (i = 0; i < MSG_INT_SIZE; i++) { msg_buf [i] = rand (); } if (mq_send (mq_des [(int) pt_nm], (char *) msg_buf, MSG_INT_SIZE * sizeof (int), prio_rnd) != 0) { perror ("MQ_SEND"); return (NULL); } }
return (NULL); }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Создание очереди сообщений, */ /* регистрация на получение уведомлений, */ /* создание и ожидание завершения потоков управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { /* Массив идентификаторов порождаемых */ /* потоков. Эти потоки будут */ pthread_t pt_mqs [PT_N]; /* генерировать сообщения*/ /* Массив для генерации и хранения */ /* имен очередей сообщений */ char mq_name [PT_N] [MQ_NAME_LENGTH]; struct mq_attr mqattrs; /* Атрибуты создаваемых очередей */ int i;
for (i = 0; i < PT_N; i++) { /* Создадим очереди сообщений */ /* и зарегистрируемся на получение уведомлений */ sprintf (mq_name [i], "g%d", i);
mqattrs.mq_flags = O_NONBLOCK; mqattrs.mq_maxmsg = MQ_MSGS_MAX; mqattrs.mq_msgsize = MSG_INT_SIZE * sizeof (int); mqattrs.mq_curmsgs = 0; if ((mq_des [i] = mq_open (mq_name [i], O_RDWR | O_CREAT | O_NONBLOCK, 0777, &mqattrs)) == (mqd_t) (-1)) { perror ("MQ_OPEN"); return (-1); }
/* Сформируем структуру evnt_mq_notify */ evnt_mq_notify [i].sigev_notify = SIGEV_THREAD; evnt_mq_notify [i].sigev_signo = SIG_MQ_NOTIFY; evnt_mq_notify [i].sigev_value.sival_int = i; evnt_mq_notify [i].sigev_notify_function = msg_arrvd; evnt_mq_notify [i].sigev_notify_attributes = NULL; if (mq_notify (mq_des [i], &evnt_mq_notify [i]) != 0) { perror ("MQ_NOTIFY_MAIN"); return (-1); }
/* Создадим потоки управления */ if ((errno = pthread_create (&pt_mqs [i], NULL, start_sender, (void *) i)) != 0) { perror ("PTHREAD_CREATE"); return (i); } } /* for */
/* Ожидание завершения */ for (i = 0; i < PT_N; i++) { (void) pthread_join (pt_mqs [i], NULL); }
/* Закроем дескрипторы и удалим очереди */ for (i = 0; i < PT_N; i++) { (void) mq_close (mq_des [i]); (void) mq_unlink (mq_name [i]); }
printf ("Общая сумма: %d\n", sum);
if ((errno = pthread_mutex_destroy (&sm_mutex)) != 0) { perror ("PTHREAD_MUTEX_DESTROY"); return (errno); }
return 0; }
Листинг 4.10. Пример программы, использующей очереди сообщений.
Обратим внимание на то, что если в программе для отправки и приема сообщений используются буфера одного размера, он должен равняться значению атрибута mq_msgsize, которое задается при создании очереди. Отметим также применение режима без блокировки, что важно для приложений реального времени.