Каков реальный вариант использования обменника?

#java #concurrency

Вопрос:

Каков был бы реалистичный вариант использования, когда java.util.Обменник был бы лучшим выбором синхронизатора?

Я видел фрагменты с сайтов GitHub и учебных пособий, но они всегда кажутся надуманными и лучше решаются с помощью очереди передачи

Ответ №1:

Exchanger и TransferQueue очень разные, в основном TransferQueue могут заполнить всю вашу память, если потребитель работает очень медленно, при Exchanger этом использует постоянную память, производя синхронизацию, когда парные потоки готовы.

Очевидно, что использование того или другого будет иметь серьезные последствия для эффективности синхронизации и рабочего процесса (но также и для детерминизма в использовании ресурсов).

Примечание.причины, по которым производитель и потребитель запрашивают синхронизацию, могут быть разными, не только из-за того, что у них полный буфер, например, это может быть связано с тем, что один из них остался ждать другого. Кроме того, с Exchange производителем/потребителем путают (потребитель может доставлять результаты производителю), в то время как с TransferQueue вами следует создавать дополнительные структуры.

В качестве примера представьте свой параллельный процесс как инфраструктуру трубопроводов, а узлы перемещают воду вверх и вниз (узлы содержат водяные насосы).

Кроме того, эффект накачки распространяется по всей инфраструктуре (как волна), думайте, что ваш параллельный процесс использует один результат потока для использования в другом потоке и так далее.

(Комментарии в коде)

 package com.computermind.sandbox.concurrent;

import lombok.AllArgsConstructor;
import lombok.SneakyThrows;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;

public class ExchangerTransferQueue {
    
    // helper for wait
    @SneakyThrows
    public static void _wait() {
        Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 5_000));
    }

    // the pumping capacity is the work a node must to do, this work
    // is moving across threads then, the `PumpingCapacity` "P1" will
    // be used by the producer but later (when water move) "P1" will be
    // used by consumer and so (the `exchange` sync move PumpingCapacity
    // instances)
    @AllArgsConstructor
    static
    class PumpingCapacity {
        String name;
        int from;
        int to;

        // do "up" work (if any)
        boolean up(String node) {
            if(from < 1) return false;
            System.out.printf("~ node %s pump up %s%n", node, name);
            from -= 1; to  = 1;
            return true;
        }
        // do "down" work (if any)
        boolean down(String node) {
            if(to < 1) return false;
            System.out.printf("~ node %s pump down %s%n", node, name);
            from  = 1; to -= 1;
            return true;
        }
    }

    // the producer have a initial PumpingCapacity and
    // create the exchange to the next node
    static class WaterPumpProducer implements Runnable {
        PumpingCapacity p;
        final Exchanger<PumpingCapacity> b = new Exchanger<>();

        WaterPumpProducer(PumpingCapacity p) {
            this.p = p;
        }

        @SneakyThrows
        @Override
        public void run() {
            // for ever
            while(true) {
                // do work
                while (p.up("Producer")) _wait();
                // and exchange
                System.out.println("Producer need change");
                p = b.exchange(p);
            }
        }
    }

    // an interemediate node have two PumpingCapacity one working
    // with the predecessor and other with the successor
    static class WaterPumpNode implements Runnable {
        PumpingCapacity p, q;
        final Exchanger<PumpingCapacity> a;
        final Exchanger<PumpingCapacity> b = new Exchanger<>();

        WaterPumpNode(PumpingCapacity p, PumpingCapacity q, Exchanger<PumpingCapacity> a) {
            this.p = p;
            this.q = q;
            this.a = a;
        }

        @SneakyThrows
        @Override
        public void run() {
            while(true) {
                while (p.down("Node")) _wait();
                while (q.up("Node")) _wait();
                System.out.println("Node need change");
                p = a.exchange(p);
                q = b.exchange(q);
            }
        }
    }

    static class WaterPumpConsumer implements Runnable {
        PumpingCapacity p;
        final Exchanger<PumpingCapacity> a;

        WaterPumpConsumer(PumpingCapacity initialCapacity, Exchanger<PumpingCapacity> a) {
            p = initialCapacity;
            this.a = a;
        }

        @SneakyThrows
        @Override
        public void run() {
            while(true) {
                while (p.down("Consumer")) _wait();
                System.out.println("Consumer need change");
                p = a.exchange(p);
            }
        }
    }

    @SneakyThrows
    public static void main(String... args) {

        WaterPumpProducer producer = new WaterPumpProducer(new PumpingCapacity("P1", 5, 0));
        WaterPumpNode node = new WaterPumpNode(new PumpingCapacity("P2", 0, 3), new PumpingCapacity("P3", 3, 0), producer.b);
        WaterPumpConsumer consumer = new WaterPumpConsumer(new PumpingCapacity("P4", 0, 2), node.b);

        // consumer run first, the consumer do job!
        new Thread(consumer).start();

        // wait to see consumer wait
        Thread.sleep(15_000);

        new Thread(node).start();

        // wait to see node wait
        Thread.sleep(15_000);

        new Thread(producer).start();

        // see how PumpingCapacities up and down across all pipe infrastructure
    }

}
 

