Прерывания, ограниченные областью, они существуют?

#java #networking #concurrency #locking #css-selectors

#java — язык #сеть #параллелизм #блокировка #css-селекторы #java

Вопрос:

Мне требуется, чтобы какая-то система предоставляла мне прерывания с ограниченной областью действия, такие, что прерывание потока отключается, когда Thread.sleep (..) не выполняется. Обычно я бы использовал защищенные блоки, которые работали бы нормально, но есть несколько причин, по которым я не могу в этом случае, главным образом потому, что конфликт мешает всей работе должным образом. Симуляция просто зависает через несколько секунд. Прерывания с ограничением заменят любой механизм блокировки и предотвратят «утечку» прерывания за пределы метода, на котором тестируется прерывание. В этом случае это было бы Thread.sleep(...) . Однако мне интересно, запрашивает ли это что-то, что принципиально невозможно в Java. Возможно, защищенный механизм блокировки — лучшее, что можно сделать.

Я создал свою собственную реализацию селектора для использования в сетевом симуляторе (селектор взаимодействует с обратными вызовами симулятора и тому подобным для оптимизации), и в основном я использую CountDownLatch в данный момент. Это хорошо в некоторых ситуациях, в то время как в других это создает еще худшую блокировку, при этом не замораживая всю симуляцию как таковую (однако, это настолько медленно, что невозможно выполнить). Зависит от используемой сетевой структуры относительно того, какие части кода попадают под удар больше, чем другие. Я также думаю, что в «хороших» симуляциях проблема может возникать редко, вызывая ожидания в 13 мс и так далее, пока выполняется внутренний обратный отсчет или что-то еще, что происходит внутри. магия.

Если вы сможете решить проблему без каких-либо из приведенных выше предложений, это было бы потрясающе 🙂 В любом случае, мой код приведен ниже:

 /**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: "   socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: "   closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() amp;amp; !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() amp; SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() amp; SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() amp; SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() amp;amp; !cs.isConnecting() amp;amp; !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() amp; SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: "   ski   ", connectionless: "   ski.channel.socket().isConnectionless()   ", listening: "   ski.channel.socket().isListening()   ", hasPendingConn: "   (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() amp;amp; ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            wakeup = false;
            selecting = true;
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup amp;amp; isOpen() amp;amp; selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms   timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {
                            wakeup();
                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                    processDeregisterQueue();
                }
            } finally {
                selecting = false;
                wakeup = false;
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() amp;amp; !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (selectedKeys) {
            wakeup = true;
            selectedKeys.notifyAll();
        }
        return this;
    }
}
  

Извините, что не разместил SCEE в этом случае, но в данном случае это немного сложно. Любые предложения были бы полезны.

Приветствия,
Крис

Ответ №1:

Прерывания с ограниченной областью не существуют.

Однако, когда sleep или wait или что-то еще завершается с помощью InterruptedException , флаг «прерванный» будет снят до возникновения исключения. Итак, если вы перехватываете исключение на границе какого-либо блока, вы как бы получаете область видимости. Аналогично, Thread.interrupted() проверяет и удаляет флаг «прерванный».


Прочитав ваш код более внимательно, я понимаю, что вы (на самом деле) вообще не используете прерывания. Скорее вы используете notify и notify только пробуждаете поток, который выполняет wait .

Комментарии:

1. Проблема заключается в детализации области, когда дело доходит до прерывания — оно продолжает просачиваться в родительский try / catch, потому что невозможно определить, нахожусь ли я внутри прерываемого кода или нет в пределах определенной области. Как я уже сказал, мне нужно избегать механизма защищенных блоков, потому что в этом случае он не работает. Это настоящий удар головой. Может сработать что-то, связанное с LockSupport.park (..), что является основой конструкций синхронизации.

2. У меня возникли проблемы с визуализацией того, что вы предлагаете, хотя это звучит неплохо 🙂 Мне интересно, действительно ли флаг interrupted() может помочь предотвратить утечку прерывания в родительский перехватчик исключений.

3. @Chris — Посмотрите на javadocs для методов, которые выдают InterruptedException

4. @Chris — Я не говорю, что это реальное ограничение: это не так. Скорее, я говорю, что это настолько близко, насколько вы, вероятно, сможете получить.