Разработка ведется в кастомном репозитории на базе CMJ

Структура

Описание инструмента

МА - менеджер агентов
КМА - клиент менеджера агентов


Разработка агентов

Типы агентов

Под управлением МА могут работать агенты двух типов: самостоятельные (сейчас запускаемые только по расписанию) и по сигналам (агенты по JMS-сообщениям, обычно это ASAP-агенты). Первые работают аналогично агентам в домино, которые запускались AFServer'ом, вторые - новый для CompanyMedia тип, позволяющий реализовать событийно-ориентированную работу системы.

Для возможности запуска МА агенты должны быть оформлены специальны образом. МА может найти и зарегистрировать только классы агентов, являющиеся бинами с аннотацией ru.intertrust.am.client.api.annotations.ManagedAgent. В аннотации и имени бина указывается название агента, по которому МА будет его искать, оно же должно быть указано администратором в настройках запуска в интерфейсе МА.

На самом деле аннотация @ManagedAgent сейчас не используется, но в будущих версиях МА она потребуется для автопубликации агентов.

По соглашениям внутри CMJ все агенты должны начинаться с префикса "CMJ-".

Результат работы агенты возвращают в виде строки - это краткая информация, которая отображается для администратора в интерфейсе МА.

Все упомянутые интерфейсы хорошо документированы в коде (JavaDoc), и при написании собственных агентов можно смело руководствоваться их описанием.
@ManagedAgent(name = MyAgent.AGENT_NAME)
@Service(MyAgent.AGENT_NAME)
public class MyAgent ... {
    ...
}

Агенты по сигналам (по JMS-сообщениям)

В качестве входных данных получают сигналы из очереди сообщений. Могут быть запущены как ASAP-агенты или по расписанию.

Должны реализовывать интерфейс ru.intertrust.am.client.api.MessageAgent. Интерфейс поддерживает универсальные типы.

public class MyAgent implements MessageAgent<ChangeLogMessage, Map<String, String>>

Первый тип-параметр описывает сообщения, с которыми работает агент. Так как обмен сообщениями производится с помощью JMS, то класс сообщения должен реализовывать javax.jms.Message. Сейчас стандартом для МА является ru.intertrust.common.jms.ChangeLogMessage.

Второй тип-параметр описывает передаваемый агенту набор параметров, заданных для агента в настройках МА - сейчас все параметры передаются в виде Map<String, String>.

Интерфейс MessageAgent описывает всего один метод:

public interface MessageAgent<T extends Message, P> {
    @Nonnull
    String agentRun(@Nonnull MessageAgentContext<T> context, @Nonnull P parameters) throws Exception;
}

Хотя сообщения обычно передаются по одному, должна быть обеспечена пакетная работа: из первого параметра должна быть получена и обработана коллекция сообщений.

Пример реализации:

@ManagedAgent(name = MyAgent.AGENT_NAME)
@Service(MyAgent.AGENT_NAME)
public class MyAgent implements MessageAgent<ChangeLogMessage, Map<String, String>> {
    static final String AGENT_NAME = "CMJ-MyAgent";
    private static final Logger log = LoggerFactory.getLogger(MyAgent.class);

    @Nonnull
    @Override
    public String agentRun(final @Nonnull MessageAgentContext<ChangeLogMessage> context,
            final @Nonnull Map<String, String> parameters) {
        try {
            validateParameters(parameters);
            // Агент должен поддерживать получение множества ChLog в рамках одного запуска
            for (ChangeLogMessage msg : context.getMessages()) {
                processMessage(msg, parameters);
            }
        } catch (Throwable e) {
        	throw new RuntimeException("Error when processing message " + toString(context), e);
        } finally {
            // Освободить ресурсы, если нужно
        }
 
        // TODO form pretty message
        return "OK";
    }

    .....
}

Метод

context.getMessages()

возвращает итерируемый объект ru.intertrust.am.client.api.MessageCollection.

