Как наблюдать за сокетами до отмены подписки в Rxjava

#java #rx-java #serversocket

#java #rx-java #serversocket

Вопрос:

Я написал небольшой фрагмент кода, который принимает параметр ObjectInputStream и просматривает данные в сокете. Проблема возникает, когда функция readObject() возвращает «null», и поскольку функция observeSocket(ObjectInputStream в) принимает только объект, подписчик выполняет функцию onError() и завершает программу.

Но что мне нужно, так это продолжать наблюдать за сокетами для объектов и возвращать только в том случае, если объект наблюдается над сокетом, и только когда наблюдатель отписывается, функция должна прекратить свою работу. Как я мог изменить код для достижения требуемой функциональности.

 public Observable<Object> observeSocket(ObjectInputStream in){
    return Observable.create(subscriber -> {
        while(!subscriber.isUnsubscribed()) {
            subscriber.onNext(getData(in));
        }

        subscriber.onCompleted();

    });
}

public Object getData(ObjectInputStream in){

    Object streamData = null;

    try{

        streamData = in.readObject();
    }

    catch(IOException e){
        //e.printStackTrace();
    }

    catch(ClassNotFoundException e){
        e.printStackTrace();
    }

    return streamData;

}
  

Ответ №1:

Избегайте использования Observable.create(OnSubscribe) , потому что создание наблюдаемых с учетом противодавления и совместимых с контрактом объектов является сложным делом. Это хороший кандидат для использования Observable.create(SyncOnSubscribe) :

 ObjectInputStream ois = ...;

Observable<Object> objects = 
  Observable.create(
    SyncOnSubscribe.createStateless(observer -> {
      try {
          Object value = ois.readObject();
          // you decide how end of file is indicated
          // a common strategy is to write a null object
          // to the end of the Object stream.
          if (value == END_OF_FILE) {
              observer.onCompleted();
          } else {
              observer.onNext(value);
          }
      } catch (Exception e) {
          observer.onError(e);
      }
    }));          
  

Комментарии:

1. Вы проверяли эту стоимость на своем компьютере. Общая подпись SyncOnSubscribe отличается от возвращаемого значения действия, что приводит к ошибке компиляции и синтаксической ошибке.

2. createStateless принимает только Action1(), а observer.onNext или любые другие функции не могут быть реализованы.

3. забыл вставить Observable.create. Для меня отлично компилируется с использованием rxjava 1.2.1

4. Как насчет использования Flowables в RxJava 2.x. Вы немного поэкспериментировали и можем ли мы использовать Flowables вместо Observables.

5. Та же функциональность существует Flowable в RxJava 2.x, но API будет немного отличаться.