Java: шаблон Publisher-Observer: высокая загрузка ЦП

#java

#java

Вопрос:

У меня есть поток ServerSocket, который принимает соединение и запускает обработчик сокета. Эта часть, похоже, работает нормально, без утечек памяти или высокой загрузки процессора. Я добавил поток Publisher и поток observer, и теперь моя Java-программа сообщает о высокой загрузке процессора.

Subject.java:

 public interface Subject {
public void attach(Observer o);
public void detach(Observer o);
public void notifyUpdate(Message m);
 

}

MessagePublisher.java:

 public class MessagePublisher extends Thread implements Subject{
private List<Observer> observers = new ArrayList<>();
private boolean readyToPublish = false;
private ConcurrentLinkedQueue<Message> msgHolder;
public MessagePublisher(ConcurrentLinkedQueue<Message> _queue){
    this.msgHolder = _queue;
}


@Override
public void attach(Observer o) {
    observers.add(o);
}

@Override
public void detach(Observer o) {
    observers.remove(o);
}

@Override
public void notifyUpdate(Message m) {
    for(Observer o: observers) {
        o.update(m);
    }
}

public void run(){
    this.readyToPublish = true;
    while (readyToPublish)
        {
            try
            {                    
                Message _m = (Message)this.msgHolder.poll();
                if(!_m.equals(null)){
                    System.out.println("Polled message: "   _m.getMessage());
                    System.out.println("Number of subscribers: "   observers.size());
                    notifyUpdate(_m);
                }                    
                
            }
            catch(Exception j) { }
            try { sleep(9); }
            catch(Exception e) { }
        }
        EndWork();
}
public void EndWork(){
    this.readyToPublish = false;
    this.observers.clear();
    
}
 

}

Main.java:

 public static void main(String[] args) {
    // TODO code application logic here
    ConcurrentLinkedQueue<Message> msgHolder = new ConcurrentLinkedQueue<Message>();
    ServerSocketThread _socketThread = new ServerSocketThread(msgHolder);
    _socketThread.start();
    
    MessagePublisher _publisher = new MessagePublisher(msgHolder);
    _publisher.start();
    
    UIServerSocketThread _uiSocketThread = new UIServerSocketThread(_publisher);
    _uiSocketThread.start();        
    
}
 

UIServerSocketThread.java:

 public class UIServerSocketThread extends Thread{
private ServerSocket        objServerSocket; 
private Socket              objSocket;
private int                 iPort = 21001;
private FileHandler         obj_LogFileHandler;
private Logger              obj_Logger;

private int file_size =   8000000;
private int numLogFiles  = 20;

private UIClientSocketThread  objCltSocket;
private MessagePublisher    objPublisher;
private boolean             running = false;
public UIServerSocketThread(MessagePublisher _publisher){
    
    this.running = true;
    try {            
        this.obj_LogFileHandler = new FileHandler("uiserver.log.%g", file_size, numLogFiles);
        this.obj_LogFileHandler.setFormatter(new SimpleFormatter());
    }
    
    catch ( IOException obj_Exception ) {
        this.obj_LogFileHandler = null;
    }
    catch ( SecurityException obj_Exception ) {
        this.obj_LogFileHandler = null;
    }

    this.obj_Logger = null;

    if ( this.obj_LogFileHandler != null ) {
        this.obj_Logger = Logger.getLogger("eti.logger.uiserver");
        this.obj_Logger.addHandler(this.obj_LogFileHandler);
    }
    try {
    this.objServerSocket = new ServerSocket(this.iPort);
    } catch(IOException i){
        //i.printStackTrace();
        this.obj_Logger.info(i.getMessage());
    }
    this.objPublisher = _publisher;
}

public void run() {        
    StringBuffer        sMsg;
    
    sMsg = new StringBuffer();        
    
    while ( this.running ) {
        try {
            this.objSocket = this.objServerSocket.accept();
        } catch(Exception e){
            sMsg.append("Error accepting ui socket connectionn");
            sMsg.append(e.getMessage());                
            this.obj_Logger.info(sMsg.toString());
        }
        
        try {
                
                this.objCltSocket = new UIClientSocketThread(this.objSocket, this.obj_Logger);
                if(!this.objPublisher.equals(null)){
                    this.obj_Logger.info("Attacing clientObserver");
                    this.objPublisher.attach(this.objCltSocket);
                }
                
                this.objCltSocket.start();
            
        } catch(Exception r) {
            sMsg.append("Error n");
            sMsg.append(r.getMessage());                
            this.obj_Logger.info(sMsg.toString());
        }   
        
    }
    this.objPublisher.EndWork();
    stopServerSocketThread();
        
} // end run
public void stopServerSocketThread() {
     try {
         this.running = false;
        this.objServerSocket.close();
        this.objServerSocket = null;
        this.obj_Logger.info("Server NOT ACCEPTING Connections!!");
    } catch(Exception e ) {            
        this.obj_Logger.info(e.getMessage());
    }
}
 

}

Я не уверен, в чем проблема. Любая помощь приветствуется.

Комментарии:

1. Потоки, как правило, кэшируют локальные переменные. В вашем случае наблюдатели и readyToPublish должны быть объявлены изменчивыми, чтобы любое изменение было немедленно воспринято. И это может быть в каком-то случае цикл все еще выполняется, пока readyToPublish имеет значение false …

Ответ №1:

В вашем приложении есть два бесконечных цикла. Один находится в классе MessagePublisher с функцией run() . И еще один находится в классе UIServerSocketThread . Это приведет к высокой загрузке процессора.

Комментарии:

1. Я предполагал, что опрос в моем MessagePublisher блокировался. Я ошибаюсь в своем предположении. Это неблокирующий вызов.