Как отправить поток байтов с помощью RxJava?

#java #android #kotlin #rx-java #rx-java2

#java #Android #kotlin #rx-java #rx-java2

Вопрос:

Существует подобный код, который отлично передает строку на сервер и считывает строку ответа.

 val thread = Thread {
        try {
            val port = 8888
            println("Try to open connection:$port")
            val socket = Socket("192.168.0.104", port)
            println("Connection is created")
            val pw = PrintWriter(socket.getOutputStream(), true)
            val br = BufferedReader(InputStreamReader(socket.getInputStream()))
            pw.println("Hello from client 1")
            Log.e("Server answer", br.readLine())
            pw.close()
            br.close()
            socket.close()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    thread.start()
 

Я пытаюсь сделать то же самое, используя RxJava

 fun connectToServer() {
    val TAG = "Server answer"
        connectToServerWrapper()
            .observeOn(Schedulers.io())
            .subscribeBy (onSuccess = {
                Log.e(TAG, it)
            }, onError = {
                Log.e(TAG, it.toString())
            })
            .addTo(compositeDisposable)
}

fun connectToServerWrapper(): Single<String> {
    return Single.create { emitter ->
        val port = 8888
        println("Try to open connection:$port")
        val socket = Socket(SERVER_IP, port)
        println("Connection is created")
        val pw = PrintWriter(socket.getOutputStream(), true)
        val br = BufferedReader(InputStreamReader(socket.getInputStream()))
        pw.println("Hello from client 1")
        emitter.onSuccess(br.readLine())
        pw.close()
        br.close()
        socket.close()
    }
}
 

Android.os.NetworkOnMainThreadException возникает в onError (). Что я делаю не так?

Ответ №1:

Вы не должны выполнять свои сетевые запросы в основном потоке

 fun connectToServer() {
    val TAG = "Server answer"
        connectToServerWrapper()
            .subscribeOn(Schedulers.io())
            .subscribeBy (onSuccess = {
                Log.e(TAG, it)
            }, onError = {
                Log.e(TAG, it.toString())
            })
            .addTo(compositeDisposable)
}