Очереди сообщений

Зачем нам очереди сообщений, когда у нас уже есть общая память? Это было бы по нескольким причинам, давайте попробуем разбить это на несколько пунктов для упрощения -

  • Как понятно, как только сообщение получено процессом, оно больше не будет доступно для любого другого процесса. В то время как в разделяемой памяти данные доступны для доступа нескольких процессов.

  • Если мы хотим общаться с небольшими форматами сообщений.

  • Данные общей памяти должны быть защищены синхронизацией, когда несколько процессов обмениваются данными одновременно.

  • Частота записи и чтения с использованием разделяемой памяти высока, тогда было бы очень сложно реализовать эту функциональность. Не стоит в отношении использования в подобных случаях.

  • Что если все процессы не нуждаются в доступе к общей памяти, а очень немногие процессы нуждаются в этом, то было бы лучше реализовать с очередями сообщений.

  • Если мы хотим обмениваться данными с различными пакетами данных, скажем, процесс A отправляет тип сообщения 1 для процесса B, тип сообщения 10 для процесса C и тип сообщения 20 для обработки D. В этом случае проще реализовать с очередями сообщений. Чтобы упростить данный тип сообщения как 1, 10, 20, это может быть либо 0, либо + ve, либо –ve, как описано ниже.

  • Конечно, порядок очереди сообщений - FIFO (первый вошел, первый вышел). Первое сообщение, вставленное в очередь, является первым для извлечения.

Использование общей памяти или очередей сообщений зависит от потребностей приложения и эффективности его использования.

Связь с использованием очередей сообщений может происходить следующими способами:

  • Запись в общую память одним процессом и чтение из общей памяти другим процессом. Как мы знаем, чтение может выполняться несколькими процессами.

Очередь сообщений
  • Запись в общую память одним процессом с разными пакетами данных и чтение из нее несколькими процессами, т. Е. В соответствии с типом сообщения.

Многократная очередь сообщений

Увидев определенную информацию об очередях сообщений, теперь пришло время проверить системный вызов (System V), который поддерживает очереди сообщений.

Для осуществления связи с использованием очередей сообщений выполните следующие действия:

Шаг 1. Создайте очередь сообщений или подключитесь к уже существующей очереди сообщений (msgget ())

Шаг 2 - запись в очередь сообщений (msgsnd ())

Шаг 3 - Чтение из очереди сообщений (msgrcv ())

Шаг 4 - Выполнение операций управления в очереди сообщений (msgctl ())

Теперь давайте проверим синтаксис и определенную информацию о вышеупомянутых вызовах.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)

Этот системный вызов создает или выделяет очередь сообщений System V. Следующие аргументы должны быть переданы -

  • Первый аргумент, ключ, распознает очередь сообщений. Ключ может быть либо произвольным значением, либо ключом, который можно получить из библиотечной функции ftok ().

  • Второй аргумент, shmflg, указывает требуемые флаги очереди сообщений, такие как IPC_CREAT (создание очереди сообщений, если она не существует) или IPC_EXCL (используется с IPC_CREAT для создания очереди сообщений и сбой вызова, если очередь сообщений уже существует). Нужно также передать разрешения.

Примечание. Подробнее о разрешениях см. В предыдущих разделах.

Этот вызов вернул бы действительный идентификатор очереди сообщений (используемый для дальнейших вызовов очереди сообщений) в случае успеха и -1 в случае сбоя. Чтобы узнать причину ошибки, проверьте с помощью переменной errno или функции perror ().

Различные ошибки в отношении этого вызова: EACCESS (разрешение отклонено), EEXIST (очередь уже существует, не может создать), ENOENT (очередь не существует), ENOMEM (недостаточно памяти для создания очереди) и т. Д.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

Этот системный вызов отправляет / добавляет сообщение в очередь сообщений (System V). Следующие аргументы должны быть переданы -

  • Первый аргумент, msgid, распознает очередь сообщений, т. Е. Идентификатор очереди сообщений. Значение идентификатора получено в случае успеха msgget ()

  • Второй аргумент, msgp, это указатель на сообщение, отправленное вызывающей стороне, определенное в структуре следующей формы:

struct msgbuf {
   long mtype;
   char mtext[1];
};

