#java #rx-java #serversocket
#java #rx-java #serversocket
Вопрос:
Я написал небольшой фрагмент кода, который принимает параметр ObjectInputStream и просматривает данные в сокете. Проблема возникает, когда функция readObject() возвращает «null», и поскольку функция observeSocket(ObjectInputStream в) принимает только объект, подписчик выполняет функцию onError() и завершает программу.
Но что мне нужно, так это продолжать наблюдать за сокетами для объектов и возвращать только в том случае, если объект наблюдается над сокетом, и только когда наблюдатель отписывается, функция должна прекратить свою работу. Как я мог изменить код для достижения требуемой функциональности.
public Observable<Object> observeSocket(ObjectInputStream in){
return Observable.create(subscriber -> {
while(!subscriber.isUnsubscribed()) {
subscriber.onNext(getData(in));
}
subscriber.onCompleted();
});
}
public Object getData(ObjectInputStream in){
Object streamData = null;
try{
streamData = in.readObject();
}
catch(IOException e){
//e.printStackTrace();
}
catch(ClassNotFoundException e){
e.printStackTrace();
}
return streamData;
}
Ответ №1:
Избегайте использования Observable.create(OnSubscribe)
, потому что создание наблюдаемых с учетом противодавления и совместимых с контрактом объектов является сложным делом. Это хороший кандидат для использования Observable.create(SyncOnSubscribe)
:
ObjectInputStream ois = ...;
Observable<Object> objects =
Observable.create(
SyncOnSubscribe.createStateless(observer -> {
try {
Object value = ois.readObject();
// you decide how end of file is indicated
// a common strategy is to write a null object
// to the end of the Object stream.
if (value == END_OF_FILE) {
observer.onCompleted();
} else {
observer.onNext(value);
}
} catch (Exception e) {
observer.onError(e);
}
}));
Комментарии:
1. Вы проверяли эту стоимость на своем компьютере. Общая подпись SyncOnSubscribe отличается от возвращаемого значения действия, что приводит к ошибке компиляции и синтаксической ошибке.
2. createStateless принимает только Action1(), а observer.onNext или любые другие функции не могут быть реализованы.
3. забыл вставить Observable.create. Для меня отлично компилируется с использованием rxjava 1.2.1
4. Как насчет использования Flowables в RxJava 2.x. Вы немного поэкспериментировали и можем ли мы использовать Flowables вместо Observables.
5. Та же функциональность существует
Flowable
в RxJava 2.x, но API будет немного отличаться.