Получение ошибки при реализации DestinationAvailabilityListener в java

#java #jms #apache-kafka-connect #weblogic12c

#java #jms #apache-kafka-connect #weblogic12c

Вопрос:

У меня есть вариант использования, в котором мне нужно прослушивать доступные элементы единой распределенной очереди, размещенной в weblogic 12c. Я прочитал и обнаружил, что интерфейс DestinationAvailabilityListener имеет методы, которые могут удовлетворить потребности. Ниже приведен мой код:

 import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import javax.naming.Context;

import org.apache.kafka.connect.connector.ConnectorContext;

import weblogic.jms.extensions.DestinationAvailabilityListener;
import weblogic.jms.extensions.DestinationDetail;
import weblogic.jms.extensions.JMSDestinationAvailabilityHelper;
import weblogic.jms.extensions.RegistrationHandle;

public class QueueMonitor extends Thread implements DestinationAvailabilityListener,WebLogicJmsTask {
        private Hashtable<String, String> wlsEnvParamHashTbl = null;
        private final Object containerLock = new Object();
        private final  CountDownLatch startLatch ;
        private RegistrationHandle registrationHandle;
        private ArrayList<String> containerMap;
        boolean shutdown =false,changeflg=false;
        private final ConnectorContext context;
        
    public QueueMonitor(Map<String, String> props,ConnectorContext context) {
            super();
            wlsEnvParamHashTbl = new Hashtable<String, String>();
            wlsEnvParamHashTbl.put(Context.PROVIDER_URL, props.get(WEBLOGIC_T3_URL_DESTINATION_CONFIG)); // set Weblogic JMS URL
            wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); // set Weblogic JNDI
            wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, props.get(WEBLOGIC_USERNAME_CONFIG)); // set Weblogic UserName
            wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, props.get(WEBLOGIC_PASSWORD_CONFIG)); // set Weblogic PassWord
            for (Map.Entry<String,String> entry : wlsEnvParamHashTbl.entrySet())  
                System.out.println("Key = "   entry.getKey()   
                                 ", Value = "   entry.getValue()); 
        System.out.println(props.get(WEBLOGIC_JMS_DESTINATION_CONFIG));
            this.context=context;
            this.startLatch = new CountDownLatch(1);
            JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
            try {this.registrationHandle= dah.register(wlsEnvParamHashTbl, props.get(WEBLOGIC_JMS_DESTINATION_CONFIG), this);
            
                startLatch.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch(Exception e)
            {
                System.out.println(e);
            }
        }

    public void run()
    {
        while(!shutdown)
        {
            if(changeflg)
            {
                context.requestTaskReconfiguration();
                changeflg=false;
            }
            
        }
        if(shutdown)
        {
            System.out.println("Called shutdown, returning");
            return;
        }
        
    }
    
    public synchronized String getContainers()
    {
        String list=null;
        synchronized (containerLock) {
            list=String.join(",", containerMap);
        }
        return list;
    }

      @Override
         public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
            synchronized (containerLock) {
                System.out.println("destJNDIName is :" destJNDIName);
              // For all Physical destinations, start a container
              for (DestinationDetail detail : physicalAvailableMembers) {
                  System.out.println("member is :" detail.getJNDIName());
                  containerMap.add(detail.getJNDIName());
                //containerMap.put(detail.getJNDIName(), detail.getJNDIName());
              }
              if(startLatch.getCount()==0)
              {
                  changeflg=true;
              }
            }
            startLatch.countDown();       
          }

        @Override
        public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
            // TODO Auto-generated method stub
            synchronized (containerLock) {
                  // Shutdown all containers whose physical members are no longer available
                  for (DestinationDetail detail : physicalUnavailableMembers) {
                    containerMap.remove(detail.getJNDIName());
                    // maybe i will need to do somethinh here
                  }
                  changeflg=true;
                }
            
        }

        @Override
          public void onFailure(String destJndiName, Exception exception) {
            // Looks like a cluster wide failure
             System.out.println("inside on failure");
            shutdown();
            System.out.println(exception);
            
          }
        public void shutdown() {
            // Unregister for events about destination availability
              registrationHandle.unregister();

            // Shut down containers
            synchronized (containerLock) {
                containerMap.removeAll(containerMap);
            }
            
            shutdown=true;
          }
}
 

При запуске происходит сбой со следующей ошибкой:

 <Error> <Kernel> <WL-000802> <ExecuteRequest failed
 java.lang.NullPointerException.
        at com.bt.connect.QueueMonitor.shutdown(QueueMonitor.java:128)
        at com.bt.connect.QueueMonitor.onFailure(QueueMonitor.java:118)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper$3.run(JMSDestinationAvailabilityHelper.java:490)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:451)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onFailure(JMSDestinationAvailabilityHelper.java:487)
        at weblogic.jms.common.CDS$DD2Listener.reportException(CDS.java:1145)
        at weblogic.jms.common.CDS.lookupDDAndCalloutListenerSingle(CDS.java:459)
        at weblogic.jms.common.CDS.lookupDDAndCalloutListener(CDS.java:412)
        at weblogic.jms.common.CDS.access$400(CDS.java:52)
        at weblogic.jms.common.CDS$DDListenerRegistrationTimerListener.timerExpired(CDS.java:255)
        at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:301)
        at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)
        at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:147)
        at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:119)
 

я использую java 8. Кто-нибудь может мне помочь? Заранее спасибо.

Ответ №1:

я решил проблемы в коде ранее, поделившись тем же для других, которым может понадобиться помощь, основной проблемой было wlthint3client.jar был необходим. Окончательный код git приведен ниже.

MyJMSKafkaConnect

Код представляет собой API kafkaconnect, который считывает данные из единой распределенной очереди Weblogic и записывает данные в очередь kafka.