#java #jakarta-ee #weblogic12c
#java #jakarta-ee #weblogic12c
Вопрос:
У меня есть сеансовый компонент без состояния с TimerService. По истечении времени ожидания он начинает использовать очередь JMS. При обработке сообщения требуется доступ к внешнему ресурсу, который может быть временно недоступен. Метод тайм-аута вызывается MessageConsumer.receiveNoWait()
в цикле до тех пор, пока:
- Больше нет сообщений для обработки: он регистрирует новый таймер = сейчас 10 минут. И заканчивается.
- во время обработки произошла ошибка: сообщение откатывается и регистрируется новый таймер: теперь 30 минут. И заканчивается.
Таким образом, я контролирую время перезапуска, и у меня нет спящих потоков благодаря обратному вызову TimerService.
Я хотел бы иметь несколько случаев этого сессионного компонента, чтобы предвидеть узкие места в очереди:
-----<ejb>-------
| timerService |
| | ---------------------
----| onTimeout() {} | -----------> | external dependency |
/ | | / ---------------------
/ ----------------- /
/ /
--------- / /
|||queue|||K /
--------- /
-----<ejb>------- /
| timerService | /
| | /
----| onTimeout() {} |
| |
-----------------
Мой сеансовый компонент выглядит следующим образом (упрощенный, конечно):
@Stateless
public class MyJob {
@Resource
private TimerService timerService;
@PostConstruct
public void init() {
registerNewTimer(1000L); // -> problem: timerService not accessible
System.out.println("Initial Timer created");
}
private void registerNewTimer(long duration) {
TimerConfig config = new TimerConfig();
config.setPersistent(false);
timerService.createSingleActionTimer(duration, config);
}
@Timeout
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public void execute() {
try {
// instantiate JMS session and consumer
while ((message = consumer.receiveNoWait()) != null) {
// business logic with message
message.acknowledge();
}
// queue empty, let's stop for now and register a new timer 10min
registerNewTimer(10*60*1000);
} catch (ResourceException re) {
// external resource unavailable, let's wait 30min
registerNewTimer(30*60*1000);
// last message not acknowledged, so rolled back
}
}
}
Я не хочу использовать компоненты, управляемые сообщениями, поскольку я хотел бы контролировать, когда использовать сообщения (см. Логику задержки в случае ошибок).
Проблема: ошибка в @PostConstruct
аннотированном init()
методе: на данный момент не разрешено использовать службу времени. Это разрешено, когда я создаю sessionbean @Singleton
но тогда я теряю возможность обрабатывать очередь параллельно. У кого-нибудь есть идея, как это решить? Если TimerService не является правильным механизмом, что может быть альтернативой. Существует ли альтернатива PostConstruct, которая разрешает доступ к ссылочным ресурсам и вызывается только один раз после создания экземпляра?
Заранее спасибо за любую конструктивную информацию.
Ответ №1:
Я сам нашел решение, поэтому размещаю его здесь, чтобы оно могло помочь другим.
Я добавил новый одноэлементный компонент Startup bean, который содержит список всех компонентов, реализующих интерфейс MyJobExecutor, благодаря CDI. В среде JEE CDI хорошо работает с EJBS и поэтому внедряет сеансовые компоненты! Предупреждение: CDI будет вводить только классы, которые непосредственно реализуют MyJobExecutor, поэтому это не сработает, если у вас есть абстрактный класс, реализующий MyJobExecutor, и конкретный компонент, расширяющийся из этого абстрактного класса. Это должно быть явно implements MyJobExecutor
. В postconstruct класса startup я вызываю каждый компонент MyJobExecutor, чтобы зарегистрировать новый таймер в его TimerService. Таким образом, я могу создать первый singleActionTimer (ы) для каждого сессионного компонента.
public interface MyJobExecutor {
// how many timers to register for this bean meaning how many parallel processes.
public int nrOfParallellInstances();
}
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
@Singleton
@Startup
public class MyJobStartup {
@Inject
@Any
private Instance<MyJobExecutor> myJobs;
@PostConstruct
public void startIt() {
for (MyJobExecutor ajob : myJobs) {
for(int i=0; i<ajob.nrOfParallellInstances(); i )
// register every instance with a 1 second delay between each timeout
ajob.registerNewTimer(1000L*(i 1));
}
}
}
Внутренний цикл зарегистрирует несколько таймеров для одного и того же компонента на основе ajob.nrOfParallellInstances()
значения.
Однако служба времени не будет запускать тайм-аут, когда предыдущий тайм-аут все еще выполняется. Это блокирует параллельную обработку, как и ожидалось:(
Чтобы решить эту проблему, я адаптировал метод тайм-аута не для выполнения самой бизнес-логики, а для запуска управляемого потока, который выполняет бизнес-логику. Таким образом, onTimeout
метод быстро завершается, и служба таймера запускает следующий тайм-аут, что приводит к нескольким параллельным исполнениям (каждое в управляемом потоке):
import javax.enterprise.concurrent.ManagedThreadFactory;
import javax.naming.InitialContext;
@Resource(name = "concurrent/AsyncJobThreadFactory")
private ManagedThreadFactory threadFactory;
@Timeout
private void onTimeout(Timer timer) {
LOG.info("Timeout triggered for " timer.getInfo());
Thread thread = threadFactory.newThread(new AsyncExecRunnable((String) timer.getInfo()));
if (thread != null) {
thread.start();
} else {
LOG.warn("No thread available for this job. Retry in 5 minutes.");
registerNewTimer(1000L * 60 * 5);
}
}
private static class AsyncExecRunnable implements Runnable {
private String extra info;
public AsyncExecRunnable(String info) {
this.info = info;
}
@Override
public void run() {
try {
LOG.info("Job executor thread started for " info);
InitialContext ic=new InitialContext();
// business logic. With ic you can lookup other EJBs
} catch (Exception e) {
LOG.error("Problem executing AsyncJobExecutor:", e);
}
}
}
С этой настройкой у меня есть:
- автоматическая регистрация первого таймера при развертывании для всех сеансовых компонентов, которые реализуют интерфейс MyJobExecutor.
- возможность зарегистрировать несколько таймеров с одинаковым временем, и они будут работать параллельно.
- Используя ManagedThreadFactory, потоки также имеют доступ к контексту JNDI и могут искать EJB.