Особое внимание тут нужно уделить работе с итератором: MessageCollection - не фиксированная коллекция сообщений, она больше похожа на поток. При вызове Iterator.next() сообщение будет прочитано, объект MessageCollection это запомнит и дальше его итераторы вернут уже следующее сообщение.

Именно это позволяет при работе с коллекцией сообщений выкинуть исключение, при этом КМА поймёт, на каком сообщении оно произошло, и запустит агент, который продолжит обработку со следующего сообщения.

В примере есть цикл

for (ChangeLogMessage msg : context.getMessages()) {

Для долго выполняющихся агентов в него может быть добавлена дополнительная проверка context.isCanceled():

Iterator<ChangeLogMessage> iterator = context.getMessages().iterator();
while (iterator.hasNext() && !context.isCancelled()) {

MessageCollection внутри себя уже проверяет context.isCanceled(), и, если true, то не передаёт следующее сообщение агенту (hasNext()==false). Поэтому обычно цикл перебора будет выглядеть именно как в примере - без неё.


Агент имеет право выбросить исключение:

  • ru.intertrust.am.client.api.TemporaryUnavailableException - предназначено, чтобы сообщить МА, что данные на сервере неактуальны, в таком случае сообщение будет передоставлено позднее;
  • другое исключение - ошибка отправится и будет сохранена в МА; перед отправкой ошибки само JMS-сообщение протоколируется, поэтому ошибка есть и там.

Проблема неактуальных данных

Заключается в том, что сигнал для агента в режиме ASAP прилетел на сервер-обработчик быстрее, чем отреплицировались новые данные.

Решение проблемы особенно необходимо для работы с хранилищем на базе Domino, т.к. в нём репликация работает медленнее, чем передача сигналов.

Временное решение - отложенная доставка сигналов (сигнал копируется в очередь сообщений с отложенной доставкой, на которую настраивается агент) + проверка актуальности на сервере. 

Процедура проверки актуальности находится в разработке. После её выпуска, возможно, потребуется вызывать / переопределять её в агентах.

Самостоятельные агенты

Должны самостоятельно отбирать и обрабатывать нужные данные на сервере. Могут быть запущены только по расписанию.

Реализуют другой интерфейс, не требующий сообщение в качестве входного параметра - ru.intertrust.am.client.api.Agent:

public interface Agent<P> {
    @Nonnull
    String agentRun(@Nonnull AgentContext context, @Nonnull P parameters) throws Exception;
} 

Первый параметр - специальный объект, необходимый для работы агента. Второй параметр, как и для агентов по сигналам, - набор параметров из настроек запуска.

Пример агента:

@ManagedAgent(name = MyAgent.AGENT_NAME)
@Service(MyAgent.AGENT_NAME)
public class MyAgent implements Agent<Map<String, String>> {
    static final String AGENT_NAME = "CMJ-MyAgent";
    private static final Logger log = LoggerFactory.getLogger(MyAgent.class);
 
    @Nonnull
    @Override
    public String agentRun(@Nonnull AgentContext context, @Nonnull Map<String, String> parameters)
            throws Exception {
        log.debug("Start agent {} with Agent Manager", getActionInfo());
        try {
            applyParamsAndRun(parameters);
        } finally {
            log.debug("Finish {} with Agent Manager", getActionInfo());
        }
 
        // TODO form pretty message
        return "OK";
    }
 
    .....
}

Агент имеет право выбрасывать ошибки, как и в случае с ASAP-агентами. В таком случае он будет перезапущен позже (т.е. при следующем срабатывании расписания - ещё раз напомню, что самостоятельные агенты сейчас работают только в таком режиме), а ошибка запротоколирована.

Настройка запуска агентов

Самостоятельные агенты и агенты по сигналам могут быть настроены для работы в разных режимах. Для каждого типа агентов есть свои особенности, которые рассмотрены далее.

Агенты по сигналам

Фильтрация сигналов (селекторы сигналов)

Обычно агент обрабатывает только некоторые сообщения, которые можно выделить по их типу и содержимому. Чтобы сообщение не было передано агенту зря, в КМА существует механизм фильтрации. При разработке агентов по сигналам обязательно его использование, если фильтрация вообще возможна.

Фильтр указывается в дескрипторе агента и работает в зависимости от режима запуска - ASAP или по расписанию (см. ниже).

Фильтры применяет брокер: если сообщение отфильтровано, то КМА его не получит и не запустит агент.

Параллельный запуск

Агент может работать параллельно на нескольких серверах и в нескольких потоках на каждом отдельном сервере. Поступившее сообщение всегда обрабатывается только одним из обработчиков - экземпляров агента.

Режим работы задаётся несколькими полями в настройках.

Режим конкуренции (в дескрипторе агента) указывает, должен ли агент быть запущен на одном сервере или нескольких. Здесь под запуском понимается его активация для последующей передачи ему сообщений, а не старт обработки конкретного сообщения.

  • ANY - агент запускается на одном сервере пула, который выбирается МА; все сообщения обрабатываются на нём;
  • MANY - агент запускается на всех серверах пула; каждое сообщение передаётся на один из них, выбранный МА.

Количество обработчиков (подписчиков) на сервере при работе в режиме ASAP настраивается полем Количество потоков (Настройка запуска -> Конфигурация агента по JMS сообщениям).

Каждое сообщение обрабатывается только один раз: сначала доставляется на один из серверов, затем передаётся агенту в одном из потоков.

Количество допустимых ошибок

Применяется, если агент выдаёт ошибки при последовательном запуске несколько раз подряд. Для разнородных ошибок допускается указанное в настройке количество идущих подряд ошибочных запусков, для одинаковых оно делится на 3. После успешного запуска счётчик ошибок сбрасывается.
На одном сервере для одной настройки запуска агента создаётся один счётчик на все потоки, в которых этот агент работает. Это означает, что счётчик учитывает результаты запуска агента из всех потоков.

При срабатывании счётчика агент останавливается и выключается на текущем сервере (и только на нём).

Однородность ошибок проверяется сравнением их полного stack trace'а в виде строки.

Самостоятельные агенты

Параллельный запуск

Агент по расписанию может быть запущен параллельно на нескольких серверах - режим MANY. В режиме ANY он работает только на одном выбранном МА сервере.

Также существует настройка для разрешения / запрета параллельного запуска для случая, когда наступает очередной момент времени для запуска нового экземпляра и старый ещё не завершил работу - политика наложения расписания.

Количество допустимых ошибок

Используется так же, как и в агентах по сигналам.

Запуск агентов по сигналам по расписанию

Работа агента по сигналам может быть настроена так, что он будет запущен по расписанию и получит сразу все необработанные сообщения, предназначенные для него. Именно поэтому при разработке в параметре MessageAgentContext содержится коллекция сообщений, которую должен обработать агент.

Для такого режима нужно создать настройку запуска по расписанию и добавить в неё конфигурацию агента по JMS-сообщениям.

Если сообщения не проходят фильтрацию, то метод agentRun агента всё-равно запускается, но в параметрах у него не будет сообщений.

Создание настроек для запуска агента

Вся настройка выполняется в интерфейсе управления МА. Предполагается, что приложения, пулы серверов, группы параметров и JMS-очереди уже настроены.

  1. Создание дескриптора агента

    В представлении "Агенты" создать дескриптор самостоятельного агента или агента по JMS-сообщениям. Указываются приложение, название (произвольная строка) и идентификатор (указанное в аннотации ManagedAgent значение) агента, параметры агента (набор имён параметров, которые передаются агенту при вызове, и их значений по-умолчанию).
    Для самостоятельного агента должны быть указаны режим конкуренции и политика наложения расписания (см. выше описание параллельного запуска самостоятельных агентов).
    Для агента по сигналам - очередь данных по-умолчанию, фильтр JMS-сообщений (см. описание фильтрации сигналов), режим конкуренции (см. описание параллельного запуска агентов по сигналам).

  2. Создание настройки запуска

    В представлении "Конфигурации агентов" и "Настройки запуска" создать настройку запуска ASAP или по расписанию на одном / нескольких серверах.
    В одной настройке могут быть указаны агенты только с одинаковым режимом конкуренции: для запуска на одном / нескольких серверах соответственно будут доступны агенты в режиме конкуренции ANY / MANY.
    Настройка запуска ASAP позволяет выбрать только агенты по сигналам, по расписанию могут работать агенты обоих типов.
    Во всех настройках запуска указываются название, описание, приложение, пул серверов, группа параметров, максимальное количество ошибок (см. описание выше) и список агентов.
    Агенты в одной настройке запуска выполняются строго последовательно. Выбор агентов ограничивается приложением и типом настройки запуска (как указано выше). Для всех агентов должны быть заданы сам агент (выбором из списка), его порядок запуска, параметры запуска (если какого-то нет в списке, то используется параметр по-умолчанию из дескриптора агента). Для агентов по сигналам дополнительно указываются очередь данных (если нет - используется указанная по-умолчанию в дескрипторе) и доп. фильтр (обычно настраивается системным администраторам для фильтрации по хосту). В настройках ASAP для агентов доступна настройка количества потоков (см. описание параллельного запуска агентов по сигналам).
    Для настроек запуска по расписанию задаётся расписание запуска.

Подключение КМА в приложение


Для подключения библиотеки КМА (agent-manager-client) в конечное приложение необходимо подключить несколько зависимостей, состав которых определяется спецификой приложения. 
КМА предоставляет следующий набор зависимостей:

  1. Модуль с основным API для клиентских приложений, содержащий API для агентов, а также для настройки основного компонента КМА - ru.intertrust.am.client.api.executor.AgentExecutor, используемого для управления агентами. Модуль обеспечивает возможность подключения различных конвертеров JMS-сообщений и плагинов обработки JMS-сообщений агентами, событий жизненного цикла агентов (запуск/остановка/окончание работы/исключение в работе) и КМА (запуск/остановка).
    Maven-зависимость:

    		<dependency>
    			<groupId>ru.intertrust.am</groupId>
    			<artifactId>agent-manager-client-api</artifactId>
    			<version>${amc.version}</version>
    		</dependency>

    Для настройки AgentExecutor используется класс ru.intertrust.am.client.api.executor.AgentExecutorOptions, предоставляющий возможности построения объекта с настройками для КМА в fluent-стиле. Через данный класс конечное приложение должно подключать свои плагины/конвертеры JMS-сообщений.

  2. Модуль SPI, обеспечивающий возможность переопределения следующих фабрик, используемых КМА для работы:
    1. Фабрика потоков - ru.intertrust.am.client.spi.factory.ExecutorServiceFactory. Дефолтная реализация - ru.intertrust.am.client.activator.services.std_impl.DefaultExecutorServiceFactory, основана на java.util.concurrent.Executors.newFixedThreadPool(int).
    2. Фабрика агентов - ru.intertrust.am.client.spi.factory.AgentFactory. Дефолтная реализация - ru.intertrust.am.client.activator.services.std_impl.DefaultAgentFactory, основана на получении агентов по qualified-имени класса.
    3. Фабрика свойств, используемая для получения различных свойств, требуемых для настройки КМА - ru.intertrust.am.client.spi.factory.PropertiesSource. Дефолтная реализация - ru.intertrust.am.client.activator.services.std_impl.SystemPropertiesSource, основана на системных свойствах приложения.

    Механизм переопределения основан на Java Service Discovery. (предупреждение) Зависимость необходимо подключать только в случае, если приложению необходимо переопределение фабрик своими кастомными реализациями.

    		<dependency>
    			<groupId>ru.intertrust.am</groupId>
    			<artifactId>agent-manager-client-spi</artifactId>
    			<version>${amc.version}</version>
    		</dependency>
  3. Модуль c реализацией SPI для Spring-приложений. Содержит реализации фабрик из модуля SPI (перечислены выше) на основе Spring, а также реализацию ru.intertrust.am.client.api.executor.AgentExecutor ru.intertrust.am.client.spring.SpringAgentExecutorBean, который можно настроить на автозапуск (бин на основе org.springframework.context.SmartLifecycle). Приложение, построенное на основе Spring, может подключить данную зависимость для упрощения конфигурирования КМА, Бин может быть сконфигурирован как через Java-конфигурацию, так и через XML-конфигурацию. Пример конфигурирования через Java-конфиг с указанием кастомных конвертеров JMS-сообщений и плагина обработки JMS-ChLog агентами: 

    	@Bean
        public AgentExecutor agentExecutorBean() {
    
            final AgentExecutorOptions options = new AgentExecutorOptions().withConverter(ChangeLogMessage.class,
                    new ChangeLogMessageConverterAdapter()).withMessageProcessingListener(new ChangeLogMessageProcessingListener(..));
     
    		// autoStartEnabled - признак, должен ли КМА инициализироваться и запускаться автоматически при инициализации контекста приложения
    		// если false - КМА запустится только при явном вызове start(), а остановится - при явном вызове stop()
            final SpringAgentExecutorBean executor = new SpringAgentExecutorBean(this.autoStartEnabled, options); 
     
    		return executor;
        }

    Для конфигурирования КМА для Spring-приложений необходимо подключить следующие зависимости:

    		<dependency>
    			<groupId>ru.intertrust.am</groupId>
    			<artifactId>agent-manager-client-spring</artifactId>
    			<version>${amc.version}</version>
    		</dependency>
     
    		<dependency>
    			<groupId>ru.intertrust.am</groupId>
    			<artifactId>agent-manager-client-activator</artifactId>
    			<version>${amc.version}</version>
    			<scope>runtime</scope>
    		</dependency>

    Подключения этих зависимостей и определения бина ru.intertrust.am.client.spring.SpringAgentExecutorBean достаточно для конфигурирования и активации КМА. Spring не входит в транзитивные зависимости agent-manager-client-spring.

  4. Если приложение не использует Spring, то в нем управление КМА может быть реализовано явным инстанцированием дефолтной реализации AgentExecutor ru.intertrust.am.client.activator.executor.DefaultAgentExecutor. Конфигурируется она также через AgentExecutorOptions, для запуска необходимо явно вызывать start(), а для остановки - stop(). В таком случае, необходимо подключение следующей зависимости:

    		<dependency>
    			<groupId>ru.intertrust.am</groupId>
    			<artifactId>agent-manager-client-activator</artifactId>
    			<version>${amc.version}</version>
    		</dependency>
  5. Если приложение работает на основе CDI (JavaEE-приложения), то приложение может подключить зависимость на модуль с реализацией SPI для Java EE:

    		<dependency>
    			<groupId>ru.intertrust.am</groupId>
    			<artifactId>agent-manager-client-jee</artifactId>
    			<version>${amc.version}</version>
    		</dependency>

    В реализации SPI отсутствует реализация фабрики для свойств, т.к. в Java EE отсутствует как-таковая абстракция над источником свойств, т.е. будет использоваться дефолтная реализация на основе системных свойств. Для конфигурирования AgentExecutor для такого приложения необходимо подключить зависимость: 

    		<dependency>
    			<groupId>ru.intertrust.am</groupId>
    			<artifactId>agent-manager-client-activator</artifactId>
    			<version>${amc.version}</version>
    		</dependency>

    и далее сконфигурировать дефолтную реализацию DefaultAgentExecutor через AgentExecutorOptions. В дальнейшем будет добавлена реализация AgentExecutor для JEE, облегчающая конфигурирование КМА в JEE-приложениях.

Для получения более подробной информации о конфигурировании КМА необходимо обращаться к Java-doc к основным интерфейсам и классам, описанным в данном разделе.