Почему переключение потоков в этом коде RxJava в моем репозитории Android

#java #android #retrofit2 #rx-java

Вопрос:

Я ни за что на свете не могу понять, почему потоки переключаются в моем коде, использующем Rx-Java. Это в Android, и я использую RxJava3. У меня есть репозиторий, который вызывает api с помощью RetroFit2 с адаптером вызовов RxJava, чтобы возвращать объекты, завернутые в классы RxJava. Я заранее извинюсь за ужасное переименование моего кода, я не был слишком изобретателен в этом отношении, поэтому я приношу извинения, если это трудно понять из-за переименования переменных, методов и классов.

Вот моя модернизация 2 с адаптером вызова Rxjava, а также Schedulers.io установите в качестве пула потоков для одиночных файлов, которые будут выполняться на:

 retrofit = new Retrofit.Builder()
            .baseUrl(LOCAL_URL)
            .addConverterFactory(GsonConverterFactory.create(gson))
            .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))
            .build();
 

И мой модифицированный интерфейс имеет этот метод:

 @GET("url/{data-id}")
Single<FirstObject> getFirstObject(@Path("data-id") Long dataID);
 

Я возвращаю один из моего атрибута apiService (используя приведенную выше модификацию) моего репозитория в методе fetchData. Затем я использую класс SingleSubject, чтобы с нетерпением получить вызов api, подписавшись на метод fetchData. Я также охотно выполняю фильтрацию данных, которые мне понадобятся в моем приложении, снова подписываясь на большее количество тем для этой начальной темы. Я использую темы для того, чтобы выполнить этот код на уровне репозитория и подготовить его для моих различных моделей представлений, которым необходимо использовать эти данные, вместо того, чтобы выполнять его несколько раз для каждой модели представления. Мой слой репозитория является одноэлементным и существует с начала программы до конца(с использованием аннотации Hilt).

Вот конструктор и атрибуты моего слоя репозитория:

 ApiService apiService;
SingleSubject<FirstObject> firstSubject;
SingleSubject<List<DerivedObject1>> derivedSubject1;
SingleSubject<DerivedObject1> firstFilteredDerivedSubject1;
SingleSubject<DerivedObject1> secondFilteredDerivedSubject1;
SingleSubject<List<DerivedObject2>> derivedSubject2;
SingleSubject<List<DerivedObject2>> filteredDerivedSubject2;


@Inject
public Repository(ApiService apiService){
    this.apiService = apiService;
    firstSubject = SingleSubject.create();
    derivedSubject1 = SingleSubject.create();
    derivedSubject2 = SingleSubject.create();
    firstFilteredDerivedSubject1 = SingleSubject.create();
    secondFilteredDerivedSubject1 = SingleSubject.create();
    filteredDerivedSubject2 = SingleSubject.create();

}
 

Here is the fetchData method where I eagerly execute the API call and necessary filtering with RxJava:

 public void fetchData(Long dataId){
    System.out.println(""   "Main thread ID: "   Thread.currentThread().getId());
    apiService.getSingle(dataId).retry(1).subscribe(firstSubject);
    firstSubject
            .map(FirstObject::getDerivedObject1).subscribe(derivedSubject1);
    firstSubject.map(FirstObject::getDerivedObject2).subscribe(derivedSubject2);
    Observable<DerivedObject1> tempObservable = derivedSubject1.toObservable()
            .concatMapIterable(list -> list).publish().autoConnect(2);
    Single.fromObservable(tempObservable
            .filter(firstObject -> firstObject.getBoolean) amp;amp; firstObject.getOtherBoolean))
            .subscribe(secondFilteredDerivedSubject1);
    Single.fromObservable(tempObservable
            .filter(firstObject -> firstObject.getBoolean amp;amp; firstObject.getAnotherBoolean))
            .subscribe(firstFilteredDerivedSubject1);
    derivedSubject2.toObservable().concatMapIterable(list -> list)
            .filter(DerivedObject2::getBoolean).toList()
            .subscribe(filteredDerivedSubject2);
            
    // FOR DEBUG PURPOSES
    //testing firstSubject subscription
    firstSubject.subscribe(i -> System.out.println("firstSubject subscription
            IN FETCHDATA, CURRENT THREAD: "   Thread.currentThread()
            .getId()));
            
    //testing filteredDerivedSubject2 subscription
    filteredDerivedSubject2
            .subscribe(i -> System.out.println("filtedDerivedSubject2 subscription
            IN FETCHDATA, CURRENT THREAD: "   Thread.currentThread()
            .getId()));

}
 

If you notice, I have a print statement that prints out the ID as 2 for the «Main Thread». I call the fetchData method from a ViewModel which is executed on the Main Thread, so it prints out 2 which is the Main Thread in Android. At the bottom I subscribe to a couple of these subjects in order to test the threading. As I would expect, the threads are some high number in the 500s signifying that it is not subscribing on the main thread and using some Thread in Schedulers.io().

