Как загрузить файл с текущими обновлениями с использованием реактивных расширений на Android

#java #android #rx-java #lifecycle #okhttp

#java #Android #rx-java #жизненный цикл #okhttp

Вопрос:

В настоящее время я программирую приложение, которое должно выполнять множество комбинированных вызовов api, и поэтому впервые использую rxjava, поскольку оно кажется более удобным для обработки асинхронных событий и жизненных циклов Android.

Однако приложению также иногда требуется загружать статические наборы данных, упакованные в zip-архив. Я также пытаюсь использовать rx для этой операции для обеспечения согласованности, и это работает хорошо, но я не совсем понимаю, как подписаться на события прогресса, чтобы обновлять пользовательский интерфейс с помощью прогресса загрузки файла.

Это код, который я использую сейчас для загрузки файла, который использует okhttp-библиотеку:

 downloadService.downloadFile(filename)
    .flatMap(new Func1<Response<ResponseBody>, Observable<File>>()
    {
        @Override
        public Observable<File> call(final Response<ResponseBody> responseBodyResponse)
        {
            return Observable.create(new Observable.OnSubscribe<File>()
            {
                @Override
                public void call(Subscriber<? super File> subscriber)
                {
                    try
                    {
                        File file = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
                        BufferedSink sink = Okio.buffer(Okio.sink(file));
                        sink.writeAll(responseBodyResponse.body().source());
                        sink.close();
                        subscriber.onNext(file);
                        subscriber.onCompleted();
                    }
                    catch (IOException e)
                    {
                        e.printStackTrace();
                        subscriber.onError(e);
                    }
                }
            });
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<File>()
    {
        @Override
        public void onCompleted()
        {
            Log.d("downloadZipFile", "onCompleted");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
            Log.d("downloadZipFile", "Error "   e.getMessage());
        }

        @Override
        public void onNext(File file) {
            Log.d("downloadZipFile", "File downloaded to "   file.getAbsolutePath());
        }
    });
  

Каков хороший способ реализации подписки на события прогресса?

Ответ №1:

Прежде всего: никогда не используйте Observable.create() , если вы не знаете, что делаете, и не готовы справиться с обратным давлением и всем, что должен обрабатывать Observable. (ОБНОВЛЕНИЕ: если вы используете RxJava2, вы можете безопасно использовать этот create() метод)

Тем не менее, начните с создания класса для хранения информации о вашем прогрессе

 public class DownloadProgress<DATA> {
    private final float progress;
    private final DATA data;

    public DownloadProgress(float progress) {
        this.progress = progress;
        this.data = null;
    }

    public DownloadProgress(@NonNull DATA data) {
        this.progress = 1f;
        this.data = data;
    }

    public float getProgress() {
        return progress;
    }

    public boolean isDone() {
        return data != null;
    }

    public DATA getData() {
        return data;
    }
}
  

Тогда вы можете сделать что-то вроде этого:

 public Observable<DownloadProgress<File>> downloadFile(@NonNull final String filename) {
    return downloadService.downloadFile(filename)
            .switchMap(response -> Observable.fromEmitter(emitter -> {
                ResponseBody body = response.body();
                final long contentLength = body.contentLength();
                ForwardingSource forwardingSource = new ForwardingSource(body.source()) {
                    private long totalBytesRead = 0L;

                    @Override
                    public long read(Buffer sink, long byteCount) throws IOException {
                        long bytesRead = super.read(sink, byteCount);
                        // read() returns the number of bytes read, or -1 if this source is exhausted.
                        totalBytesRead  = bytesRead != -1 ? bytesRead : 0;
                        boolean done = bytesRead == -1;
                        float progress = done ? 1f : (float) bytesRead / contentLength;
                        emitter.onNext(new DownloadProgress<>(progress));
                        return bytesRead;
                    }
                };
                emitter.setCancellation(body::close);
                try {
                    File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
                    saveLocation.getParentFile().mkdirs();
                    BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
                    sink.writeAll(forwardingSource);
                    sink.close();
                    emitter.onNext(new DownloadProgress<>(saveLocation));
                    emitter.onCompleted();
                } catch (IOException e) {
                    // RxJava1: emitter.onError(e);
                    emitter.tryOnError(e);
                }
            }, Emitter.BackpressureMode.LATEST));
}
  

fromEmitter() это недавнее дополнение к RxJava, в настоящее время экспериментальное, но работает очень хорошо (недавно оно было переименовано, вызывалось fromAsync() раньше).

Затем вы можете использовать его следующим образом:

     String filename = "yourFileName";
    downloadFile(filename)
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(fileDownloadProgress -> {
                float progress = fileDownloadProgress.getProgress();
                // TODO update UI
            })
            .filter(DownloadProgress::isDone)
            .map(DownloadProgress::getData)
            .subscribe(file -> {
                // file downloaded
            }, throwable -> {
                // error
            });
  

Это должно сработать (я не тестировал его), но не позволяет отменить текущий http-вызов, если вы отписываетесь.

Если вы можете изменить свою службу загрузки, я бы предпочел обработать это через OkHttp, вдохновленный примером рецепта здесь:

 public Observable<DownloadProgress<File>> downloadFile(@NonNull final String url, @NonNull final File saveLocation) {
    return Observable.fromEmitter(emitter -> {
        Request request = new Request.Builder()
                .url(url)
                .build();

        final ProgressListener progressListener = (bytesRead, contentLength, done) -> {
            // range [0,1]
            float progress = done ? 1f : (float) bytesRead / contentLength;
            emitter.onNext(new DownloadProgress<>(progress));
        };

        OkHttpClient client = new OkHttpClient.Builder()
                .addNetworkInterceptor(chain -> {
                    Response originalResponse = chain.proceed(chain.request());
                    return originalResponse.newBuilder()
                            .body(new ProgressResponseBody(originalResponse.body(), progressListener))
                            .build();
                })
                .build();

        final Call call = client.newCall(request);
        emitter.setCancellation(() -> call.cancel());

        try {
            Response response = call.execute();
            BufferedSink sink = Okio.buffer(Okio.sink(saveLocation));
            sink.writeAll(response.body().source());
            sink.close();
            emitter.onNext(new DownloadProgress<>(saveLocation));
            emitter.onCompleted();
        } catch (IOException e) {
            emitter.onError(e);
        }
    }, Emitter.BackpressureMode.LATEST);
}
  

Для этого вам также понадобится это:

 public static class ProgressResponseBody extends ResponseBody {

    private final ResponseBody responseBody;
    private final ProgressListener progressListener;
    private BufferedSource bufferedSource;

    public ProgressResponseBody(ResponseBody responseBody, ProgressListener progressListener) {
        this.responseBody = responseBody;
        this.progressListener = progressListener;
    }

    @Override
    public MediaType contentType() {
        return responseBody.contentType();
    }

    @Override
    public long contentLength() {
        return responseBody.contentLength();
    }

    @Override
    public BufferedSource source() {
        if (bufferedSource == null) {
            bufferedSource = Okio.buffer(source(responseBody.source()));
        }
        return bufferedSource;
    }

    private Source source(Source source) {
        return new ForwardingSource(source) {
            long totalBytesRead = 0L;

            @Override
            public long read(Buffer sink, long byteCount) throws IOException {
                long bytesRead = super.read(sink, byteCount);
                // read() returns the number of bytes read, or -1 if this source is exhausted.
                totalBytesRead  = bytesRead != -1 ? bytesRead : 0;
                progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
                return bytesRead;
            }
        };
    }
}

interface ProgressListener {
    void update(long bytesRead, long contentLength, boolean done);
}
  

Использование:

     String filename = "yourFileName";
    String url = "http://your.url.here";
    File saveLocation = new File(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOWNLOADS).getAbsoluteFile(), filename);
    downloadFile(url, saveLocation)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnNext(fileDownloadProgress -> {
                float progress = fileDownloadProgress.getProgress();
                // TODO update UI
            })
            .filter(DownloadProgress::isDone)
            .map(DownloadProgress::getData)
            .subscribe(file -> {
                // file downloaded
            }, throwable -> {
                // error
            });
  

Комментарии:

1. вау, моя глубочайшая благодарность за то, что нашли время, чтобы предоставить такой подробный пример. Спасибо.

2. @Markus не могли бы вы опубликовать рабочий пример для приведенного выше ответа. это было бы действительно полезно.

3. Кто-нибудь использует это решение? Выглядит действительно здорово. Спасибо @Markus!

4. Некоторые обновления: метод RxJava2 create работает так же, как fromEmitter() и раньше. Я сожалею о своем предложении использовать doOnNext() : вместо этого обрабатывайте все при подписке. Если вам нужно обрабатывать отдельно прогресс и выполненное, используйте share() и подписывайтесь дважды с подходящими фильтрами.

5. Кстати, я отредактировал сообщение: onError должно быть tryOnError() . В противном случае он может попытаться выдать ошибку во время удаления нисходящего потока, что приведет к невозможности доставки исключения и -> сбою.