#rx-java2
Вопрос:
Изменение Observable.create
на Observable.fromPublisher
в приведенном ниже источнике не работает. (Если образец отсутствует, все подписаны, но если образец присутствует, ничего не подписано.)
В чем разница между Observable.create и fromPublisher?
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class SampleMain {
public static void main(String[] args) {
Observable<String> o = Observable.create(s -> {
new Thread(() -> {
for (int i=0; i<100; i ) {
s.onNext("Hello Observable.fromPublisher() A" i);
s.onNext("Hello Observable.fromPublisher() B" i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
s.onComplete();
}).start();
});
o
.sample(1, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);
}
}
Ответ №1:
fromPublisher
требуется правильно реализованный org.reactivestreams.Publisher
, соблюдающий правила. Эти Publisher
файлы обычно поступают из сторонних библиотек или API.
create
имеет встроенную инфраструктуру для преобразования более простого API в стиле эмиттера Observable
в an, чтобы разработчику не приходилось так сильно беспокоиться о базовом протоколе.
Также могу я обратить ваше внимание на javadoc из fromPublisher
:
Издатель должен следовать спецификации Реактивных потоков. Нарушение спецификации может привести к неопределенному поведению.
Если возможно, используйте create(ObservableOnSubscribe), чтобы вместо этого создать источник, подобный Observable.
Обратите внимание, что, хотя Publisher, по-видимому, является функциональным интерфейсом, не рекомендуется реализовывать его с помощью лямбда-кода, поскольку спецификация требует управления состоянием, что невозможно с помощью лямбда-кода без состояния.