с выводом (с комментариями)

 ~ node Consumer pump down P4  <-- only consumer is working
~ node Consumer pump down P4
Consumer need change          <-- and stop since need change
~ node Node pump down P2      <-- when node start do job
~ node Node pump down P2
~ node Node pump down P2
~ node Node pump up P3
~ node Node pump up P3
~ node Node pump up P3
Node need change              <-- and need change and wait producer
~ node Producer pump up P1    <-- when producer start do job
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
Producer need change          <-- here all nodes work and
~ node Producer pump up P2        PumpingCapacities go up and down
~ node Consumer pump down P3      moving water
~ node Node pump down P1
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
Consumer need change
Producer need change
~ node Node pump down P1
~ node Node pump up P4
~ node Node pump up P4
Node need change
~ node Node pump down P2
~ node Consumer pump down P4
~ node Producer pump up P1
...
 

использование памяти постоянно (не было бы, если бы вы использовали TransferQueue ).

Затем

Каков был бы реалистичный вариант использования, когда java.util.Обменник был бы лучшим выбором синхронизатора?

любой, в котором два процесса взаимно синхронизированы для выполнения задачи, в которой работа одного влияет на другого и наоборот. Модель производитель/потребитель, в которой производитель должен ждать потребителя (т. Е. и, таким образом, не накапливать чрезмерную работу), может быть хорошим примером.

вместо того, чтобы переместить его в другую ветку для дальнейших действий?

Поскольку оба потока обмениваются информацией, и вы хотите, чтобы действия выполнялись одновременно, поэтому вы не можете просто передавать ее из одного потока в другой.

Предположим, у вас есть следующая функция повторения

 G{i 1} = g( H{i}, G{i} )
H{i 1} = h( H{i}, G{i} )
 

вы можете запускать для каждого шага два потока, которые будут выполняться параллельно g , и h или вы можете использовать Exchanger и запускать только один раз два потока.

Конечно, вы можете использовать множество других структур, но вы увидите, что в них вам нужно подумать о возможности тупика, Exchanger что делает этот обмен простым и безопасным.

Например, предположим, что мы хотим выполнить определенное биологическое моделирование (т. е. https://en.wikipedia.org/wiki/Nicholson–Bailey_model)

 static void NicholsonBailey(double H, double P, double k, double a, double c, AtomicBoolean stop) {
    // H and P exchange their values (H get p and P get H)
    final Exchanger<Double> e = new Exchanger<>();

    // H function
    new Thread(() -> { try {
        double h = H, p = P;
        while(!stop.get()) {
            h = k * h * Math.exp(-a * p); // expensive
            p = e.exchange(h);
        }
        e.exchange(0., 1, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();

    // P function
    new Thread(() -> { try {
        double h = H, p = P;
        while(!stop.get()) {
            System.out.printf("(H, P) := (%e, %e)%n", h, p);
            p = c * h * (1 - Math.exp(-a * p)); // expensive
            h = e.exchange(p);
        }
        e.exchange(0., 1, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
}

@SneakyThrows
public static void main(String... args) {
    AtomicBoolean stop = new AtomicBoolean(false);
    double k = 2, a = 0.02, c = 1;
    NicholsonBailey(Math.log(k) / a   0.3, (k * Math.log(k)) / ((k - 1) * a * c)   0.3, k, a, c, stop);

    // run simulation until stop
    Thread.sleep(100);
    stop.set(true);
}
 

просто используя Exchanger его, мы можем легко синхронизировать оба вычисления.

Комментарии:

1. Спасибо за подробный код. Я понимаю ваше различие в потреблении памяти, и спасибо, что указали на это. Однако мне все еще неясен реалистичный вариант использования, в котором обменник был бы подходящим синхронизатором. Каков был бы случай, когда потребитель захотел бы обменять ценность с производителем, а не перемещать ее в другой поток для дальнейших действий? Это нарушило бы семантику производителя, если бы он также потреблял. Это то, что я изо всех сил пытаюсь понять.

2. «вместо того, чтобы перемещать его в другой поток для дальнейших действий» , вы не можете сделать это, не используя что-то большее, чем простое exchange . Я обновил другой пример. Но помните, Exchanger что это всего лишь элемент синхронизации, который может быть более подходящим, чем другие, в определенных случаях (например, примеры, которые я привожу), не более и не менее.

3. Понял, отличный пример, спасибо