#java #multithreading #threadpool #executorservice #worker
#java #многопоточность #пул потоков #executorservice #рабочий
Вопрос:
В настоящее время я реализую контейнер данных / структуру данных, которая имеет addRecord (окончательную запись Redcord) и метод close().
public class Container{
Container() {
}
public void addRecord(final Record record){
// do something with the record
// add the modified it to the container
}
public void close(){
}
}
Поскольку для сохранения записи в контейнере необходимо выполнить несколько шагов, я хочу передать эти шаги потокам, прежде чем окончательно добавлять их в контейнер.
Моя проблема в том, что я не хочу, чтобы какой-либо экземпляр контейнера использовал больше, чем, скажем, 4 потока, и что, если у меня одновременно открыто несколько экземпляров контейнера, в общей сложности должно использоваться не более, например, 12 потоков. В дополнение к этому я бы хотел, чтобы после создания 4-го контейнера каждый из остальных 3 открытых контейнеров потерял один из своих потоков, так что все 4 контейнера могут использовать по 3 потока каждый.
Я изучал ThreadPools и ExecutorServices. Хотя настройка пула потоков размером 12 проста, трудно ограничить количество используемых потоков до 4 при использовании ExecutorServices. Напомним, мне нужно не более 12 потоков, поэтому у меня есть статический экземпляр размером 12, который я разделяю между экземплярами контейнера.
Это то, с чего я начал
public class Container{
public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);
final List<Future<Entry>> _fRecords = new LinkedList<>();
Container() {
}
public void addRecord(final Record record){
_fRecords.add(SERVICE.submit(new Callable<Entry>{
@Override
public Entry call() throws Exception {
return record.createEntry();
}
}));
}
public void close() throws Exception {
for(final Future<Entry> f) {
f.get(); // and add the entry to the container
}
}
}
Очевидно, что это не гарантирует, что в одном экземпляре используется не более 4 потоков. Кроме того, я не понимаю, как эти Executorservices можно принудительно отключить рабочих или переназначить их, если для другого контейнера требуется пара потоков.
Вопросы:
-
Поскольку сам Redord является интерфейсом, а время выполнения createEntry() зависит от фактической реализации, я хочу убедиться, что разные контейнеры не используют одни и те же потоки / рабочие, т. Е. Я не хочу, чтобы какой-либо поток выполнял работу для двух разных контейнеров. Идея, лежащая в основе этого, заключается в том, что я не хочу, чтобы экземпляр контейнера, который обрабатывает только «длительные» записи, замедлялся контейнерами, которые обрабатывают только «короткие текущие записи». Если это концептуально не имеет смысла (вопрос к вам), не было бы необходимости иметь изолированные потоки / рабочие для разных контейнеров.
-
Является ли реализация моего собственного ExecutorService правильным способом? Может кто-нибудь указать мне на проект, который имеет аналогичную проблему / решение? В противном случае, я думаю, мне пришлось бы реализовать свой собственный ExecutorService и поделиться им с моими контейнерами, или есть лучшее решение?
-
В настоящее время я собираю все фьючерсы в методе close, поскольку контейнер должен хранить записи / записи в том порядке, в котором они поступили. Очевидно, что для этого требуется много времени, и поэтому я хотел бы сделать это, как только будущее будет завершено. Имеет ли смысл иметь дополнительного работника, обрабатывающего только фьючерсы, или лучше, чтобы мой работник выполнял эту работу, то есть взаимодействовал с потоком.
Ответ №1:
Я не думаю, что среда выполнения Java предоставляет что-то, что отвечает вашим потребностям из коробки.
Я написал свой собственный класс для удовлетворения таких потребностей (общий пул ограничение для данной очереди).
public static class ThreadLimitedQueue {
private final ExecutorService executorService;
private final int limit;
private final Object lock = new Object();
private final LinkedList<Runnable> pending = new LinkedList<>();
private int inProgress = 0;
public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
this.executorService = executorService;
this.limit = limit;
}
public void submit(Runnable runnable) {
final Runnable wrapped = () -> {
try {
runnable.run();
} finally {
onComplete();
}
};
synchronized (lock) {
if (inProgress < limit) {
inProgress ;
executorService.submit(wrapped);
} else {
pending.add(wrapped);
}
}
}
private void onComplete() {
synchronized (lock) {
final Runnable pending = this.pending.poll();
if (pending == null || inProgress > limit) {
inProgress--;
} else {
executorService.submit(pending);
}
}
}
}
Единственное различие в моем случае limit
является постоянным, но вы можете его изменить. Например, вы можете заменить int limit
на Supplier<Integer> limitFunction
и эта функция должна обеспечивать динамическое ограничение, например
Supplier<Integer> limitFunction = 12 / containersList.size();
Просто сделайте его более надежным (например, что делать, если список контейнеров пуст или превышает 12)
Комментарии:
1. Хотя, только что понял, что у вас есть
Callable
, поэтому приведенная выше реализация должна быть принята в дальнейшем.2. Это имеет большой смысл, однако это не гарантирует, что рабочие ExecutorService будут выделены для одного контейнера. Тем не менее, я все еще не уверен, имеет ли это требование смысл вообще.
3. Это гарантирует, что контейнер использует не более
limit
количества потоков. Кроме того, во время выполнения задачи поток не может использоваться никаким другим контейнером. Хотя, когда задача завершена, поток может быть назначен для задачи из другого контейнера. Если требование не использовать общий поток является строгим, тогда единственный вариант, который у вас есть, — это написать свой собственный пул потоков, который будет динамически добавлять потоки в зависимости отlimit
4. Точно, в любом случае я думаю, что это решение является правильной отправной точкой. Спасибо за вашу помощь