Переменная mtype используется для связи с различными типами сообщений, подробно объясненными в вызове msgrcv (). Переменная mtext - это массив или другая структура, размер которой указан как msgsz (положительное значение). Если поле mtext не упомянуто, то оно считается сообщением нулевого размера, что разрешено.

  • Третий аргумент, msgsz, это размер сообщения (сообщение должно заканчиваться null символом)

  • Четвертый аргумент, msgflg, указывает на определенные флаги, такие как IPC_NOWAIT (немедленно возвращается, когда в очереди не найдено ни одного сообщения, или MSG_NOERROR (обрезает текст сообщения, если оно больше байта msgsz)

Этот вызов вернет 0 в случае успеха и -1 в случае сбоя. Чтобы узнать причину ошибки, проверьте с помощью переменной errno или функции perror ().

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

Этот системный вызов извлекает сообщение из очереди сообщений (System V). Следующие аргументы должны быть переданы -

  • Первый аргумент, msgid, распознает очередь сообщений, т. Е. Идентификатор очереди сообщений. Значение идентификатора получено в случае успеха msgget ()

  • Второй аргумент, msgp, является указателем сообщения, полученного от вызывающей стороны. Он определяется в структуре следующего вида -

struct msgbuf {
   long mtype;
   char mtext[1];
};

Переменная mtype используется для связи с различными типами сообщений. Переменная mtext - это массив или другая структура, размер которой указан как msgsz (положительное значение). Если поле mtext не упомянуто, то оно считается сообщением нулевого размера, что разрешено.

  • Третий аргумент, msgsz, это размер полученного сообщения (сообщение должно заканчиваться null символом)

  • Пустой аргумент, msgtype, указывает тип сообщения -

    • Если msgtype равен 0 - читает первое полученное сообщение в очереди

    • Если msgtype равен + ve - читает первое сообщение в очереди типа msgtype (если msgtype равно 10, тогда читает только первое сообщение типа 10, даже если другие типы могут находиться в очереди в начале)

    • Если msgtype равен –ve - читает первое сообщение с наименьшим типом, меньшим или равным абсолютному значению типа сообщения (скажем, если msgtype равен -5, то оно читает первое сообщение с типом меньше 5, т.е. тип сообщения от 1 до 5)

  • Пятый аргумент, msgflg, указывает определенные флаги, такие как IPC_NOWAIT (немедленно возвращается, когда в очереди не найдено сообщений или MSG_NOERROR (обрезает текст сообщения, если больше msgsz байтов)

Этот вызов вернул бы число байтов, фактически полученных в массиве mtext в случае успеха, и -1 в случае сбоя. Чтобы узнать причину ошибки, проверьте с помощью переменной errno или функции perror ().

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

Этот системный вызов выполняет операции управления очередью сообщений (System V). Следующие аргументы должны быть переданы -

  • Первый аргумент, msgid, распознает очередь сообщений, т. Е. Идентификатор очереди сообщений. Значение идентификатора получено в случае успеха msgget ()

  • Второй аргумент, cmd, является командой для выполнения требуемой операции управления в очереди сообщений. Допустимые значения для cmd -

IPC_STAT - копирует информацию о текущих значениях каждого члена struct msqid_ds в переданную структуру, указанную в buf. Эта команда требует разрешения на чтение в очереди сообщений.

IPC_SET - устанавливает идентификатор пользователя, идентификатор группы владельца, разрешения и т. Д., На которые указывает структура buf.

IPC_RMID - немедленно удаляет очередь сообщений.

IPC_INFO - возвращает информацию об ограничениях и параметрах очереди сообщений в структуре, на которую указывает buf, которая имеет тип struct msginfo

MSG_INFO - возвращает структуру msginfo, содержащую информацию об используемых системных ресурсах в очереди сообщений.

  • Третий аргумент, buf, это указатель на структуру очереди сообщений с именем struct msqid_ds. Значения этой структуры будут использоваться для набора или получения согласно cmd.

Этот вызов вернет значение в зависимости от переданной команды. Успешное выполнение IPC_INFO и MSG_INFO или MSG_STAT возвращает индекс или идентификатор очереди сообщений или 0 для других операций и -1 в случае сбоя. Чтобы узнать причину ошибки, проверьте с помощью переменной errno или функции perror ().

Посмотрев основную информацию и системные вызовы относительно очередей сообщений, теперь пришло время проверить с помощью программы.

Давайте посмотрим описание, прежде чем смотреть на программу -

Шаг 1. Создайте два процесса: один для отправки в очередь сообщений (msgq_send.c), а другой - для извлечения из очереди сообщений (msgq_recv.c)

Шаг 2 - Создание ключа с помощью функции ftok (). Для этого изначально создается файл msgq.txt, чтобы получить уникальный ключ.

Шаг 3 - Процесс отправки выполняет следующее.

  • Читает строку ввода от пользователя

  • Удаляет новую строку, если она существует

  • Посылает в очередь сообщений

  • Повторяет процесс до конца ввода (CTRL + D)

  • Как только окончание ввода получено, отправляет сообщение «конец», чтобы обозначить конец процесса

Шаг 4 - В процессе получения выполняется следующее.

  • Читает сообщение из очереди
  • Отображает вывод
  • Если полученное сообщение «конец», завершает процесс и выходит

Для упрощения мы не используем тип сообщения для этого образца. Кроме того, один процесс записывает в очередь, а другой процесс читает из очереди. Это может быть расширено по мере необходимости, т.е. в идеале один процесс записывает в очередь, а несколько процессов считывают из очереди.

Теперь давайте проверим процесс (отправка сообщения в очередь) - Файл: msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */
   
   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL ) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");
   
   if (msgctl(msqid, IPC_RMID, NULL ) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

Шаги компиляции и выполнения

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

Ниже приведен код процесса получения сообщений (извлечение сообщения из очереди) - Файл: msgq_recv.c

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");
   
   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

Шаги компиляции и выполнения

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.