Конкуренция
Т.к. ASAP и Delayed агенты обрабатывают одно сообщение за один раз, их исполнение занимает поток на относительно короткое время. Они запускаются не на долго, но часто. Агенты по-расписанию же обрабатывают большое количество сообщений за один запуск, накопившихся с момента предыдущего запуска агента. Время исполнение агентов по расписанию значительно больше (Long Running). Для агентов по-расписанию целесообразно выделять отдельный пул потоков, что позволит не тормозить исполнение ASAP и Delayed агентов.
Агенты по расписанию запускаются редко, исполняются долго, подписка на очередь данных длится до окончания работы агента.
Агенты ASAP запускаются часто, исполняются быстро, подписка на очередь данных длится постоянно.
Для более-менее равномерного распределения исполнения агентов по кластеру, необходимо, чтобы JMS-сигнал на запуск агента извлекался из очереди только при освобождении потока в пуле. Тогда агент по сигналу может начать исполняться незамедлительно, без ожидания в очереди пула и, в тоже время, давая возможность другому, менее загруженному, серверу получить этот JMS-сигнал. Чтобы добиться такого поведения, достаточно оформить функцию получения JMS-сигнала в виде самостоятельной задачи, исполняемой в том же самом пуле. Задача активируется только при освобождении потока в пуле.
Группа агентов по-расписанию
При активации КМА должна быть создана одна задача для получения сигнала на запуск произвольной группы агентов и распределена на исполнение в LongRunning пул потоков. Задача слушает сигналы из 2х JMS-очередей. Одна для режима ANY, другая - для MANY. При получении сигнала, задача должна сформировать новую задачу получения сигнала и поставить её в очередь пула потоков (ExecutorService,submit()), а сама продолжить работу по запуску агентов группы в своем потоке. При формировании новой задачи лучше не создавать новый объект, а использовать текущий ( submit(this) ) - поможем GC. Активация агента в потоке, получившем сигнал, гарантирует наличие свободного потока и активацию без ожидания в очереди. В противном случае, если поток, получивший сигнал - последний в пуле, то выставление отдельной задачи на запуск агента поставит её в очередь, а сигнал уже не будет доставлен на другой сервер, в котором есть свободный поток.
Если сигнал не получен, необходимо форсировать переключение ядра ЦПУ на исполнение другого потока вызовом Thread.yield() и вернуться в цикл опроса сигналов.
Чтобы извлекать JMS-сообщение при освобождении потока, подходит только вариант JMS API с периодическим не блокирующим опросом (JMSConsumer.receive(long timeout)).
Описанный подход обеспечит достаточную степень равномерности распределения исполнений агентов по серверам и в каждом процессе ОС, но имеет недостатки:
- При отсутствии сигналов на запуск агентов, поток находится в холостом цикле опроса, отъедая ресурс CPU.
ASAP и Delayed агентов
В JMS есть 2 механизма, которые потенциально подходят для реализации агенов в режиме ASAP и Delayed - MDB и JMSConsumer.setMessageLister. Эти механизмы реализуют асинхронное API, т.е. при появлении нового сообщения контейнер вызовет callback-функцию КМА. Метод setMessageLister запрещен JMS-спецификацией для применения в EJB-бинах, но допустим в объектах, управляемых приложениями. MDB конфигурируется только на этапе инициализации приложения, соответственно нельзя его переконфигурировать в рантайм в соответствии с настройками, выполненными в МА. Т.о. будем использовать setMessageLister для получения сообщений с данными для ASAP агентов. Получение служебных сообщений на запуск ASAP-агентов тоже будем делать через setMessageLister. MDB не подходит из-за необходимости фильтрации по имени приложения и хоста, которые известны только при инициализации приложения, а параметры MDB конфигурируется только на этапе разработки (хардкодятся). MDB ещё не удобно тем, что контейнер активирует его ещё до окончания инициализации приложения. В результате могут приходить сообщения в момент, когда приложение ещё не готово их обрабатывать.
КМА, при инициализации, отправляет МА сообщение о своей активации. МА в ответ шлет сигналы для создания JMS-подписок для ASAP-агентов. Если агент в режиме MANY, то сигнал отправляется по всем НЗ, привязанным к пулу серверов, в который входит данный КМА. Для агентов в режиме ASAP-ANY МА должен выполнить перераспределение исполнителей с учетом появления нового КМА (см. ниже).
Сигнал от МА на запуск ASAP или Delayed агента получает MessageListener для служебных сообщений. При получении сигнала, необходимо создать JMS-подписку и задать ей индивидуальный объект MessageListener для обработки сообщений с данными. MessageListener, обрабатывающий получение данных, должен вызвать соответствующий агент, передав ему коллекцию, состоящую из одного JMS-сообщения. Если несколько НЗ указывают на одну и туже очередь, подписка всё равно создается, т.к. доставку сообщения надо подтверждать независимо для каждого агента в контексте отдельной транзакции.
При поступлении JMS-сообщения с данными, JMS-провайдер будет вызывать соответствующий MessageListener в одном из потоков пула, управляемого провайдером. В WildFly размер пула определяется параметром thread-pool-max-size.
<subsystem xmlns="urn:jboss:domain:messaging-activemq:3.0"> <server name="default" thread-pool-max-size="32">
Служебные JMS-подписки
Подписка на получение сигналов от МА должна осуществляться только после полного завершения инициализации приложения. Для этого приложение должно явным образом включить КМА после инициализации, вызовом метода start(). Только после этого КМА активируется и создаёт подписки на служебные сигналы от МА.
Конкретное имя JMS-очереди должно быть настраиваемым для возможности развертывания независимых стендов МА и управляемых им приложений на базе общей инфраструктуры МОМ. Должно быть и значение по-умолчанию.
В Java EE окружении КМА должен получать очередь через JNDI-lookup, в Java SE - через property.=
Подписка для запусков агентов по-расписанию
Исполняется в LongRunning пуле. Должна быть осуществлена методом JMSContext.createConsumer
Имя очереди по умолчанию | Тип очереди | Message Selector | Назначение |
---|---|---|---|
am-to-any | Queue не durable | application='<app>' AND JMSType='run-scheduled-agent' AND target-hosts LIKE %|<host>|% | Сигналы на запуск агентов по-расписанию в режиме ANY |
am-to-many | Topic не durable | application='<app>' AND JMSType='run-scheduled-agent' AND target-hosts LIKE %|<host>|% | Сигналы на запуск агентов по-расписанию в режиме MANY |
<app> - имя приложения.
<host> - имя хоста, где развернуто приложение.
Значение <app> по-умолчанию - это имена ear+war файл приложения. Их можно получить через JNDI (@Resource(lookup="java:app/AppName") и @Resource(lookup="java:module/ModuleName")). Если AppName и ModuleName - совпадают, значит это war без ear и надо использовать только ModuleName.
JMS-очереди, через которые направляются сигналы, должна быть не durable, чтобы сообщения не доставлялись отложено временно выключенным приложениям. Если приложение не активно, то JMS-сообщение пропадет. Брокер определит это по отсутствию сессий от консьюмеров, удовлетворяющих условию messsage selector.
Важно, чтобы фабрика соединений с брокером имела одинаковый параметр "Client ID". В противном случае брокер будет рассылать копии сообщений из Queue каждому подписчику с уникальным CliendID, нарушая необходимую семантику доставки только одному. Достаточно оставить значение по-умолчанию - пустоту. ( спецификации JMS, если интересно. п. 6.11.1 Durable TopicSubscriber)
Подписка для прочих адресных сообщений от МК
Имя очереди по умолчанию | Тип очереди | Message Selector | Назначение |
---|---|---|---|
am-to-any | Queue не durable | application='<app>' AND JMSType<>'run-scheduled-agent' AND target-hosts LIKE %|<host>|% | Служебные адресные сообщения приложению на произвольном сервере в пуле |
am-to-many | Topic не durable | application='<app>' AND JMSType<>'run-scheduled-agent' AND target-hosts LIKE %|<host>|% | Служебные адресные сообщения приложению на указанных серверах пула |