However, then when I get into a new Activity, and a new ViewModel, I call the getFilteredDerivedSubject2 method that also resides in the same Repository class as the fetchData and aforementioned constructor and attributes. This other ViewModel again calls this getFilteredDerivedSubject2 method on the Main Thread. It actually returns a Single instead of a SingleSubject as I want only the functionality of a Single in my ViewModel. Its purpose is to just return the Single.

Here is the getFilteredDerivedSubject2 method:

 public Single<List<DerivedObject2>> getfilteredDerivedSubject2(){

    // FOR DEBUG PURPOSES
    System.out.println("In CURRENT THREAD: "   Thread.currentThread().getId());
    //testing firstSubject subscription
    firstSubject.subscribe(i -> System.out.println("firstSubject subscription
            IN GETFILTEREDDERIVEDSUBJECT2 CURRENT THREAD: "   Thread.currentThread().getId()));
            
    //testing filteredDerivedSubject2 subscription
    filteredDerivedSubject2
            .subscribe(i -> System.out.println("filtedDerivedSubject2 subscription 
            IN GETFILTEREDDERIVEDSUBJECT2 CURRENT THREAD: "   Thread.currentThread()
            .getId()));
            
            
    // REAL PURPOSE OF METHOD
    return filteredDerivedSubject2;
}
 

Как вы можете видеть, у меня снова есть некоторые инструкции по отладке печати и подписки для тестирования потоков. Здесь, как и ожидалось, первый оператор печати выводит, что он находится в Основном потоке с идентификатором «2». Это было ожидаемо, так как я снова вызываю этот метод из Основного потока. Однако затем подписки, которые распечатывают поток, также распечатывают Основной поток с идентификатором 2. Я понятия не имею, почему. Я никогда не меняю какие-либо потоки с помощью subscribe или observeOn. Я бы полностью ожидал, что он будет продолжать оставаться в той же теме в 500-х годах или что-то в этом роде для каждой темы, на которую я подписываюсь. Это не так. Он выводит, что его идентификатор потока равен 2.

Кто-нибудь знает, почему это так? Я понимаю, что могу исправить это с помощью оператора subscribe или observeOn, но мне бы очень хотелось понять, почему потоковая передача ведет себя таким образом. Я новичок в RxJava. Спасибо за любые ответы!

ИЗМЕНИТЬ: Вот выходные данные из запрошенных инструкций печати:

 I/System.out: Main thread ID: 2
I/System.out: firstSubject subscription IN FETCHDATA, CURRENT THREAD: 224
filtedDerivedSubject2 subscription IN FETCHDATA, CURRENT THREAD: 224
I/System.out: in CURRENT THREAD: 2
I/System.out: firstSubject subscription IN GETFILTEREDDERIVEDSUBJECT2 
CURRENT THREAD: 2
I/System.out: filtedDerivedSubject2 subscription IN 
GETFILTEREDDERIVEDSUBJECT2 CURRENT THREAD: 2
 

Это все соответствующие инструкции печати, которые являются результатом инструкций печати в коде, который я опубликовал. Для уточнения «в ТЕКУЩЕМ ПОТОКЕ: 2» является первым оператором печати в методе getfilteredDerivedSubject2.

Комментарии:

1. Чтобы достичь того, чего вы хотите здесь, вам никогда не требовался даже 1 предмет, и в вашем коде, вероятно, было бы меньше странностей, если бы у вас не было нескольких неуправляемых подписок.

2. @EpicPandaForce Можете ли вы объяснить, как это произошло? Как я уже сказал, у меня нет опыта. И просто для уточнения, для каждого из моих отдельных объектов есть получатели с одним типом возвращаемого значения, а не только один получатель, который у меня есть в коде.

3. @programmingmanwithaplan одна вещь, которую я не понимаю из вашего вопроса. Ожидаете ли вы, что подписка firstSubject будет в io? потому что нигде в коде вы не указываете ему это делать, поэтому подписка произойдет в потоке, из которого она была вызвана. То, что вы сделали, — это изменили подписки на звонки о модернизации.

4. @Fred Итак, когда я выполняю этот код, сингл, возвращенный из Retrofit, выполняется в потоке в Schedulers.io() пул потоков. При тестировании с этими субъектами выясняется, что субъект, подписавшийся на этот модифицированный сингл, такой как firstSubject, затем имеет свою собственную подписку в том же потоке. filteredDerivedSubject2 отображает точно такое же поведение после подписки на firstSubject. Они выводят один и тот же идентификатор, который не является идентификатором основного потока 2. Однако это происходит только в методе fetchData (fetchData вызывается из основного потока (идентификатор: 2).

5. @Fred Однако в методе getfilteredDerivedSubject2 (), который также вызывается из Основного потока (ИДЕНТИФИКАТОР: 2), подписки происходят в Основном потоке, о чем свидетельствует распечатка подписок с идентификатором: 2 при тестировании. Я не понимаю, почему это так. Я просто пытаюсь понять поведение потоков и почему оно отличается в каждом из вышеперечисленных методов. Я, вероятно, мог бы добавить подписку где-нибудь, чтобы исправить это, но я просто пытаюсь понять общие правила того, как потоковое взаимодействие работает с этими предметами.