Добавление и исключение менеджера из кластера
Рассмотрим два способа включения в кластер менеджера, расположенного на компьютере с операционной системой UNIX. Имя менеджера очередей – QM_HPUX, IP адрес – 198.32.100.16, порт для службы «Listener» - 1421. Для включения в кластер данного менеджера очередей и создания необходимых кластерных каналов выполним команду из командной строки на компьютере с операционной системой UNIX
runmqsc QM_HPUX
и далее:
включаем менеджер очередей QM_HPUX в кластер THUNDER командой alter qmgr repos('THUNDER')создаем кластерный канал receiver
define channel('TO_QM_HPUX') + chltype(clusrcvr) conname('198.32.100.16(1421)') + cluster('THUNDER') создаем кластерный канал sender
define channel('TO_QM_Win2000_REP') + chltype(clussdr) conname('198.32.100.26(1415)') + cluster('THUNDER') создаем кластерную очередь HPUX.CQ
define qlocal('HPUX.CQ') cluster('THUNDER')обновляем информацию о кластерных объектах refresh cluster('THUNDER')проверяем доступность очереди Win2000.CQ DISPLAY QCLUSTER('Win2000.CQ')
В результате последней команды мы получаем информацию о кластерной очереди 'Win2000.CQ' менеджера QM_Win2000, что говорит о доступности информации о кластерных объектах кластера 'THUNDER':
DESCR(WebSphere MQ Default Local Queue) CLUSTER(THUNDER) CLUSQMGR(QM_Win2000) CLUSDATE(2004-10-05) ALTDATE(2004-10-05) CLUSQT(QLOCAL) PUT(ENABLED) DEFPSIST(NO) QUEUE(Win2000.CQ) QMID(QM_Win2000_2004-10-05_15.42.55) CLUSTIME(17.36.17) ALTTIME(15.49.29) TYPE(QCLUSTER) DEFPRTY(0) DEFBIND(OPEN) выходим из командного процессора WebSphere MQ end
Существует способ добавления менеджера в кластер с помощью WebSphere MQ Explorer в графическом режиме. Для этого следует:
Подключиться к удаленному менеджеру очередей QM_HPUX (IP адрес – 198.32.100.16, порт для службы «Listener» - 1421) через WebSphere MQ Explorer.После появления его в левой панели, вызвать на нем контекстное меню и выполнить пункт «Join Cluster».Ввести имя кластера THUNDER (рис. 6.8)
увеличить изображение
Рис. 6.8. Ввод имени кластера.
Ввести имя менеджера очередей, являющегося репозиторием для кластера THUNDER, например QM_Win2000_REP (рис. 6.9).
увеличить изображение
Рис. 6.9. Выбор менеджера очередей, являющегося репозиторием.
Ввести имя кластерного канала receiver TO_QM_HPUX и IP адрес с указанием номера порта для службы «Listener» 192.32.100.16(1421) для менеджера очередей QM_HPUX (рис. 6.10).
увеличить изображение
Рис. 6.10. Ввод имени кластерного receiver канала и IP адреса менеджера QM_HPUX.
Ввести имя кластерного канала receiver TO_QM_Win2000_REP и IP адрес с указанием номера порта для службы «Listener» 192.32.100.26(1415) для менеджера очередей QM_Win2000_REP (рис. 6.11).
увеличить изображение
Рис. 6.11. Ввод имени кластерного receiver канала и IP адреса менеджера QM_Win2000_REP.
После нажатия кнопки «Далее» выводится суммарная информация об объектах, которые будут созданы. Далее при нажатии кнопки «Готово» менеджер QM_HPUX будет включен в кластер THUNDER, на менеджере QM_HPUX будет создана пара кластерных каналов TO_QM_Win2000_REP (sender) и TO_QM_HPUX (receiver), а на менеджере QM_Win2000_REP будет создан кластерный канал sender TO_QM_HPUX.
Кластер WebSphere MQ
При создании систем передачи данных, нередко возникают ситуации, когда некоторые серверы недоступны в течение какого-то времени или работают посменно. Кластер WebSphere MQ это объединение менеджеров очередей, которые могут располагаться на различных платформах. При правильной настройке объектов кластера, приложения помещают сообщения в кластерные очереди, даже если один из менеджеров очередей становится недоступным, а после возобновления связи с ним приложения могут забирать сообщения из локальных кластерных очередей. Каждый менеджер может иметь очереди, доступные для других менеджеров кластера без использования удаленных (remote) и трансмиссионных (transmission) очередей и каналов. При включении менеджера в кластер автоматически создаются кластерные каналыsender и receiver на каждом таком менеджере, причем receiver - канал у каждого менеджера может быть один. В процессе передачи данных используются системные очереди SYSTEM.CLUSTER.REPOSITORY.QUEUE и SYSTEM.CLUSTER.TRANSMIT.QUEUE, которые были созданы еще в процессе установки WebSphere MQ. Менеджеры очередей в кластере могут исполнять роль, как клиентов, так и серверов. Серверы делают доступными очереди для членов кластера, а также для приложений, которые управляют процессами передачи сообщений и генерируют ответные сообщения. Клиенты могут помещать сообщения в кластерные очереди на любых менеджерах и также получать ответные сообщения, но только из кластерных очередей, находящихся на локальном менеджере. Менеджеры очередей кластера доставляют сообщения в нужную очередь и обмениваются информацией о кластерных объектах, несмотря на то, что обычно клиенты, расположенные на разных платформах не могут установить соединение друг с другом.
Объектами кластера могут быть менеджеры очередей, очереди и каналы. Информация обо всех объектах кластера называется репозиторием. Часть ее хранится в системной очереди SYSTEM.CLUSTER.REPOSITORY.QUEUE и обновляется с помощью SYSTEM.CLUSTER.COMMAND.QUEUE и встроенных в WebSphere MQ механизмов репликации. Репозиторий может быть полным (full) и частичным (partial).
Информация между репозиториями менеджеров передается с помощью кластерных каналов sender и receiver. Она включает в себя как собственно сообщения, так и любую информацию об изменении статуса менеджера или о добавлении/удалении объектов в кластер. Кластерный канал receiver принимает информацию от других менеджеров. На каждом менеджере необходимо иметь как минимум один кластерный канал receive. Все сообщения передаются через SYSTEM.CLUSTER.TRANSMIT.QUEUE. Если один из менеджеров кластера перестанет быть доступным, то сообщения, предназначенные для его очередей, останутся в этой очереди на соответствующем менеджере.
Один менеджер очередей может быть включен во множество кластеров. То же самое относится к очередям. В данном случае создается объект WebSphere MQ именуемый NAMELIST.
Рассмотрим процесс создания кластера и включения в него менеджеров на разных платформах. Создадим кластер из существующих на одном компьютере (IP адрес 198.32.100.26) менеджеров очередей QM_Win2000 (менеджер по умолчанию) и QM_Win2000_REP (порт для listener – 1415). Процесс состоит из следующих шагов [12].
Вызвать контекстное меню WebSphere MQ Explorer на группе Clusters правой кнопкой мыши. Выбрать Create, далее Cluster. На экране появится информационная форма «Create Cluster Wizard» (рис. 6.1), говорящая о том, что Wizard поможет вам создать новый кластер для менеджеров очередей, которые еще не являются репозиториями для других кластеров, а также о необходимости выполнить следующие шаги: ввести имя кластера;ввести имя менеджера очередей, который будет выступать в роли первого репозитория;ввести имя менеджера очередей, который будет выступать в роли второго репозитория;ввести или оставить по умолчанию имя receiver канала для первого репозитория;ввести или оставить по умолчанию имя receiver канала для второго репозитория.
увеличить изображение
Рис. 6.1. Create Cluster Wizard
После нажатия на кнопку «Далее» появится следующая форма (рис. 6.2), в которой надо ввести имя кластера.
увеличить изображение
Рис. 6.2. Ввод имени кластера
В следующей форме (рис. 6.3) вводим имя менеджера очередей для первого репозитория. Поскольку оба менеджера находятся на одном компьютере, и запуск процесса создания кластера был выполнен из WebSphere MQ Explorer этого же компьютера, то устанавливаем флажок на «Local (on this computer)». Очевидно, что менеджер очередей для первого репозитория может быть расположен и на удаленном компьютере. В таком случае флажок должен быть установлен на «Remote (on another computer)», и введены имя удаленного менеджера и IP адрес с указанием номера порта для службы Listener удаленного компьютера, на котором, собственно и установлен менеджер очередей. Отметим, что нет разницы, имя какого менеджера будет введено первым. Единственное, надо иметь в виду то, что менеджер не должен являться членом и репозиторием для другого кластера.
увеличить изображение
Рис. 6.3. Ввод имени менеджера для первого репозитория
увеличить изображение
Рис. 6.5. Ввод имени receiver канала для второго менеджера QM_Win2000
Далее выводится суммарная информация о конфигурации кластерных объектов, которую можно распечатать, а при нажатии клавиши «Готово» создается кластер и пара кластерных каналов на обоих менеджерах. Убедиться в этом можно, увидев в WebSphere MQ Explorer (рис. 6.6) в группе Clusters кластер THUNDER, в который входят менеджеры очередей QM_Win2000 и QM_Win2000_REP, а менеджер QM_Win2000_REP имеет кластерный канал sender TO_QM_Win2000 и кластерный канал receiver TO_QM_Win2000_REP.
увеличить изображение
Рис. 6.6. WebSphere MQ Explorer, показывающий кластер THUNDER
Следует сказать, что кластерные каналы могут использоваться как обычные для передачи сообщений между менеджерами очередей. Так, создав необходимые объекты на удаленном менеджере, не включенном в кластер можно использовать имя кластерного канала receiver для создания sender канала, и наоборот. Использовать эту возможность не рекомендуется, так как для четкости построения потоков передачи данных целесообразно использовать для каждого потока свои объекты WebSphere MQ, дифференцируя количество потоков с количеством и размером сообщений в каждом потоке. Подробнее на вопросах производительности мы остановимся в лекции 7.
Таким образом, создав объекты WebSphere MQ (очереди и каналы) на одном менеджере можно видеть их «отображение» на другом, управление очередями становится доступным как на одном, так и на другом менеджере. При создании очередей теперь необходимо указывать, в зависимости от их назначения, доступна ли она кластеру и какому именно. При создании очередей через WebSphere MQ Explorer первый вопрос задается сразу после ввода имени очереди и нажатии на кнопку «Ok». При положительном ответе форма создания очереди переходит на закладку «Cluster» и предлагает выбрать имя доступного кластера. Отметим тот факт, что при создании кластерных очередей директории для них не создаются, как это было в отношении локальных очередей. Вся информация будет находиться в SYSTEM.CLUSTER.REPOSITORY.QUEUE и будет передаваться в такую же очередь на менеджеры, включенные в кластер.
Рассмотрим пример передачи сообщений в кластере. Создадим локальную очередь с именем Win2000.CQ (CQ – cluster queue) на менеджере QM_Win2000:
runmqsc QM_Win2000 define qlocal('Win2000.CQ') cluster('THUNDER') refresh cluster('THUNDER') end
Создадим локальную очередь с именем Win2000_REP.CQ на менеджере QM_Win2000_REP:
runmqsc QM_Win2000_REP define qlocal('Win2000_REP.CQ') cluster('THUNDER') refresh cluster('THUNDER') end
Поместив тестовое сообщение в очередь Win2000_REP.CQ с помощью контекстного меню WebSphere MQ Explorer (рис. 6.7) на менеджере очередей QM_Win2000 можно его увидеть на менеджере QM_Win2000_REP. И наоборот, поместив тестовое сообщение в очередь Win2000.CQ на менеджере очередей QM_Win2000_REP можно его увидеть на менеджере QM_Win2000.
увеличить изображение
Рис. 6.7. Помещение тестового сообщения в удаленную кластерную очередь.
WebSphere MQ под управлением MSCS
В данной части лекции мы подразумеваем, что читатель хорошо знаком с работой Microsoft Cluster Server (MSCS). Для установки WebSphere MQ на кластер NT система должна удовлетворять следующим требованиям:
Windows NT4 Enterprise Edition with Service Pack 6a или более поздним,Microsoft Cluster Server (MSCS),
или
Windows 2000 Advanced Server,Microsoft Cluster Server (MSCS).
Процедура установки WebSphere MQ и помещение менеджеров под контроль MSCS описывается следующими шагами:
MSCS должен быть установлен и стартован.Установить WebSphere MQ на каждом сервере. Создать менеджер очередей. Рекомендуется сменить кодовую страницу на 1251.Закрыть MSCS Cluster Administrator и WebSphere MQ Explorer на каждом сервере (менеджер очередей не останавливать).Зарегистрировать новый ресурс «IBM WebSphere MQ MSCS» с помощью команды haregtyp /r.Выполнить пункт 4 на другом сервере кластера.Проверить наличие нового ResourceType с именем «IBM WebSphere MQ MSCS» запустив MSCS Cluster Administrator и нажав '+' рядом с именем кластера.Создать необходимые объекты (очереди, каналы и пр.) WebSphere MQ на «активном сервере».Остановить Queue Manager.Создать на кластерном диске (допустим, что кластерный диск имеет название E:) каталоги WebSphere MQ и WebSphere MQ\log.Выполнить команду для переноса Queue Manager в кластер hamvmqm /m qmname /dd e:\WebSphere MQ /ld e:\WebSphere MQ\log
где qmname – имя менеджера очередей.
Стартовать менеджер и проверить его работоспособность, создав очередь, поместив в нее тестовое сообщение, просмотрев его и удалив очередь.Установить тип запуска сервиса IBM MQSeries в «Manual».Остановить Queue Manager.Запустить MSCS Cluster Administrator.Создать группу в MSCS, которая будет содержать все необходимые ресурсы менеджера очередей.Создать в группе ресурс типа «Physical Disk» для кластерного диска (E:). Зависимых (Resource dependencies) ресурсов не указывать.Создать IP ресурс, в котором указать «свободный» IP адрес. Этот адрес будет использоваться другими менеджерами или клиентами для установления соединения с «виртуальным» менеджером очередей. Создать ресурс типа «IBM WebSphere MQ MSCS». В процессе создания данного ресурса используется Wizard (мастер построения), при работе с которым необходимо ввести следующие параметры: Name – имя для идентификации менеджера очередей;Add to group – добавить в группу, созданную в п.14;Run in a separate Resource Monitor – данную опцию можно не использовать;Possible owners – добавить обе части (node) кластера;Dependencies – добавить ресурс для кластерного диска и ресурс для IP;Parameters – QueueManagerName (добавить имя менеджера очередей); PostOnlineCommand (команда, которая может быть выполнена, когда менеджер очередей перейдет из состояния online в offline); PreOfflineCommand (команда, которая может быть выполнена, когда менеджер очередей перейдет из состояния offline в online).
На этом процесс переноса менеджера под управление MSCS можно считать завершенным. Остается только проверить работоспособность менеджера, проимитировав сбой с помощью команды MSCS «Initiate Failure», вызываемой с помощью контекстного меню группы, в которую входит менеджер очередей. Кратко о преимуществах работы WebSphere MQ под управлением MSCS. Очевидно, что это делает работу WebSphere MQ исключительно надежной в целом, если по тем или иным причинам один сервер кластера выйдет из строя или временно будет не доступен.
Прежде чем деинсталлировать WebSphere MQ необходимо вывести менеджер очередей из под контроля MSCS. Для этого нужно сначала перевести ресурс менеджера в offline, а затем уничтожить все ресурсы. Уничтожение ресурсов (кластерный диск, IP адрес, IBM WebSphere MQ MSCS) не приведет к удалению менеджера очередей. Далее выполнить команду haregtyp /u. Рекомендуется сохранить все объекты менеджера (например с помощью программы saveqmgr, описанной в лекции 5), затем удалить менеджер, создать его заново и восстановить все объекты.
В заключение лекции можно сказать, что мы рассмотрели работу WebSphere MQ в самом кластере WebSphere MQ и под управлением кластера MSCS. Главное отличие состоит в том, что при работе с кластером MSCS «виртуальный» менеджер кластера всегда доступен, если даже один из серверов выходит из строя. Управление объектами WebSphere MQ остается точно таким же, как и при работе с локальным менеджером, и никаких преимуществ в управлении и настройке мы не получаем. В случае использования кластера WebSphere MQ наяву очевидные преимущества, связанные с отсутствием обязательного создания и настройки трансмиссионных (transmission) и удаленных (remote) очередей и каналов. Но если возникают проблемы на одном из менеджеров кластера WebSphere MQ, то считывание сообщений из локальных кластерных очередей менеджера становится проблематичным до восстановления его работоспособности.
Дополнительные средства администрирования
В каждом случае при возникновении ошибок WebSphere MQ дает ее номер.
С помощью документации [7], [13] или программы WebSphere MQ Messages and Codes Helper, доступной по адресу ftp://ftp.software.ibm.com/software/integration/support/supportpacs/individual/ma0k.zip, можно выяснить суть ошибки и исправить ее. Внешний вид программы представлен на рис.7.2.
увеличить изображение
Рис. 7.2. Внешний вид программы WebSphere MQ Messages and Codes Helper
Кроме расшифровки кода самой ошибки, данная программа дает варианты причин, по которым могла произойти ошибка и методы устранения ее.
Помимо вышеуказанной программы существует множество программ, облегчающих работу с объектами менеджеров очередей. Рассмотрим программу WebSphere MQ Administrator (Support Pac MO71). Программа имеет графический интерфейс и позволяет производить любые действия с объектами как локальных, так и удаленных менеджеров. Внешний вид программы представлен на рис.7.3.
увеличить изображение
Рис. 7.3. Внешний вид программы WebSphere MQ Administrator
Добавить в список доступных менеджеров для управления новый удаленный менеджер можно, выполнив пункт меню File => Add Location. На экране появится форма, изображенная на рис.7.4.
увеличить изображение
Рис. 7.4. Форма добавления удаленного менеджера очередей
В поле Location нужно ввести имя отображения в списке менеджеров, доступных для управления, а в поле Queue Manager - имя удаленного менеджера, выставить флажок в поле Client и нажать кнопку Configured.
В открывшейся форме (рис.7.5) в поле Channel Name нужно ввести имя канала типа Server Connection или Client Connection (данный канал должен быть создан на удаленном менеджере), с помощью которого будет осуществляться подключение к менеджеру, IP адрес с указанием номера порта для службы listener и нажать кнопку Ok.
увеличить изображение
Рис. 7.5. Конфигурация параметров подключения к удаленному менеджеру
После того, как форма закроется, нажать кнопку Add в предыдущей форме. Подключаемый удаленный менеджер должен отобразиться в форме, показанной на рис.7.3 под именем QM_HPUX. Рассмотрим пример, показывающий как можно оперировать с сообщениями, находящимися в очереди на этом удаленном менеджере. Допустим, нужно переложить второе сообщение из очереди Win2000_HPUX.Q в очередь TEMP.Q. Для выполнения операций просмотра, удаления или перемещения сообщений нужно выполнить следующие действия.
Поместив курсор на менеджер QM_HPUX выполнить пункт меню Commands => Queue List.
В открывшейся форме (рис.7.6) при нажатии на кнопку Refresh появится список очередей менеджера QM_HPUX.
Выбрав очередь Win2000_HPUX.Q нажать кнопку Browse и в открывшейся форме (рис. 7.7) нажать Refresh.
Поместить курсор на второе сообщение, в поле Target Queue ввести имя очереди, в которую нужно переместить сообщение (в нашем случае это TEMP.Q) и нажать кнопку Move. После этого второе сообщение будет перемещено в очередь TEMP.Q. Соответственно для копирования сообщения нужно нажать на кнопку Copy, для удаления – Delete. Кроме этого, данная программа в отличие от Message Browser утилиты WebSphere MQ, позволяет просматривать до 10000 сообщений, причем на экран можно вывести полный текст сообщения с помощью кнопки Open, а с помощью кнопки Detail свойства сообщения. Вставить сообщение между другими сообщениями в очереди нельзя.
увеличить изображение
Рис. 7.6. Список очередей менеджера QM_HPUX
увеличить изображение
Рис. 7.7. Список сообщений очереди Win2000_HPUX.Q
Для этого следует переместить все сообщения после требуемого в другую очередь, затем поместить нужное сообщение и вернуть все перемещенные обратно. Несомненным преимуществом программы является возможность удаления сообщений даже если очередь эксклюзивно открыта другим приложением. Отображение списка объектов в данной программе статично, то есть для получения списка объектов в данный момент времени следует нажимать кнопку Refresh. Последняя версия программы доступна по адресу: ftp://ftp.software.ibm.com/software/integration/support/supportpacs/individual/mo71.zip.
Существуют программы, позволяющие перемещать текст сообщения в файл и наоборот, текст, содержащийся в файле помещать в тело сообщения. Примером такой программы может служить программа rfhutil, являющаяся частью Support Pac IH03 (IBM WebSphere Business Integration Message Broker display, test and performance utilities). Программа доступна по адресу: ftp://ftp.software.ibm.com/software/integration/support/supportpacs/individual/ih03.zip.
Внешний вид программы представлен на рис.7.8. Данная программа может быть полезна в случае, когда надо исправить текст внутри сообщения, находящегося в очереди. Для этого можно переместить текст сообщения в файл, исправить его, а затем положить обратно в очередь. Следует учесть, что данная программа при считывании сообщения удаляет его из очереди. Поэтому, во избежание потери текста сообщения, желательно скопировать его в какую-нибудь промежуточную очередь с помощью программы WebSphere MQ Administrator (mqmonntp) и оперировать с ним.
увеличить изображение
Рис. 7.8. Внешний вид программы rfhutil
Остановимся еще на одной важной программе saveqmgr, позволяющей записывать все объекты менеджера очередей в файл в формате команд MQSC для командного процессора runmqsc. Синтаксис команды выглядит следующим образом:
saveqmgr -m[-r] QmgrName –f FileName -h -v -s -i
где:
-m – создается скрипт для локального менеджера.
-r - создается скрипт для удаленного менеджера. Для подключения к удаленному менеджеру достаточно создать трансмиссионные очереди и каналы в обе стороны.
QmgrName – имя менеджера.
–f FileName – имя файла, в который будет записан скрипт. По умолчанию saveqmgr.tst.
-h – выводит справочную информацию на экран.
-v – указывает в какой версии MQSC нужно формировать команды для создания объектов. Может принимать значения 1, 2, 5, 51, 52 или 53.
-s – не создает команды MQSC для объектов, начинающихся на SYSTEM*.
-i – пропускает создание команд для поврежденных объектов.
Используя созданный данной командой скрипт-файл, можно легко восстановить объекты менеджера очередей в случае переустановки системы, если состояние объектов менеджера не удается восстановить, например, при поломке жесткого диска. Рекомендуется при создании или изменении свойств какого-нибудь объекта, но не реже раза в неделю использовать данную программу для сохранения информации объектов для каждого менеджера очередей. Данная программа доступна по адресу ftp://ftp.software.ibm.com/software/integration/support/supportpacs/individual/ms03.zip
Очередь недоставленных сообщений и восстановление данных
Практически все события, за исключением обработки non persistent сообщений, фиксируются в различных лог файлах WebSphere MQ. Существует два вида лог файлов. В первый записываются сообщения об ошибках, а во второй все изменения состояния объектов менеджера, включая обработанные persistent сообщения. Файл ошибок имеет формат, который легко прочитать с помощью любого текстового редактора. В него записываются события, связанные со стартом или остановкой менеджера и каналов, ошибки установки соединения, ошибки приема или отправки сообщений, например с некорректным форматом или ошибки конвертации, связанные с кодовой страницей. Одним словом, все события, связанные с обеспечением деятельности всех менеджеров очередей, созданных на данном компьютере. Расположение этих файлов задается при первоначальной установке WebSphere MQ. Например, для платформы Windows по умолчанию они расположены в каталоге C:\Program Files\IBM\WebSphere MQ\Errors и имеют название amqerrXX.LOG, где ХХ – номер файла. Второй тип файлов имеет специальный формат и представляет собой журнал изменений свойств объектов. Кроме этого в него записываются все persistent сообщения, обработанные менеджером очередей. Для каждого локального менеджера создаются свои файлы. Например, для менеджера очередей QM_Win2000 на платформе Windows они расположены в C:\Program Files\IBM\WebSphere MQ\log\QM_Win2000\active\ и имеют имя s000000X.log, где X – номер файла. Напомним, что существует два вида логирования сообщений и изменений свойств объектов – линейный и круговой. Линейный вид логирования, в отличие от кругового, позволяет как записывать, так и восстанавливать состояние менеджера в определенный момент времени. Запись образа объектов менеджера производится с помощью команды rcdmqobj. Синтаксис команды имеет вид:
rcdmqobj -m QmgrName -t ObjectType GenericObjName
где:
QmgrName – имя менеджера.
ObjectType – тип объекта. Может принимать значения nl или namelist для списка кластеров, prcs или process для процессов, q или queue для всех типов очередей, ql или qlocal для локальных очередей, qa или qalias для alias очередей, qr или qremote для удаленных локальных очередей, qm или qmodel для модельных очередей, qmgr для менеджера, * или all для всех объектов.
GenericObjName – имя объекта.
Встречаются ситуации, когда необходимо удалить файл очереди, содержащий сообщения. Это необходимо, например, при заполнении свободного пространства дисковой системы (тома), выделенной под WebSphere MQ на UNIX платформах. В этом случае WebSphere MQ сообщит, что объект данной очереди поврежден. Восстановить поврежденные как сознательно, так и в результате сбоя дисковой системы, объекты при условии линейного логирования, можно с помощью команды rcrmqobj. Синтаксис команды имеет вид:
rcrmqobj -m QmgrName -t ObjectType GenericObjName
где:
QmgrName – имя менеджера;
ObjectType – тип объекта;
GenericObjName – имя объекта.
В результате выполнения этой команды восстанавливаются не только очереди, но и persistent сообщения в очередях. Это возможно благодаря тому, что для каждого объекта менеджера ведется запись изменений его состояния, и так называемые точки checkpoint. Команда rcrmqobj повторяет все события, которые были с поврежденным объектом от последнего checkpoint до конца лог файла. Поскольку not persistent сообщения в файл не записываются, то они и не восстанавливаются.
При использовании кругового логирования поврежденные объекты следует удалить, а затем создать заново.
Кроме технических ошибок бывают еще и логические ошибки, связанные с неправильными настройками объектов. Если в локальной удаленной очереди указано неверное имя удаленного менеджера или очереди назначения, то сообщения все равно будут доставлены на менеджер, расположенный по адресу, указанному в канале отправителе, который использует соответствующую трансмиссионную очередь. Для обработки ошибочных или недоставленных сообщений существует специальная очередь недоставленных сообщений, которая указывается в атрибуте менеджера Dead-letter Queue в закладке Extended. Изменим пример из лекции 5, в котором при создании на менеджере QM_Win2000 локальной удаленной очереди Win2000_HPUX.RQ будет неправильно указана очередь, в которую нужно доставить сообщения:
ALTER QREMOTE ('Win2000_HPUX.RQ') + XMITQ('Win2000_HPUX.TQ') + RNAME('Win2000_AIX.Q') + RQMNAME('QM_HPUX')
Очереди назначения Win2000_AIX. Q на менеджере QM_HPUX не существует, однако сообщение все равно будет доставлено на этот менеджер в очередь DEAD_LETTER. Его можно просмотреть с помощью WebSphere MQ Explorer (рис. 7.1).
Просмотрев свойства сообщения, в закладке Dead-Letter Header можно увидеть код ошибки 2085, который говорит о том, что в заголовке сообщения существует неизвестный объект. В поле Destination Queue можно увидеть, что очередью назначения является несуществующая очередь Win2000_AIX.Q. Из данной ситуации есть три выхода:
Создать очередь Win2000_AIX.Q на менеджере QM_HPUX и перенаправить сообщение из DEAD_LETTER в нее с помощью команды:
runmqdlq DEAD_LETTER QM_Win2000
Далее ввести команду:
ACTION(RETRY)
и нажать <Ctrl+d> (для платформы Windows <Ctrl+z> <Enter> и еще раз <Ctrl+z> <Enter>) и выйти из команды runmqdlq нажав <Ctrl+c>. В результате выполнения данной команды WebSphere MQ попытается заново инициировать помещение сообщения с указанными атрибутами из очереди DEAD_LETTER. Этот способ следует применять в случае переполнения существующей очереди назначения. Напомним, что если количество
увеличить изображение
Рис. 7.1. Свойства недоставленного сообщения
сообщений в очереди превышает ее атрибут Maximum Queue Depth, то вновь поступающие сообщения также будут помещаться в очередь недоставленных сообщений.
Перенаправить сообщение в другую очередь, например в Win2000_HPUX.Q. В данном случае синтаксис команды runmqdlq будет выглядеть следующим образом:
runmqdlq DEAD_LETTER QM_Win2000
Далее ввести:
ACTION(FWD) FWDQ('Win2000_HPUX.Q') HEADER(NO)
и нажать Ctrl+d. Сообщения из очереди DEAD_LETTER будут помещены в очередь Win2000_HPUX.Q.
Удалить сообщение из DEAD_LETTER, исправить свойства локальной удаленной очереди Win2000_HPUX.RQ на менеджере QM_Win2000:
ALTER QREMOTE ('Win2000_HPUX.RQ') RNAME('Win2000_HPUX.Q')
и послать сообщение заново.
Второй способ можно использовать, когда переполнена как очередь назначения, так и очередь недоставленных сообщений. Если это не удается, то можно поступить следующим образом. Назначить в качестве очереди недоставленных сообщений новую очередь и перестартовать менеджер. Вновь поступающие сообщения будут помещаться уже в нее. Однако следует не допускать переполнения очередей недоставленных сообщений. Кроме этого, следует отметить, что простое перекладывание сообщения из очереди недоставленных сообщений в очередь назначения не даст нужного результата, поскольку недоставленные сообщения имеют свой собственный (MQDEAD) формат.
Иногда встречается случай, когда после перезагрузки менеджера очередей пропадают сообщения из трансмиссионной (transmission) очереди, несмотря на то, что ее атрибут Default Persistence установлен в persistent. В таком случае следует проверить этот атрибут в соответствующей локальной удаленной (remote) очереди и также установить его в persistent.
Перенаправление потоков
Потребность в этом приеме может возникнуть когда необходимо посылать "пачки" сообщений, например, с QM_AS400 на QM_HPUX. Доступ к запуску приложений на AS400 временно закрыт. Но WebSphere MQ работает. Самое простое решение (рис.7.11) с сервера WindowsNT направляем необходимую "пачку" сообщений на AS400 в локальную удаленную очередь (remote queue), нацеленную на HP_UNIX. Этот же прием может использоваться, если доступ к менеджеру QM_HPUX открыт только с определенного IP адреса.
Рис. 7.11. Перенаправление потоков
"Вечный двигатель"
Этот прием весьма полезен при тестировании интерфейсов для проверки стабильности и надежности работы программ. Схема "вечного двигателя" изображена на рис. 7.12. В очереди Remote Queue rq1 на сервере WindowsNT в качестве параметра Remote Queue Name указывается rq2, а на сервере HP_UNIX в очереди Remote Queue rq2 соответственно rq1.
Рис. 7.12. "Вечный двигатель"
И еще один прием, точнее святая обязанность администратора WebSphere MQ, это документирование интерфейсов. Весьма удобно иметь документированные интерфейсы в графическом виде, и программных средств для этого более чем достаточно (Visio, Bpwin и т.д.). Но по мере сложности интерфейсов их графическое представление становится не наглядным. В этом случае может быть рекомендован Excel, с помощью которого в колонках таблиц прописываются параметры настройки интерфейсов.
По мере возрастания количества интерфейсов, менеджеров и их объектов может возникнуть ситуация, когда администратор WebSphere MQ по названию объекта не сможет определить его сущность. Во избежание этого рекомендуется с самого начала разработать некоторые правила, по которым следует называть все объекты на разных менеджерах. Например, если простые локальные очереди (local queue) будут оканчиваться на .Q, трансмиссионные (transmission queue) на .TQ, локальные удаленные (remote queue) на .RQ, каналы на .CH, процессы на .P и так далее, то по окончанию сразу можно определить тип объекта. То же касается направления передачи и сущности передаваемых сообщений. Например, надо передать курсы валют из системы ABS1 в систему ABS2.
Создаваемые объекты в системе ABS1 можно назвать:
ABS1_ ABS2_CV.RQ – локальная удаленная очередь;
ABS1_ ABS2_CV.TQ – локальная трансмиссионная очередь;
ABS1_ ABS2_CV.CH – канал отправитель.
В системе ABS2:
ABS1_ ABS2_CV.Q – локальная очередь;
ABS1_ ABS2_CV.CH – канал получатель.
Кроме всего прочего, если производительность интерфейса передачи данных позволяет помимо обработки сообщений еще и создавать файлы журнала прохождения и обработки сообщений на каждом участке, то наличие таких файлов, содержащих как минимум время и мнемонику сообщений, а как максимум и их текст, существенно облегчает работу по поиску и устранению неисправностей.
Вопросы производительности рассматривались в той или иной мере в предыдущих лекциях. Подводя итог, можно разделить проблемы производительности на две группы, одна из которых связана со скоростью передачи данных в сети, а другая, связана со скоростью помещения и извлечения сообщений из очередей.
В первой группе решающую роль играет собственно, сетевое оборудование и качество линий связи. Скорость передачи данных от менеджера до менеджера сопоставима со скоростью передачи данных в сети. Ко второй группе можно отнести следующие факторы.
Серверное оборудование – чем мощнее сервера, тем быстрее будет работать программное обеспечение.
Размеры баз данных – чем больше база, тем дольше выполняются запросы.
Алгоритмы обработки сообщений – если требуется просто вставить запись в таблицу, то это может быть относительно высокая скорость, а если требуется выполнение операторов SELECT и проверка условий, на основе которых принимается решение о добавлении или изменении записи, то скорость обработки сообщений уменьшается.
Программное обеспечение – корректно написанная программа работает быстрее, к тому же выбор языка программирования играет важную роль. Замечено, что один и тот же алгоритм, реализованный на C и Visual Basic, дает существенно разные результаты. Например, простое перекладывание persistent сообщений размером 1Кб из очереди в очередь на одном и том же менеджере (NT платформа, процессор Pentium IV с тактовой частотой 1.8 ГГц) с помощью программы, написанной на С, дает результат 400 сообщений в секунду, а с помощью программы на Visual Basic только 140.
Тип хранения сообщений в очереди – сообщения persistent обрабатываются медленнее, чем not persistent. Not persistent сообщения из предыдущего примера перекладывались со скоростями 1000 и 200 сообщений в секунду, соответственно.
Наличие механизмов шифрования и сертификации также влияет на скорость обработки сообщений. Так использование SSL механизма может отнять до 10% производительности в зависимости от длины ключа и алгоритма шифрования данных.
Вопросы производительности
Кроме выяснения причин ошибок передачи и их устранения, администратор WebSphere MQ должен планировать и рассчитывать схемы передачи данных от одной платформы до другой. Рассмотрим простой пример передачи сообщения, содержащего информацию из строки таблицы из одной базы данных DataBase1 в другую базу DataBase2, расположенную на другой платформе. Интерфейс передачи данных можно разбить на три части (рис. 7.9).
Рис. 7.9. Схема интерфейса передачи данных
Первая часть – программа А, помещающая сообщения в исходящую очередь RQ1, вторая – передача сообщений от одного менеджера очередей к другому в очередь назначения Q2, третья – программа B, помещающая сообщения из очереди назначения Q2 в таблицу базы данных.
В данной схеме нужно учитывать, что скорость разбора сообщений из очереди назначения Q2 и помещения их в DataBase2 (участок 3) должна быть больше скорости поступления сообщений в эту очередь. Если же сообщения генерируются в исходящую очередь из базы DataBase1 не линейно, то необходимо учитывать характер нелинейности и возможные "узкие" места интерфейса. В любом случае не следует допускать переполнения очередей.
Часто встречаются задачи, в которых требуется при изменении информации в одной системе передать произошедшие изменения в другие системы, причем формат изменяемых данных на разных платформах может иметь различный вид. При этом обработку информации целесообразно сосредоточить в одном месте, так как алгоритмы преобразования данных из DataBase1 в DataBase2, DataBase3 и т.д. очень похожи и имеют много общего, а часто являются подмножествами друг друга. В этом случае мы имеем интерфейс передачи данных с централизованной обработкой сообщений, схема которого показана на рис.7.10.
Рис. 7.10. Схема интерфейса с централизованной обработкой сообщений
В качестве инструментального средства для центра обработки сообщений рекомендуется Брокер сообщений WebSphere BI Message Broker. Это достаточно эффективный инструмент, являющийся интегрированной средой для визуального проектирования программ обработки сообщений. При проектировании более сложных, разветвленных потоков передачи и распространения информации, которая на пути от источника к приемнику может меняться и дополняться, следует также учитывать производительность всех точек входа и выхода сообщений и соизмерять скорости выгрузки сообщений со скоростью их передачи и обработки.
Приведем еще несколько приемов настройки и тестирования интерфейсов.
Дополнительные функции WebSphere MQ
Одним из важнейших приемов прикладного программирования для WebSphere MQ является использование транзакций, аналогичных транзакциям в базах данных. С помощью функций MQBEGIN, MQCMIT, MQBACK можно открыть транзакцию, завершить транзакцию успешно и откатить транзакцию, соответственно. В этом механизме прослеживается полная аналогия с транзакциями в базах данных. Программирование с использованием WebSphere MQ транзакций позволяет создавать надежные программы.
MQBEGIN - функция, которая открывает WebSphere MQ транзакцию, координирует работу менеджера очередей и может использовать внешние ресурсы менеджера.
Синтаксис:
MQBEGIN (Hconn, BeginOptions, CompCode, Reason)
где:
Hconn | - | идентификатор связи (connection handle) с менеджером очередей |
BeginOptions | - | опции MQBEGIN |
CompCode | - | код завершения |
Reason | - | код ошибки, детализирующий код завершения. |
Здесь BeginOptions дает ссылку на структуру MQBO (Begin options), поля которой показаны в таблице 8.6
StrucId | MQCHAR4 | Идентификатор структуры | MQBO_STRUC_ID | 'BObb' |
Version | MQLONG | Номер версии структуры | MQBO_VERSION_1 | 1 |
Options | MQLONG | Опции для управления MQBO | MQBO_NONE | 0 |
На языке С макропеременная MQBO_DEFAULT содержит значения, приведенные в табл.8.6, и может быть использована в тексте следующим образом: MQBO MyBO = {MQBO_DEFAULT};
MQCMIT – функция, которая указывает на то, что все сообщения, прочитанные и записанные с момента открытия транзакции, становятся постоянными, то есть транзакция успешно завершена. Сообщения, помещенные в очередь как часть блока сообщений, становятся доступными всем приложениям. Сообщения, прочитанные из очереди как часть блока сообщений, удаляются из очереди.
Синтаксис:
MQCMIT (Hconn, CompCode, Reason)
Комментарии в данном случае излишни.
MQBACK - функция, которая указывает на то, что всем сообщениям, прочитанным и записанным с момента открытия транзакции, дается задний ход, то есть производится откат транзакции. Все сообщения, помещенные в очередь, удаляются из нее. Все сообщения, прочитанные из очереди, восстанавливаются в очереди (становятся доступными). Как правило, при чтении сообщения удаляются из очереди или помечаются как транзакционные и становятся не доступными (uncommitted messages), если прошла команда MQBEGIN.
Синтаксис:
MQBACK (Hconn, CompCode, Reason)
Работа функций MQBEGIN, MQCMIT и MQBACK будет на примере продемонстрирована в лекции 9.
Следующие группы функций позволяют считывать и модифицировать атрибуты WebSphere MQ объектов, а также предоставляют дополнительный сервис.
MQINQ – функция, которая возвращает массив цифр и множество символьных строк, содержащих параметры объекта. В качестве объектов могут выступать очередь, процесс, именованный список (namelist), менеджер очередей.
Синтаксис:
MQINQ (Hconn, Hobj, SelectorCount, Selectors, IntAttrCount, IntAttrs, CharAttrLength, CharAttrs, CompCode, Reason)
где:
Hconn | - | идентификатор связи с менеджером очередей, полученный от MQCONN |
Hobj | - | идентификатор объекта, полученный от MQOPEN |
SelectorCount | - | Счетчик атрибутов, которые должны быть извлечены (от 0 до 256) |
Selectors | - | Массив значений атрибутов, т.е. чисел или символов, которые должны быть извлечены |
IntAttrCount | - | Счетчик цифровых атрибутов |
IntAttrs | - | Массив цифровых атрибутов |
CharAttrLength | - | Длина буфера символьных атрибутов |
CharAttrs | - | Буфер значений символьных атрибутов |
CompCode | - | код завершения |
Reason | - | код ошибки, детализирующий код завершения. |
Комментарии. Буфер CharAttrs должен содержать значения символьных атрибутов для записи в том же порядке, как перечислены атрибуты в Selectors. Имена устанавливаемых атрибутов приведены в главе "MQSET – Set object attributes" [14].
MQCONNХ – функция, которая обеспечивает подключение приложения к менеджеру очередей.
Синтаксис:
MQCONNХ (QMgrName, ConnectOpts, Hconn, CompCode, Reason)
Функция MQCONNХ отличается от MQCONN наличием параметра ConnectOpts для контроля процесса подключения к менеджеру очередей и позволяет установить дополнительные опции. Опции MQCNO_STANDARD_BINDING и MQCNO_FASTPATH_BINDING служат для стандартного или быстрого установления связи. Опции MQCNO_HANDLE_SHARE_NONE, MQCNO_HANDLE_SHARE_BLOCK, MQCNO_HANDLE_SHARE_NO_BLOCK позволяют осуществлять управление идентификаторами связи при работе с разными процессами (threads).
Опции SSLConfigPtr и SSLConfigOffset используются, когда приложение осуществляет вызов MQCONNХ через WebSphere MQ клиента по протоколу TCP/IP. Более подробно эти опции описаны в главе "MQCNO – Connect options" [14].
MQPUT1 – функция, которая помещает одно сообщение в очередь, при этом очередь не должна быть открыта. Функция MQPUT1 по сравнению с MQPUT не требует команды MQOPEN и MQCLOSE.
Синтаксис:
MQPUT1 (Hconn, ObjDesc, MsgDesc, PutMsgOpts, BufferLength, Buffer, CompCode, Reason)
Как следствие такого определения функции вместо параметра Hobj для MQPUT в MQPUT1 используется параметр ObjDesc, как и в функции MQOPEN.
На этом обзор основных и дополнительных функций WebSphere MQ закончен. Теперь можно начинать писать первые приложения для WebSphere MQ, что и будет сделано в следующей лекции.
В заключении следует отметить, что разработчикам приложений необходимо выбирать язык программирования исходя из решаемой задачи и требований на создаваемое ПО. Если приложение должно работать с WebSphere MQ в режиме промышленной эксплуатации при достаточно высоких требованиях на производительность, то рекомендуется использовать С, С++. Если приложение работает с WebSphere MQ в тестовом режиме без ограничений на производительность, то вполне подойдут Visual Basic 6.0 или Power Builder 6.0. Известны случаи, когда приложение, написанное на Visual Basic 6.0 или Power Builder 6.0 и работающее постоянно с WebSphere MQ, начинало наращивать используемую оперативную память, и разработчики этих программ не могли понять причину ошибки и устранить ее. Приложение, написанное на С++ и реализующее тот же алгоритм, не имеет подобного дефекта. Красивые интерфейсы при работе с WebSphere MQ требуются не так часто. Кроме того, следует отметить, что приложение на C/С++ обеспечивает обработку более 400 Persistent сообщений/секунду при длине сообщения 1Кбайт, в то время как приложение с тем же алгоритмом обработки на Visual Basic - не более 150 сообщений/секунду на компьютере INTEL Pentium 1.8Ггц. Все это объясняет тот факт, что в данной книге примеры программ даются на языке С.
© 2003-2007 INTUIT.ru. Все права защищены. |
Общие сведения о разработке приложений для WebSphere MQ
В большинстве приложений, работающих с WebSphere MQ, решаются такие задачи как: чтение сообщений из базы данных (БД) и запись их в очередь; чтение сообщений из очереди и запись их в БД; и то и другое одновременно. В более редких случаях, например, для задач мониторинга осуществляется чтение параметров объектов WebSphere MQ в частности Current Depth, Channel Status, Message Count, Last Message Date/Time и т.п.
Для задач чтения/записи сообщений в очередь ведется работа непосредственно с сообщениями. На рис.8.1 показана структура одного из таких сообщений.
Рис. 8.1. Структура сообщения
Заголовок сообщения (Message Header) несет в себе служебную информацию: идентификационный номер сообщения (Message ID); идентификатор пользователя, пославшего сообщение; формат, длину, кодировку, тип сообщения, дату и время отправки и т.д. Данные в сообщении (Message Data) на уровне приложений разделяются на поля. В спецификации приложений записывается структура сообщения, например, в табл.8.1.
Name | 50 | Для заполнения поля Account_Name таблицы Account |
Account | 32 | Для заполнения поля Account таблицы Account |
ClientNo | 32 | Для заполнения поля ClientNo таблицы Account |
Detail | 4 | Для заполнения поля Detail таблицы Account |
Account_Date | 10 | Дата в формате 'YYYY-MM-DD' для заполнения поля Account_Date таблицы Account |
Account_Time | 8 | Время в формате 'HH-MM-SS' для заполнения поля Account_Time таблицы Account |
Comment | 150 | Для заполнения поля Comment таблицы Account |
В теле сообщения поля не имеют имен и идут в порядке перечисления в таблице 8.1. без разделителей (разделители могут быть использованы, но тогда длина сообщения увеличится). Числа (Integer, Long) записываются в символьном формате.
По WebSphere MQ интерфейсу (канал и соответствующие очереди) могут передаваться сообщения одного или нескольких типов, например, информация о клиентах, счетах и проводках. Для обработки сообщения большое значение имеет поле заголовка MSGTYPE, определяющее тип сообщения. Каждому типу сообщения соответствует своя структура данных сообщения (Message Data). По типу сообщения программа определяет, по какой ветви программы осуществлять разбор сообщения. С одной стороны, не целесообразно увеличивать число интерфейсов и, соответственно, число WebSphere MQ объектов и обрабатывающих программ, однако практические неудобства могут сказаться, если число WebSphere MQ объектов перевалит за несколько сотен и они все будут использоваться одновременно на недостаточно мощной технике. С другой стороны, программирование и внедрение программ облегчается, если на один тип сообщения создается один интерфейс и одна программа. Оптимальное решение следует искать где-то посередине между этими двумя подходами. К вопросам унификации интерфейсов следует вернуться (это будет сделано при рассмотрении одновременной работы WebSphere MQ и баз данных), так как создание и сопровождение нескольких сотен похожих программ для работы WebSphere MQ может вызвать серьезные затруднения.
Основу для программирования приложений, работающих с WebSphere MQ, предоставляет интерфейс очередей сообщений MQI (Message Queue Interface).
Приложения для работы с WebSphere MQ, создаваемые пользователем, могут использовать следующие группы функций MQI:
MQCONN, MQCONNХ и MQDISC. Эти функции обеспечивают подключение приложения к менеджеру очередей и отключение его.MQOPEN и MQCLOSE функции открывают и закрывают подключение к очередям, с которыми работает приложение.MQPUT и MQPUT1 функции обеспечивают помещение сообщений в очередь.MQGET функция поддерживает просмотр, извлечение и удаление сообщений из очереди.MQINQ функция позволяет запросить атрибуты WebSphere MQ объекта.MQSET функция устанавливает атрибуты очереди, но атрибуты других типов WebSphere MQ объектов не могут быть изменены.MQBEGIN, MQCMIT, MQBACK. Эти функции обеспечивают работу с WebSphere MQ транзакциями (открытие транзакции, закрытие и "откат" транзакции).
Таким образом, обобщенная структура программы для работы с WebSphere MQ на уровне блоков может быть представлена в виде следующей последовательности псевдокода:
Блок 1 MQCONN |
Блок 2 MQOPEN |
Блок 3 MQBEGIN |
Блок 4 MQGET |
Блок 5 SQL UPDATE, SQL SELECT |
Блок 6 MQPUT |
Блок 7 Если нет ошибок - MQCMIT, в противном случае MQBACK |
Блок 8 MQCLOSE |
Блок 9 MQDISC |
Для программирования приложений, работающих с WebSphere MQ, предлагается инструментарий на различных языках: C (для всех платформ), C++ (для большинства операционных систем), Visual Basic (для систем Windows), COBOL, Assembler (для мэйн-фреймов ИБМ с операционной системой z/OS), RPG, PL/I (для систем с z/OS, OS/2 Warp, VSE/ESA, Windows), TAL (для систем с Compaq NonStop Kernel) и другие средства.
В WebSphere MQ для приложений на C++ в среде Windows следует редактировать (линковать) разрабатываемую программу с библиотекой MQI в дополнение к библиотекам операционной системы:
mqm.Lib для WebSphere MQ Server для 32-bit Cmqic.Lib для WebSphere MQ Client для 16-bit Cmqic32.Lib для WebSphere MQ Client для 32-bit C
В качестве заголовочных файлов следует использовать cmqc.h или cmqcfc.h
Библиотека MQI обеспечивает реализацию функций WebSphere MQ. Список библиотек на других платформах, необходимых для разработки приложений на основе WMQ, можно найти в документации.
Следует упомянуть об интерфейсе приложений для передачи сообщений AMI (Application Messaging Interface), который является более простым и высокоуровневым интерфейсом, чем MQI. И хотя AMI имеет некоторые ограничения по сравнению с MQI, его функции достаточно эффективны для большинства пользователей. AMI поддерживает два типа моделей для программирования приложений: точка-точка, издатель-подписчик. Интерфейс AMI существует для языков C, C++ и Java, работающих в операционных системах: OS/400, AIX, HP-UX, Solaris, Microsoft Windows и z/OS.
В дальнейшем при описании функций интерфейса MQI будут использованы общие для всех функций два типа параметров: идентификатор (handle) и код возврата (return сode). При подключении к менеджеру очередей должен быть создан уникальный идентификатор этого менеджера для данного приложения, который называется идентификатором связи Hconn (connection handle). Идентификатор Hconn возвращается функциями MQCONN или MQCONNХ и передается во все другие функции как входной параметр. При работе с объектом WebSphere MQ должен также существовать уникальный идентификатор, называемый идентификатором объекта (object Handle). Этот идентификатор определяется функций MQOPEN при открытии объекта и возвращается как Hobj. Программа передает идентификатор объекта как входной параметр при вызове функций MQPUT, MQGET, MQINQ, MQSET или MQCLOSE.
Код завершения (completion code) и код ошибки (reason code) возвращаются как выходные параметры в каждой функции. Совместно они называются кодами возврата (return codes) и показывают результат выполнения функции. Код завершения возвращается либо как MQCC_OK или MQCC_FAILED, отображая успешное или ошибочное выполнение функции, соответственно. Иногда возвращается промежуточное значение MQCC_WARNING как предупреждение о неполном завершении. MQCC_OK всегда соответствует Reason code = 0. MQCC_WARNING может сопутствовать, например, Reason code = 2002, это говорит о том, что приложение уже подключено. MQCC_FAILED обязательно имеет детализацию, например: Reason code = 2058 - менеджер с данным именем неизвестен или недоступен, Reason code = 2035 - нет прав доступа и т.д. Программа может и должна использовать код ошибки в процессе обработки. Например, при определенном коде ошибки программа может выдать сообщение пользователю с предложением изменить входные данные и после этого повторить вызов функции либо вернуть сообщение пользователю. Коды возврата подробно описаны в книге WebSphere MQ Messages [7].
Основные функции WebSphere MQ
MQCONN - функция подключения приложения к менеджеру очередей.
Синтаксис:
MQCONN (QmgrName, Hconn, CompCode, Reason)
где:
QmgrNam | - | имя менеджера очередей, к которому производиться подключение (латинские буквы, цифры, символы "_", "/", ".", "%" ). |
Hconn | - | идентификатор связи (connection handle) с менеджером очередей |
CompCode | - | код завершения, принимающий одно из трех значений: MQCC_OK, MQCC_WARNING, MQCC_FAILED |
Reason | - | код ошибки, детализирующий код завершения. |
Результат работы функции – установление связи с менеджером очередей и возвращение уникального идентификатора связи Hconn с менеджером. Имя QmgrNam может быть опущено (строка со значением Null или пробел), тогда обращение к менеджеру очередей на данном компьютере происходит по умолчанию. Одно из основных назначений функции – проверка авторизации пользователя (приложение работает под определенным пользователем с идентификатором userid, который может быть не авторизован для работы с данным менеджером или его объектами).
MQOPEN – функция, открывающая подключение к очередям, с которыми работает приложение.
Синтаксис:
MQOPEN (Hconn, ObjDesc, Options, Hobj, CompCode, Reason)
где:
Hconn | - | идентификатор связи (connection handle) с менеджером очередей |
ObjDesc | - | описание объекта MQOD |
Options | - | опции объекта MQOO |
Hobj | - | идентификатор связи с объектом |
CompCode | - | код завершения, принимающий одно из трех значений: MQCC_OK – успешное завершение, MQCC_WARNING – предупреждение, MQCC_FAILED – ошибочный вызов |
Reason | - | код ошибки, детализирующий код завершения. |
Результат работы функции – возвращение уникального идентификатора связи Hobj с "открытым" объектом WebSphere MQ, то есть очередью, с которой установлена связь. Описание объекта MQOD – это ссылка на структуру объекта из библиотеки WebSphere MQ. Структура MQOD представлена в таблице 8.2.
Опции объекта (переменная MQLONG): MQOO_BROWSE* – просмотр объекта, MQOO_INPUT* – объект открыт для помещения сообщений, MQOO_OUTPUT – объект открыт для извлечения сообщений, MQOO_INQUIRE - объект открыт для извлечения атрибутов, MQOO_SET - объект открыт для изменения атрибутов и др. Опции объекта со звездочками задаются, как правило, в виде развернутых констант:
MQOO_INPUT_AS_Q_DEF - открытие очереди на основе ее определения;MQOO_INPUT_SHARED - открытие очереди для одновременного доступа нескольких приложений;MQOO_INPUT_EXCLUSIVE - открытие очереди для эксклюзивного доступа одному приложению;MQOO_BROWSE - открытие очереди для просмотра/чтения сообщений с возможностью дальнейшего использования детализирующих опций MQGMO_BROWSE_FIRST, MQGMO_BROWSE_NEXT, MQGMO_BROWSE_MSG_UNDER_CURSOR функции MQGET;MQOO_OUTPUT - открытие очереди для записи сообщений.
Описание всех опций MQOO дано в главе "MQOPEN – Open object" [14] и объекта MQOD - в главе "MQOD – Object descriptor" [14]. Как правило, значений опций по умолчанию для MQOPEN вполне достаточно для программирования стандартных приложений для WebSphere MQ.
StrucId | MQCHAR4 | MQOD_STRUC_ID | 'ODbb' |
Version | MQLONG | MQOD_VERSION_1 | 1 |
ObjectType | MQLONG | MQOT_Q | 1 |
ObjectName | MQCHAR48 | Нет | Строка со значением Null или пробел |
ObjectQMgrName | MQCHAR48 | Нет | Строка со значением Null или пробел |
DynamicQName | MQCHAR48 | Нет | 'CSQ.*' на z/OS; 'AMQ.*' в противном случае |
AlternateUserId | MQCHAR12 | Нет | Строка со значением Null или пробел |
RecsPresent | MQLONG | Нет | 0 |
KnownDestCount | MQLONG | Нет | 0 |
UnknownDestCount | MQLONG | Нет | 0 |
InvalidDestCount | MQLONG | Нет | 0 |
ObjectRecOffset | MQLONG | Нет | 0 |
ResponseRecOffset | MQLONG | Нет | 0 |
ObjectRecPtr None | MQPTR | Нет | Указатель со значением Null |
ResponseRecPtr | MQPTR | Нет | Указатель со значением Null |
AlternateSecurityId | MQBYTE40 | MQSID_NONE | Nulls |
ResolvedQName | MQCHAR48 | Нет | Строка со значением Null или пробел |
ResolvedQMgrName | MQCHAR48 | Нет | Строка со значением Null или пробел |
Hconn и Hobj – это идентификаторы, полученные от MQCONN и MQOPEN, соответственно.
Описание сообщения MQMD – это ссылка на структуру объекта из библиотеки WebSphere MQ. Эта структура записывается следующим образом (табл.8.3).
StrucId | MQCHAR4 | MQMD_STRUC_ID | 'MDbb' |
Version | MQLONG | MQMD_VERSION_1 | 1 |
Report | MQLONG | MQRO_NONE | 0 |
MsgType | MQLONG | MQMT_DATAGRAM | 8 |
Expiry | MQLONG | MQEI_UNLIMITED | MQEI_UNLIMITED |
Feedback | MQLONG | MQFB_NONE | 0 |
Encoding | MQLONG | MQENC_NATIVE | В зависимости от среды |
CodedCharSetId | MQLONG | MQCCSI_Q_MGR | 0 |
Format | MQCHAR8 | MQFMT_NONE | Пробел |
Priority | MQLONG | MQPRI_PRIORITY_AS_Q_DEF | -1 |
Persistence | MQLONG | MQPER_PERSISTENCE_AS_Q_DEF | 2 |
MsgId | MQBYTE24 | MQMI_NONE | Nulls |
CorrelId | MQBYTE24 | MQCI_NONE | Nulls |
BackoutCount | MQLONG | Нет | 0 |
ReplyToQ | MQCHAR48 | Нет | Строка со значением Null или пробел |
ReplyToQMgr | MQCHAR48 | Нет | Строка со значением Null или пробел |
UserIdentifier | MQCHAR12 | Нет | Строка со значением Null или пробел |
AccountingToken | MQBYTE32 | MQACT_NONE | Nulls |
ApplIdentityData | MQBYTE32 | Нет | Строка со значением Null или пробел |
PutApplType | MQLONG | MQAT_NO_CONTEXT | 0 |
PutApplName | MQCHAR28 | Нет | Строка со значением Null или пробел |
PutDate | MQCHAR8 | Нет | Строка со значением Null или пробел |
PutTime | MQCHAR8 | Нет | Строка со значением Null или пробел |
ApplOriginData | MQCHAR4 | Нет | Строка со значением Null или пробел |
GroupId | MQBYTE24 | MQGI_NONE | Nulls |
MsgSeqNumber | MQLONG | Нет | 1 |
Offset | MQLONG | Нет | 0 |
MsgFlags | MQLONG | MQMF_NONE | 0 |
OriginalLength | MQLONG | MQOL_UNDEFINED | -1 |
GetMsgOpts – опции для функции MQGET - MQGMO (Get-message options), поля структуры которой приведены в таблице 8.4. Наиболее часто используемые опции для управления MQGET:
MQGMO_WAIT - определяет время ожидания функцией поступления новых сообщений в зависимости от значения в WaitInterval, заданногов мсек. MQGMO_NO_WAIT немедленно возвращает управление, если нет больше сообщений в очереди.MQGMO_BROWSE_FIRST - определяет, что читается первое сообщение в очереди.MQGMO_BROWSE_NEXT - определяет, что читается сообщение из текущей позиции.MQGMO_BROWSE_MSG_UNDER_CURSOR - определяет, что читается сообщение под курсором.MQGMO_LOGICAL_ORDER - определяет, что сообщения читаются в логическом порядке. Если опция опущена, то сообщения читаются в физическом порядке.MQGMO_FAIL_IF_QUIESCING - выдает ошибку, если менеджер не доступен.MQGMO_SYNCPOINT (MQGMO_NO_SYNCPOINT) - означает установку (отмену установки) контрольной точки (syncpoint control) на данном сообщении.MQGMO_ACCEPT_TRUNCATED_MSG - указывает, что допускается отсечение данных сообщения, например, если DataLength для реального сообщения больше BufferLength.
Все опции MQGMO даны в главе "MQGET – Get message" [14].
BufferLength – длина в байтах области буфера Buffer, в который считывается сообщение (переменная типа MQLONG). Максимальная длина сообщений 100Мбт, длина по умолчанию 4Мбт, реальная длина большинства сообщений не более 10Кбт.
Buffer - буфер, в который считывается сообщение.
DataLength – длина сообщения в байтах (переменная MQLONG).
Если DataLength для реального сообщения больше BufferLength, то часть сообщения может быть потеряна в зависимости от опции MQGMO_ACCEPT_TRUNCATED_MSG.
CompCode, Reason – это стандартные возвращаемые параметры, упомянутые выше и не требующие детальных пояснений.
StrucId | MQCHAR4 | Идентификатор структуры | MQGMO_STRUC_ID | 'GMOb' |
Version | MQLONG | Номер версии структуры | MQGMO_VERSION_1 | 1 |
Options | MQLONG | Опции для управления MQGET | MQGMO_NO_WAIT | 0 |
WaitInterval | MQLONG | Интервал ожидания (Wait interval) WaitInterval | None | 0 |
Signal1 | MQLONG | Сигнал | Нет | Указатель Null на z/OS; 0 в ост. случаях |
Signal2 | MQLONG | Идентификатор сигнала | Нет | 0 |
ResolvedQName | MQCHAR48 | Разрешенное имя очереди назначения (destination queue) | Нет | Строка string или пробел |
MatchOptions | MQLONG | Опции управления критериями выбора, используемыми MQGET | MQMO_MATCH_MSG_ID + MQMO_MATCH_CORREL_ID | 3 |
GroupStatus | MQCHAR | Флаг, индицирующий, что извлеченное сообщение находиться в группе сообщений | MQGS_NOT_IN_GROUP | 'b' |
SegmentStatus | MQCHAR | Флаг, индицирующий, что извлеченное сообщение является сегментом логического сообщения | MQSS_NOT_A_SEGMENT | 'b' |
Segmentation | MQCHAR | Флаг, индицирующий, что допускается дальнейшая сегментация для извлеченного сообщения | MQSEG_INHIBITED | 'b' |
Reserved1 | MQCHAR | Резервное | Нет | 'b' |
MsgToken | MQBYTE16 | Маркер сообщения (Message token) | MQMTOK_NONE | Nulls |
ReturnedLength | MQLONG | Возвращаемая длина сообщения в байтах | MQRL_UNDEFINED | -1 |
Синтаксис:
MQPUT (Hconn, Hobj, MsgDesc, PutMsgOpts, BufferLength, Buffer, CompCode, Reason)
где:
Hconn | - | идентификатор связи с менеджером очередей, полученный от MQCONN |
Hobj | - | идентификатор объекта, полученный от MQOPEN |
MsgDesc | - | описание сообщения MQMD |
PutMsgOpts | - | опции MQPMO для записи сообщений |
BufferLength | - | длина буфера Buffer, откуда пишется сообщение. Значение 0 является действительным и показывает, что сообщение не содержит данных. |
Buffer | - | буфер, из которого пишется сообщение |
CompCode | - | код завершения, принимающий одно из трех значений: MQCC_OK, MQCC_WARNING, MQCC_FAILED |
Reason | - | код ошибки, детализирующий код завершения. |
StrucId | MQCHAR4 | Идентификатор структуры | MQPMO_STRUC_ID | 'PMOb' |
Version | MQLONG | Номер версии структуры | MQPMO_VERSION_1 | 1 |
Options | MQLONG | Опции для управления MQPUT и MQPUT1 | MQPMO_NONE | 0 |
Timeout | MQLONG | Зарезервировано | Нет | -1 |
Context | MQHOBJ | Идентификатор объекта входной очереди | Нет | 0 |
KnownDestCount | MQLONG | Число сообщений, посланное успешно в локальную очередь | Нет | 0 |
UnknownDestCount | MQLONG | Число сообщений, посланное успешно в удаленную очередь | Нет | 0 |
InvalidDestCount | MQLONG | Число сообщений, которые возможно не посланы | Нет | 0 |
ResolvedQName | MQCHAR48 | Разрешенное имя очереди назначения | Нет | Строка Null или пробел |
ResolvedQMgrName | MQCHAR48 | Разрешенное имя менеджера назначения | Нет | Строка Null или пробел |
RecsPresent | MQLONG | Число записей помещенных сообщений или ответных записей в настоящее время | Нет | 0 |
PutMsgRecFields | MQLONG | Флаг, индицирующий, что MQPMR поле присутствует | MQPMRF_NONE | 0 |
PutMsgRecOffset | MQLONG | Погашение записи первого помещенного сообщения с момента старта MQPMO | Нет | 0 |
ResponseRecOffset | MQLONG | Погашение записи первого ответа с момента старта MQPMO | Нет | 0 |
PutMsgRecPtr | MQPTR | Адрес записи первого помещенного в очередь сообщения | Нет | Указатель Null |
ResponseRecPtr | MQPTR | Адрес записи первого ответа | Нет | Указатель Null |
Среди опций для управления MQPUT следует назвать:
MQPMO_NEW_MSG_ID - генерирует новый идентификатор сообщенияMQPMO_NEW_CORREL_ID - генерирует новый корреляционный идентификатор и заменяет поле CorrelId в опции MQMD этим идентификатором.MQPMO_LOGICAL_ORDER - определяет, что сообщения в группах и сегментах пишутся в логическом порядке.MQPMO_FAIL_IF_QUIESCING - выдает ошибку, если менеджер не доступен.MQPMO_SYNCPOINT (MQPMO_NO_SYNCPOINT) - означает установку (отмену установки) контрольной точки (syncpoint control) на данном сообщении.MQ_MSG_HEADER_LENGTH - определяется для очереди передачи (transmission queue)
Полный список опций MQPMO дан в главе "MQPUT – Put message" [14].
Функция MQPUT может положить сообщение как в локальную (local queue), так и в удаленную очередь (remote queue). MQGET считывает сообщения только из локальной очереди локального менеджера очередей, но не может читать сообщения на удаленном менеджере.
MQCLOSE – функция, закрывающая подключение к очереди, с которой работает приложение.
Синтаксис:
MQCLOSE (Hconn, Hobj, Options, CompCode, Reason)
где:
Hconn | - | идентификатор связи (connection handle) с менеджером очередей |
Hobj | - | идентификатор связи с объектом |
ObjDesc | - | описание объекта MQOD |
Options | - | опции объекта |
CompCode | - | код завершения |
Reason | - | код ошибки, детализирующий код завершения. |
Синтаксис:
MQDISC (Hconn, CompCode, Reason)
где:
Hconn | - | идентификатор связи (connection handle) с менеджером очередей |
CompCode | - | код завершения |
Reason | - | код ошибки |
Для работы приложений в условиях промышленной эксплуатации необходимо использовать дополнительные функции WebSphere MQ.
Программа rewriter (модель "один к одному")
Первая программа будет достаточно простая и реализует так называемую модель "один к одному" или "точка-точка". Эта программа предназначена для чтения сообщений из очереди 1, записи их в очередь 2 и лог-файл на диске. Эта программа имеет практическое значение. Достаточно часто необходимо иметь файл переданных сообщений за определенный период времени, чтобы быстро ответить на вопрос "Было ли передано сообщение с такими идентификационными параметрами в теле сообщения:…"? WebSphere MQ сохраняет persistent сообщения на диске, но эти лог-файлы малопонятны, предназначены для восстановления сообщений при сбоях и достаточно быстро перезаписываются менеджерами очередей при значение параметра logging = circular (по умолчанию) и больших потоках сообщений (logging = linear рекомендуется только для систем промышленной эксплуатации и в этом случае администратор WebSphere MQ должен заботиться о том, чтобы лог-файлы не "замусорили" весь жесткий диск). Поэтому наша программа может быть достаточно полезной.
Автору приходилось сталкиваться с "плохим" стилем программирования, когда параметры программы "зашиваются" в текст. Даже в учебных курсах этого следует избегать, несмотря на некоторое усложнение программ. В наших программах мы будем использовать простые файлы инициализации, чтобы избежать этой ошибки. Назовем нашу программу rewriter.exe и файл инициализации rewriter.ini, в котором 1-я строка – имя очереди для чтения, 2-я строка – имя очереди для записи, 3-я строка – имя лог-файла, как показано ниже.
QUEUE_INPUT QUEUE_OUTPUT C:\TEMP\rewriter.log
Разрабатываемая программа может быть представлена в следующей последовательности псевдокода:
MQCONN MQOPEN --> цикл чтения сообщений | (на основе gmo.WaitInterval): | MQGET | MQPUT |-- конец цикла MQCLOSE MQDISC
Ниже приводится листинг программы rewriter.cpp для Microsoft Visual C++ ver.6.0. Не забудьте добавить mqm.Lib в Project => Settings => Link и обратиться к документации [15], [16], [17] в случае проблем с программированием.
Листинг 9.1. Rewriter C program pass messages to output queue (html, txt)
В данной версии мы выходим из цикла программы по опции gmo.WaitInterval = 3000, когда ожидаем сообщение в очереди в течении 3 сек, а его там нет (опция gmo.WaitInterval работает быстрее, чем если бы мы опрашивали очередь по собственному временному циклу). Другой вариант программы может быть таким. Задаем gmo.WaitInterval = MQWI_UNLIMITED; что соответствует gmo.WaitInterval= -1. Программа будет крутиться "бесконечно" до тех пор, пока мы не остановим её принудительно, например, нажатием клавиш CNTRL_C (стандартный останов). В этом случае нужно добавить обработчик прерываний по нажатию CNTRL_C потому, что при таком выходе объекты очереди останутся не закрытыми и идентификаторы объектов окажутся "зависшими" в виртуальной памяти компьютера. А это может привести к тому, что при повторном запуске программы эти "зависшие" идентификаторы будут мешать нормальному функционированию программы. Во втором варианте открытие и закрытие лог-файла необходимо также делать в обработчике прерываний или после каждой команды MQPUT, в противном случае лог-файл не будет формироваться. Следует отметить, что размер массива buffer ограничивает длину сообщения 8Кб и при появлении сообщений большей длины следует увеличить размер буфера.
Программа rewriter.exe работает достаточно быстро и сравнительные скорости работы данного алгоритма при длине сообщения 1Кб на компьютере INTEL Pentium 1.8Ггц приведены в таблице ниже.
С++ | 1000 сооб/сек | 400 сооб/сек |
Visual Basic 6.0 | 200 сооб/сек | 140 сооб/сек |
Возвращаясь к вопросу о стилях программирования, следует отметить, что обработка кода ошибки является обязательным атрибутом качественного программирования и об этом не следует забывать. В нашей программе дается предупредительное сообщение и делается выход из программы. Если этого не сделать, то простая описка в rewriter.ini файле приведет к зависанию программы и мучительному поиску причин такого зависания, не говоря о других более сложных ситуациях, например, когда очередь открыта эксклюзивно другим приложением.
Для версии программы gmo.WaitInterval = MQWI_UNLIMITED полезно сделать вывод на экран передаваемых сообщений, чтобы наблюдать динамику работы созданного интерфейса. Таких улучшений может быть достаточно много и мы рассмотрим две достаточно полезные модификации.
Программа rewriter может вызываться как MQSeries-триггер. Для этого параметры можно задать следующим образом. Входная очередь – это очередь, на которой определен триггеринг. Выходная очередь – это User Data в триггерном процессе и имя лог файла – это Environment Date в триггерном процессе. В этом случае код в начале программы будет такой.
/* Код для вызова rewriter.exe как MQSeries-триггер */ int main(int argc, char **argv) { if (argc > 1) {trig = (MQTMC2*)argv[1]; strncpy(odG.ObjectName, trig->QName, MQ_Q_NAME_LENGTH); strncpy(queue1, trig->QName, MQ_Q_NAME_LENGTH); strncpy(QManager, trig->QMgrName, MQ_Q_MGR_NAME_LENGTH); strncpy(odP.ObjectName, trig->UserData, MQ_PROCESS_USER_DATA_LENGTH); strncpy(queue2, trig->UserData, MQ_PROCESS_USER_DATA_LENGTH); strncpy(logfilename2, trig->EnvData, 48); }
Возможная модификация этого варианта - программа rewriter может вызываться с передачей параметров через командную строку и эту модификацию читатель может проделать самостоятельно.
Программа rewriter может быть модифицирована в программу разветвитель mqsplitter.exe: чтение сообщений из очереди 1 и запись их в очередь 2, в очередь 3 и лог-файл на диске.
Можно сделать программу mqsplitter.exe на разных языках, например, на Visual Basic 6.0 с интерфейсом, показанным на рис.9.1, и сравнить производительность программ на разных языках, реализующих один и тот же алгоритм. Такая задача будет хорошим лабораторным практикумом.
увеличить изображение
Рис. 9.1. Интерфейс программы mqsplitter на VB6
Модификацию программы mqsplitter.exe читателю предлагается сделать самостоятельно и одновременно проверить идею создания "вечного двигателя". Для создания "вечного двигателя" понадобиться изменить исходный mqsplitter.ini файл следующим образом:
QUEUE_INPUT |
QUEUE_OUTPUT1 |
QUEUE_OUTPUT2 |
C:\TEMP\ mqsplitter.log |
Программа rewriter.exe работает достаточно быстро и сравнительные скорости работы данного алгоритма при длине сообщения 1Кб на компьютере INTEL Pentium 1.8Ггц приведены в таблице ниже.
С++ | 1000 сооб/сек | 400 сооб/сек |
Visual Basic 6.0 | 200 сооб/сек | 140 сооб/сек |
Возвращаясь к вопросу о стилях программирования, следует отметить, что обработка кода ошибки является обязательным атрибутом качественного программирования и об этом не следует забывать. В нашей программе дается предупредительное сообщение и делается выход из программы. Если этого не сделать, то простая описка в rewriter.ini файле приведет к зависанию программы и мучительному поиску причин такого зависания, не говоря о других более сложных ситуациях, например, когда очередь открыта эксклюзивно другим приложением.
Для версии программы gmo.WaitInterval = MQWI_UNLIMITED полезно сделать вывод на экран передаваемых сообщений, чтобы наблюдать динамику работы созданного интерфейса. Таких улучшений может быть достаточно много и мы рассмотрим две достаточно полезные модификации.
Программа rewriter может вызываться как MQSeries-триггер. Для этого параметры можно задать следующим образом. Входная очередь – это очередь, на которой определен триггеринг. Выходная очередь – это User Data в триггерном процессе и имя лог файла – это Environment Date в триггерном процессе. В этом случае код в начале программы будет такой.
/* Код для вызова rewriter.exe как MQSeries-триггер */ int main(int argc, char **argv) { if (argc > 1) {trig = (MQTMC2*)argv[1]; strncpy(odG.ObjectName, trig->QName, MQ_Q_NAME_LENGTH); strncpy(queue1, trig->QName, MQ_Q_NAME_LENGTH); strncpy(QManager, trig->QMgrName, MQ_Q_MGR_NAME_LENGTH); strncpy(odP.ObjectName, trig->UserData, MQ_PROCESS_USER_DATA_LENGTH); strncpy(queue2, trig->UserData, MQ_PROCESS_USER_DATA_LENGTH); strncpy(logfilename2, trig->EnvData, 48); }
Возможная модификация этого варианта - программа rewriter может вызываться с передачей параметров через командную строку и эту модификацию читатель может проделать самостоятельно.
Программа rewriter может быть модифицирована в программу разветвитель mqsplitter.exe: чтение сообщений из очереди 1 и запись их в очередь 2, в очередь 3 и лог-файл на диске.
Можно сделать программу mqsplitter.exe на разных языках, например, на Visual Basic 6.0 с интерфейсом, показанным на рис.9.1, и сравнить производительность программ на разных языках, реализующих один и тот же алгоритм. Такая задача будет хорошим лабораторным практикумом.
увеличить изображение
Рис. 9.1. Интерфейс программы mqsplitter на VB6
Модификацию программы mqsplitter.exe читателю предлагается сделать самостоятельно и одновременно проверить идею создания "вечного двигателя". Для создания "вечного двигателя" понадобиться изменить исходный mqsplitter.ini файл следующим образом:
QUEUE_INPUT |
QUEUE_OUTPUT1 |
QUEUE_OUTPUT2 |
C:\TEMP\ mqsplitter.log |
Программирование транзакций
Сообщения WebSphere MQ могут быть четырех типов:
Datagram - простое сообщение, не требующее ответа; Request - сообщение-запрос, которое ожидает сообщение-ответ (reply message); Reply - сообщение-ответ на сообщение-запрос; Report - сообщение, которое описывает такое событие, как появление ошибки.
Наша очередная задача: на сервере 1 прочитать сообщение из входной очереди, положить её в очередь для отправки на сервер 2 как сообщение-запрос и дождаться прихода сообщения-ответа, как это показано на рис.9.2. Все это необходимо оформить в виде транзакции, для которой будет осуществляться откат в случае неполучения сообщения-ответа в течении 10 сек. Эта задача может использоваться в практических целях при нестабильной работе каналов, например выделенных. Наше приложение при откате транзакции может попытаться перенаправить сообщений из входной очереди – но это уже другая задача.
Рис. 9.2. Структура объектов WebSphere MQ
Итак, последовательность псевдокода представляется следующим образом (обратите внимание на блок 5 и опции MQMD):
Блок 1 MQCONN Блок 2 MQOPEN Блок 3 MQBEGIN Блок 4 MQGET (Input_queue) Блок 5 MQPUT (Output_queue, MQMD.MsgType = MQMT_REQUEST, MQMD.ReplyToQ = Reply_queue) Блок 6 MQGET (Reply_queue) Блок 7 If Reply time < 10 sec then MQCMIT else MQBACK; Блок 8 MQCLOSE Блок 9 MQDISC
Назовем нашу программу transmit.exe и файл инициализации transmit.ini, в котором 1-я строка – имя очереди для чтения, 2-я строка – имя очереди для записи, 3-я строка – имя очереди для ответа, 4-я строка – время ожидания ответа Reply_time = 3000мсек, как показано ниже.
QUEUE_INPUT QUEUE_OUTPUT QUEUE_REPLY 3000
Тип очереди Output_queue – remote queue и эта очередь настроена для отправки сообщений на сервер 2. На сервере 2 также выполнены соответствующие настройки и при нормальной работе каналов транзакция будет совершаться успешно. Отметим также, что сообщение-ответ формируется на сервере 2 средствами другого приложения на этом сервере. В случае остановки любого канала, которую мы произведем для отладки программы, будет происходить откат транзакции. В данной версии в начале программы производится извлечение параметров из ini-файла. Такую программу полезно также иметь в виде триггера и читателю предлагается самостоятельно модифицировать программу для считывания параметров триггера из очереди, на которую он навешивается.
Ниже приводится листинг программы transmit.cpp для Microsoft Visual C++ ver.6.0. Для каждого сообщения MsgId и the CorrelId создаются как уникальные (MSGID= MQMI_NONE и CORRELID= MQCI_NONE) и об этом подробнее в лекции 11.
Листинг 9.2. Программа transmit.cpp для Microsoft Visual C++ ver.6.0. (html, txt)
По тексту программы следует дать комментарии. Наличие опции
gmo.Options = MQGMO_SYNCPOINT;
подразумевает, что команда MQBEGIN может не указываться. Операторы
md.MsgType = MQMT_REQUEST; strncpy(md.ReplyToQ, queue_reply, MQ_Q_NAME_LENGTH);
определяют тип сообщения REQUEST и очередь ответа, заданную в QUEUE_REPLY.
На очередь QUEUE_OUTPUT (или на удаленную очередь на другом менеджере) должна быть навешена программа-триггер, который возвращает сообщения типа Reply. Если Reply-сообщение поступает в очередь QUEUE_REPLY, то транзакция завершается успешно, в противном случае производится откат транзакции и сообщение восстанавливается в очереди QUEUE_INPUT. Reply-сообщение должно иметь идентификатор CorrelId такой же, как и MsgId исходного сообщения. В данной версии программы в целях упрощения отладки не проверяется это условие и читателю предлагается самостоятельно дописать этот фрагмент кода после отладки текущей версии программы. Работа с MsgId и CorrelId будет рассмотрена подробнее в лекции 11.
Программу-триггер, которая "навешивается" на очередь QUEUE_OUTPUT (или на удаленную очередь) для формирования Reply-сообщения (md.MsgType = MQMT_REPLY;), читателю также предлагается сделать самостоятельно.
На данном примере мы познакомились с WebSphere MQ транзакциями, являющимися основой создания надежных программ для передачи сообщений. Если сообщение приходит на сервер в очередь, то программа опроса очереди открывает внешнюю транзакцию для работы с WebSphere MQ и передает управление подпрограмме записи сообщения в базу данных, которая открывает внутреннюю транзакцию для работы с базой данных (БД). Если сообщение уходит из базы данных, то открывается внешняя транзакция работы с БД, далее открывается внутренняя транзакцию для работы с WebSphere MQ и идет помещение сообщения в очередь, из которой это сообщение "улетает" на другой сервер. Завершение транзакций и откат транзакций обоих типов осуществляется взаимосвязанно. Это и есть правильный стиль интеграции приложений на основе WebSphere MQ.
O_options = MQOO_BROWSE + MQOO_INPUT_SHARED ; MQOPEN(Hcon, &odG, O_options, &Hobj, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQOPEN % s ended with reason code %ld\n", queue_input, Reason); } if (CompCode == MQCC_FAILED) { exit(Reason); }
O_options = MQOO_BROWSE + MQOO_INPUT_SHARED ; MQOPEN(Hcon, &odR, O_options, &Hrep, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQOPEN %s ended with reason code %ld\n", queue_reply, Reason); } if (CompCode == MQCC_FAILED) { exit(Reason); }
O_options = MQOO_OUTPUT ; MQOPEN(Hcon, &odP, O_options, &Hout, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQOPEN %s ended with reason code %ld\n", queue_output, Reason); } if (CompCode == MQCC_FAILED) { exit(Reason); }
while (CompCode == MQCC_OK) { buflen = sizeof(buffer) - 1; memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId)); gmo.Options = MQGMO_ACCEPT_TRUNCATED_MSG + MQGMO_WAIT + MQGMO_SYNCPOINT; gmo.WaitInterval = 3000 ;
MQBEGIN (Hcon, &mbo, &CompCode, &Reason); MQGET(Hcon, Hobj, &md, &gmo, buflen, buffer, &messlen, &CompCode, &Reason); //if (Reason != MQRC_NONE) { printf("MQGET from %s ended with reason code %ld\n", queue_input, Reason); }
if ((CompCode == MQCC_OK) || (CompCode == MQCC_WARNING)) { buffer[messlen] = '\0'; /* заносим символ конец строки в буфер с прочитанным сообщением */ buflen = messlen; md.MsgType = MQMT_REQUEST; md.Report = MQRO_EXCEPTION_WITH_DATA; strncpy(md.ReplyToQ, queue_reply, MQ_Q_NAME_LENGTH); memcpy(md.Format, MQFMT_STRING, MQ_FORMAT_LENGTH);
MQPUT(Hcon, Hout, &md, &pmo, buflen, buffer, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQPUT to %s ended ended unsuccessfully with reason code %ld CompCode %ld\n", queue_output, Reason, CompCode ); MQBACK( Hcon, &CompCode, &Reason ) ; CompCode = MQCC_FAILED ; } else {
while (CompCode != MQCC_FAILED) { /** осуществляется проверка queue_reply **/ gmo.Options = MQGMO_ACCEPT_TRUNCATED_MSG + MQGMO_WAIT ; gmo.WaitInterval = 3000 ; memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId)); MQGET(Hcon, Hrep, &md, &gmo, buflen, buffer, &replylen, &CompCode, &Reason); if (CompCode != MQCC_FAILED) { if (md.MsgType == MQMT_REPLY) /* report feedback */ { printf("Transaction % s=> %s successfully: %s\n", queue_input, queue_output, buffer); MQCMIT( Hcon, &CompCode, &Reason ) ; } else { printf("Transaction % s=> %s successfully, REPLY message not deliver, reason code %ld CompCode %ld\n", queue_input, queue_output, queue_reply, Reason, CompCode ); MQBACK( Hcon, &CompCode, &Reason ) ; CompCode = MQCC_FAILED ; } }
if (Reason == MQRC_NO_MSG_AVAILABLE) { printf("Transaction % s=> % s UNsuccessfully, REPLY message not deliver\n", queue_input, queue_output ); MQBACK( Hcon, &CompCode, &Reason ) ; CompCode = MQCC_FAILED ; } } } } } C_options = 0; MQCLOSE(Hcon, &Hobj, C_options, &CompCode, &Reason); if (Reason != MQRC_NONE){printf("MQCLOSE %s ended with reason code %ld\n", queue_input, Reason); } MQCLOSE(Hcon, &Hout, C_options, &CompCode, &Reason); if (Reason != MQRC_NONE){printf("MQCLOSE %s ended with reason code %ld\n", queue_output, Reason); } MQCLOSE(Hcon, &Hrep, C_options, &CompCode, &Reason); if (Reason != MQRC_NONE){printf("MQCLOSE %s ended with reason code %ld\n", queue_reply, Reason); }
MQDISC(&Hcon, &CompCode, &Reason); if (Reason != MQRC_NONE){ printf("MQDISC ended with reason code %ld\n", Reason); } return(0); }
Листинг 9.2. Программа transmit.cpp для Microsoft Visual C++ ver.6.0.
По тексту программы следует дать комментарии. Наличие опции
gmo.Options = MQGMO_SYNCPOINT;
подразумевает, что команда MQBEGIN может не указываться. Операторы
md.MsgType = MQMT_REQUEST; strncpy(md.ReplyToQ, queue_reply, MQ_Q_NAME_LENGTH);
определяют тип сообщения REQUEST и очередь ответа, заданную в QUEUE_REPLY.
На очередь QUEUE_OUTPUT (или на удаленную очередь на другом менеджере) должна быть навешена программа-триггер, который возвращает сообщения типа Reply. Если Reply-сообщение поступает в очередь QUEUE_REPLY, то транзакция завершается успешно, в противном случае производится откат транзакции и сообщение восстанавливается в очереди QUEUE_INPUT. Reply-сообщение должно иметь идентификатор CorrelId такой же, как и MsgId исходного сообщения. В данной версии программы в целях упрощения отладки не проверяется это условие и читателю предлагается самостоятельно дописать этот фрагмент кода после отладки текущей версии программы. Работа с MsgId и CorrelId будет рассмотрена подробнее в лекции 11.
Программу-триггер, которая "навешивается" на очередь QUEUE_OUTPUT ( или на удаленную очередь) для формирования Reply-сообщения (md.MsgType = MQMT_REPLY;), читателю также предлагается сделать самостоятельно.
На данном примере мы познакомились с WebSphere MQ транзакциями, являющимися основой создания надежных программ для передачи сообщений. Если сообщение приходит на сервер в очередь, то программа опроса очереди открывает внешнюю транзакцию для работы с WebSphere MQ и передает управление подпрограмме записи сообщения в базу данных, которая открывает внутреннюю транзакцию для работы с базой данных (БД). Если сообщение уходит из базы данных, то открывается внешняя транзакция работы с БД, далее открывается внутренняя транзакцию для работы с WebSphere MQ и идет помещение сообщения в очередь, из которой это сообщение "улетает" на другой сервер. Завершение транзакций и откат транзакций обоих типов осуществляется взаимосвязанно. Это и есть правильный стиль интеграции приложений на основе WebSphere MQ.
Списки распространения ( модель "один ко многим" )
Использование механизма списков распространения (Distribution List) или так называемой модели "один ко многим" требуется, например, в случае рассылки большому количеству клиентов постоянно меняющейся информации (котировки акций, курсы валют, новости и т.п.). Этот механизм позволяет одной командой MQOPEN открыть множество очередей и одной командой MQPUT положить сообщения в эти очереди. После открытия очередей возвращается один уникальный идентификатор объекта и MQPUT помещает сообщения во все эти очереди, используя этот единственный идентификатор.
В версии WebSphere MQ 5.1 и выше object descriptor (MQOD) содержит поля, которые используются для списков распространения. Поле Object Descriptor RecsPresent содержит число Object Records (MQORs) и если оно больше чем 0, то это означает, что должен быть использован список распространения.
Рассмотрим этот механизм на примере задачи, когда WebSphere MQ server помещает сообщения в N очередей, как показано на рис.9.3. Эти сообщения могут дальше уходить через remote queue или их может забирать WebSphere MQ client с заданной периодичностью. Назовем нашу программу distlist.exe, файл с текстом сообщения distlist.dat и файл инициализации distlist.ini, в котором 1-я строка – имя менеджера, 2-я и последующие строки – имена очередей, как показано ниже.
QM_ ALFA Queue_ Moscow Queue_ Kiev Queue_ Alma-Ata Queue_ SPetersburg Queue_ Novosibirsk Queue_ Saratov
//last string must be blank
Рис. 9.3. Механизм Distribution List для WebSphere MQ
Ниже приводится листинг программы distlist.cpp для Microsoft Visual C++ ver.6.0.
Листинг 9.3. Программа distlist.cpp для Microsoft Visual C++ ver.6.0. (html, txt)
В завершение раздела можно сказать, что время работы механизма Distribution List для WebSphere MQ с 200, 400, 600 и т.д. очередями не зависит от производительности компьютера и не сильно зависит от количества очередей (для Notpersistent queue, persistent queue не целесообразно использовать для данной задачи ). Это наглядно видно из следующей таблицы, отражающей время работы distlist (сек) в зависимости от оперативной памяти (ОП) компьютера: 512Мбт и 1Гбт.
static void print_usage(void); static void print_responses( char * comment, PMQRR pRR, MQLONG NumQueues, PMQOR pOR);
int main(int argc, char **argv) { typedef enum {False, True} Bool; MQOD od = {MQOD_DEFAULT}; /* Object Descriptor */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQPMO pmo = {MQPMO_DEFAULT}; /* put message options */ MQHCONN Hcon; /* connection handle */ MQHOBJ Hobj; /* object handle */ MQLONG O_options; /* MQOPEN options */ MQLONG C_options; /* MQCLOSE options */ MQLONG CompCode; /* completion code */ MQLONG OpenCode; /* MQOPEN completion code */ MQLONG Reason; /* reason code */ MQCHAR48 QManager; /* queue manager name */ MQLONG buflen; /* buffer length */ char buffer[101]; /* message buffer */ MQLONG Index ; /* Index into list of queues */ MQLONG NumQueues ; /* Number of queues */ PMQRR pRR=NULL; /* Pointer to response records */ PMQOR pOR=NULL; /* Pointer to object records */ Bool DisconnectRequired=False;/* Already connected switch */ Bool Connected=False; /* Connect succeeded switch */
typedef struct { MQBYTE24 MsgId; MQBYTE24 CorrelId; } PutMsgRec, *pPutMsgRec; pPutMsgRec pPMR=NULL; /* Pointer to put msg records */ MQLONG PutMsgRecFields=MQPMRF_MSG_ID | MQPMRF_CORREL_ID;
/* Open ini file and setting value */ if ( (fptr=fopen ("distlist.ini","r" )) == NULL ) {printf("Cannot open distlist.ini file" ); print_usage(); exit(1); } else{ fgets(QManager, 48, fptr); queuenamelen = strlen(QManager) - 1; QManager[queuenamelen] = ' '; NumQueues = 0; while (queuenamelen != 0) { fgets(queue[NumQueues], 48, fptr); queuenamelen = strlen(queue[NumQueues]) - 1; queue[NumQueues][queuenamelen] = ' '; NumQueues++; } } fclose (fptr); --NumQueues; /* NumQueues - Number of Queue name */ /* Allocate response records, object records and put message records */ pRR = (PMQRR)malloc( NumQueues * sizeof(MQRR)); pOR = (PMQOR)malloc( NumQueues * sizeof(MQOR)); pPMR = (pPutMsgRec)malloc( NumQueues * sizeof(PutMsgRec));
if((NULL == pRR) || (NULL == pOR) || (NULL == pPMR)) { printf("%s(%d) malloc failed\n", __FILE__, __LINE__); exit(4); }
/* Use parameters as the name of the target queues */
for( Index = 0 ; Index < NumQueues ; Index ++) { strncpy( (pOR+Index)->ObjectName, queue[Index], (size_t)MQ_Q_NAME_LENGTH); strncpy( (pOR+Index)->ObjectQMgrName, QManager, (size_t)MQ_Q_MGR_NAME_LENGTH); }
for( Index = 0 ; Index < NumQueues ; Index ++) { MQCONN((pOR+Index)->ObjectQMgrName, &Hcon, &((pRR+Index)->CompCode), &((pRR+Index)->Reason)); if ((pRR+Index)->CompCode == MQCC_FAILED) { continue; } if ((pRR+Index)->CompCode == MQCC_OK) { DisconnectRequired = True ; } Connected = True; break ; } /* Print any non zero responses */ print_responses("MQCONN", pRR, Index, pOR);
/* Print If failed to connect to queue manager then exit. */ if( False == Connected ) { printf("Unable to connect to queue manager\n"); exit(3) ; }
if ( (fp=fopen ("distlist.dat","r" )) == NULL ) {printf("Cannot open distlist.dat file" ); exit(2); } else{ fgets(buffer, 100, fptr); buflen = (MQLONG)strlen(buffer); /* length without null */ if (buffer[buflen-1] == '\n') /* last char is a new-line */ { buffer[buflen-1] = '\0'; /* replace new-line with null */ --buflen; /* reduce buffer length */ } } fclose (fp);
tmr = time(NULL); strcpy ( buf, ctime(&tmr)); buf[strlen(buf)-5]=0; printf("Distlist start send message to list queue %s\n", buf);
/* Open the target message queue for output */ od.Version = MQOD_VERSION_2 ; od.RecsPresent = NumQueues ; od.ObjectRecPtr = pOR; od.ResponseRecPtr = pRR ; O_options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
MQOPEN(Hcon, &od, O_options, &Hobj, &OpenCode, &Reason); if (Reason == MQRC_MULTIPLE_REASONS) { print_responses("MQOPEN", pRR, NumQueues, pOR); } else { if (Reason != MQRC_NONE) { printf("MQOPEN returned CompCode=%d, Reason=%d\n", OpenCode, Reason); } }
/* Read message from the file /* Loop until null line or end of file, or there is a failure */ CompCode = OpenCode; /* use MQOPEN result for initial test */
Технологические вопросы интеграции приложений
Задачи интеграции приложений возникают достаточно часто в современных корпоративных системах. В качестве примера можно привести такую задачу. В московском динамично развивающемся банке принято решение о переходе с наиболее популярной в России банковской системы компании «Диасофт» на западную банковскую систему T24 компании TEMENOS, получившую широкое распространение в мире и в России за последние годы. Причины, побудившие к этому переходу, могут быть следующие:
Необходимость иметь западную банковскую отчетность. Возможность работы клиентов через Интернет (интернет-банкинг). Гибкость и адаптивность к изменениям в области банковского законодательства. Передовые программно-аппаратные решения и наилучшие показатели по критерию цена/(качество + производительность).
В этой задаче для нас интересна технология такого перехода. Совершенно очевидно, что переход от одной автоматизированной банковской системы (АБС) к другой не может пройти за один день или даже за один месяц. Этот переход будет идти несколько месяцев, а возможно, один или два года. В первую очередь это зависит от используемых технологий и системных интеграторов. Кроме этого, должен быть обучен персонал, а сам переход должен быть тщательно протестирован и осуществляться по подсистемам. Проект перехода на новую АБС составляется по подсистемам, в каждой из которых выделяются свои группы задач.
В качестве конкретной задачи рассмотрим создание интерфейса по передаче клиентов из системы «Диасофт» (АБС1) в систему T24 (АБС2). АБС1 функционирует под Windows на основе базы данных SQL Server. В АБС2 предполагается функционирование под UNIX (HP_UX) на основе базы данных ORACLE. Известна структуры данных (таблицы client) в АБС1 и АБС2. Требуется создать интерфейс: клиенты АБС1 => WebSphere MQ => клиенты АБС2. WebSphere MQ идеально подходит для решения такого класса задач по межплатформенной передаче данных, являясь мировым лидером среди транспортных систем.
Существует разные варианты решения поставленной задачи:
Создать две программы обработчика, работающих на платформах АБС1 и АБС2 для отправки и приема сообщений через WebSphere MQ. Использовать WebSphere Business Integration Message Broker (сокращенно WebSphere BI Broker) или, иначе называемый, WebSphere MQ Integrator. Использовать заложенные в T24 средства интеграции с WebSphere MQ.
Первый путь ясен в технической реализации. С одной стороны при каждом обновлении таблицы client в АБС1 программа-обработчик срабатывает как триггер базы данных и помещает результаты оператора update в очередь, они приходят на АБС2, где своя программа-обработчик срабатывает как триггер очереди и помещает сообщение в таблицу client АБС2. WebSphere MQ гарантирует доставку сообщений. Разработчикам приложений (программ-обработчиков) остается позаботиться 1) о преобразовании форматов АБС1 в АБС2 в одной из программ-обработчиков, например, на платформе Windows; 2) о надежности такой передачи с учетом механизмов транзакционности в WebSphere MQ и в базах данных одновременно. Ведь в банковских системах ничего не должно и не может пропасть! Укрупненная блок-схема программы-обработчика в АБС2 для такого надежного транзакционного взаимодействия выглядит следующим образом.
Блок 1 MQCONN; MQOPEN; Блок 2 MQBEGIN; MQGET; Блок 3 Begin Tran Блок 4 UPDATE CLIENT SET ... WHERE ... Блок 5 If Error = 0 then Commit Tran else Rollback Tran; Блок 6 If Error = 0 then MQCMIT else MQBACK; Блок 7 MQCLOSE; MQDISC;
В этой программе транзакция WebSphere MQ является внешней по отношению к транзакции базы данных. В программе-обработчике для АБС1 наоборот транзакция WebSphere MQ будет внутренней по отношению к транзакции базы данных. Таким образом, реализация интерфейса по первому варианту не вызывает технических проблем и в следующем разделе мы рассмотрим пример на программирование транзакций для WebSphere MQ. Остается отметить один важный момент. Первый вариант не перспективен при создании нескольких десятков интерфейсов и более. Во-первых, затраты на разработку возрастут по сравнению со вторым и третьим вариантами, когда используются специализированные средства. Во-вторых, сопровождение нескольких десятков разных программ, написанных разными программистами, становиться весьма серьезной задачей, а их модификация после увольнения авторов программ или прекращения с ними договорных отношений может оказаться неразрешимой проблемой. К сожалению, жизнь такова, что программисты увольняются, программы интерфейсов живут по несколько лет и их требуется модифицировать. Поэтому нужно очень серьезно подумать в самом начале интеграционного проекта, какой выбрать путь для реализации интерфейсов. Если будет создано несколько интерфейсов по варианту 1, то переделывать их по варианту 2 – задача малоприятная и неблагодарная.
Тем не менее, при небольшом числе интерфейсов рекомендуется вариант 1, как наиболее экономический, к рассмотрению которого мы и переходим. WebSphere BI Broker будет рассмотрен в лекции 12. К сожалению, рассмотрение средств интеграции с WebSphere MQ в системе T24 (вариант 3) выходит за пределы данного курса лекций.
MQLONG CompCode; MQLONG Reason; MQOD
/*************************************************************************************/ /* Имя программы: AMQSGAMA */ /* Описание: Основанная на модели Publish/Subscribe программа */ /* моделирует результаты футбольного матча и */ /* отправляет их от издателя к брокеру */ /* Statement: Licensed Materials - Property of IBM */ /* SupportPac MA0E */ /* (C) Copyright IBM Corp. 1999 */ /*************************************************************************************/ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h> #include <cmqc.h> /* MQI */ #include <cmqpsc.h> /* MQI Publish/Subscribe */ #include <windows.h> #if MQAT_DEFAULT == MQAT_WINDOWS_NT #define msSleep(time) \ Sleep(time) #elif MQAT_DEFAULT == MQAT_UNIX #define msSleep(time) \ { \ struct timeval tval; \ tval.tv_sec = time / 1000; \ tval.tv_usec = (time % 1000) * 1000; \ select(0, NULL, NULL, NULL, &tval); \ } #endif #define STREAM "SAMPLE.BROKER.RESULTS.STREAM" #define TOPIC_PREFIX "Sport/Soccer/Event/" #define MATCH_STARTED "MatchStarted" #define MATCH_ENDED "MatchEnded" #define SCORE_UPDATE "ScoreUpdate" #define MATCH_LENGTH 30000 /* 30 Second match length */ #define REAL_TIME_RATIO 333 #define AVERAGE_NUM_OF_GOALS 5 #define DEFAULT_MESSAGE_SIZE 512 /* Maximum buffer size for a message */ static const MQRFH DefaultMQRFH = {MQRFH_DEFAULT}; typedef struct { MQCHAR32 Team1; MQCHAR32 Team2; } Match_Teams, *pMatch_Teams; void BuildMQRFHeader( PMQBYTE pStart , PMQLONG pDataLength , MQCHAR TopicType[] ); void PutPublication( MQHCONN hConn , MQHOBJ hObj , PMQBYTE pMessage , MQLONG messageLength , PMQLONG pCompCode , PMQLONG pReason ); int main(int argc, char **argv) { MQHCONN hConn = MQHC_UNUSABLE_HCONN; MQHOBJ hObj = MQHO_UNUSABLE_HOBJ; MQLONG CompCode; MQLONG Reason; MQOD od = { MQOD_DEFAULT }; MQLONG Options; PMQBYTE pMessageBlock = NULL; MQLONG messageLength; MQLONG timeRemaining; MQLONG delay; PMQCHAR pScoringTeam; pMatch_Teams pTeams; MQCHAR32 team1; MQCHAR32 team2; char QMName[MQ_Q_MGR_NAME_LENGTH+1] = ""; MQLONG randomNumber; MQLONG ConnReason; /* Проверка аргументов программы */ if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) ) { printf("Usage: amqsgam team1 team2 <QManager>\n"); printf(" Maximum 31 characters per team name,\n"); printf(" no spaces or '\"' characters allowed.\n"); exit(0); } else { strcpy(team1, argv[1]); strcpy(team2, argv[2]); } /* Использовать default queue manager или заданный в зависимости от наличия аргумена */ if (argc > 3) strcpy(QMName, argv[3]); MQCONN( QMName, &hConn, &CompCode, &ConnReason ); if( CompCode == MQCC_FAILED ) { printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason); } else if( ConnReason == MQRC_ALREADY_CONNECTED ) { CompCode = MQCC_OK; } if( CompCode == MQCC_OK ) { strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH); Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING; MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n", od.ObjectName, CompCode, Reason); } } if( CompCode == MQCC_OK ) { srand( (unsigned)(time( NULL )) + (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) ); timeRemaining = MATCH_LENGTH; messageLength = DEFAULT_MESSAGE_SIZE; pMessageBlock = (PMQBYTE)malloc(messageLength); if( pMessageBlock == NULL ) { printf("Unable to allocate storage\n"); } else { if( CompCode == MQCC_OK ) { /* создание MQRFH для публикации о начале матча */ BuildMQRFHeader( pMessageBlock, &messageLength, MATCH_STARTED ); pTeams = (pMatch_Teams)(pMessageBlock + messageLength); strcpy(pTeams->Team1, team1); strcpy(pTeams->Team2, team2); messageLength += sizeof(Match_Teams); printf("Match between %s and %s\n", team1, team2); /* помещение сообщения (публикации) в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } else { /* Моделирование попытки забить один из 5 голов (каждые 30 сек) */ while( (timeRemaining > 0)&&(CompCode == MQCC_OK) ) { randomNumber = rand(); delay = REAL_TIME_RATIO + ( (randomNumber * MATCH_LENGTH) / (RAND_MAX * AVERAGE_NUM_OF_GOALS)); if( delay > timeRemaining ) delay = timeRemaining; msSleep(delay); timeRemaining -= delay; if( timeRemaining > 0 ) { if( (randomNumber % 2) == 0 ) /* Шанс забить гол - 50 процентов */ { messageLength = DEFAULT_MESSAGE_SIZE; BuildMQRFHeader( pMessageBlock , &messageLength, SCORE_UPDATE ); printf("GOAL! "); pScoringTeam = (PMQCHAR)pMessageBlock + messageLength; if( rand() < (RAND_MAX/2) ) { strcpy(pScoringTeam, team1); printf(team1); } else { strcpy(pScoringTeam, team2); printf(team2); } printf(" scores after %d minutes\n", ((MATCH_LENGTH - timeRemaining)/REAL_TIME_RATIO)); messageLength += sizeof(MQCHAR32); /* помещение сообщения о забитом голе в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } } } /* конец цикла по времени окончания матча ( timeRemaining ) */ if( CompCode == MQCC_OK ) { printf("Full time\n"); messageLength = DEFAULT_MESSAGE_SIZE; BuildMQRFHeader( pMessageBlock , &messageLength , MATCH_ENDED ); pTeams = (pMatch_Teams)(pMessageBlock + messageLength); strcpy(pTeams->Team1, team1); strcpy(pTeams->Team2, team2); messageLength += sizeof(Match_Teams); /* помещение сообщения о конце матча в очередь потока */ PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason); } } } free( pMessageBlock ); } /* end of else (pMessageBlock != NULL) */ } if( hObj != MQHO_UNUSABLE_HOBJ ) { MQCLOSE( hConn , &hObj, MQCO_NONE, &CompCode , &Reason ); if( CompCode != MQCC_OK ) printf("MQCLOSE failed with CompCode %d and Reason %d\n", CompCode, Reason); } if( (hConn != MQHC_UNUSABLE_HCONN) &&(ConnReason != MQRC_ALREADY_CONNECTED) ) { MQDISC( &hConn, &CompCode, &Reason ); if( CompCode != MQCC_OK ) printf("MQDISC failed with CompCode %d and Reason %d\n", CompCode, Reason); } return(0); } /* end of main Function */ /* Function Name : BuildMQRFHeader */ void BuildMQRFHeader( PMQBYTE pStart, PMQLONG pDataLength, MQCHAR TopicType[] ) { PMQRFH pRFHeader = (PMQRFH)pStart; PMQCHAR pNameValueString; memset((PMQBYTE)pStart, 0, *pDataLength); memcpy( pRFHeader, &DefaultMQRFH, (size_t)MQRFH_STRUC_LENGTH_FIXED); memcpy( pRFHeader->Format, MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH); pRFHeader->CodedCharSetId = MQCCSI_INHERIT; pNameValueString = (MQCHAR *)pRFHeader + MQRFH_STRUC_LENGTH_FIXED; strcpy(pNameValueString, MQPS_COMMAND_B); strcat(pNameValueString, MQPS_PUBLISH); strcat(pNameValueString, MQPS_PUBLICATION_OPTIONS_B); strcat(pNameValueString, MQPS_NO_REGISTRATION); strcat(pNameValueString, MQPS_TOPIC_B); strcat(pNameValueString, TOPIC_PREFIX); strcat(pNameValueString, TopicType); *pDataLength = MQRFH_STRUC_LENGTH_FIXED + ((strlen(pNameValueString)+15)/16)*16; pRFHeader->StrucLength = *pDataLength; } /* Function Name : PutPublication */ void PutPublication( MQHCONN hConn, MQHOBJ hObj, PMQBYTE pMessage, MQLONG messageLength, PMQLONG pCompCode, PMQLONG pReason ) { MQPMO pmo = { MQPMO_DEFAULT }; MQMD md = { MQMD_DEFAULT }; memcpy(md.Format, MQFMT_RF_HEADER, (size_t)MQ_FORMAT_LENGTH); md.MsgType = MQMT_DATAGRAM; md.Persistence = MQPER_PERSISTENT; pmo.Options |= MQPMO_NEW_MSG_ID; MQPUT( hConn, hObj, &md, &pmo, messageLength, pMessage, pCompCode, pReason ); } |
Листинг 10.1. Программа amqsgama |
Закрыть окно |
MQCHAR32 team1;
MQCHAR32 team2;
char QMName[MQ_Q_MGR_NAME_LENGTH+1] = "";
MQLONG randomNumber;
MQLONG ConnReason;
/* Проверка аргументов программы */
if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) )
{
printf("Usage: amqsgam team1 team2 \n");
printf(" Maximum 31 characters per team name,\n");
printf(" no spaces or '\"' characters allowed.\n");
exit(0);
}
else
{
strcpy(team1, argv[1]);
strcpy(team2, argv[2]);
}
/* Использовать default queue manager или заданный в зависимости от наличия аргумена */
if (argc > 3) strcpy(QMName, argv[3]);
MQCONN( QMName, &hConn, &CompCode, &ConnReason );
if( CompCode == MQCC_FAILED )
{
printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason);
}
else if( ConnReason == MQRC_ALREADY_CONNECTED )
{
CompCode = MQCC_OK;
}
if( CompCode == MQCC_OK )
{
strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH);
Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING;
MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason );
if( CompCode != MQCC_OK )
{
printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n",
od.ObjectName, CompCode, Reason);
}
}
if( CompCode == MQCC_OK )
{
srand( (unsigned)(time( NULL ))
+ (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) );
timeRemaining = MATCH_LENGTH;
messageLength = DEFAULT_MESSAGE_SIZE;
pMessageBlock = (PMQBYTE)malloc(messageLength);
if( pMessageBlock == NULL )
{
printf("Unable to allocate storage\n");
}
else
{
if( CompCode == MQCC_OK )
{
/* создание MQRFH для публикации о начале матча */
BuildMQRFHeader( pMessageBlock, &messageLength, MATCH_STARTED );
pTeams = (pMatch_Teams)(pMessageBlock + messageLength);
strcpy(pTeams->Team1, team1);
strcpy(pTeams->Team2, team2);
messageLength += sizeof(Match_Teams);
printf("Match between %s and %s\n", team1, team2);
/* помещение сообщения (публикации) в очередь потока */
PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );
if( CompCode != MQCC_OK )
{
printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
else
{
/* Моделирование попытки забить один из 5 голов (каждые 30 сек) */
while( (timeRemaining > 0)&&(CompCode == MQCC_OK) )
{
randomNumber = rand();
delay = REAL_TIME_RATIO
+ ( (randomNumber * MATCH_LENGTH)
/ (RAND_MAX * AVERAGE_NUM_OF_GOALS));
if( delay > timeRemaining ) delay = timeRemaining;
msSleep(delay);
timeRemaining -= delay;
if( timeRemaining > 0 )
{
if( (randomNumber % 2) == 0 ) /* Шанс забить гол - 50 процентов */
{
messageLength = DEFAULT_MESSAGE_SIZE;
BuildMQRFHeader( pMessageBlock , &messageLength, SCORE_UPDATE );
printf("GOAL! ");
pScoringTeam = (PMQCHAR)pMessageBlock + messageLength;
if( rand() < (RAND_MAX/2) )
{
strcpy(pScoringTeam, team1);
printf(team1);
}
else
{
strcpy(pScoringTeam, team2);
printf(team2);
}
printf(" scores after %d minutes\n", ((MATCH_LENGTH - timeRemaining)/REAL_TIME_RATIO));
messageLength += sizeof(MQCHAR32);
/* помещение сообщения о забитом голе в очередь потока */
PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );
if( CompCode != MQCC_OK )
printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
}
} /* конец цикла по времени окончания матча ( timeRemaining ) */
if( CompCode == MQCC_OK )
{
printf("Full time\n");
messageLength = DEFAULT_MESSAGE_SIZE;
BuildMQRFHeader( pMessageBlock , &messageLength , MATCH_ENDED );
pTeams = (pMatch_Teams)(pMessageBlock + messageLength);
strcpy(pTeams->Team1, team1);
strcpy(pTeams->Team2, team2);
messageLength += sizeof(Match_Teams);
/* помещение сообщения о конце матча в очередь потока */
PutPublication( hConn, hObj, pMessageBlock, messageLength, &CompCode, &Reason );
if( CompCode != MQCC_OK )
printf("MQPUT failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
}
}
free( pMessageBlock );
} /* end of else (pMessageBlock != NULL) */
}
if( hObj != MQHO_UNUSABLE_HOBJ )
{
MQCLOSE( hConn , &hObj, MQCO_NONE, &CompCode , &Reason );
if( CompCode != MQCC_OK )
printf("MQCLOSE failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
if( (hConn != MQHC_UNUSABLE_HCONN) &&(ConnReason != MQRC_ALREADY_CONNECTED) )
{
MQDISC( &hConn, &CompCode, &Reason );
if( CompCode != MQCC_OK )
printf("MQDISC failed with CompCode %d and Reason %d\n", CompCode, Reason);
}
return(0);
}
/* end of main Function */
/* Function Name : BuildMQRFHeader */
void BuildMQRFHeader( PMQBYTE pStart, PMQLONG pDataLength, MQCHAR TopicType[] )
{
PMQRFH pRFHeader = (PMQRFH)pStart;
PMQCHAR pNameValueString;
memset((PMQBYTE)pStart, 0, *pDataLength);
memcpy( pRFHeader, &DefaultMQRFH, (size_t)MQRFH_STRUC_LENGTH_FIXED);
memcpy( pRFHeader->Format, MQFMT_STRING, (size_t)MQ_FORMAT_LENGTH);
pRFHeader->CodedCharSetId = MQCCSI_INHERIT;
pNameValueString = (MQCHAR *)pRFHeader + MQRFH_STRUC_LENGTH_FIXED;
strcpy(pNameValueString, MQPS_COMMAND_B);
strcat(pNameValueString, MQPS_PUBLISH);
strcat(pNameValueString, MQPS_PUBLICATION_OPTIONS_B);
strcat(pNameValueString, MQPS_NO_REGISTRATION);
strcat(pNameValueString, MQPS_TOPIC_B);
strcat(pNameValueString, TOPIC_PREFIX);
strcat(pNameValueString, TopicType);
*pDataLength = MQRFH_STRUC_LENGTH_FIXED + ((strlen(pNameValueString)+15)/16)*16;
pRFHeader->StrucLength = *pDataLength;
}
/* Function Name : PutPublication */
void PutPublication( MQHCONN hConn, MQHOBJ hObj, PMQBYTE pMessage,
MQLONG messageLength, PMQLONG pCompCode, PMQLONG pReason )
{
MQPMO pmo = { MQPMO_DEFAULT };
MQMD md = { MQMD_DEFAULT };
memcpy(md.Format, MQFMT_RF_HEADER, (size_t)MQ_FORMAT_LENGTH);
md.MsgType = MQMT_DATAGRAM;
md.Persistence = MQPER_PERSISTENT;
pmo.Options |= MQPMO_NEW_MSG_ID;
MQPUT( hConn, hObj, &md, &pmo, messageLength, pMessage, pCompCode, pReason );
}
Общие сведения о модели публикация-подписка
Механизм публикация-подписка (Publish/Subscribe) позволяет поставлять информацию от поставщика к потребителю. Эта модель стала особенно популярной в последние годы благодаря тому, что часто меняющаяся информация может поставляться постоянно многим получателям. Одним из типичных примеров такой информации являются данные на рынке акций и валют. В такой модели издателю необязатеьно знать о местонахождении получателя и наоборот. В модели Request/Reply движение информации начинается по запросу потребителя (клиента). В модели Publish/Subscribe движение информации начинается по мере ее появления и поступления от поставщика.
Поставщика информации принято называть издатель (publisher). издатель предлагает информацию на определенные темы. Потребитель информации называется подписчиком (subscriber), он подписывается на получение информации на определенные темы. Информация, которую получает один подписчик, может передаваться другим подписчикам.
Информация, посылаемая как сообщение, характеризуется темой. издатель посылает информацию только на те темы, на которые сделана подписка. Взаимодействие между издателями и подписчиками контролируется посредником или брокером (broker). Взаимосвязанные темы группируются совместно в потоки (stream). издатель может применять потоки для ограничения диапазонов тем издателей и подписчиков. Через брокера идет поток, который использует все темы. На рис.10.1 представлена схема, иллюстрирующая взаимодействие брокеров, издателей и подписчиков.
Рис. 10.1. Схема взаимодействия брокеров, издателей и подписчиков
Общий подход работы механизма Publish/Subscribe лучше всего иллюстрируется на примере трех видов сценариев, которые реализуются на основе командных сообщений.
Сценарий издатель-брокер.
издатель регистрирует свое намерение публиковать информацию по определенным темам.издатель посылает сообщение-публикацию брокеру, содержащее дату публикации. Сообщение может быть перенаправлено брокером непосредственно подписчикам, или может храниться у брокера, пока его не востребует подписчик.издатель может послать сообщение брокеру, чтобы хранящаяся у него публикация была удалена.издатель может отказаться от регистрации у брокера, когда он заканчивает отправку сообщений на определенную тему.
Сценарий подписчик-брокеру.
подписчик регистрируется у брокера, определяя темы, которые его интересуют.брокер начинает посылать подписчику публикации на заданные темы. подписчик может потребовать выдать публикации, хранящиеся у брокера.
Сценарий брокер-брокер реализуется в виде следующих взаимодействий
брокеры могут обмениваться регистрациями подписчиков и прекращать регистрации.брокеры могут обмениваться публикациями и требованиями на удаление публикаций.брокеры могут обмениваться информацией о самих себе.
Механизм Publish/Subscribe поддерживает брокеров на основе функций WebSphere MQ на платформах для AIX, HP-UX, Linux, Windows NT, Microsoft Windows 2000 и Sun Solaris. Допустимо иметь по одному брокеру на менеджере очередей. брокер имеет то же самое имя, что и менеджер. Приложения могут писаться на основе стандартных приемов программирования с использованием технологий Message Queue Interface (MQI) или Application Messaging Interface (AMI). Издатели и подписчики могут быть на любых платформах, на которых поддерживается WebSphere MQ, например, издатель - на OS/390 и подписчик - на OS/2. брокеры могут объединяться в сеть, в которой есть корневой брокер, брокеры-родители и брокеры-дети.
Для работы с механизмом Publish/Subscribe существует несколько основных функций или, иначе называемых, командных сообщений, в которых помещается команда брокеру. Синтаксис этих функций имеет простые правила при определении темы. Тема задается строкой длиной не более 256 байт без пробелов и двойных кавычек ("). Тема и подтемы разделяются символом "/" (без пробелов), например, для рынка акций формат темы задается следующим образом:
Регион/СекторРынка/Компания
Конкретные темы выглядят следующим образом:
NewYork/InformationTechnology/IBM
Этот синтаксис разрешает в строках иметь символы:
* - означающий ноль или любое количество произвольных символов; ? – означающий один произвольный символ.
Данный синтаксис позволит в дальнейшем определить, например, такие темы для подписки на рынке акций как:
* получение всех цен на акции со всех рынков мира; London/* получение всех цен на акции с Лондонской биржи; NewYork/Banks/* получение цен акций всех нью-йоркских банков; */*/IBM получение всех цен на акции компании ИБМ на всех рынках мира.
Символ % позволяет использовать специальные символы * и ? в строках, например, 'ABC%*D' означает 'ABC*D'.
В числе основных функций для работы с механизмом Publish/Subscribe (MQPSCommand) следует назвать:
RegPub | Регистрация издателя (Register Publisher) |
RegSub | Регистрация подписчика (Register Subscriber) |
Publish | Публикация |
ReqUpdate | Запрос от издателя к брокеру на обновление публикации на заданную тему (Request Update) |
DeletePub | Удаление публикации (Delete Publication) |
DeregPub | Отказ от регистрации издателя (Deregister Publisher) |
DeregSub | Отказ от регистрации подписчика (Deregister Subscriber) |
Для осуществления работы издателя и подписчика используется формат MQRFH (Rules and Formatting Header - RF Header) WebSphere MQ сообщения. формат MQRFH представляет собой структуру заголовка переменной длины, в которую приложение размещает данные в виде префикса при публикации. В эту структуру входят следующие переменные:
typedef struct tagMQRFH { MQCHAR4 StrucId; /* Идентификатор структуры */ MQLONG Version; /* Номер версии структуры */ MQLONG StrucLength; /* Общая длина MQRFH */ MQLONG Encoding; /* Data encoding */ MQLONG CodedCharSetId; /* Идентификатор множества перекодировки */ MQCHAR8 Format; /* Имя формата */ MQLONG Flags; /* Флаги */ } MQRFH;
Этот заголовок также включает строку NameValueString, в которой приложение публикации или подписки помещает команды, которые должен выполнить брокер. Поле StrucLength в заголовке определяет длину структуры заголовка, включительно с переменной длины NameValueString в конце структуры. Поля Encoding, CodedCharSetId и Format описывают структуру данных в заголовке публикации.
Примеры работы механизмов публикация-подписка
Теперь после знакомства с технологией публикация-подписка следует рассмотреть работу модели Publish/Subscribe на простых примерах. Для этого понадобиться инсталлировать SupportPacs MA0C: WebSphere MQ (WebSphere MQ) Publish/Subscribe с сайта ИБМ:http://www-306.ibm.com/software/integration/support/supportpacs/category.html
После этого можно стартовать брокер на менеджере очередей командой:
strmqbrk -m QMgrName
Для отображения состояния брокера можно использовать команду dspmqbrk:
dspmqbrk -m QMgrName
В ответ появиться следующее сообщение:
WebSphere MQ message broker for queue manager QMgrName running
Теперь брокер готов получать команды от издателей и подписчиков.
На каждом менеджере может быть стартован только один брокер.
В менеджере есть необходимые системные очереди, которые можно увидеть командой
runmqsc QMgrName display qlocal(SYSTEM.BROKER.*) end
Следует сразу отметить, что завершение работы брокера осуществляется командой endmqbrk перед окончанием работы менеджера: endmqbrk -m QMgrName.
Работу издателя можно продемонстрировать с помощью программы amqsgama, предложенной в SupportPacs MA0C в качестве теста. Эта программа издателя из перечня спортивных тем для подписки (табл.10.1) помещает на брокер сообщения о футбольных матчах (Sport/Soccer/Event/ - в таблице данная тема выделена курсивом) и проверяет ответы брокера.
спорт/футбол/* спорт/теннис/* спорт/баскетбол/* |
спорт/футбол/расписание игр спорт/футбол/события спорт/футбол/обзоры |
Формат запуска программы:
amqsgama TeamName1 TeamName2 QMgrName
Результаты работы программы, моделирующей случайным образом забивание голов той или иной командой, выглядит следующим образом (рис. 10.3):
Рис. 10.3. Результаты работы программы издателя
Для работы программы необходимо создание очереди: SAMPLE.BROKER.RESULTS.STREAM.
Именно в эту очередь поступают сообщения от издателя. Необходимо также, чтобы был запущен брокер. Программа подписчика должна стартовать раньше, чем программа издателя amqsgama, чтобы отобразить результаты игры полностью. Все используемые функции в программе служат для подключения к менеджеру брокера и публикации событий о начале матча, окончании матча и забивание гола.
Блочная структура программы выглядит следующим образом.
Подключение к менеджеру брокера (MQCONN) Открытие очереди потока брокера (MQOPEN) Инициализация таймера матча Генерация MQRFH для публикации события о начале матча Добавление имен команд в данные Помещение публикации в очередь потоков Начало цикла по времени матча: засыпание на случайный период попытка забить гол (50% вероятность) генерация публикации о забитом голе (RFH для ScoreUpdate) случайный выбор команды, забившей гол добавление имени команды в данные для публикации помещение публикации в очередь потоков Окончание цикла по времени матча Генерация MQRFH для публикации события о конце матча Добавление имен команд для публикации Помещение публикации в очередь потоков Закрытие очереди потока брокера (MQCLOSE) Отключение от менеджера брокера (MQDISC)
Программа amqsgama имеет следующий код:
Листинг 10.1. Программа amqsgama (html, txt)
В качестве комментария следует отметить, что функция BuildMQRFHeader формирует значения по умолчанию для заголовка MQRFH, устанавливает параметры format и CCSID пользовательских данных. В строку NameValueString добавляются команды, тема и опции для публикации и она выравнивается на 16-ти байтовую границу. StrucLength в MQRFH устанавливается как общая длина. Входными параметрами функции являются pStart – начало блока сообщения, TopicType[] – строка с именем темы. Входным и выходным параметром одновременно является pDataLength – размер блока сообщения при входе и размер выходного блока информации.
Функция PutPublication формирует сообщение для вывода в очередь брокера с помощью команды MQPUT. Входными параметрами функции являются hConn – идентификатор менеджера для команды MQHCONN, hObj – идентификатор очереди, pMessage – идентификатор на начало блока сообщения, messageLength – длина данных в сообщении. Выходными параметрами функции являются pCompCode и pReason – коды завершения команды MQPUT.
Работу подписчика можно продемонстрировать с помощью программы amqsresa из состава SupportPacs MA0C, которая подписывается у брокера на заданную тему (футбол) и получает сообщения от брокера. Формат запуска программы:
amqsresa QMgrName
где QmgrName – имя менеджера очередей, на котором запущен брокер.
Необходимая тема подписки (TOPIC = "Sport/Soccer/*") и очереди для подписчика заданы в теле программы (эти параметры рекомендуется выносить в командную строку или файл инициализации для создания универсальных программ – примеч. автора).
Для работы программы необходимо создание очередей:
runmqsc QMgrName define qlocal(SAMPLE.BROKER.RESULTS.STREAM) define qlocal(RESULTS.SERVICE.SAMPLE.QUEUE) define qlocal(SYSTEM.BROKER.CONTROL.QUEUE) end
Результаты работы программы amqsresa, получающей сообщения от брокера, выглядят следующим образом (рис. 10.4):
Рис. 10.4. Результаты работы программы подписчика
Объем программы подписчика amqsresa более 2000 строк (в том числе комментариев – 40%) и это не позволяет привести ее в данной лекции. Стоит ограничиться лишь кратким алгоритмом.
Подключение к менеджеру брокера (MQCONN ) Открытие очереди потока брокера и подписчика (MQOPEN ) Генерация MQRFH и подписка на все события Ожидание появления сообщений в очереди подписчика (до 3 минут) Извлечение из очереди MQGET всех публикаций Отбор публикации по теме "Sport/Soccer/*” Обработка и отображение результатов публикации в зависимости от событий (начало матча, конец матча, изменение счета) Выход из цикла по концу матча или по таймеру Закрытие очереди потока брокера и подписчика (MQCLOSE) Отключение от менеджера брокера (MQDISC)
В комментарии к алгоритму следует отметить, что подписка на все события (в алгоритме выделено курсивом) вряд ли объяснима, скорее это сделано в учебных целях. подписчику нет необходимости получать все публикации, поэтому в команде регистрации подписчика (RegSub) целесообразно осуществлять подписку сразу на заданную тему.
В заключение лекции можно привести график времени доставки публикации (мсек) в зависимости от количества подписчиков (рис.10.5), полученный на компьютере RISC/6000, 200MHz, 1 GB RAM с операционной системой AIX 4.3.0 [20]. Этот график показывает, что механизм Publish/Subscribe обеспечивает более высокую производительность, чем приложения, созданные на основе классического подхода с помощью MQI интерфейса и Distribution List.
Рис. 10.5. Зависимость времени доставки публикации от количества подписчиков
Блочная структура программы выглядит следующим образом.
Подключение к менеджеру брокера (MQCONN) Открытие очереди потока брокера (MQOPEN) Инициализация таймера матча Генерация MQRFH для публикации события о начале матча Добавление имен команд в данные Помещение публикации в очередь потоков Начало цикла по времени матча: засыпание на случайный период попытка забить гол (50% вероятность) генерация публикации о забитом голе (RFH для ScoreUpdate) случайный выбор команды, забившей гол добавление имени команды в данные для публикации помещение публикации в очередь потоков Окончание цикла по времени матча Генерация MQRFH для публикации события о конце матча Добавление имен команд для публикации Помещение публикации в очередь потоков Закрытие очереди потока брокера (MQCLOSE) Отключение от менеджера брокера (MQDISC)
Программа amqsgama имеет следующий код:
/*************************************************************************************/ /* Имя программы: AMQSGAMA */ /* Описание: Основанная на модели Publish/Subscribe программа */ /* моделирует результаты футбольного матча и */ /* отправляет их от издателя к брокеру */ /* Statement: Licensed Materials - Property of IBM */ /* SupportPac MA0E */ /* (C) Copyright IBM Corp. 1999 */ /*************************************************************************************/ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <time.h> #include <cmqc.h> /* MQI */ #include <cmqpsc.h> /* MQI Publish/Subscribe */ #include <windows.h> #if MQAT_DEFAULT == MQAT_WINDOWS_NT #define msSleep(time) \ Sleep(time) #elif MQAT_DEFAULT == MQAT_UNIX #define msSleep(time) \ { \ struct timeval tval; \ tval.tv_sec = time / 1000; \ tval.tv_usec = (time % 1000) * 1000; \ select(0, NULL, NULL, NULL, &tval); \ } #endif #define STREAM "SAMPLE.BROKER.RESULTS.STREAM" #define TOPIC_PREFIX "Sport/Soccer/Event/" #define MATCH_STARTED "MatchStarted" #define MATCH_ENDED "MatchEnded" #define SCORE_UPDATE "ScoreUpdate" #define MATCH_LENGTH 30000 /* 30 Second match length */ #define REAL_TIME_RATIO 333 #define AVERAGE_NUM_OF_GOALS 5 #define DEFAULT_MESSAGE_SIZE 512 /* Maximum buffer size for a message */
static const MQRFH DefaultMQRFH = {MQRFH_DEFAULT}; typedef struct { MQCHAR32 Team1; MQCHAR32 Team2; } Match_Teams, *pMatch_Teams; void BuildMQRFHeader( PMQBYTE pStart , PMQLONG pDataLength , MQCHAR TopicType[] ); void PutPublication( MQHCONN hConn , MQHOBJ hObj , PMQBYTE pMessage , MQLONG messageLength , PMQLONG pCompCode , PMQLONG pReason );
int main(int argc, char **argv) { MQHCONN hConn = MQHC_UNUSABLE_HCONN; MQHOBJ hObj = MQHO_UNUSABLE_HOBJ; MQLONG CompCode; MQLONG Reason; MQOD od = { MQOD_DEFAULT }; MQLONG Options; PMQBYTE pMessageBlock = NULL; MQLONG messageLength; MQLONG timeRemaining; MQLONG delay; PMQCHAR pScoringTeam; pMatch_Teams pTeams; MQCHAR32 team1; MQCHAR32 team2; char QMName[MQ_Q_MGR_NAME_LENGTH+1] = ""; MQLONG randomNumber; MQLONG ConnReason;
/* Проверка аргументов программы */ if( (argc < 3)||(argc > 4)||(strlen(argv[1]) > 31)||(strlen(argv[2]) > 31) ) { printf("Usage: amqsgam team1 team2 <QManager>\n"); printf(" Maximum 31 characters per team name,\n"); printf(" no spaces or '\"' characters allowed.\n"); exit(0); } else { strcpy(team1, argv[1]); strcpy(team2, argv[2]); } /* Использовать default queue manager или заданный в зависимости от наличия аргумена */ if (argc > 3) strcpy(QMName, argv[3]); MQCONN( QMName, &hConn, &CompCode, &ConnReason ); if( CompCode == MQCC_FAILED ) { printf("MQCONN failed with CompCode %d and Reason %d\n", CompCode, ConnReason); } else if( ConnReason == MQRC_ALREADY_CONNECTED ) { CompCode = MQCC_OK; } if( CompCode == MQCC_OK ) { strncpy(od.ObjectName, STREAM, (size_t)MQ_Q_NAME_LENGTH); Options = MQOO_OUTPUT + MQOO_FAIL_IF_QUIESCING; MQOPEN( hConn, &od, Options, &hObj, &CompCode, &Reason ); if( CompCode != MQCC_OK ) { printf("MQOPEN failed to open \"%s\"\nwith CompCode %d and Reason %d\n", od.ObjectName, CompCode, Reason); } }
if( CompCode == MQCC_OK ) { srand( (unsigned)(time( NULL )) + (unsigned)(team1[0] + team2[(strlen(team2) - 1)]) ); timeRemaining = MATCH_LENGTH; messageLength = DEFAULT_MESSAGE_SIZE; pMessageBlock = (PMQBYTE)malloc(messageLength); if( pMessageBlock == NULL ) { printf("Unable to allocate storage\n"); } else { if( CompCode == MQCC_OK ) {
Технология разработки приложений для модели публикация-подписка
Задача создания собственных приложений Издателя и подписчика рассматривается на следующей упрощенной модели: один издатель, один брокер и один подписчик (рис.10.2). Будет использоваться формат сообщений WebSphere MQ – MQRFH и классический MQI интерфейс. Для создания приложений необходимо иметь руководство по командам и опциям Publish/Subscribe, а также некоторые знания по программированию на С.
Рис. 10.2. Модель Издатель/Подписчик
Приложение Издателя должно публиковать символьные строки, задаваемые пользователем на определенные темы. Следовательно, такая программа с именем publisher может иметь следующий формат запуска:
publisher Topic Command QMgrName PubQueue
где Topic – тема публикации, символьная строка длиной не более 256 байт; Command – команды Издателя для брокера (RegPub, Publish, ReqUpdate, DeletePub, DeregPub), PubQueue - очередь издателя. Текст публикации вводится в командном окне с запущенной программой.
Приложение подписчика должно иметь возможность зарегистрироваться (подписаться) на заданные темы и отображать результаты полученных публикаций. Программа подписчика с именем subscriber может иметь формат запуска:
subscriber Topic QMgrName SubQueue
где Topic – тема подписки, SubQueue - очередь подписчика. Текст публикации на тему Topic отображается в командном окне с запущенной программой subscriber.
Для работы приложений требуется создание очередей: одна у Издателя и одна у подписчика, наименования которых, соответственно, Publisher_queue и Subscriber_queue. издатель будет получать сообщения от брокера через Publisher_queue. подписчик - регистрировать темы и получать публикации через Subscriber_queue. Создание этих очередей осуществляется простой командой runmqsc:
runmqsc QMgrName define qlocal(имя очереди) end
Следует рассмотреть потоки сообщений и то, как сообщения по определенной теме находят своего подписчика.
подписчик посылает заявки на подписку в управляющую очередь брокера: SYSTEM.BROKER.CONTROL.QUEUE, содержащие подписку на определенную тему, например, TestTopic. брокер читает это сообщение и запоминает детали подписки, такие как тема и очередь, в которой подписчик хочет получать сообщения. Потоки являются средством группирования разных тем. Состояния брокера и заявок на подписку запоминаются во внутренних очередях брокера: SYSTEM.BROKER.*.Приложение-издатель публикует информацию по определенной теме, например, по TestTopic и оно попадает в потоковую очередь брокера. По умолчанию это очередь:
SYSTEM.BROKER.DEFAULT.STREAM
Сообщение о публикации читается брокером и направляется в очередь подписчика. Приложение подписчика читает публикацию из очереди подписчика.
Сообщения публикация-подписка имеет время жизни, по истечению которого они удаляются. Сообщения публикация-подписка являются постоянными сообщениями (persistent) и восстанавливаются после перезагрузки менеджера и/или брокера.
Также как и сообщения WebSphere MQ потоки информации от издателя к подписчику являются асинхронными. Если брокер не работает, то публикация будет оставаться в потоковой очереди до тех пор, пока брокер не стартует. Если приложение подписчика не активно, то публикация будет оставаться в очереди подписчика.
Основные шаги, из которых складывается создание Приложения издателя. Таких шагов для программы publisher будет 7.
Шаг 1. Для того чтобы положить сообщение в очередь потоков брокера, необходимо подключиться к менеджеру брокера и открыть очередь командой MQOPEN для помещения сообщений.
Шаг 2. Для публикации необходимо сформировать MQRFH структуру (см. руководство по WebSphere MQ Publish/Subscribe). Все поля этой структуры должны иметь определенные значения. Некоторые значения MQRFH должны быть изменены по сравнению со значениями по умолчанию перед публикацией или в тот момент, когда мы определим значения этих полей.
Шаг 3. Сразу за структурой MQRFH должна следовать символьная строка NameValueString. Указатель pNameValueString определяет начальную позицию NameValueString в теле сообщения.
Шаг 4. Содержимое строки NameValueString должно включать всю необходимую информацию о публикации и всю необходимую последовательность команд для формирования непрерывной символьной строки нашей публикации. Содержимое строки NameValueString позволяет брокеру определить Publish/Subscribe команды и порядок их обработки.
Шаг 5. Поле StrucLength должно содержать длину MQRFH структуры и сопровождающей его строки NameValueString. Длина MQRFH фиксирована и длина NameValueString – переменная. Значение StrucLength позволяет не применять разделитель конца строки в NameValueString, хотя и он может быть использован, если это необходимо приложению-подписчику. MQRFH и NameValueString выравниваются на границе 16 байт автоматически.
Шаг 6. Данные о публикации ( в данном случае строка данных для публикации) помещаются сразу после MQFRH и NameValueString структуры, указатель pUserData должен быть установлен на начальную позицию строки введенных данных о публикации.
Шаг 7. Осуществляется процедура тестирования работы программы. Программа компилируется, устраняются ошибки компиляции. Она вызывается, например, командой:
publisher TestTopic Publish QM_broker Publisher_queue
В командном окне программы publisher вводится сообщение по теме: Hello
На этом этапе еще нет приложения подписчика, поэтому необходимо:
проверить, нет ли сообщений об ошибках от брокера в очереди Publisher_queue и исправить это; есть ли успешные сообщения от брокера;посмотреть сгенерированное сообщение о публикации в очереди SYSTEM.DEFAULT.LOCAL.STREAM, используя MQ explorer или команду amqsbcg и убедиться, что в формате сообщения о публикации нет ошибок.
Основные шаги, из которых складывается создание Приложения подписчика.
Шаг 1. Прежде всего, командой MQOPEN необходимо открыть очередь брокера для того, чтобы затем положить командное сообщение в соответствующую очередь.
Шаг 2. На этом шаге необходимо зарегистрировать интересы в виде темы и послать брокеру запрос на регистрацию подписки. Поскольку команды из основной программы будут посылаться брокеру довольно часто, необходимо разработать функцию SendBrokerCommand, одним из аргументов которой является командная строка для помещения в NameValueString командного сообщения. Эта функция SendBrokerCommand должна быть аналогичной по коду, как и для приложения Издателя.
Шаг 3. Теперь, когда подписчик зарегистрирован, можно ждать поступление публикаций в очередь подписчика, и как только они поступят, их можно прочесть командой MQGET. Первое, что необходимо сделать при получении сообщения, это проверить, что оно в нужном формате MQRFH.
Шаг 4. После распознавания формата MQRFH сообщения, можно выделить порцию наиболее интересного сообщения. В данном случае нет необходимости смотреть на строку NameValueString так как интересуют данные, которые следуют за этой строкой, они задаются указателем pUserData и могут быть напечатаны на экране или выведены в файл.
Шаг 5. На этом последнем шаге необходимо отказаться от регистрации и, как это делалось раньше на шаге 2, вызвать функцию SendBrokerCommand, добавив соответствующую команду для отказа от регистрации. Теперь, так же как и раньше, надо скомпилировать код и затем исправить ошибки компиляции. После этого программа готова для работы, ее можно выполнить:
subscriber TestTopic QM_broker Subscriber_queue
Этот запуск должен отобразить информацию об успешной регистрации у брокера. Если сообщение об успешной регистрации не поступило, то следует посмотреть сообщение об ошибке и исправить ее. После того как прошла успешная регистрация, можно попробовать получить сообщение по подписке. Для этого запустить программу publisher на тему TestTopic с сообщением, например, PublisherReady. В результате, в программе-подписчике должна появиться информация: PublisherReady.
Если этого не произойдет, то необходимо найти ошибку.
Получив успешно одну публикацию от одного издателя, можно расширить эксперимент и попробовать запустить много программ-издателей и программ-подписчиков. Каждой из них понадобиться своя очередь. Если сделать публикации на разные темы, можно ожидать получение определенных сообщений на экранах подписчиков, подписавшихся на соответствующие темы.
Channel-exit программы
Channel - exits программы выполняются канальными агентами. Эти программы первыми и последними встречают и провожают сообщения. Именно поэтому их чаще всего используют для шифрования, проверки идентификации и аутентификации сообщений в целях безопасности.
Иногда возникает потребность применять Channel- exits программы, например, для того чтобы проследить прохождение сообщений и выполнить их логирование, выявить возникшие проблемы. Допустим, что стандартное ПО этого не делает, "вклиниваться" в него не представляется возможным, так как все входящие сообщения считывает из очереди программа-обработчик. В этом случае можно написать простую Channel- exits программу в виде библиотечной функции для UNIX или как DLL (Dynamic Linl Library) для Windows. Программе следует присвоить имя, к примеру, mqexit.dll с точкой входа MsgExit. Для работы программы, ее надо прописать в том канале, через который идут сообщения:
Message Exit Name: | mqexit(MsgExit) |
Message Exit Data: | \mydir\mqexit.ini |
С помощью программы RUNMQSC это может быть сделано командой:
alter chl(mychannel) chltype(mychannel_type) msgexit(' mqexit(MsgExit)') msgdata('\mydir\mqexit.ini')
Программа mqexit.dll и настроечный файл mqexit.ini помещаются в стандартной директории (mydir ) для Channel exits программ (для Windows это C:\Program Files\IBM\WebSphere MQ\Exits\, для UNIX HP_UX это /var/mqm/exits/, аналогичные пути существуют для других платформ).
Настроечный файл mqexit.ini может содержать имя лог файла, имя буферного файла для записи входящего сообщения; имя очереди, в которую перекладывается содержимое буферного файла; имя менеджера, ведь менеджеров может быть несколько на одном сервере.
В учебных целях стоит упростить задачу, не использовать mqexit.ini-файл и записать имя лог-файла непосредственно в Message Exit Data (например, C:\TEMP\mqexit.log), а с сообщениями пусть разбирается прежняя программа-обработчик, читающая входящие сообщения из очереди. При этом предполагается, что используется менеджер очередей по умолчанию. В этом случае программа mqexit.dll для Windows выглядит следующим образом.
Листинг 11.1. Листинг программы mqexit.c (html, txt)
В качестве комментария к программе надо отметить, что обязательным является соблюдение трех рекомендаций для Windows из документа [8]
- Intercommunication.
Начало программы следует определить точками входа MQStart и ChannelExit программы mqexit.c:
#include <cmqc.h> #include <cmqxc.h> void MQStart() {;} /* dummy entry point - for consistency only */ void MQENTRY ChannelExit ( PMQCXP pChannelExitParms, PMQCD pChannelDefinition, PMQLONG pDataLength, PMQLONG pAgentBufferLength, PMQVOID pAgentBuffer, PMQLONG pExitBufferLength, PMQPTR pExitBufferAddr) { ... Insert code here } При компиляции добавить в проект как исходный файл MQMVX.LIB. В настройках проекта для генерации C/C++ кода изменить выпадающее меню с "Use Run-Time Library" на "Multithreaded" для "Multithreaded using DLL".Добавить в проект свой .DEF файл (mqexit.def):
LIBRARY mqexit DESCRIPTION 'Provides Retry and Channel exits' HEAPSIZE 4096 STACKSIZE 8192 EXPORTS ChannelExit
И после этого проходящие по каналу сообщения начинают записываться в файл, определенный в Message Exit Data, включая заголовок и данные сообщения. Скорость прохождения сообщений с использованием Channel exits программ уменьшится за счет команд записи на диск. Читателю предлагается исследовать самостоятельно во сколько раз измениться скорость работы программы и найти пути её повышения.
В заключение лекции следует отметить, что остались механизмы, которые не нашли отражение в этой книге и могли бы представлять интерес для читателя-программиста, а именно:
Разработка программ на основе интерфейсов JMS и AMI [21];PCF - команды (Programmable Command Format) и работа с ними;встроенные средства мониторинга событий WebSphere MQ.
С этими вопросами читателю предлагается ознакомиться самостоятельно по документации.
Ключевым моментом интеграции приложений является использование WebSphere Business Integration Message Broker®, сокращенно WBI Message Broker или просто WBI Broker (ранее - IBM MQSeries Integrator®) для управления и преобразования потоков сообщений. Этот продукт компании IBM будет рассмотрен далее.
/*------------------------*/ /* remove space from data */ /*------------------------*/ for ( i = MQ_EXIT_DATA_LENGTH - 1; i > 0; i-- ) { if ( ini_filename??(i??) != ' ' && ini_filename??(i??) ) break; ini_filename??(i??) = 0; } } getINI(); void writer(long *lngth, char *bf) {
long msgsize; int n; FILE *fp; getINI(); fp = fopen(MsgBufFile, "ab"); msgsize = *lngth; n = fwrite(&msgsize, sizeof(msgsize), 1, fp); /* put record length */ n = fwrite(bf, msgsize, 1, fp); /* and message buffer */ fclose(fp); }
/*--------------------------------------------------------- getINI() - read information from INI file -----------------------------------------------------------*/ void getINI( void ) { FILE *ini_fp; char *ptr, *name_ptr; char field_val??(129??), Ini_s??(129??), fn??(129??); short len;
memset( Ini_s, 0, sizeof( Ini_s ) ); memset( field_val, 0, sizeof( field_val ) ); memset( MsgBufFile, 0, sizeof ( MsgBufFile ) ); memset( Channel_Logfile, 0, sizeof( Channel_Logfile ) );
if ( strlen( ini_filename ) ) /* if Channel Exit defined INI filename */ strcpy( fn, ini_filename ); /* use it */ else /* otherwise use default INI filename */ strcpy( fn, INI_FILENAME ); if ( (ini_fp = fopen( fn, "r" )) == NULL ) { fprintf(stderr, "\nUnable to open %s. Error #: %d", fn, errno ); return; }
/*-----------------------------*/ /* read the INI file until eof */ /*-----------------------------*/ while ( (ptr = fgets( Ini_s, sizeof(Ini_s) - 1, ini_fp )) != NULL ) { len = strlen( Ini_s ); if ( Ini_s??(len -1??) == LINEFEED ) Ini_s??(len-1??) = 0; /* null out '\n' */ if ( (name_ptr = strchr( Ini_s, EQUAL )) == NULL ) /* no '=' found */ continue; ptr = name_ptr + 1; *name_ptr = 0; /* set null for s[] */ strcpy( field_val, ptr ); fclose( ini_fp );
return; }
Листинг 11.1. Листинг программы mqexit.c
В качестве комментария к программе надо отметить, что обязательным является соблюдение трех рекомендаций для Windows из документа [8]
- Intercommunication.
Начало программы следует определить точками входа MQStart и ChannelExit программы mqexit.c:
#include <cmqc.h> #include <cmqxc.h> void MQStart() {;} /* dummy entry point - for consistency only */ void MQENTRY ChannelExit ( PMQCXP pChannelExitParms, PMQCD pChannelDefinition, PMQLONG pDataLength, PMQLONG pAgentBufferLength, PMQVOID pAgentBuffer, PMQLONG pExitBufferLength, PMQPTR pExitBufferAddr) { ... Insert code here } При компиляции добавить в проект как исходный файл MQMVX.LIB. В настройках проекта для генерации C/C++ кода изменить выпадающее меню с "Use Run-Time Library" на "Multithreaded" для "Multithreaded using DLL".Добавить в проект свой .DEF файл (mqexit.def):
LIBRARY mqexit DESCRIPTION 'Provides Retry and Channel exits' HEAPSIZE 4096 STACKSIZE 8192 EXPORTS ChannelExit
И после этого проходящие по каналу сообщения начинают записываться в файл, определенный в Message Exit Data, включая заголовок и данные сообщения. Скорость прохождения сообщений с использованием Channel exits программ уменьшится за счет команд записи на диск. Читателю предлагается исследовать самостоятельно во сколько раз измениться скорость работы программы и найти пути её повышения.
В заключение лекции следует отметить, что остались механизмы, которые не нашли отражение в этой книге и могли бы представлять интерес для читателя-программиста, а именно:
Разработка программ на основе интерфейсов JMS и AMI [21];PCF - команды (Programmable Command Format) и работа с ними;встроенные средства мониторинга событий WebSphere MQ.
С этими вопросами читателю предлагается ознакомиться самостоятельно по документации.
Ключевым моментом интеграции приложений является использование WebSphere Business Integration Message Broker®, сокращенно WBI Message Broker или просто WBI Broker (ранее - IBM MQSeries Integrator®) для управления и преобразования потоков сообщений. Этот продукт компании IBM будет рассмотрен далее.
© 2003-2007 INTUIT.ru. Все права защищены. |
a sample message exit function
/* Листинг программы mqexit.dll */ /************************************************************************/ /* Program name: mqexit */ /* Description: This is a sample message exit function record all messages */ /* This function will work on any channels, e.g. send, receive, requester, etc. */ /* This function logs all channel activities into the Channel Log file. */ /* Please see Intercommunication documentation to specify the Channel Message Exit */ /* Authors: Vladimir Makushkin, Moscow, Russia */ /* email: vmakushkin@mail.ru */ /* Disclaimer: this program has been tested under HP_UX and Windows to the */ /* authors' satisfaction. You may use this program at your own risk. Author are not */ /* responsible for any unexpected results generated by this program. */ /************************************************************************/ /* standard headers */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <locale.h> #include <sys/types.h> #include <sys/timeb.h> #include <time.h> /* MQSeries headers */ #include <cmqc.h> #include <cmqxc.h> char timebuf??(130??); char Channel_Logfile??(129??); char MsgBufFile??(129??); char ini_filename??(129??); FILE * OST = 0; #define DBGPRINTF(x) { \ OST=fopen(Channel_Logfile,"a+"); \ if(OST) fprintf x; \ fclose(OST); \ } void writer (long *, char *); /* prototype for file writer function */ void getINI( void ); void MQENTRY MQStart(void) {;} void MQENTRY MsgExit( PMQCXP pChannelExitParams, PMQCD pChannelDefinition, PMQLONG pDataLength, PMQLONG pAgentBufferLength, PMQBYTE AgentBuffer, PMQLONG pExitBufferLength, PMQPTR pExitBufferAddr) { short i; struct timeb t; char millisec??(25??); time_t clock = time( (time_t*) NULL); struct tm *tmptr = localtime(&clock); strcpy(timebuf, asctime(tmptr)); ftime( &t ); memset( millisec, 0, sizeof( millisec ) ); sprintf( millisec, " Time:%ld.%d", t.time, t.millitm ); strcat( timebuf, millisec ); if ( strlen(pChannelExitParams->ExitData) ) { memset( ini_filename, 0, sizeof( ini_filename ) ); strncpy( ini_filename, pChannelExitParams->ExitData, MQ_EXIT_DATA_LENGTH ); /*------------------------*/ /* remove space from data */ /*------------------------*/ for ( i = MQ_EXIT_DATA_LENGTH - 1; i > 0; i-- ) { if ( ini_filename??(i??) != ' ' && ini_filename??(i??) ) break; ini_filename??(i??) = 0; } } getINI(); void writer(long *lngth, char *bf) { long msgsize; int n; FILE *fp; getINI(); fp = fopen(MsgBufFile, "ab"); msgsize = *lngth; n = fwrite(&msgsize, sizeof(msgsize), 1, fp); /* put record length */ n = fwrite(bf, msgsize, 1, fp); /* and message buffer */ fclose(fp); } /*--------------------------------------------------------- getINI() - read information from INI file -----------------------------------------------------------*/ void getINI( void ) { FILE *ini_fp; char *ptr, *name_ptr; char field_val??(129??), Ini_s??(129??), fn??(129??); short len; memset( Ini_s, 0, sizeof( Ini_s ) ); memset( field_val, 0, sizeof( field_val ) ); memset( MsgBufFile, 0, sizeof ( MsgBufFile ) ); memset( Channel_Logfile, 0, sizeof( Channel_Logfile ) ); if ( strlen( ini_filename ) ) /* if Channel Exit defined INI filename */ strcpy( fn, ini_filename ); /* use it */ else /* otherwise use default INI filename */ strcpy( fn, INI_FILENAME ); if ( (ini_fp = fopen( fn, "r" )) == NULL ) { fprintf(stderr, "\nUnable to open %s. Error #: %d", fn, errno ); return; } /*-----------------------------*/ /* read the INI file until eof */ /*-----------------------------*/ while ( (ptr = fgets( Ini_s, sizeof(Ini_s) - 1, ini_fp )) != NULL ) { len = strlen( Ini_s ); if ( Ini_s??(len -1??) == LINEFEED ) Ini_s??(len-1??) = 0; /* null out '\n' */ if ( (name_ptr = strchr( Ini_s, EQUAL )) == NULL ) /* no '=' found */ continue; ptr = name_ptr + 1; *name_ptr = 0; /* set null for s[] */ strcpy( field_val, ptr ); fclose( ini_fp ); return; } |
Листинг 11.1. Листинг программы mqexit.c |
Закрыть окно |
strncpy( ini_filename, pChannelExitParams->ExitData, MQ_EXIT_DATA_LENGTH );
/*------------------------*/
/* remove space from data */
/*------------------------*/
for ( i = MQ_EXIT_DATA_LENGTH - 1; i > 0; i-- )
{
if ( ini_filename??(i??) != ' ' && ini_filename??(i??) )
break;
ini_filename??(i??) = 0;
}
}
getINI();
void writer(long *lngth, char *bf)
{
long msgsize;
int n;
FILE *fp;
getINI();
fp = fopen(MsgBufFile, "ab");
msgsize = *lngth;
n = fwrite(&msgsize, sizeof(msgsize), 1, fp); /* put record length */
n = fwrite(bf, msgsize, 1, fp); /* and message buffer */
fclose(fp);
}
/*---------------------------------------------------------
getINI() - read information from INI file
-----------------------------------------------------------*/
void getINI( void )
{
FILE *ini_fp;
char *ptr, *name_ptr;
char field_val??(129??), Ini_s??(129??), fn??(129??);
short len;
memset( Ini_s, 0, sizeof( Ini_s ) );
memset( field_val, 0, sizeof( field_val ) );
memset( MsgBufFile, 0, sizeof ( MsgBufFile ) );
memset( Channel_Logfile, 0, sizeof( Channel_Logfile ) );
if ( strlen( ini_filename ) ) /* if Channel Exit defined INI filename */
strcpy( fn, ini_filename ); /* use it */
else /* otherwise use default INI filename */
strcpy( fn, INI_FILENAME );
if ( (ini_fp = fopen( fn, "r" )) == NULL )
{
fprintf(stderr, "\nUnable to open %s. Error #: %d", fn, errno );
return;
}
/*-----------------------------*/
/* read the INI file until eof */
/*-----------------------------*/
while ( (ptr = fgets( Ini_s, sizeof(Ini_s) - 1, ini_fp )) != NULL )
{
len = strlen( Ini_s );
if ( Ini_s??(len -1??) == LINEFEED )
Ini_s??(len-1??) = 0; /* null out '\n' */
if ( (name_ptr = strchr( Ini_s, EQUAL )) == NULL ) /* no '=' found */
continue;
ptr = name_ptr + 1;
*name_ptr = 0; /* set null for s[] */
strcpy( field_val, ptr );
fclose( ini_fp );
return;
}
Группировка и сегментация сообщений
До настоящего момента говорилось о физических сообщениях WebSphere MQ, и этого было вполне достаточно. Обычные сообщения длиной до 4Мб и сообщения максимальной длины в 100Мбт покрывают потребности любых приложений на 95%. Необходимость использования сверхбольших сообщений (> 100Мбт) или ограничения в приложениях на длину сообщений приводят к необходимости использовать понятия физических и логических сообщений, механизмы группирования и сегментации сообщений.
Физическое сообщение - наименьший блок информации, который может быть размещен в очереди. Одно физическое сообщение принято называть сегментом.
Логическое сообщение это либо отдельный блок информации приложения, либо один или несколько сегментов.
Группа сообщений это одно или несколько логических сообщений. Сегментация сообщений обычно используется, когда требуется работа с сообщениями, которые слишком большие для менеджера, очереди, либо приложения. Группы сообщений,как правило, применяются в следующих случаях:
требуется гарантировать упорядочение при поиске и при этом избежать использования MsgId и CorrelId;требуется обеспечить объединение в группу сообщений, которые должны быть обработаны специфическим программным модулем и без использования MsgId и CorrelId.
Пусть необходимо объединить в группу три сообщения. Для этого в дескрипторе сообщений полю MQMD_GroupId (24 байта) нужно присвоить уникальный идентификатор и записать его в каждом сообщении. Следующему полю MQMD_MsgSeqNumber присвоить значения 1, 2 и 3 для первого, второго и третьего сообщений, соответственно, как показано это на рис.11.1. Если сообщение не включено в группу, то для него GroupId будет NULL (MQGI_NONE), а в MsgSeqNumber будет значение 1. WebSphere MQ может автоматизировать присвоение MsgSeqNumber во время выполнения функции MQPUT, если в PMO задана опция MQPMO_LOGICAL_ORDER. В этом случае также понадобится установить значение в новое поле дескриптора сообщений: MQMD.MsgFlags, которое может иметь значения MQMF_MSG_IN_GROUP и MQMF_LAST_MSG_IN_GROUP. Если в GroupId было установлено значение MQGI_NONE, то для GroupId будет сгенерирован новый уникальный идентификатор при использовании MQPMO_LOGICAL_ORDER. И еще одно правило для вывода группы сообщений: свойства сообщений PERSISTENCE и NOT_PERSISTENCE не должны перемешиваться в одной группе.
Для пояснения изложенного следует привести фрагмент программы, использующей вывод сообщений в виде группы.
if ((CompCode == MQCC_OK) || (CompCode == MQCC_WARNING)) { pmo.Options = MQPMO_LOGICAL_ORDER + MQPMO_SYNCPOINT ; memcpy(md.GroupId, "1", sizeof(md.GroupId)); memcpy(md. MsgFlags, "MQMF_MSG_IN_GROUP", sizeof(md. MsgFlags));
//memcpy(md.MsgSeqNumber, 1, // sizeof(md.MsgSeqNumber)); //memcpy(md.MsgFlags, MQMF_SEGMENT, // sizeof(md.MsgFlags));
/* блок формирования buffer */ MQPUT1(Hcon, &odR, &md, &pmo, messlen, buffer, &CompCode, &Reason); if (Reason != MQRC_NONE) printf("MQPUT1 ended with reason code %ld\n", Reason); } /* конец цикла чтения поступающих сообщений */
memcpy(md. MsgFlags, "MQMF_ LAST_MSG_IN_GROUP", sizeof(md. MsgFlags));
/* блок формирования buffer */ MQPUT1(Hcon, &odR, &md, &pmo, messlen, buffer, &CompCode, &Reason); if (Reason != MQRC_NONE) printf("MQPUT1 ended with reason code %ld\n", Reason);
Рис. 11.1. Группа сообщений и сегментация сообщения
Процесс извлечения группы логических сообщений из очереди командой MQGET во многом будет зависеть от значений, заданных в опции GMO. Если задать значение MQGMO_ALL_MSGS_AVAILABLE, то сообщения будут доступны для чтения только, когда все сообщения группы поступят в очередь. Если задана опция MQGMO_LOGICAL_ORDER, то сообщения можно извлекать в порядке, определяемом MsgSeqNumber и только по одному сообщению из группы. Если опция MQGMO_LOGICAL_ORDER не задана, то приложение должно выбирать сообщения двумя способами:
на основе корректных GroupId и MsgSeqNumber, либо переключаться на MQGMO_LOGICAL_ORDER, либо переслать сообщения дальше.на основе опций соответствия MatchOption, если они заданы: MQMO_MATCH_MSG_ID, MQMO_MATCH_CORREL_ID, MQMO_MATCH_GROUP_ID, MQMO_MATCH_MSG_SEQ_NUMBER.
Например, опция CORREL_ID означает, что должно выбираться сообщение, корреляционный идентификатор которого, соответствует значению поля CorrelId параметра MsgDesc функции MQGET. Это согласование является дополнением к любому другому соответствию, заданному в MatchOption. Если корреляционный идентификатор есть MQCI_NONE, то это равносильно тому, что опция MQMO_MATCH_CORREL_ID не определена.
Теперь следует дать расширенное определение сегмента сообщения. Сегментом сообщения называется физическое сообщение, идентифицируемое тройкой: GroupId, MsgSeqNumber и Offset. Из лекции 8 известно, что Offset это поле формата MQLONG в структурах MQOD и MQMD. Таким образом, одно логическое сообщение может состоять из нескольких сегментов, как показано на рис.11.1.
Разбиение на сегменты (сегментация) и обратная процедура, называемая реассемблированием (reassembly) может осуществляться менеджером очередей или приложением. Для начала следует рассмотреть, как это осуществляет менеджер очередей. Если дескриптор сообщения имеет в поле MsgFlag значение MQMF_SEGMENTATION_ALLOWED, и менеджер очередей определил, что сообщение является слишком большим для его MaxMsgLength или для значения параметра MaxMsgLength для очереди, то тогда менеджер осуществляет сегментацию. Никаких предположений о том, как выполняется разбиение данных, сделано быть не может.
Реассемблирование с помощью менеджера осуществляется, если при чтении сообщений командой MQGET была задана GMO-опция: MQGMO_COMPLETE_MESSAGE. В этом случае извлекаются все сегменты, и только полное логическое сообщение помещается в программный буфер. Следует быть внимательным при задании преобразования данных в sender-канале. Поскольку не все сегменты приходят одновременно, преобразование данных в канале в связи с разными кодовыми страницами может привести к ошибкам и рекомендуется задавать предобработку (UOW - unit of work) с помощью менеджера (опция MQRC_UOW_AVAILABLE) или приложения (опция MQRC_UOW_NOT_AVAILABLE).
Сегментация с помощью приложения возможна в двух случаях:
Сегментация с помощью менеджера не решает проблему из-за того, что буфер приложения недостаточно большой для обработки полного сообщения;Сегментация с помощью менеджера не может быть успешной из-за специфических границ сообщения и необходимости преобразование данных, которое должно осуществляться в канале.
Для сегментации с помощью приложения необходимо включить для функции MQPUT опцию MQPMO_LOGICAL_ORDER и проследить за правильным включением в поле MsgFlags флажков MQMF_SEGMENT и MQMF_LAST_SEGMENT. Менеджер очередей позаботиться о назначении и поддержании GroupId, MsgSeqNumber и Offset.
Реассемблирование с помощью приложения осуществляется, если при чтении сообщений командой MQGET была задана GMO-опция: MQGMO_COMPLETE_MESSAGE и флажок MsgFlags имеет значение MQMF_SEGMENTATION_ALLOWED. В этом случае приложение извлекает сегменты самостоятельно в зависимости от ограничений на свой размер буфера. Опции MQGMO_ALL_MSGS_AVAILABLE и MQGMO_ALL_SEGMENTS_AVAILABLE позволят начать работу, если все сообщения или все сегменты поступят в очередь. Как только первое сообщение поступит в очередь, можно использовать MQGMO_LOGICAL_ORDER опцию, чтобы быть уверенным, что все последующие сегменты логического сообщения будут обработаны. Если опция MQGMO_LOGICAL_ORDER не задана, то приложение может выбирать сообщения на основе опций соответствия MatchOption, как это делалось для групп:
MQMO_MATCH_MSG_ID,MQMO_MATCH_CORREL_ID, MQMO_MATCH_GROUP_ID, MQMO_MATCH_MSG_SEQ_NUMBER,MQMO_MATCH_OFFSET.
Приложение может определить окончание работы с сегментом с помощью возвращаемого MQGET значения поля MQGMO.SegmentStatus, которое может принимать следующие значения: MQSS_SEGMENT, MQSS_LAST_SEGMENT, MQSS_NOT_A_SEGMENT.
В завершение раздела можно привести примеры использования функций MQPUT и MQGET для вывода и чтения логических сообщений в соответствии с их структурой, указанной на рис.11.1.
/* пример использования функции MQPUT */ PMO.Options = MQGMO_LOGICAL_ORDER + MQGMO_SYNCPOINT
/* помещаем в очередь первое логическое сообщение*/ MQPUT (MQMF_MSG_IN GROUP + MQMF_SEGMENT) MQPUT (MQMF_MSG_IN GROUP + MQMF_SEGMENT) MQPUT (MQMF_MSG_IN GROUP + MQMF_LAST_SEGMENT)
/* помещаем в очередь второе логическое сообщение */ MQPUT (MQMF_MSG_IN GROUP + MQMFSEGMENT)
/* помещаем в очередь третье логическое сообщение */ MQPUT (MQMF_LAST_MSG_IN GROUP + MQMF_SEGMENT) MQPUT (MQMF_ LAST_MSG_IN GROUP + MQMF_LAST_SEGMENT) MQCMIT
/* пример использования функции MQGET */ GMO.Options = MQGMO_LOGICAL_ORDER + MQGMO_SYNCPOINT + MQGMO_ALL_MSGS_AVAILABLE + MQGMO_WAIT Do while (Not MQGS_LAST_MSG_IN GROUP and Not MQSS_LAST_SEGMENT) MQGET (........) /* обработка сегмента или законченного логического сообщения */ End MQCMIT
Модификация объектов
Характеристики объектов WebSphere MQ определяются в момент создания, но иногда их необходимо модифицировать, например, изменив приоритет сообщений при помещении их в очередь (Default Priority) или максимально допустимое количество сообщений в очереди (Maximum Queue Depth). Модификация объектов WebSphere MQ требуется, в частности, при восстановлении опций очередей Put Messages и Get Messages в состояние Allowed, а также параметров триггеринга, извлечении статистических данных (Message Count и т.п.). Необходимость работы с функциями MQINQ и MQSET, предназначенными для этих целей, возникает не так часто, но без этого иногда трудно обойтись.
Функция MQINQ позволяет извлечь атрибуты любой очереди, процесса, менеджера или списка кластеров namelist. MQSET дает возможность изменить эти параметры, но только для очереди. Обе функции используют массивы идентификаторов (selectors), в которых указывается, какие характеристики объектов должны быть извлечены или изменены. Имена идентификаторов имеют префиксы: MQCA_ для символьных данных (например, имя очереди) и MQIA_ для числовых данных (например, Maximum Queue Depth).
Допустим, для некоторой очереди необходимо определить характеристики: количество сообщений в очереди, максимальное количество сообщений, имя очереди и ее описание.
Формат команды:
MQINQ (Hconn, Hobj, SelectorCount, Selectors, IntAttrCount, IntAttrs, CharAttrLength, CharAttrs, CompCode, Reason)
Очередь должна быть открыта и Hconn, Hobj известны.
Пусть общее количество идентификаторов MQLONG SelectorCount = 4; они должны быть перечислены в массиве MQLONG Selectors[4];
Selectors [0] = MQIA_CURRENT_Q_DEPTH; Selectors [1] = MQIA_MAX_Q_DEPTH; Selectors [2] = MQCA_Q_NAME; Selectors [3] = MQCA_Q_DESC;
Число цифровых атрибутов задается в MQLONG IntAttrCount = 2; они должны вернуться в массив MQLONG IntAttrs [2]; Длина затребованных символьных данных MQLONG CharAttrLength = 112; (48 для MQCA_Q_NAME и 64 для MQCA_Q_DESC) и они должны вернуться в массив CHAR CharAttrs[112] = ""; Теперь можно выполнять MQINQ. Не стоит забывать одно важное правило: цифровые и символьные характеристики следуют в массивах IntAttrs и CharAttrs в том порядке, как они перечислены в массиве Selectors.
В случае, когда программа rewriter (см. лекцию 8), запускается как триггер, при поступлении сообщения в очередь INPUT.Q, использовать ini-файл неудобно и гораздо проще задать необходимые параметры в свойствах очереди. Для этого в свойствах очереди INPUT.Q в закладке Triggering указывается:
Trigger Control = On Initiation Queue Name = INPUT_INIT.Q Process Name = rewriter
а для процесса rewriter определяется
PROCESS_USER_DATA = OUTPUT.Q /* queue output */ PROCESS_APPL_ID = "C:\rewriter\rewriter.exe" PROCESS_ENV_DATA = rewriter.log /* имя лог-файла не более 24 символов */
Теперь в программе легко можно извлечь все необходимые параметры с помощью MQINQ. После завершения работы программы очередь инициализации следует очистить. Это делает следующий фрагмент кода:
MQLONG Select[1]; /* attribute selectors */ MQLONG SelectValue[1]; /* value attribute selectors */ MQLONG char_count; char queueinitname[48] = ""; int queuenamelen;
Select[0] = MQCA_INITIATION_Q_NAME; /* attribute selectors */ queuenamelen = 0; char_count = 48; MQINQ(Hcon, Hobj, 1, Select, 0, NULL, char_count, queueinitname, &CompCode, &Reason); queuenamelen = strlen(queueinitname) - 1; queueinitname[queuenamelen] = ' '; strncpy(odI.ObjectName, queueinitname, 24); O_options = MQOO_INPUT_SHARED + MQOO_FAIL_IF_QUIESCING; MQOPEN(Hcon, &odI, O_options, &Hinq, &CompCode, &Reason); if (Reason != MQRC_NONE) { printf("MQOPEN (input) ended with reason code %ld\n", Reason); } if (CompCode == MQCC_FAILED) { exit(Reason); } memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQMI_NONE, sizeof(md.CorrelId)); while (CompCode == MQCC_OK) { gmo.Options = MQGMO_ACCEPT_TRUNCATED_MSG + MQGMO_WAIT; gmo.WaitInterval = 1; memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId)); memcpy(md.CorrelId, MQMI_NONE, sizeof(md.CorrelId)); buflen = sizeof(buffer) - 1; MQGET(Hcon, Hinq, &md, &gmo, buflen, buffer, &messlen, &CompCode, &Reason); // printf("Inituation queue clear with // reason code %ld and CompCode %ld\n", // Reason, CompCode); } CompCode = MQCC_OK ;
В этом фрагменте MQINQ используется для извлечения только одного параметра: имени очереди инициализации. Очистка этой очереди делается командой MQGET до тех пор, пока очередь не будет пуста.
Функция MQSET по формату полностью аналогична MQINQ, разница между ними заключается только в направлении потоков данных. В качестве примера можно рассмотреть работу триггера с условиями Trigger Type = First и Trigger Depth = 1. При срабатывании триггера флажок из состояния Trigger Control = On переходит в состояние Trigger Control = Off и его надо восстанавливать. Это делает следующий фрагмент кода:
Select[0] = MQIA_TRIGGER_CONTROL; /* attribute selectors */ SelectValue[0] = 1 ; MQSET(Hcon, Hobj, 1, Select, 1, SelectValue, 0, NULL, &CompCode, &Reason);
Заключительный вывод по разделу: функция MQINQ позволяет извлечь атрибуты любой очереди, процесса, менеджера или списка кластеров namelist, а функция MQSET - изменить эти параметры, но только для очереди.
Работа с MsgId и CorrelId
Теперь полезно изучить способы контроля доставки сообщений, тем более что для этого существуют специальные поля в дескрипторе сообщений: MsgId и CorrelId. Например, некоторое приложение отправляет запросы на обработку в очередь Queue1 и ожидает уведомления о доставке в очереди ReplyToQ = Queue2. Совершенно очевидно, что уведомления о доставке не обязаны появиться в очереди Queue2 в той же последовательности, в какой отправлялись запросы в очередь Queue1. Как же в таком случае отслеживать доставку сообщений?
Другой пример, приложение отправляет запросы в разные системы, в частности, по банковским счетам клиента и по проверке его кредитной истории и ожидает получить ответ от запрашиваемых систем в одну и ту же очередь для всех клиентов. Как же идентифицировать поступающие ответы? Самое простое - использовать идентификаторы MsgId и CorrelId, которые являются уникальными для каждого сообщения и создаются на основе времени выполнения команды MQPUT. Идентификаторы MsgId и CorrelId создаются менеджером, если они установлены в NULL, либо заданы опции MQPMO_NEW_MSG_ID и/или MQPMO_NEW_CORREL_ID; в противном случае, MsgId и CorrelId создаются приложением.
Таким образом, задавая значения MsgId и CorrelId при выполнении MQPUT, появляется возможность автоматически получать значения MsgId и CorrelId при чтении командой MQGET. При этом не надо забывать, что в цикле считывания перед командой MQGET следует ставить обнуление этих переменных (MsgId=MQMI_NONE и CorrelId=MQCI_NONE), иначе можно получить ошибку считывания сообщений из очереди MQRC_NO_MSG_AVAILABLE, несмотря на то, что сообщения в очереди имеются.
Рассмотрим на примере, как работает MQGET с MsgId и CorrelId на практике. Пусть в некоторую очередь поступили сообщения в определенном порядке, как указано в таблице ниже.
1 | 1 | 1 |
2 | 1 | 2 |
3 | 1 | 3 |
4 | 2 | 1 |
5 | 2 | 2 |
6 | 3 | 1 |
Теперь варьируя MsgId и CorrelId, можно читать самые разные сообщения.
MQGET (MSGID= MQMI_NONE, CORRELID= MQCI_NONE) читает первое доступное сообщение независимо от MsgId и CorrelId, то есть сообщение 1.
MQGET (MSGID= MQMI_NONE, CORRELID= 3) может прочитать только одно сообщение, имеющее значение CorrelId = 3, это сообщение 3.
MQGET (MSGID= 2, CORRELID= MQCI_NONE) читает первое сообщение со значением MsgId = 2, это сообщение 4.
MQGET (MSGID= 2, CORRELID= 2) может прочитать только одно сообщение, имеющее уникальное сочетание MsgId = 4 и CorrelId = 2, это сообщение 5.
Таким образом, если никакие другие сообщения не поступят в очередь, то в очереди после четырех MQGET останутся 2 сообщения. Здесь уместен вопрос о производительности работы менеджера очередей при таком поиске сообщений. Если число сообщений в очереди не превышает 100, то время поиска будет незначительным, оно практически не зависит от числа сообщений. Но если число сообщений в очереди превысит 1000, то менеджер будет сканировать очередь и время поиска будет заметным. На OS/390 MQSeries администратор может определить MsgId или CorrelId (но не одновременно) как индекс и это существенно ускорит поиск нужного сообщения в очереди. Следует также отметить, что на других платформах (AS/400, HP_UX, AIX, Sun Solaris, Windows) в версии 5.3 WebSphere MQ можно использовать опции соответствия MatchOption: MQMO_MATCH_MSG_ID и MQMO_MATCH_CORREL_ID. Если эти опции соответствия не будут определены, то MsgId и CorrelId будут игнорироваться, как если бы использовались опции MQMI_NONE и MQCI_NONE и будет извлекаться очередное сообщение.
Сформулируем одно общее правило при разработке программ с контролем доставки сообщений с помощью MsgId и CorrelId: если приложение перемещает сообщение из входной очереди в выходную, то MsgId входного сообщения перемещается в CorrelId выходного сообщения и создается новый уникальный идентификатор MsgId выходного сообщения. На уровне псевдокода это правило можно записать следующим образом:
INPUT_MSG_DESC.MsgId = MQMI_NONE; INPUT_MSG_DESC.CorrelId = MQCI_NONE; MQGET (..........); /* Обработка входного сообщения */ OUT_MSG_DESC.MsgId = MQMI_NONE; OUT_MSG_DESC.CorrelId = INPUT_MSG_DESC.MsgId; MQPUT (..........);
Разрабатывая логику в цепи программ- обработчиков сообщений на разных платформах следует стремиться к тому, чтобы CorrelId содержал MsgId исходного запроса, а MsgId выходного сообщения содержал уникальный идентификатор, полученный наиболее важной в функциональном смысле выходной программой. На основе такой логики легко получить подтверждение на выходе, что исходный запрос был отработан, и промежуточные программы проставили свой MsgId в выходном сообщении. Это позволяет осуществить контроль времени прохождения запроса и получения сообщения-отчета.
В завершение раздела, уж если речь пошла о контроле доставки сообщений, необходимо отметить возможность использования полей context (контекст) в дескрипторе сообщения для контроля авторизации пользователя и обеспечения безопасности. Группа полей context состоит из 8 полей, их значения по умолчанию приводятся в таблице ниже.
UserIdentifier | CHAR12 | Имя пользователя | PutAppName | CHAR28 | Имя приложения |
AccountingToken | BYTE32 | Учетный номер приложения-отправителя | PutDate | CHAR8 | YYYYMMDD |
AppIdentityData | CHAR32 | Пробелы | PutTime | CHAR8 | HHMMSSTH |
PutApplType | LONG | UNIX и т.п. | AppOriginData | CHAR4 | пробелы |