mqd_t mq_open
#include <mqueue.h> mqd_t mq_open ( const char *name, int oflag, ...); |
Листинг 4.1. Описание функции mq_open(). |
Закрыть окно |
#include <mqueue.h> int mq_getattr ( mqd_t mqdes, struct mq_attr *mqstat); int mq_setattr ( mqd_t mqdes, const struct mq_attr *restrict mqstat, struct mq_attr *restrict omqstat); |
Листинг 4.2. Описание функций mq_getattr() и mq_setattr(). |
Закрыть окно |
#include <mqueue.h> int mq_close (mqd_t mqdes); |
Листинг 4.3. Описание функции mq_close(). |
Закрыть окно |
#include <mqueue.h> int mq_unlink (const char *name); |
Листинг 4.4. Описание функции mq_unlink(). |
Закрыть окно |
#include <mqueue.h> int mq_send ( mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); |
Листинг 4.5. Описание функции mq_send(). |
Закрыть окно |
#include <mqueue.h> #include <time.h> int mq_timedsend ( mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abstime); |
Листинг 4.6. Описание функции mq_timedsend(). |
Закрыть окно |
#include <mqueue.h> ssize_t mq_receive ( mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio_ptr); |
Листинг 4.7. Описание функции mq_receive(). |
Закрыть окно |
#include <mqueue.h> #include <time.h> ssize_t mq_timedreceive ( mqd_t mqdes, char * restrict msg_ptr, size_t msg_len, unsigned *restrict msg_prio_ptr, const struct timespec *restrict abstime); |
Листинг 4.8. Описание функции mq_timedreceive(). |
Закрыть окно |
#include <mqueue.h> int mq_notify ( mqd_t mqdes, const struct sigevent *notification); |
Листинг 4.9. Описание функции mq_notify(). |
Закрыть окно |
/* * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа принимает сообщения */ /* и суммирует содержащиеся в них целые числа. */ /* Массивы случайных целых чисел */ /* генерируют несколько потоков управления. */ /* Каждый поток использует свою очередь сообщений.*/ /* Используется режим без блокировки, */ /* с уведомлениями о появлении сообщений в очереди*/ /* * * * * * * * * * * * * * * * * * * * * * * * */ #include <stdlib.h> #include <stdio.h> #include <limits.h> #include <fcntl.h> #include <mqueue.h> #include <signal.h> #include <pthread.h> #include <errno.h> /* Число потоков управления, порождающих случайные числа */ #define PT_N MQ_OPEN_MAX /* Число сообщений, генерируемых каждым потоком управления */ #define MSG_N 128 /* Длина имени очереди сообщений */ #define MQ_NAME_LENGTH PATH_MAX /* Количество целых чисел в одном сообщении */ #define MSG_INT_SIZE 32 /* Максимальное число сообщений в очереди */ #define MQ_MSGS_MAX 16 /* Приоритет порождаемого сообщения */ #define prio_rnd (rand () % MQ_PRIO_MAX) /* Номер сигнала, используемого для уведомлений */ #define SIG_MQ_NOTIFY SIGRTMIN /* Массив идентификаторов очередей сообщений */ static mqd_t mq_des [PT_N]; /* Массив структур для задания уведомлений */ /* о поступлении сообщений в очереди */ static struct sigevent evnt_mq_notify [PT_N]; /* Мьютекс, используемый для синхронизации */ /* доступа к переменной sum */ static pthread_mutex_t sm_mutex = PTHREAD_MUTEX_INITIALIZER; /* Общий результат суммирования */ static int sum = 0; /* * * * * * * * * * * * * * * * * * * * * * * */ /* Функция, вызываемая при получении уведомления*/ /* о том, что очередь сообщений стала непустой. */ /* Номер очереди передается как аргумент */ /* * * * * * * * * * * * * * * * * * * * * * * */ static void msg_arrvd (union sigval pt_nm) { int msg_buf [MSG_INT_SIZE]; mqd_t mqdes; unsigned int msg_prio; int msg_sum = 0; ssize_t msg_size; int i; mqdes = mq_des [pt_nm.sival_int]; /* Примем и обработаем имеющиеся сообщения, */ /* а затем снова зарегистрируемся на получение */ /* такого же уведомления */ while ((msg_size = mq_receive (mqdes, (char *) msg_buf, MSG_INT_SIZE * sizeof (int), &msg_prio)) > 0) { for (i = 0; i < (msg_size / (signed int) sizeof (int)); i++) { msg_sum += msg_buf [i]; } msg_sum *= msg_prio; } if ((errno = pthread_mutex_lock (&sm_mutex)) != 0) { perror ("PTHREAD_MUTEX_LOCK"); } sum += msg_sum; if ((errno = pthread_mutex_unlock (&sm_mutex)) != 0) { perror ("PTHREAD_MUTEX_UNLOCK"); } if (mq_notify (mqdes, &evnt_mq_notify [pt_nm.sival_int]) != 0) { perror ("MQ_NOTIFY"); } } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока, генерирующего числа и */ /* посылающего сообщения. */ /* Аргумент - номер очереди сообщений */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_sender (void *pt_nm) { int msg_buf [MSG_INT_SIZE]; int i, j; /* Сформируем и пошлем заданное число сообщений */ /* (проверяя, не переполнилась ли очередь)*/ for (j = 0; j < MSG_N; j++) { for (i = 0; i < MSG_INT_SIZE; i++) { msg_buf [i] = rand (); } if (mq_send (mq_des [(int) pt_nm], (char *) msg_buf, MSG_INT_SIZE * sizeof (int), prio_rnd) != 0) { perror ("MQ_SEND"); return (NULL); } } return (NULL); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Создание очереди сообщений, */ /* регистрация на получение уведомлений, */ /* создание и ожидание завершения потоков управления */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { /* Массив идентификаторов порождаемых */ /* потоков. Эти потоки будут */ pthread_t pt_mqs [PT_N]; /* генерировать сообщения*/ /* Массив для генерации и хранения */ /* имен очередей сообщений */ char mq_name [PT_N] [MQ_NAME_LENGTH]; struct mq_attr mqattrs; /* Атрибуты создаваемых очередей */ int i; for (i = 0; i < PT_N; i++) { /* Создадим очереди сообщений */ /* и зарегистрируемся на получение уведомлений */ sprintf (mq_name [i], "g%d", i); mqattrs.mq_flags = O_NONBLOCK; mqattrs.mq_maxmsg = MQ_MSGS_MAX; mqattrs.mq_msgsize = MSG_INT_SIZE * sizeof (int); mqattrs.mq_curmsgs = 0; if ((mq_des [i] = mq_open (mq_name [i], O_RDWR | O_CREAT | O_NONBLOCK, 0777, &mqattrs)) == (mqd_t) (-1)) { perror ("MQ_OPEN"); return (-1); } /* Сформируем структуру evnt_mq_notify */ evnt_mq_notify [i].sigev_notify = SIGEV_THREAD; evnt_mq_notify [i].sigev_signo = SIG_MQ_NOTIFY; evnt_mq_notify [i].sigev_value.sival_int = i; evnt_mq_notify [i].sigev_notify_function = msg_arrvd; evnt_mq_notify [i].sigev_notify_attributes = NULL; if (mq_notify (mq_des [i], &evnt_mq_notify [i]) != 0) { perror ("MQ_NOTIFY_MAIN"); return (-1); } /* Создадим потоки управления */ if ((errno = pthread_create (&pt_mqs [i], NULL, start_sender, (void *) i)) != 0) { perror ("PTHREAD_CREATE"); return (i); } } /* for */ /* Ожидание завершения */ for (i = 0; i < PT_N; i++) { (void) pthread_join (pt_mqs [i], NULL); } /* Закроем дескрипторы и удалим очереди */ for (i = 0; i < PT_N; i++) { (void) mq_close (mq_des [i]); (void) mq_unlink (mq_name [i]); } printf ("Общая сумма: %d\n", sum); if ((errno = pthread_mutex_destroy (&sm_mutex)) != 0) { perror ("PTHREAD_MUTEX_DESTROY"); return (errno); } return 0; } |
Листинг 4.10. Пример программы, использующей очереди сообщений. |
Закрыть окно |
#include <semaphore.h> sem_t *sem_open (const char *name, int oflag, ...); int sem_init (sem_t *sem, int pshared, unsigned value); |
Листинг 4.11. Описание функций sem_open() и sem_init(). |
Закрыть окно |
#include <semaphore.h> int sem_close (sem_t *sem); int sem_unlink (const char *name); int sem_destroy (sem_t *sem); |
Листинг 4.12. Описание функций закрытия и ликвидации семафоров. |
Закрыть окно |
#include <semaphore.h> int sem_wait (sem_t *sem); int sem_trywait (sem_t *sem); |
Листинг 4.13. Описание функций захвата семафоров. |
Закрыть окно |
#include <semaphore.h> #include <time.h> int sem_timedwait (sem_t * restrict sem, const struct timespec *restrict abstime); |
Листинг 4.14. Описание функции захвата семафора с контролем времени ожидания. |
Закрыть окно |
#include <semaphore.h> int sem_post (sem_t *sem); |
Листинг 4.15. Описание функции sem_post(). |
Закрыть окно |
#include <semaphore.h> int sem_getvalue (sem_t * restrict sem, int *restrict sval); |
Листинг 4.16. Описание функции sem_getvalue(). |
Закрыть окно |
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа реализует взаимодействие */ /* поставщик/потребитель (писатель/читатель). */ /* Поставщик генерирует случайные целые числа и помещает их буфер */ /* на один элемент, потребитель извлекает их оттуда и суммирует. */ /* Для синхронизации используются неименованные семафоры*/ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <semaphore.h> #include <pthread.h> #include <errno.h> /* Буфер для хранения генерируемых данных */ static int my_buf; /* Семафор, разрешающий записывать в буфер новые данные */ static sem_t w_sem; /* Семафор, разрешающий читать данные из буфера */ static sem_t r_sem; /* Общий результат суммирования */ static int sum = 0; /* * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока, генерирующего числа */ /* * * * * * * * * * * * * * * * * * * * * * * */ void *start_writer (void *dummy) { while (sem_wait (&w_sem) == 0) { my_buf = rand (); if (sem_post (&r_sem) != 0) { perror ("SEM_POST-R"); return (NULL); } } return (NULL); } /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока, читающего и суммирующего числа */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ void *start_reader (void *dummy) { while (sem_wait (&r_sem) == 0) { sum += my_buf; if (sem_post (&w_sem) != 0) { perror ("SEM_POST-W"); return (NULL); } } return (NULL); } /* * * * * * * * * * * * * * * * * * * * * * * */ /* Инициализация семафоров, */ /* создание и терминирование потоков управления*/ /* * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t w_ptid;/* Идентификатор потока-писателя */ pthread_t r_ptid;/* Идентификатор потока-читателя */ /* Инициализируем семафоры. */ /* Семафор записи будет свободен,*/ /* семафор чтения - захвачен */ if (sem_init (&w_sem, 0, 1) == -1) { perror ("SEM_INIT-W"); return (-1); } if (sem_init (&r_sem, 0, 0) == -1) { perror ("SEM_INIT-R"); return (-1); } /* Создадим потоки управления - писателя и читателя */ if ((errno = pthread_create (&w_ptid, NULL, start_writer, NULL)) != 0) { perror ("PTHREAD_CREATE-W"); return (errno); } if ((errno = pthread_create (&r_ptid, NULL, start_reader, NULL)) != 0) { perror ("PTHREAD_CREATE-R"); return (errno); } /* Дадим потокам повыполняться */ sleep (10); /* Терминируем потоки */ (void) pthread_cancel (w_ptid); (void) pthread_cancel (r_ptid); /* Дождемся завершения потоков */ (void) pthread_join (w_ptid, NULL); (void) pthread_join (r_ptid, NULL); /* Ликвидируем семафоры */ if (sem_destroy (&w_sem) != 0) { perror ("SEM_DESTROY-W"); return (-1); } if (sem_destroy (&r_sem) != 0) { perror ("SEM_DESTROY-R"); return (-1); } printf ("Сумма сгенерированных чисел: %d\n", sum); return 0; } |
Листинг 4.17. Пример программы, реализующей взаимодействие поставщик/потребитель. |
Закрыть окно |
/* * * * * * * * * * * * * * * * * * * * * * * */ /* Программа реализует взаимодействие */ /* поставщик/потребитель (писатель/читатель). */ /* Поставщик генерирует случайные целые числа */ /* и помещает их в кольцевой буфер, */ /* потребитель извлекает их оттуда и суммирует.*/ /* Для синхронизации используются */ /* неименованные семафоры */ /* * * * * * * * * * * * * * * * * * * * * * * */ #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <semaphore.h> #include <pthread.h> #include <errno.h> /* Буфер для хранения генерируемых данных */ static int my_buf [BUFSIZ]; /* Индекс, по которому можно записать очередной элемент */ static int w_ind = 0; /* Индекс, по которому можно прочитать очередной элемент */ static int r_ind = 0; /* Семафор, разрешающий записывать в буфер новые данные */ static sem_t w_sem; /* Семафор, разрешающий читать данные из буфера */ static sem_t r_sem; /* Общий результат суммирования */ static int sum = 0; /* * * * * * * * * * * * * * * * * * * * * * * */ /* Стартовая функция потока, генерирующего числа*/ /* * * * * * * * * * * * * * * * * * * * * * * */ void *start_writer (void *dummy) { while (sem_wait (&w_sem) == 0) { my_buf [w_ind] = rand (); w_ind = (w_ind + 1) % BUFSIZ; if (sem_post (&r_sem) != 0) { perror ("SEM_POST-R"); return (NULL); } } return (NULL); } /* * * * * * * * * * * * * * * */ /* Стартовая функция потока, */ /* читающего и суммирующего числа*/ /* * * * * * * * * * * * * * * */ void *start_reader (void *dummy) { while (sem_wait (&r_sem) == 0) { sum += my_buf [r_ind]; r_ind = (r_ind + 1) % BUFSIZ; if (sem_post (&w_sem) != 0) { perror ("SEM_POST-W"); return (NULL); } } return (NULL); } /* * * * * * * * * * * * * * * * * * * * * * * */ /* Инициализация семафоров, */ /* создание и терминирование потоков управления*/ /* * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { pthread_t w_ptid;/* Идентификатор потока-писателя */ pthread_t r_ptid;/* Идентификатор потока-читателя */ /* Инициализируем семафоры. */ /* Семафор записи будет свободен,*/ /* разрешая заполнить весь буфер,*/ /* семафор чтения - захвачен */ if (sem_init (&w_sem, 0, BUFSIZ) == -1) { perror ("SEM_INIT-W"); return (-1); } if (sem_init (&r_sem, 0, 0) == -1) { perror ("SEM_INIT-R"); return (-1); } /* Создадим потоки управления - писателя и читателя */ if ((errno = pthread_create (&w_ptid, NULL, start_writer, NULL)) != 0) { perror ("PTHREAD_CREATE-W"); return (errno); } if ((errno = pthread_create (&r_ptid, NULL, start_reader, NULL)) != 0) { perror ("PTHREAD_CREATE-R"); return (errno); } /* Дадим потокам повыполняться */ sleep (10); /* Терминируем потоки */ (void) pthread_cancel (w_ptid); (void) pthread_cancel (r_ptid); /* Дождемся завершения потоков */ (void) pthread_join (w_ptid, NULL); (void) pthread_join (r_ptid, NULL); /* Ликвидируем семафоры */ if (sem_destroy (&w_sem) != 0) { perror ("SEM_DESTROY-W"); return (-1); } if (sem_destroy (&r_sem) != 0) { perror ("SEM_DESTROY-R"); return (-1); } printf ("Сумма сгенерированных чисел: %d\n", sum); return 0; } |
Листинг 4.18. Пример программы, реализующей взаимодействие поставщик/потребитель с помощью целочисленных семафоров. |
Закрыть окно |
/* Обедающие философы. Многопотоковая реализация с помощью семафоров. Запуск: mudrecSem [-a | -p | -I] [-t число_секунд] имя_философа ... Опции: -t число_секунд - сколько секунд моделируется Стратегии захвата вилок: -a - сначала захватывается вилка с меньшим номером; -p - сначала захватывается нечетная вилка; -I - некорректная (но эффективная) интеллигентная стратегия: во время ожидания уже захваченная вилка кладется. Пример запуска: mudrecSem -p -t 300 A B C D E F G H I J K L M N\ O P Q R S T U V W X Y Z */ static char rcsid[] __attribute__((unused)) = \ "$Id: mudrecSem.c,v 1.2 2004/03/18 10:28:38 sambor Exp $"; #include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <semaphore.h> #include <signal.h> #include <string.h> #include <fcntl.h> #include <limits.h> #include <errno.h> #define max(a,b) ((a)>(b)?(a):(b)) #define min(a,b) ((a)>(b)?(b):(a)) struct mudrec { char *name; int left_fork, right_fork; int eat_time, wait_time, think_time, max_wait_time; int count; pthread_t thread; int private_pFdIn; } *kafedra; /* Глобальные счетчики и логические переменные */ int Stop = 0; /* Признак конца обеда */ /* Различные дескрипторы */ int protokol [2] = {-1, -1}; #define pFdIn (protokol [1]) #define pFdOut (protokol [0]) /* Массив семафоров для синхронизации доступа к вилкам */ sem_t *semFork; /* Разные алгоритмы захвата вилок */ static void get_forks_simple (struct mudrec *this); static void get_forks_odd (struct mudrec *this); static void get_forks_maybe_infinit_time (struct mudrec *this); /* Используемый метод захвата вилок */ void (*get_forks) (struct mudrec *this) = get_forks_simple; /* Возвращение вилок */ static void put_forks (struct mudrec *this); /* * Потоки-философы */ void *filosof (void *arg) { struct mudrec *this = arg; char buffer [LINE_MAX]; int bytes; int private_pFdIn = this->private_pFdIn; while (!Stop) { /* Пора подкрепиться */ { int wait_time, tm = time (NULL); sprintf (buffer, "%s: хочет есть\n", this->name); bytes = write (private_pFdIn, buffer, strlen (buffer)); (*get_forks) (this); wait_time = time (NULL) - tm; this->wait_time += wait_time; this->max_wait_time = max (wait_time, this->max_wait_time); sprintf (buffer,"%s: ждал вилок %d сек\n", this->name, wait_time); bytes = write (private_pFdIn, buffer, strlen (buffer)); } /* Может, обед уже закончился? */ if (Stop) { put_forks (this); break; } /* Ест */ { int eat_time = rand () % 20 + 1; sleep (eat_time); this->eat_time += eat_time; this->count++; sprintf (buffer,"%s: ел %d сек\n", this->name, eat_time); bytes = write (private_pFdIn, buffer, strlen (buffer)); } /* Отдает вилки */ put_forks (this); if (Stop) break; /* Размышляет */ { int think_time = rand () % 10 + 1; sleep (think_time); this->think_time += think_time; } } /* while (!Stop) */ sprintf (buffer,"%s: уходит\n", this->name); bytes = write (private_pFdIn, buffer, strlen (buffer)); close (private_pFdIn); return (NULL); } /* Поток-философ */ /* Кладет вилки одну за другой */ static void put_forks (struct mudrec *this) { sem_post (&semFork [this->left_fork - 1]); sem_post (&semFork [this->right_fork - 1]); } /* Берет вилки по очереди в порядке номеров */ static void get_forks_simple (struct mudrec *this) { int first = min (this->left_fork, this->right_fork); int last = max (this->left_fork, this->right_fork); sem_wait (&semFork [first - 1]); sem_wait (&semFork [last - 1]); } /* Берем сначала нечетную вилку */ /* (если обе нечетные - то с большим номером) */ static void get_forks_odd (struct mudrec *this) { int left = this->left_fork, right = this->right_fork; int first; int last; if ((left & 1) > (right & 1)) { first = left; last = right; } else if ((left & 1) < (right & 1)) { first = right; last = left; } else { first = max (left, right); last = min (left, right); } sem_wait (&semFork [first - 1]); sem_wait (&semFork [last - 1]); } /* Берем вилки по очереди, в произвольном порядке. * Но если вторая вилка не берется сразу, то кладем первую. * То есть философ не расходует вилочное время впустую. */ static void get_forks_maybe_infinit_time (struct mudrec *this) { int left = this->left_fork, right = this->right_fork; for (;;) { sem_wait (&semFork [left - 1]); if (0 == sem_trywait (&semFork [right - 1])) return; sem_post (&semFork [left - 1]); sem_wait (&semFork [right - 1]); if (0 == sem_trywait (&semFork [left - 1])) return; sem_post (&semFork [right - 1]); } } /* Мелкие служебные функции */ static void stop (int dummy) { Stop = 1; } static void usage (char name []) { fprintf (stderr, "Использование: %s [-a | -p | -I] [-t число_секунд] " "имя_философа ...\n", name); exit (1); } /* Точка входа демонстрационной программы */ int main (int argc, char *argv []) { char buffer [LINE_MAX], *p; int i, n, c; int open_room_time = 300; int nMudr; struct sigaction sact; while ((c = getopt (argc, argv, "apIt:")) != -1) { switch (c) { case 'a': get_forks = get_forks_simple; break; case 'p': get_forks = get_forks_odd; break; case 'I': get_forks = get_forks_maybe_infinit_time; break; case 't': open_room_time = strtol (optarg, &p, 0); if (optarg [0] == 0 || *p != 0) usage (argv [0]); break; default : usage (argv [0]); } } nMudr = argc - optind; if (nMudr < 2) usage (argv [0]); /* Меньше двух */ /* философов неинтересно ... */ /* Создание канала для протокола обработки событий */ pipe (protokol); kafedra = calloc (sizeof (struct mudrec), nMudr); /* Зачисление на кафедру */ for (i = 0; i < nMudr; i++, optind++) { kafedra [i].name = argv [optind]; /* Выдадим телефон */ kafedra [i].private_pFdIn = fcntl (pFdIn, F_DUPFD, 0); /* Укажем новичку, какими вилками пользоваться */ kafedra [i].left_fork = i + 1; kafedra [i].right_fork = i + 2; } kafedra [nMudr - 1].right_fork = 1; /* Последний*/ /* пользуется вилкой первого */ /* Зададим реакцию на сигналы и установим будильник */ /* на конец обеда */ sact.sa_handler = stop; (void) sigemptyset (&sact.sa_mask); sact.sa_flags = 0; (void) sigaction (SIGINT, &sact, ( struct sigaction *) NULL); (void) sigaction (SIGALRM, &sact, (struct sigaction *) NULL); alarm (open_room_time); /* Создадим семафоры для охраны вилок */ semFork = calloc (sizeof (sem_t), nMudr); for (i = 0; i < nMudr; i++) { sem_init (&semFork [i], 0, 1 /* На каждое место */ /* по одной вилке */); } /* Философы входят в столовую */ for (i = 0; i < nMudr; i++) pthread_create (&kafedra [i].thread, NULL, &filosof, (void *) &kafedra [i]); /* Выдача сообщений на стандартный вывод и выход */ /* после окончания всех задач */ close (pFdIn); while (1) { n = read (pFdOut, buffer, LINE_MAX); if (n == 0 || (n < 0 && errno != EINTR)) break; for (i = 0; i < n; i++) putchar (buffer [i]); } close (pFdOut); /* Уничтожение семафоров */ for (i = 0; i < nMudr; i++) { sem_destroy (&semFork [i]); } /* Выдача сводной информации */ { int full_eating_time = 0; int full_waiting_time = 0; int full_thinking_time = 0; for (i = 1; i <= nMudr; i++) { struct mudrec *this = &kafedra [i - 1]; full_eating_time += this->eat_time; full_waiting_time += this->wait_time; full_thinking_time += this->think_time; if (this->count > 0) { float count = this->count; float think_time = this->think_time / count; float eat_time = this->eat_time / count; float wait_time = this->wait_time / count; printf ("%s: ел %d раз в среднем: думал=%.1f " "ел=%.1f ждал=%.1f (максимум %d)\n", this->name, this->count, think_time, eat_time, wait_time, this->max_wait_time); } else printf ("%s: не поел\n", this->name); } /* for */ { float total_time = (full_eating_time + full_waiting_time + full_thinking_time) / (float) nMudr; printf("Среднее число одновременно едящих = %.3f\n", full_eating_time / total_time); printf("Среднее число одновременно ждущих = %.3f\n", full_waiting_time / total_time); } } /* Выдача сводной информации */ free (semFork); free (kafedra); /* Сообщим об окончании работы. */ printf ("Конец обеда\n"); return 0; } |
Листинг 4.19. Многопотоковый вариант решения задачи об обедающих философах с использованием семафоров реального времени. |
Закрыть окно |
#include <sys/mman.h> int shm_open (const char *name, int oflag, mode_t mode); int shm_unlink (const char *name); |
Листинг 4.20. Описание функций shm_open() и shm_unlink(). |
Закрыть окно |
#ifndef g_SHM #define g_SHM /* Имя объекта в разделяемой памяти */ #define O_SHM_NAME "/g_o.shm" /* Используемый номер сигнала реального времени */ #define SIG_SHM SIGRTMIN /* Используемые значения сигнала реального времени */ #define SIGVAL_LINE 0 #define SIGVAL_EOF EOF #endif |
Листинг 4.21. Заголовочный файл "g_shm.h" программы, копирующей строки со стандартного ввода на стандартный вывод. |
Закрыть окно |
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа, состоящая из двух процессов, копирует */ /* строки со стандартного ввода на стандартный вывод,*/ /* "прокачивая" их через разделяемый сегмент памяти. */ /* Для синхронизации доступа к разделяемому сегменту */ /* используются сигналы реального времени */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ #include <unistd.h> #include <stdio.h> #include <signal.h> #include <sys/mman.h> #include <fcntl.h> #include <limits.h> #include <sys/wait.h> #include <assert.h> #include "g_shm.h" /* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Создание разделяемого сегмента памяти, */ /* чтение со стандартного ввода и запись строк в сегмент*/ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ int main (void) { int fd_shm; /* Дескриптор объекта в разделяемой памяти*/ FILE *fp; /* Поток для записи в объект */ char line [LINE_MAX]; /* Буфер для копируемых строк */ struct sigaction sact; /* Структура для обработки сигналов */ union sigval sg_val;/* Значение сигнала */ int sg_no; /* Номер принятого сигнала */ pid_t cpid;/* Идентификатор порожденного процесса */ /* Создадим разделяемый сегмент памяти */ if ((fd_shm = shm_open (O_SHM_NAME, O_RDWR | O_CREAT, 0777)) < 0) { perror ("SHM_CREAT"); return (1); } /* Сформируем поток данных по файловому дескриптору */ /* объекта в разделяемой памяти */ assert ((fp = fdopen (fd_shm, "w")) != NULL); /* Отменим буферизацию вывода */ setbuf (fp, NULL); /* Сформируем маску сигналов (блокируем SIG_SHM) */ (void) sigemptyset (&sact.sa_mask); (void) sigaddset (&sact.sa_mask, SIG_SHM); (void) sigprocmask (SIG_BLOCK, &sact.sa_mask, (sigset_t *) NULL); /* Установим для сигнала SIG_SHM флаг SA_SIGINFO */ sact.sa_flags = SA_SIGINFO; sact.sa_sigaction = (void (*) (int, siginfo_t *, void *)) SIG_DFL; (void) sigaction (SIG_SHM, &sact, (struct sigaction *) NULL); /* Подготовительная работа закончена */ switch (cpid = fork ()) { case -1: perror ("FORK"); return (2); case 0: /* Чтение из объекта и выдачу на стандартный */ /* вывод реализуем в порожденном процессе */ if (execl ("./g_r_shm", "g_r_shm", (char *) NULL) < 0) { perror ("EXECL"); return (3); } } /* Чтение со стандартного ввода и запись в объект */ /* возложим на родительский процесс.*/ /* В начальный момент объект в разделяемой памяти*/ /* доступен для записи */ assert (fseek (fp, 0, SEEK_SET) == 0); fputs ("Вводите строки\n", fp); /* Сообщим порожденному процессу,*/ /* что объект в разделяемой памяти заполнен*/ sg_val.sival_int = SIGVAL_LINE; assert (sigqueue (cpid, SIG_SHM, sg_val) == 0); while (fgets (line, sizeof (line), stdin) != NULL) { assert (fseek (fp, 0, SEEK_SET) == 0); /* Дождемся, когда в объект можно будет писать */ if ((sigwait (&sact.sa_mask, &sg_no) != 0) || (sg_no != SIG_SHM)) { return (4); } assert (fputs ("Вы ввели: ", fp) != EOF); assert (fputs (line, fp) != EOF); assert (sigqueue (cpid, SIG_SHM, sg_val) == 0); } /* Сообщим о конце файла */ sg_val.sival_int = SIGVAL_EOF; assert (sigqueue (cpid, SIG_SHM, sg_val) == 0); fclose (fp); (void) wait (NULL); if (shm_unlink (O_SHM_NAME) != 0) { perror ("SHM_UNLINK"); return (7); } return (0); } |
Листинг 4.22. Исходный текст начального процесса двухпроцессной программы, копирующей строки со стандартного ввода на стандартный вывод. |
Закрыть окно |
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Процесс читает строки из объекта в разделяемой памяти */ /* и копирует их на стандартный вывод */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ #include <unistd.h> #include <stdio.h> #include <signal.h> #include <sys/mman.h> #include <fcntl.h> #include <limits.h> #include <assert.h> #include "g_shm.h" /* * * * * * * * * * * * * * * * * * * * */ /* Открытие разделяемого сегмента памяти,*/ /* чтение из сегмента и выдача строк */ /* на стандартный вывод */ /* * * * * * * * * * * * * * * * * * * * */ int main (void) { int fd_shm; /* Дескриптор объекта */ /* в разделяемой памяти */ FILE *fp; /* Поток для чтения из объекта */ char line [LINE_MAX];/* Буфер для копируемых строк */ sigset_t smask; /* Маска ожидаемых сигналов */ siginfo_t sinfo; /* Структура для получения */ /* данных о сигнале */ pid_t ppid; /* Идентификатор родительского */ /* процесса */ /* Откроем разделяемый сегмент памяти */ if ((fd_shm = shm_open (O_SHM_NAME, O_RDONLY, 0777)) < 0) { perror ("SHM_OPEN"); return (1); } /* Сформируем поток по файловому дескриптору объекта */ /* в разделяемой памяти */ assert ((fp = fdopen (fd_shm, "r")) != NULL); /* Отменим буферизацию ввода */ setbuf (fp, NULL); /* Запомним идентификатор родительского процесса */ ppid = getppid (); /* Сформируем маску ожидаемых сигналов (SIG_SHM) */ (void) sigemptyset (&smask); (void) sigaddset (&smask, SIG_SHM); /* Подготовительная работа закончена */ while ((fseek (fp, 0, SEEK_SET) == 0) && /* Дождемся, когда из объекта можно будет читать */ (sigwaitinfo (&smask, &sinfo) == SIG_SHM) && /* И прочитаем строку, а не конец файла */ (sinfo.si_value.sival_int == SIGVAL_LINE)) { (void) fgets (line, sizeof (line), fp); /* Сообщим родительскому процессу, */ /* что данные из объекта извлечены */ assert (kill (ppid, SIG_SHM) == 0); /* Выдадим, наконец, строку на стандартный вывод */ assert (fputs (line, stdout) != EOF); } fclose (fp); return 0; } |
Листинг 4.23. Исходный текст порождаемого процесса (файл g_r_shm.c) двухпроцессной программы, копирующей строки со стандартного ввода на стандартный вывод. |
Закрыть окно |