Используйте RxJava Flowable — как разделить излучение в зависимости от типа события

# #java #firebase #rx-java

Вопрос:

Допустим, мы используем Firebase ChildEventListener, который можно использовать в качестве нескольких источников данных (его функций), и я оборачиваю его потоковым или наблюдаемым. Я хочу, чтобы в каждом источнике излучатель передавал данные в другой конвейер, потому что в каждом случае данные могут быть изменены, я хочу обрабатывать их по — разному, то есть разделять излучение на несколько разных потоков в зависимости от типа события.

Как это можно сделать на Java?

   public void newUsers() {
    DatabaseReference ref = database.getReference().child("Users");
    Flowable.create(emitter -> {
        ref.addChildEventListener(new ChildEventListener() {
            @Override
            public void onChildAdded(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
                User userData = dataSnapshot.getValue(User.class);
                emitter.onNext(userData);
            }

            @Override
            public void onChildChanged(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
             User userData = dataSnapshot.getValue(User.class);
                emitter.onNext(userData);
            }

            @Override
            public void onChildRemoved(@NonNull DataSnapshot dataSnapshot) {
             User userData = dataSnapshot.getValue(User.class);
                emitter.onNext(userData);
            }

            @Override
            public void onChildMoved(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
                emitter.onNext(userData);
            }

            @Override
            public void onCancelled(@NonNull DatabaseError databaseError) {
             User userData = dataSnapshot.getValue(User.class);
                emitter.onNext(userData);
            }
        });
    }
}
 

Комментарии:

1. разве вы не использовали бы разные излучатели для получения нескольких потоков?

2. Я бы с удовольствием сделал это, но я не знаю, возможно ли разделить функции ChildEventListener, чтобы Текучая оболочка объединяла все функции вместе. У тебя есть какие-нибудь идеи, как это можно сделать? @Shark

3. Да, конечно. позволь мне переделать эту штуку… не знаю, правильный ли это подход, но это то, что я попробовал бы сначала вместо одного текучего.

4. опубликовал что-то, подумай об этом @nirkov

Ответ №1:

Таким образом, чтобы иметь несколько потоков, у нас не может быть одного Flowable . Я не знаю, является ли текучесть на самом деле требованием, поэтому я просто изложу свою идею, вы можете исправить ее и адаптировать к своим потребностям.

    BehaviourSubject childAddedStream;
   BehaviourSubject childChangedStream;
   BehaviourSubject childRemovedStream;
   BehaviourSubject childMovedStream;
   BehaviourSubject cancelledStream;

  public void newUsers() {
    DatabaseReference ref = database.getReference().child("Users");
    
    ref.addChildEventListener(new ChildEventListener() {
            @Override
            public void onChildAdded(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
                User userData = dataSnapshot.getValue(User.class);
                childAddedStream.onNext(userData);
            }

            @Override
            public void onChildChanged(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
             User userData = dataSnapshot.getValue(User.class);
             childChangedStream.onNext(userData);
            }

            @Override
            public void onChildRemoved(@NonNull DataSnapshot dataSnapshot) {
             User userData = dataSnapshot.getValue(User.class);
             childRemovedStream.onNext(userData);
            }

            @Override
            public void onChildMoved(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
            childMovedStream.onNext(userData);
            }

            @Override
            public void onCancelled(@NonNull DatabaseError databaseError) {
             User userData = dataSnapshot.getValue(User.class);
             cancelledStream.onNext(userData);
            }
        });
    }
}
 

что-то подобное было бы моей отправной точкой.

Идея заключается в том, чтобы этот класс инициализировал разные субъекты для каждого обратного вызова, регистрировал обратный вызов в базе данных FirebaseDatabase, когда он был создан, и перенаправлял отдельные обратные вызовы в разные потоки (субъекты, не стесняйтесь использовать более релевантную Тему, которая, если хотите, подчиняется объекту) — тогда всем, кому нужно слушать, просто прослушивает соответствующий поток вместо одного потока, который просто действует как мост между обратным вызовом и реактивным миром.

Комментарии:

1. В качестве альтернативы вы можете пойти с чем — то похожим на вашу идею, но вам нужно 5 потоков, и пусть каждый из них прослушивает (и выдает) только один обратный вызов.

2. Или вы идете с излучением a Pair<UserData, EventType> и используете только один поток — и, фильтруя по перечислению типов событий , например REMOVED, CHANGED, MOVED, ADDED , вы получаете несколько потоков. Я все еще думаю, что в этом случае вам нужен субъект, но один субъект может обслуживать все пять этих потоков с помощью фильтрации.

3. Вам не нужно несколько потоков, если один из них выдает пару; фильтрация одного потока на основе EventType этого создаст несколько потоков, которые будут использовать одни и те же резервные данные. Вы либо используете одну тему/Поток, испускающую пару (и получаете несколько потоков путем фильтрации), либо у вас есть одна тема/поток на обратный вызов. Все так просто.

4. Если вы используете перечисление типа события (и создаете пару) — ваша вещь уже в порядке, просто нужно, чтобы соответствующее перечисление соответствовало данным пользователя, чтобы потребитель мог различать события и фильтровать по соответствующему типу события.

5. cc @nirkov (пингует, потому что вы не увидите комментариев, сделанных на моем посту)