Приоритетная блокирующая очередь, обеспечивающая последовательную последовательность элементов

#java #algorithm #concurrency #priority-queue #java.util.concurrent

Вопрос:

Я получаю последовательность сообщений и хочу обработать их в последовательном порядке. Каждое сообщение имеет порядковый номер. Существует пул потоков, получающих их. Я хочу поместить их в очередь блокировки , как a PriorityBlockingQueue , и прочитать их в правильном порядке, блокируя до тех пор, пока не будет доступно следующее последовательное сообщение.

Например, учитывая этот код:

 ConsecutiveBlockingQueue<Integer> q = new ConsecutiveBlockingQueue<>();

new Thread (()->{ q.put(0); q.put(2); }).start();
new Thread (()->{ q.put(1); q.put(3); }).start();

ArrayList<Integer> ordered = new ArrayList<>(4);
for (int i = 0; i < 4; i  ) {
    ordered.add(q.take());
}
System.out.println(ordered);
 

Я хочу, чтобы он напечатал [0, 1, 2, 3]

Комментарии:

1. Да, именно это и происходит PriorityBlockingQueue . Ваш вопрос?

2. @user207421 Это не то, что делает PriorityBlockingQueue. Приоритетная блокирующая очередь блокируется только тогда, когда она пуста или заполнена. Мне нужна очередь, которая также блокируется после взятия 0 и 1, в ней есть 3, но 2 отсутствует. Если вы добавите Tread.sleep в t2 в моем примере, вы увидите, что результат не будет [0, 1, 2, 3]

3. Насколько я знаю, в JDK нет ничего, что делало бы то, что вы хотите. Хотя там может быть библиотека. Однако, когда дело доходит до этого, реализовать это самостоятельно не должно быть слишком сложно. По крайней мере, нет, если вам нужно только базовое поведение. Вы могли бы даже использовать PriorityQueue его внутренне.

Ответ №1:

Вот минимально протестированный класс, который, кажется, делает то, что я хочу. Комментарии приветствуются.

 package com.ciphercloud.sdp.common;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToIntFunction;

public class ConsecutiveBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final ToIntFunction <E> ixFunction;
    // blocking queue for consecutive items. Take operations will look into this queue
    LinkedBlockingQueue <E> bQueue = new LinkedBlockingQueue<>();

    // buffering/ordering queue for items that are out of sequence
    PriorityQueue <E> pQueue = new PriorityQueue<>();

    ReentrantLock lock = new ReentrantLock();

    private int nextIx;

    ConsecutiveBlockingQueue(ToIntFunction <E> ixFunction) {
        this(0, ixFunction);
    }

    ConsecutiveBlockingQueue(int startIx, ToIntFunction <E> ixFunction) {
        nextIx = startIx;
        this.ixFunction = ixFunction;
    }

    @Override
    public Iterator <E> iterator() {
        return bQueue.iterator();
    }

    @Override
    public int size() {
        return bQueue.size();
    }

    protected BlockingQueue <E> delegate() {
        return bQueue;
    }

    @Override
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    @Override
    public int drainTo(Collection <? super E> c) {
        return bQueue.drainTo(c);
    }

    @Override
    public int drainTo(Collection <? super E> c, int maxElements) {
        return bQueue.drainTo(c, maxElements);
    }

    @Override
    public void put(E e) {
        offer(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        offer(e);
        return true;
    }

    @Override
    public boolean offer(E e) {
        lock.lock();
        try {
            if (ixFunction.applyAsInt(e) == nextIx) {
                // offered item is the next consecutive expected one
                // put it directly into the blocking queue
                bQueue.offer(e);
                nextIx  ;

                // if there are any buffered items in the pQueue, move them
                // into the blocking queue while they follow consecutively
                while(true) {
                    E next = pQueue.peek();
                    if(next == null || ixFunction.applyAsInt(next) != nextIx) {
                        // no more items in pQueue, or next item is not consecutive
                        break;
                    }
                    pQueue.poll();
                    bQueue.offer(next);
                    nextIx  ;
                }
            } else {
                // offered item is not consecutively next. Buffer it in pQueue
                pQueue.offer(e);
            }
        } finally {
            lock.unlock();
        }

        return true;
    }


    @Override
    public E take() throws InterruptedException {
        return bQueue.take();
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return bQueue.poll(timeout, unit);
    }


    @Override
    public E poll() {
        return bQueue.poll();
    }

    @Override
    public E peek() {
        return bQueue.peek();
    }
}