#java #multithreading #observable #rx-java
#java #многопоточность #наблюдаемый #rx-java
Вопрос:
Я использую RX-Java 1.3.8
и для демонстрации проблемы я изменил планировщик ввода CacheThreadScheduler
-вывода, чтобы регистрировать оператор, например getting thread with $id
, когда рабочий поток берется из очереди и releasing thread with $id
когда рабочий поток возвращается в очередь.
Теперь рассмотрим сценарий, в котором мы хотим выполнить HTTP-вызов async-io, а затем, используя его результат, выполнить другой HTTP-вызов async-io.
Приведенный ниже код демонстрирует это
In the example have used executor service to simulate the async-io HTTP call
1. Observable.just("1")
2. .doOnNext(s -> print("just started"))
3. .flatMap(one -> {
4. return Observable.create(sub -> {
5. // simulating an async io http request which take 10 sec
6. Executors.newSingleThreadExecutor().submit(() -> {
7. sleep(10_000);
8. print("GOT RESPONSE-1");
9. sub.onNext("");sub.onCompleted();
10. });
11. });
12. })
13. .doOnNext((s) -> print("before moving to io thread"))
14. .observeOn(Schedulers.io())
15. .doOnNext((s) -> print("after moving to io thread"))
16. .flatMap(two -> {
17. return Observable.create(sub -> {
18. Executors.newSingleThreadExecutor().submit(() -> {
19. // simulating an async io http request which take 10 sec
20. sleep(10_000);
21. print("GOT RESPONSE-2");
22. sub.onNext("");sub.onCompleted();
23. });
24. });
25. })
26. .doOnNext((s) -> print("close to end of chain"))
27. .subscribe();
ПОЛУЧЕН РЕЗУЛЬТАТ:
1. getting thread worker with id ThreadWorker@1d251891
2. main:just started
3. pool-1-thread-1:GOT RESPONSE-1
4. pool-1-thread-1:before moving to io thread
5. RxCachedThreadScheduler-1:after moving to io thread
6. pool-2-thread-1:GOT RESPONSE-2
7. pool-2-thread-1:close to end of chain
8. releasing thread worker with id ThreadWorker@1d251891 in 2011
Теперь, если мы проанализируем результат, первое, что он сделает, это проверит рабочий поток из CacheThreadScheduler, что нежелательно, поскольку мы ожидаем, что работа с потоками должна быть проверена после GOT RESPONSE-1
печати.
Вопрос1: Может ли это быть принудительно, чтобы threadworker получал проверку только после GOT RESPONSE-1
печати?
Второе, на что нужно обратить line 8
внимание, это то, что threadworker выпускается после завершения конвейера, ожидается, что threadworker должен получить релиз, как только Executors.newSingleThreadExecutor().submit
получит вызов.
Вопрос2: Может ли это быть принудительно, чтобы threadworker получал релиз после Executors.newSingleThreadExecutor().submit
вызова.