Zip список наблюдаемых объектов в другой Zip наблюдаемый RxJava2

#android #rx-java2 #rxandroidble #rxbluetooth

#Android #rx-java2 #rxandroidble #rxbluetooth

Вопрос:

Я пытаюсь получить zip список zip Observables , но проблема в том, что я каждый раз получаю одни и те же значения из архивированных наблюдаемых объектов. Причина, по которой я это делаю, заключается в выполнении двух операций 1st reading index и 2nd reading data из ble определенное количество раз — в следующем примере это 6 раз.

Не уверен, как справиться с этим с RxJava2

вот фрагмент кода

  private Observable<Pair<byte[],byte[]>> getValueFromIndication(RxBleConnection rxBleConnection){

         final PublishSubject<Boolean> unsubscribeSubject = PublishSubject.create();


        return Observable.zip(

                rxBleConnection.setupIndication(Data.INDEX,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),
                rxBleConnection.setupIndication(Data.DATA,NotificationSetupMode.QUICK_SETUP).flatMap(it->it).takeUntil(unsubscribeSubject),

                (bytes, bytes2) -> {

                    unsubscribeSubject.onNext(true);

                    return Pair.create(bytes,bytes2);
                }
        );
}
 

из моего основного потока я сначала создаю список Observables , заархивирую его и передаю

  .flatMap(rxBleConnection -> {


        List<Observable<Pair<byte[],byte[]>>> observableList = new ArrayList<>();

        for(int i=0;i<6;i  ){

            //Creating list of observables so that 6 times this function gets fire
            observableList.add(getValueFromIndication(rxBleConnection));

        }

        // Zipping Zipped list of observables 
        return Observable.zip(observableList,Data::OperationReadings);
    }).subscribe(bytes->{


    })
 

Здесь я всегда получаю одни и те же значения Data::OperationReadings . В настоящее время я получаю следующие данные, которые мне не нужны.

каждый раз один и тот же индекс и значение

 INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
INDEX [1] DATA [10,30,20,30,33,0]
 

Ожидаемые данные следующие

каждый раз разные индекс и значение

 INDEX [1] DATA [10,30,20,30,33,0]
INDEX [2] DATA [11,11,2,0,3,0]
INDEX [3] DATA [0,0,0,0,33,0]
INDEX [4] DATA [10,30,0,30,3,0]
INDEX [5] DATA [10,0,0,30,3,0]
INDEX [6] DATA [10,0,20,30,3,9]
 

Комментарии:

1. Требуется ли отдельное включение индикации для каждого генерируемого индекса? Или указание может сохраняться до тех пор, пока не будут получены все 6 указаний?

2. эмиссия требует отдельного включения и индикации. Вкл -> чтение-> выкл, Вкл-> чтение-> Выкл и так далее. Каждый раз, когда новый индекс и данные передаются с устройства ble

Ответ №1:

Причина, по которой вы получаете одни и те же данные, повторяющиеся 6 раз, заключается в том, что вы getValueFromIndication() одновременно подписываетесь на individual. Фактически все наблюдаемые выполняются параллельно. Вы хотите запускать каждую подписку последовательно. Решением может быть замена этого:

         return Observable.zip(observableList,Data::OperationReadings);
 

с:

         return Observable.concat(observableList) // we want to subscribe each Observable from list after the previous one will complete
            .toList() // we want to gather all results from individual Observables from the list — this returns a Single
            .toObservable() // get back to the Observable class so the types will match
            .map(Data::OperationReadings); // we map it into the OperationReadings class
 

Комментарии:

1. В случае, если какой-либо элемент списка в ObservableList вызывает какую-либо ошибку, распространяется ли это на onError конечного подписчика?

2. ДА. Он должен распространяться на onError подписчика. Чтобы предотвратить это, необходимо было бы явно обрабатывать ошибки, добавленные в цепочку.