Как создавать и обрабатывать сообщения из рабочих потоков

#java #multithreading #message

#java #многопоточность #Сообщение

Вопрос:

Как создавать асинхронные сообщения из рабочих потоков в управляющий поток. Структура приведена ниже. В фрагменте Worker реализует Runnable, а ThreadManager — ExecutorService . Рабочие потоки выполняются долго и должны периодически передавать сообщения о ходе выполнения менеджеру. Одним из вариантов является использование очереди блокировки, но я этого раньше не делал.

     RunnableTester_rev2.threadManager = Executors.newFixedThreadPool( nThreadsAllowed );

    final List<Worker> my_tasks = new ArrayList<Worker>();
    for ( int i = 0; i < nThreadsToRun; i   )
    {
        // The Worker thread is created here.
        // Could pass a blocking queue to handle messages.
        my_tasks.add( new Worker( ) );
    }

    final List<Future<?>> fList = new ArrayList<Future<?>>();

    for ( final Worker task : my_tasks )
    {
        fList.add( RunnableTester_rev2.threadManager.submit( task ) );
    }

    RunnableTester_rev2.threadManager.shutdown();


    /**
     * Are we all done
     */
    boolean isDone = false;

    /**
     * Number of threads finished
     */
    int nThreadsFinished = 0;

    /**
     * Keep processing until all done
     */
    while ( !isDone )
    {
        /*
         * Are all threads completed?
         */
        isDone = true;
        int ii = 0;
        nThreadsFinished = 0;
        for ( final Future<?> k : fList )
        {
            if ( !isThreadDoneList.get( ii ) )
            {
                final boolean isThreadDone = k.isDone();
                if ( !isThreadDone )
                {
                    // Reduce printout by removing the following line
                    // System.out.printf( "%s is not donen", mywks.get( ii ).getName() );
                    isDone = false;
                }
            }
            else
            {
                nThreadsFinished  ;
            }
            ii  ;
        }

        /*
        *  WOULD LIKE TO PROCESS Worker THREAD MESSAGES HERE
        */

    }
 

Ответ №1:

Это действительно сложно. Я бы сделал что-то вроде этого.

 // no need to use a field.
ExecutorService threadManager = Executors.newFixedThreadPool( nThreadsAllowed );
List<Future<Result>> futures = new ArrayList<Future<Result>>();
for ( int i = 0; i < nThreadsToRun; i   )
    // Worker implements Callable<Result>
    futures.add(threadManager.submit(new Worker( )));
threadManager.shutdown();
threadManager.awaitTermination(1, TimeUnit.MINUTE);
for(Future<Result> future: futures) {
    Result result = future.get();
    // do something with the result
}
 

Комментарии:

1. Спасибо за информацию, но я искал промежуточные сообщения от потоков.

2. Если есть что-то, что вы хотите сделать сразу после завершения задачи, добавьте это в саму задачу. Таким образом, это может выполняться одновременно или, по крайней мере, в любом порядке. Вы должны выполнять минимум работы в одном потоке, который запускает все это. Не имеет смысла прилагать усилия для параллельного выполнения кода и одновременно выполнять только половину работы.

3. Существует необходимость в немедленной обработке результатов вскоре после завершения рабочим потоком определенного промежуточного прогресса / результатов, пока поток запущен, не дожидаясь завершения потока. Рабочие потоки выполняются в течение длительного периода времени, порядка часов или дней, а время выполнения для одного полного набора рабочих потоков порядка недель. Теперь вы можете видеть необходимость выполнения «половины работы» или обработки сообщений вне рабочих потоков, поскольку существует дополнительное управление набором промежуточных результатов потока.

Ответ №2:

Я бы использовал BlockingQueue . Вы могли бы попробовать что-то вроде этого:

 final BlockingQueue<Message> q = new LinkedBlockingQueue<Message>();
// start all the worker threads,
// making them report progress by adding Messages into the queue
for (...) {
  threadManager.submit(new Runnable() {
    // initialize the worker
    q.offer(new Message("10% completed"));
    // do half of the work
    q.offer(new Message("50% completed"));
    // do the rest of the work
    q.offer(new Message("100% completed"));
  });
}
// after starting all the workers, call a blocking retrieval method
// on the queue, waiting for the messages
while (!(everything is done)) {
  Message m = q.take();
  // process the message
}
 

Идея заключается в том, что все рабочие потоки и основной поток имеют общую очередь блокировки. Рабочие потоки являются производителями, а основной поток — потребителем.

Здесь я использую .offer(...) , но вы могли бы использовать .add(...) , и в этом случае вы получите то же самое. Разница будет заключаться в том, что первый вернет false, если очередь заполнена, в то время как последний выдаст исключение. Этого не может произойти здесь, поскольку я создал экземпляр неограниченной очереди (очевидно, вы могли бы получить OutOfMemoryError). Однако, если вы используете .put(...) , вам придется иметь дело с непроверенным InterruptedException (что в данном случае бесполезно: оно не может быть выброшено, потому что очередь никогда не заполняется — она неограниченна -, поэтому метод никогда не блокируется).

Очевидно, что вы можете улучшить его, преобразовав вызовы q.offer(...) в reportStatus(...) метод или что-то в этом роде.


Может быть, вы хотите чего-то другого, чем то, что я предложил выше. Может быть, вы хотите, чтобы основной поток получал асинхронные сообщения только тогда, когда каждый рабочий поток заканчивает свою работу.

Мое решение аналогично решению, предложенному Питером Лоури, с той разницей, что с помощью этого решения управляющий поток получит уведомление, как только завершится любой из рабочих потоков (в то время как с решением Питера результат из рабочих потоков будет получен по порядку — так, если первый рабочий поток завершит работу).запуск потока занимает целую вечность, даже если все остальные рабочие потоки завершат свою работу, вы об этом не узнаете). В зависимости от того, что вы делаете, вы можете повысить производительность.

Обратите внимание, что с помощью этого решения вы не можете определить, какой поток завершил свою работу (для этого ваш Worker класс должен иметь метод like .getJobId() или что-то в этом роде), в то время как с решением Питера вы будете точно знать, какой поток завершил свою работу (потому что вы выполняете итерацию в futures списке в том же порядке, в котором вы выполняете итерацию).создал экземпляр и запустил рабочие потоки).

Если это так, вы можете использовать ExecutorCompletionService . Он инкапсулирует ExecutorService и предоставляет .take() метод (блокирующий поиск), который вернет Future s из службы исполнителя сразу после их завершения:

 ExecutorCompletionService<Worker> ecs = new ExecutorCompletionService(executorService);
// Worker has to be a callable
// Now you submit your tasks to the ecs:
for (...) {
  ecs.submit(new Worker(...));
}
// Now you block on .take():
while (!(everything is done)) {
  Future<Worker> f = ecs.take();
  // f.get(), for example, to check the results
}
 

CompletionService Гарантирует, что, как только вы возьмете a Future , вызов его .get() метода завершится немедленно: либо путем возврата результата, либо путем создания ExecutionException, инкапсулируя исключение, вызванное Callable .