В чем разница между Observable.create и fromPublisher?

#rx-java2

Вопрос:

Изменение Observable.create на Observable.fromPublisher в приведенном ниже источнике не работает. (Если образец отсутствует, все подписаны, но если образец присутствует, ничего не подписано.)

В чем разница между Observable.create и fromPublisher?

 import io.reactivex.Observable;

import java.util.concurrent.TimeUnit;

public class SampleMain {
    public static void main(String[] args) {
        Observable<String> o = Observable.create(s -> {
            new Thread(() -> {
                for (int i=0; i<100; i  ) {
                    s.onNext("Hello Observable.fromPublisher() A"   i);
                    s.onNext("Hello Observable.fromPublisher() B"   i);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                s.onComplete();
            }).start();
        });
        o
                .sample(1, TimeUnit.MILLISECONDS)
                .subscribe(System.out::println);
    }
}
 

Ответ №1:

fromPublisher требуется правильно реализованный org.reactivestreams.Publisher , соблюдающий правила. Эти Publisher файлы обычно поступают из сторонних библиотек или API.

create имеет встроенную инфраструктуру для преобразования более простого API в стиле эмиттера Observable в an, чтобы разработчику не приходилось так сильно беспокоиться о базовом протоколе.

Также могу я обратить ваше внимание на javadoc из fromPublisher :

Издатель должен следовать спецификации Реактивных потоков. Нарушение спецификации может привести к неопределенному поведению.

Если возможно, используйте create(ObservableOnSubscribe), чтобы вместо этого создать источник, подобный Observable.

Обратите внимание, что, хотя Publisher, по-видимому, является функциональным интерфейсом, не рекомендуется реализовывать его с помощью лямбда-кода, поскольку спецификация требует управления состоянием, что невозможно с помощью лямбда-кода без состояния.