Функции асинхронного ввода/вывода
Основными операциями асинхронного ввода/вывода являются чтение и запись данных (см. листинг 7.1).
#include <aio.h> int aio_read (struct aiocb *aiocbp); int aio_write (struct aiocb *aiocbp);
Листинг 7.1. Описание функций асинхронного чтения и записи. (html, txt)
Функция aio_read() инициирует запрос на чтение aiocbp->aio_nbytes байт из файла, ассоциированного с дескриптором aiocbp->aio_fildes, начиная с абсолютной позиции aiocbp->aio_offset, в буфер с адресом aiocbp->aio_buf. Возврат из функции произойдет тогда, когда запрос будет принят к исполнению или поставлен в очередь; при этом нормальный результат равен нулю, а значение -1 (в совокупности с errno) свидетельствует об ошибке (например, исчерпаны ресурсы и запрос не удалось поставить в очередь).
Функция aio_write() осуществляет соответствующие действия для записи данных. Если для файлового дескриптора aiocbp->aio_fildes установлен флаг O_APPEND, запись будет производиться в конец файла.
Функция lio_listio() (см. листинг 7.2) позволяет за один вызов инициировать (поставить в очередь) список запросов на чтение и/или запись данных.
#include <aio.h> int lio_listio ( int mode, struct aiocb *restrict const listio [restrict], int nent, struct sigevent *restrict sigev);
Листинг 7.2. Описание функции lio_listio(). (html, txt)
Элементы массива listio [] специфицируют отдельные запросы. В полях aio_lio_opcode указуемых структур типа aiocb могут располагаться значения LIO_READ (запрос на чтение), LIO_WRITE (запрос на запись) или LIO_NOP (пустой запрос). Остальные элементы указуемых структур интерпретируются в соответствии со смыслом запроса (можно считать, что адреса структур передаются в качестве аргументов функциям aio_read() или aio_write()). Число элементов в массиве listio [] задается аргументом nent.
Значение аргумента mode определяет способ обработки списка запросов – синхронный (LIO_WAIT) или асинхронный (LIO_NOWAIT). В первом случае возврат из вызова lio_listio() произойдет только после завершения всех заказанных операций ввода/вывода; при этом аргумент sigev игнорируется.
Во втором случае возврат из lio_listio() произойдет немедленно, а по завершении всех операций ввода/вывода в соответствии со значением аргумента sigev будет сгенерировано асинхронное уведомление.
Нормальный результат выполнения функции lio_listio() равен нулю, но для двух описанных случаев он имеет разный смысл – успешное завершение всех операций ввода/вывода или только успешная постановка запросов в очередь соответственно. Во втором случае часть операций ввода/вывода может завершиться неудачей, и с каждой из них придется разбираться индивидуально. Подобное разбирательство позволяют осуществить функции aio_return() и aio_error() (см. листинг 7.3).
#include <aio.h>
ssize_t aio_return ( struct aiocb *aiocbp);
int aio_error ( const struct aiocb *aiocbp);
Листинг 7.3. Описание функций aio_return() и aio_error(). (html, txt)
Аргументом этих функций служит адрес управляющего блока, который играет в данном случае роль идентификатора запроса. Функция aio_return() выдает значение, которое вернула бы соответствующая функция обычного ввода/вывода (если операция ввода/вывода в момент опроса не была завершена, возвращаемое значение, разумеется, не определено). К идентификатору данного запроса функцию aio_return() можно применить ровно один раз; последующие вызовы aio_return() и aio_error() могут завершиться неудачей. (Конечно, если в той же указуемой структуре типа aiocb сформировать новый запрос и выполнить его, возможность корректного вызова функций aio_return() и aio_error() появляется снова.)
Результатом вызова aio_error() служит значение, которое установила бы в переменную errno соответствующая функция обычного ввода/вывода (или, если операция еще не завершена, результат равен EINPROGRESS). Если операция асинхронного ввода/вывода завершилась успешно, aio_error() вернет нулевой результат. Функцию aio_error() к идентификатору данного запроса можно применять произвольное число раз, и до завершения его выполнения, и после.
Ожидающие обработки запросы на асинхронный ввод/вывод можно аннулировать, воспользовавшись функцией aio_cancel() (см.
листинг 7.4).
#include <aio.h> int aio_cancel (int fildes, struct aiocb *aiocbp);
Листинг 7.4. Описание функции aio_cancel(). (html, txt)
Функция aio_cancel() пытается аннулировать один ( если значение аргумента aiocbp отлично от NULL) или все ожидающие обработки запросы, в которых фигурирует файловый дескриптор fildes. После аннулирования генерируется соответствующее асинхронное уведомление, статус ошибки устанавливается равным ECANCELED, а в качестве возвращаемого значения выдается -1.
Если запрос не удается аннулировать (возможные причины этого, вообще говоря, зависят от реализации), он выполняется стандартным образом.
Результат вызова aio_cancel() равен AIO_CANCELED, если все указанные запросы аннулированы. Если хотя бы один из запросов уже выполняется, он не может быть аннулирован, и тогда в качестве результата возвращается значение AIO_NOTCANCELED (а со статусом запросов, как и для функции lio_listio(), нужно разбираться индивидуально, пользуясь функцией aio_error()). Результат AIO_ALLDONE свидетельствует о том, что выполнение всех указанных запросов не только начато, но и уже завершено. Во всех остальных случаях результат вызова aio_cancel() равен -1.
Какой именно, как обычно, нужно выяснять индивидуально, пользуясь функциями aio_error() и aio_return().
Описываемая далее группа функций (см. листинг 7.6) предназначена для согласования состояния буферизованных и хранимых в долговременной памяти данных, обеспечения целостности данных и файлов. Подобные функции необходимы в системах обработки транзакций, чтобы гарантировать определенное состояние долговременной памяти, даже если система аварийно завершит работу. Один элемент из этой группы – функцию msync() – мы уже рассматривали.
#include <unistd.h> void sync (void);
#include <unistd.h> int fsync (int fildes);
#include <unistd.h> int fdatasync (int fildes);
#include <aio.h> int aio_fsync (int op, struct aiocb *aiocbp);
Листинг 7.6. Описание функций синхронизации оперативной и долговременной памяти.
Функция sync() имеет глобальный характер – она планирует запись в файловые системы всех измененных данных.
Функция fsync() обеспечивает запись измененных данных в файл, заданный дескриптором fildes. Более того, после успешного возврата из функции (с нулевым результатом) окажутся выполненными все стоявшие в очереди к этому файлу запросы на ввод/вывод с гарантией целостности файла.
Функция fdatasync() аналогична fsync(), но гарантируется лишь целостность данных (см. курс [1]).
Функция aio_fsync() осуществляет "асинхронную синхронизацию" файла. Иными словами, порождается запрос, выполнение которого при значении аргумента op, равном O_DSYNC, эквивалентно вызову fdatasync (aiocbp->aio_fildes), а при op, равном O_SYNC, – fsync (aiocbp->aio_fildes). Как и для других операций асинхронного ввода/вывода, адрес управляющего блока может использоваться в качестве аргумента функций aio_error() и aio_return(), а после завершения генерируется асинхронное уведомление в соответствии со значением элемента aiocbp->aio_sigevent. Все остальные поля указуемой структуры типа aiocb игнорируются.
Проиллюстрируем использование функций асинхронного ввода/вывода программой, подсчитывающей суммарное число строк в совокупности файлов (см.
листинг 7.7).
/* * * * * * * * * * * * * * * * * * */ /* Программа подсчитывает суммарное */ /* число строк – в файлах аргументах */ /* командной строки. */ /* Если аргументы отсутствуют или в */ /* качестве имени задан минус, */ /* читается стандартный ввод. */ /* * * * * * * * * * * * * * * * * * */
#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <fcntl.h> #include <signal.h> #include <string.h> #include <aio.h> #include <assert.h> #include <errno.h>
int main (int argc, char *argv []) { /* Указатель на массив */ /* указателей на управляющие */ /* блоки запросов операций */ /* асинхронного чтения */ struct aiocb **lio_ptr; // Общее число строк в файлах long nlines = 0; // Признак повторного указания // стандартного ввода int dup_stdin = 0; int i;
/* Сведем случай с отсутствием */ /* аргументов к общему, */ /* воспользовавшись одним из */ /* немногих стандартизованных */ /* файлов */ if (argc == 1) { argv [0] = "-"; } else { argv [0] = "/dev/null"; }
/* Зарезервируем память, */ /* откроем заданные файлы */ /* и сформируем начальный список */ /* запросов на чтение */ assert ((lio_ptr = (struct aiocb **) malloc (sizeof (struct aiocb *) * argc)) != NULL);
for (i = 0; i < argc; i++) { assert ((lio_ptr [i] = (struct aiocb *) malloc (sizeof (struct aiocb))) != NULL);
if (strcmp (argv [i], "-") == 0) { if (dup_stdin == 0) { lio_ptr [i]->aio_fildes = STDIN_FILENO; dup_stdin = 1; } else { lio_ptr [i]->aio_fildes = fcntl (STDIN_FILENO, F_DUPFD, 0); } } else if ((lio_ptr [i]->aio_fildes = open (argv [i], O_RDONLY)) == -1) { perror ("OPEN"); free (lio_ptr [i]); lio_ptr [i] = NULL; continue; }
lio_ptr [i]->aio_offset = 0; assert ((lio_ptr [i]->aio_buf = malloc(BUFSIZ)) != NULL); lio_ptr [i]->aio_nbytes = BUFSIZ; lio_ptr [i]->aio_reqprio = 0; lio_ptr [i]->aio_sigevent.sigev_notify = SIGEV_NONE; lio_ptr [i]->aio_lio_opcode = LIO_READ; } /* for (i < argc) */
/* Поехали ... */ assert (lio_listio (LIO_NOWAIT, lio_ptr, argc, &lio_ptr [0]->aio_sigevent) == 0);
/* Будем ждать завершения */ /* операций ввода/вывода, */ /* обрабатывать прочитанные */ /* данные и */ /* инициировать новые запросы */ /* на чтение */ while (aio_suspend (( const struct aiocb **) lio_ptr, argc, NULL) == 0) { /* Выясним, какой запрос */ /* и как выполнен */
/* Число недочитанных файлов */ int nreqs = 0;
for (i = 0; i < argc; i++) { if (lio_ptr [i] == NULL) { continue; }
/* Есть обслуживаемые */ /* запросы */ nreqs++;
if (aio_error (lio_ptr [i]) == EINPROGRESS) { continue; } { // Запрос выполнен
// Число прочитанных байт ssize_t nb;
if ((nb = aio_return (lio_ptr [i])) <= 0) { /* Дошли до конца файла*/ /* или чтение */ /* завершилось ошибкой */ (void) close(lio_ptr [i]->aio_fildes); free ((void *) lio_ptr [i]->aio_buf); free (lio_ptr [i]); lio_ptr [i] = NULL; nreqs--; } else { /* Обработаем прочитанные */ /* данные */
/* Текущее начало поиска */ /* перевода строки */ char *p; /* Позиция, где нашли */ /* перевод строки */ char *p1; p = (char *) lio_ptr [i]->aio_buf; while ((p1 = (char *) memchr (p, '\n', nb - (p – (char *) lio_ptr [i]->aio_buf))) != NULL) { nlines++; p = p1 + 1; }
/* Инициируем новый */ /* запрос на чтение */ lio_ptr [i]->aio_offset += nb; (void) aio_read (lio_ptr [i]); } } } /* for (i < argc) */
/* Остались недочитанные */ /* файлы? */ if (nreqs == 0) { break; } } /* while */
printf ("%ld\n", nlines);
return 0; }
Листинг 7.7. Пример программы, использующей функции асинхронного ввода/вывода.
Отметим списочную форму начального представления и ожидания выполнения запросов на асинхронное чтение. Из технических деталей обратим внимание на пустой указатель в качестве элемента массива указателей на структуры типа aiocb (функция lio_listio() игнорирует такие элементы) и в качестве аргумента timeout функции aio_suspend() (означает бесконечное ожидание).
Второй вариант решения той же задачи (см.
листинг 7.8) демонстрирует другой способ уведомления о завершении операции асинхронного ввода/вывода – вызов функции.
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Программа подсчитывает суммарное число строк */ /* в файлах – аргументах командной строки. */ /* Если аргументы отсутствуют или в качестве имени */ /* задан минус, читается стандартный ввод. */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */
#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <fcntl.h> #include <signal.h> #include <string.h> #include <aio.h> #include <pthread.h> #include <assert.h>
/* Указатель на массив указателей на управляющие */ /* блоки запросов операций асинхронного чтения */ static struct aiocb **lio_ptr;
/* Общее число строк в файлах */ static long nlines_total = 0;
/* Переменная условия, на которой ждут */ /* окончания обработки всех файлов */ static pthread_cond_t open_files_cond = PTHREAD_COND_INITIALIZER;
/* Мьютекс, охраняющий доступ */ /* к переменной условия и nlines_total */ static pthread_mutex_t nlines_mutex = PTHREAD_MUTEX_INITIALIZER;
/* * * * * * * * * * * * * * */ /* Функция, вызываемая */ /* при завершении операции */ /* асинхронного ввода/вывода */ /* * * * * * * * * * * * * * */ static void end_of_aio_op (union sigval sg_vl) { int no; /* Номер выполненного запроса */ /* в общем списке */ ssize_t nb; /* Число прочитанных байт */ long nlines = 0; /* Число строк в одной */ /* прочитанной порции */ no = sg_vl.sival_int;
if ((nb = aio_return (lio_ptr [no])) <= 0) { /* Дошли до конца файла */ /* или чтение завершилось ошибкой */ (void) close (lio_ptr [no]->aio_fildes); free ((void *) lio_ptr [no]->aio_buf); free (lio_ptr [no]); lio_ptr [no] = NULL; (void) pthread_cond_signal (&open_files_cond); } else { /* Обработаем прочитанные данные */ char *p; /* Текущее начало поиска перевода строки */ char *p1; /* Позиция, где нашли перевод строки */
p = (char *) lio_ptr [no]->aio_buf; while ((p1 = (char *) memchr (p, '\n', nb - (p – (char *) lio_ptr [no]->aio_buf))) != NULL) { nlines++; p = p1 + 1; }
/* Прибавим локальную сумму к общей */ (void) pthread_mutex_lock (&nlines_mutex); nlines_total += nlines; (void) pthread_mutex_unlock (&nlines_mutex);
/* Инициируем новый запрос на чтение */ lio_ptr [no]->aio_offset += nb; (void) aio_read (lio_ptr [no]); } }
/* * * * * * * * * * * * * * * * * * * * * * * * * * */ /* Функция проверяет, сколько осталось открытых файлов */ /* * * * * * * * * * * * * * * * * * * * * * * * * * */ static int n_open_files (int nfiles) { int nof = 0; /* Число открытых файлов */ int i;
for (i = 0; i < nfiles; i++) { if (lio_ptr [i] != NULL) { nof++; } }
return (nof); }
/* * * * * * * * * * * * * * * * * * * * */ /* Обработка аргументов командной строки, */ /* инициация начальных запросов на чтение, */ /* ожидание завершения обработки файлов, */ /* вывод результатов */ /* * * * * * * * * * * * * * * * * * * * */ int main (int argc, char *argv []) { int dup_stdin = 0; /* Признак повторного указания */ /* стандартного ввода */ int i;
/* Сведем случай с отсутствием аргументов к общему, */ /* воспользовавшись одним из немногих */ /* стандартизованных файлов */ if (argc == 1) { argv [0] = "-"; } else { argv [0] = "/dev/null"; }
/* Зарезервируем память, откроем заданные файлы */ /* и инициируем начальные запросы на чтение */ assert ((lio_ptr = (struct aiocb **) malloc (sizeof (struct aiocb *) * argc)) != NULL);
for (i = 0; i < argc; i++) { assert ((lio_ptr [i] = (struct aiocb *) malloc (sizeof (struct aiocb))) != NULL);
if (strcmp (argv [i], "-") == 0) { if (dup_stdin == 0) { lio_ptr [i]->aio_fildes = STDIN_FILENO; dup_stdin = 1; } else { lio_ptr [i]->aio_fildes = fcntl (STDIN_FILENO, F_DUPFD, 0); } } else if ((lio_ptr [i]->aio_fildes = open (argv [i], O_RDONLY)) == -1) { perror ("OPEN"); free (lio_ptr [i]); lio_ptr [i] = NULL; continue; }
lio_ptr [i]->aio_offset = 0; assert ((lio_ptr [i]->aio_buf = malloc (BUFSIZ)) != NULL); lio_ptr [i]->aio_nbytes = BUFSIZ; lio_ptr [i]->aio_reqprio = 0;
lio_ptr [i]->aio_sigevent.sigev_notify = SIGEV_THREAD; lio_ptr [i]->aio_sigevent.sigev_signo = SIGRTMIN; lio_ptr [i]->aio_sigevent.sigev_value.sival_int = i; lio_ptr [i]->aio_sigevent.sigev_notify_function = end_of_aio_op; lio_ptr [i]->aio_sigevent.sigev_notify_attributes = NULL;
/* Запрос готов, можно отправлять его на выполнение */ (void) aio_read (lio_ptr [i]); } /* for (i < argc) */
/* Дождемся завершения обработки всех указанных */ /* в командной строке файлов */ while (n_open_files (argc) > 0) { (void) pthread_cond_wait (&open_files_cond, &nlines_mutex); }
printf ("%ld\n", nlines_total);
return 0; }
Листинг 7.8. Модифицированный вариант программы, использующей функции асинхронного ввода/вывода.
В модифицированном варианте можно отметить применение переменных условия как средства ожидания завершения "вдвойне асинхронной" обработки файлов (по вводу/выводу и по выполнению функций процессирования прочитанных данных в рамках специально создаваемых потоков управления). Когда очередной файл закрывается, то для генерации уведомления о возможном завершении всей обработки вызывается функция pthread_cond_signal(). Обратим внимание на то, что в этот момент вызывающий поток не является владельцем мьютекса, ассоциированного с переменной условия, но в данном случае это и не требуется.