#java #jms #apache-kafka-connect #weblogic12c
#java #jms #apache-kafka-connect #weblogic12c
Вопрос:
У меня есть вариант использования, в котором мне нужно прослушивать доступные элементы единой распределенной очереди, размещенной в weblogic 12c. Я прочитал и обнаружил, что интерфейс DestinationAvailabilityListener имеет методы, которые могут удовлетворить потребности. Ниже приведен мой код:
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.naming.Context;
import org.apache.kafka.connect.connector.ConnectorContext;
import weblogic.jms.extensions.DestinationAvailabilityListener;
import weblogic.jms.extensions.DestinationDetail;
import weblogic.jms.extensions.JMSDestinationAvailabilityHelper;
import weblogic.jms.extensions.RegistrationHandle;
public class QueueMonitor extends Thread implements DestinationAvailabilityListener,WebLogicJmsTask {
private Hashtable<String, String> wlsEnvParamHashTbl = null;
private final Object containerLock = new Object();
private final CountDownLatch startLatch ;
private RegistrationHandle registrationHandle;
private ArrayList<String> containerMap;
boolean shutdown =false,changeflg=false;
private final ConnectorContext context;
public QueueMonitor(Map<String, String> props,ConnectorContext context) {
super();
wlsEnvParamHashTbl = new Hashtable<String, String>();
wlsEnvParamHashTbl.put(Context.PROVIDER_URL, props.get(WEBLOGIC_T3_URL_DESTINATION_CONFIG)); // set Weblogic JMS URL
wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); // set Weblogic JNDI
wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, props.get(WEBLOGIC_USERNAME_CONFIG)); // set Weblogic UserName
wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, props.get(WEBLOGIC_PASSWORD_CONFIG)); // set Weblogic PassWord
for (Map.Entry<String,String> entry : wlsEnvParamHashTbl.entrySet())
System.out.println("Key = " entry.getKey()
", Value = " entry.getValue());
System.out.println(props.get(WEBLOGIC_JMS_DESTINATION_CONFIG));
this.context=context;
this.startLatch = new CountDownLatch(1);
JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
try {this.registrationHandle= dah.register(wlsEnvParamHashTbl, props.get(WEBLOGIC_JMS_DESTINATION_CONFIG), this);
startLatch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch(Exception e)
{
System.out.println(e);
}
}
public void run()
{
while(!shutdown)
{
if(changeflg)
{
context.requestTaskReconfiguration();
changeflg=false;
}
}
if(shutdown)
{
System.out.println("Called shutdown, returning");
return;
}
}
public synchronized String getContainers()
{
String list=null;
synchronized (containerLock) {
list=String.join(",", containerMap);
}
return list;
}
@Override
public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
synchronized (containerLock) {
System.out.println("destJNDIName is :" destJNDIName);
// For all Physical destinations, start a container
for (DestinationDetail detail : physicalAvailableMembers) {
System.out.println("member is :" detail.getJNDIName());
containerMap.add(detail.getJNDIName());
//containerMap.put(detail.getJNDIName(), detail.getJNDIName());
}
if(startLatch.getCount()==0)
{
changeflg=true;
}
}
startLatch.countDown();
}
@Override
public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
// TODO Auto-generated method stub
synchronized (containerLock) {
// Shutdown all containers whose physical members are no longer available
for (DestinationDetail detail : physicalUnavailableMembers) {
containerMap.remove(detail.getJNDIName());
// maybe i will need to do somethinh here
}
changeflg=true;
}
}
@Override
public void onFailure(String destJndiName, Exception exception) {
// Looks like a cluster wide failure
System.out.println("inside on failure");
shutdown();
System.out.println(exception);
}
public void shutdown() {
// Unregister for events about destination availability
registrationHandle.unregister();
// Shut down containers
synchronized (containerLock) {
containerMap.removeAll(containerMap);
}
shutdown=true;
}
}
При запуске происходит сбой со следующей ошибкой:
<Error> <Kernel> <WL-000802> <ExecuteRequest failed
java.lang.NullPointerException.
at com.bt.connect.QueueMonitor.shutdown(QueueMonitor.java:128)
at com.bt.connect.QueueMonitor.onFailure(QueueMonitor.java:118)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper$3.run(JMSDestinationAvailabilityHelper.java:490)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:451)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onFailure(JMSDestinationAvailabilityHelper.java:487)
at weblogic.jms.common.CDS$DD2Listener.reportException(CDS.java:1145)
at weblogic.jms.common.CDS.lookupDDAndCalloutListenerSingle(CDS.java:459)
at weblogic.jms.common.CDS.lookupDDAndCalloutListener(CDS.java:412)
at weblogic.jms.common.CDS.access$400(CDS.java:52)
at weblogic.jms.common.CDS$DDListenerRegistrationTimerListener.timerExpired(CDS.java:255)
at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:301)
at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)
at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:147)
at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:119)
я использую java 8. Кто-нибудь может мне помочь? Заранее спасибо.
Ответ №1:
я решил проблемы в коде ранее, поделившись тем же для других, которым может понадобиться помощь, основной проблемой было wlthint3client.jar был необходим. Окончательный код git приведен ниже.
Код представляет собой API kafkaconnect, который считывает данные из единой распределенной очереди Weblogic и записывает данные в очередь kafka.