Java EE несколько сеансовых интерфейсов с TimerService

#java #jakarta-ee #weblogic12c

#java #jakarta-ee #weblogic12c

Вопрос:

У меня есть сеансовый компонент без состояния с TimerService. По истечении времени ожидания он начинает использовать очередь JMS. При обработке сообщения требуется доступ к внешнему ресурсу, который может быть временно недоступен. Метод тайм-аута вызывается MessageConsumer.receiveNoWait() в цикле до тех пор, пока:

  • Больше нет сообщений для обработки: он регистрирует новый таймер = сейчас 10 минут. И заканчивается.
  • во время обработки произошла ошибка: сообщение откатывается и регистрируется новый таймер: теперь 30 минут. И заканчивается.

Таким образом, я контролирую время перезапуска, и у меня нет спящих потоков благодаря обратному вызову TimerService.

Я хотел бы иметь несколько случаев этого сессионного компонента, чтобы предвидеть узкие места в очереди:

                      -----<ejb>------- 
                    | timerService    |
                    |                 |               --------------------- 
                ----| onTimeout() {}  | -----------> | external dependency |
               /    |                 |         /     ---------------------  
              /      -----------------         /
             /                                /
 ---------  /                                /
|||queue|||K                                /
 ---------                                /
                    -----<ejb>-------    /
                   | timerService    |  /
                   |                 | /
                ----| onTimeout() {}  | 
                    |                 |
                     ----------------- 
  

Мой сеансовый компонент выглядит следующим образом (упрощенный, конечно):

 @Stateless
public class MyJob {
    
    @Resource
    private TimerService timerService;

    @PostConstruct
    public void init() {
        registerNewTimer(1000L); // -> problem: timerService not accessible
        System.out.println("Initial Timer created");
    }

    private void registerNewTimer(long duration) {
        TimerConfig config = new TimerConfig();
        config.setPersistent(false);
        timerService.createSingleActionTimer(duration, config);
    }

    @Timeout
    @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
    public void execute() {
        try {
            // instantiate JMS session and consumer
            while ((message = consumer.receiveNoWait()) != null) {
                // business logic with message
                message.acknowledge();
            }
            // queue empty, let's stop for now and register a new timer   10min
            registerNewTimer(10*60*1000);
        } catch (ResourceException re) {
            // external resource unavailable, let's wait 30min
            registerNewTimer(30*60*1000);
            // last message not acknowledged, so rolled back
        }
    }
}
  

Я не хочу использовать компоненты, управляемые сообщениями, поскольку я хотел бы контролировать, когда использовать сообщения (см. Логику задержки в случае ошибок).

Проблема: ошибка в @PostConstruct аннотированном init() методе: на данный момент не разрешено использовать службу времени. Это разрешено, когда я создаю sessionbean @Singleton но тогда я теряю возможность обрабатывать очередь параллельно. У кого-нибудь есть идея, как это решить? Если TimerService не является правильным механизмом, что может быть альтернативой. Существует ли альтернатива PostConstruct, которая разрешает доступ к ссылочным ресурсам и вызывается только один раз после создания экземпляра?

Заранее спасибо за любую конструктивную информацию.

Ответ №1:

Я сам нашел решение, поэтому размещаю его здесь, чтобы оно могло помочь другим.

Я добавил новый одноэлементный компонент Startup bean, который содержит список всех компонентов, реализующих интерфейс MyJobExecutor, благодаря CDI. В среде JEE CDI хорошо работает с EJBS и поэтому внедряет сеансовые компоненты! Предупреждение: CDI будет вводить только классы, которые непосредственно реализуют MyJobExecutor, поэтому это не сработает, если у вас есть абстрактный класс, реализующий MyJobExecutor, и конкретный компонент, расширяющийся из этого абстрактного класса. Это должно быть явно implements MyJobExecutor . В postconstruct класса startup я вызываю каждый компонент MyJobExecutor, чтобы зарегистрировать новый таймер в его TimerService. Таким образом, я могу создать первый singleActionTimer (ы) для каждого сессионного компонента.

 public interface MyJobExecutor {
    // how many timers to register for this bean meaning how many parallel processes.
    public int nrOfParallellInstances();
}

import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

@Singleton
@Startup
public class MyJobStartup {

    @Inject
    @Any
    private Instance<MyJobExecutor> myJobs;
    
    @PostConstruct
    public void startIt() {
        for (MyJobExecutor ajob : myJobs) {
            for(int i=0; i<ajob.nrOfParallellInstances(); i  )
                // register every instance with a 1 second delay between each timeout
                ajob.registerNewTimer(1000L*(i 1));
        }
    }
}
  

Внутренний цикл зарегистрирует несколько таймеров для одного и того же компонента на основе ajob.nrOfParallellInstances() значения.
Однако служба времени не будет запускать тайм-аут, когда предыдущий тайм-аут все еще выполняется. Это блокирует параллельную обработку, как и ожидалось:(

Чтобы решить эту проблему, я адаптировал метод тайм-аута не для выполнения самой бизнес-логики, а для запуска управляемого потока, который выполняет бизнес-логику. Таким образом, onTimeout метод быстро завершается, и служба таймера запускает следующий тайм-аут, что приводит к нескольким параллельным исполнениям (каждое в управляемом потоке):

 import javax.enterprise.concurrent.ManagedThreadFactory;
import javax.naming.InitialContext;

@Resource(name = "concurrent/AsyncJobThreadFactory")
private ManagedThreadFactory threadFactory;

@Timeout
private void onTimeout(Timer timer) {
    LOG.info("Timeout triggered for "   timer.getInfo());
    Thread thread = threadFactory.newThread(new AsyncExecRunnable((String) timer.getInfo()));
    if (thread != null) {
        thread.start();
    } else {
        LOG.warn("No thread available for this job. Retry in 5 minutes.");
        registerNewTimer(1000L * 60 * 5);
    }

}
private static class AsyncExecRunnable implements Runnable {

    private String extra info;
    
    public AsyncExecRunnable(String info) {
        this.info = info;
    }

    @Override
    public void run() {
        try {
            LOG.info("Job executor thread started for "   info);
            InitialContext ic=new InitialContext();
            // business logic. With ic you can lookup other EJBs
        } catch (Exception e) {
            LOG.error("Problem executing AsyncJobExecutor:", e);
        }
    }
}
  

С этой настройкой у меня есть:

  • автоматическая регистрация первого таймера при развертывании для всех сеансовых компонентов, которые реализуют интерфейс MyJobExecutor.
  • возможность зарегистрировать несколько таймеров с одинаковым временем, и они будут работать параллельно.
  • Используя ManagedThreadFactory, потоки также имеют доступ к контексту JNDI и могут искать EJB.