Пользовательский GatewayFilter Spring Cloud — изменение POST-фильтра ответа с результатами другого запроса клиента внутри фильтра

#kotlin #spring-cloud #spring-webflux #reactive #spring-cloud-gateway

#kotlin #spring-cloud #spring-webflux #реактивный #spring-cloud-gateway

Вопрос:

У меня есть фильтр POST Gateway, который я хочу изменить в теле ответа с помощью ответа на отдельный запрос webclient в фильтре gateway. Я могу дойти до отправки WebClient().create().post().exchange() внутри моего пользовательского фильтра. Я вижу это в своих журналах

 onStateChange(POST{uri=/test, connection=PooledConnection{channel=[id: 0x38eb2b4f, L:/127.0.0.1:51643 - R:localhost/127.0.0.1:9000]}}, [request_prepared])
 

и

 onStateChange(POST{uri=/test, connection=PooledConnection{channel=[id: 0x38eb2b4f, L:/127.0.0.1:51643 - R:localhost/127.0.0.1:9000]}}, [request_sent])
 

здесь соединение зависает и не завершается.

вот мой пользовательский код фильтра

 class PostProxyGatewayFilterFactory : AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
    override fun apply(params: Params): GatewayFilter {
        val cachedBody = StringBuilder()
        return GatewayFilter { exchange: ServerWebExchange, chain: GatewayFilterChain ->
            chain.filter(exchange).then(
                    executeRequest(cachedBody,exchange, params)
                            .map {
                                val mr = ResponseHandler(exchange)
                                mr.mutateResponse(it.body.toString())
                            }.flatMap{
                                it
                            }
            ) }
    }

    data class Params(
            val urlPath: String = "",
    )

    private fun cache(cachedBody: StringBuilder, buffer: DataBuffer) {
        cachedBody.append(Charsets.UTF_8.decode(buffer.asByteBuffer())
                .toString())
    }
    private fun executeRequest(cachedBody: StringBuilder, exchange: ServerWebExchange, params: PostProxyGatewayFilterFactory.Params): Mono<ResponseEntity<JsonNode>>{
        val request = when(exchange.request.method){
            HttpMethod.PUT -> WebClient.create().put().uri(params.urlPath).body(BodyInserters.fromDataBuffers(exchange.request.body.doOnNext{ cache(cachedBody, it)}))
            HttpMethod.POST -> WebClient.create().post().uri(params.urlPath).body(BodyInserters.fromDataBuffers(exchange.request.body.doOnNext{ cache(cachedBody, it)}))
            HttpMethod.GET -> WebClient.create().get().uri(params.urlPath)
            HttpMethod.DELETE -> WebClient.create().delete().uri(params.urlPath)
            else -> throw Exception("Invalid request method passed in to the proxy filter")
        }
        return request.headers { it.addAll(exchange.request.headers) }
                .exchange()
                .flatMap{
                    it.toEntity(JsonNode::class.java)
                }
    }
}
 

вот мой класс ResponseHandler

 class ResponseHandler(val delegate: ServerWebExchange) {
     fun mutateResponse(body: String): Mono<Void> {
        val bytes: ByteArray = body.toByteArray(StandardCharsets.UTF_8)
        val buffer: DataBuffer = delegate.response.bufferFactory().wrap(bytes)
        return delegate.response.writeWith(Flux.just(buffer))
    }
}
 

вот application.yml

 - id: proxy
  uri: http://${HOST:localhost}:${PORT:9000}
  predicates:
  - Path=/proxy/**
  filters:
  - RewritePath=/test(?<segment>/?.*), ${segment}
  - name: PostProxy
  args:
    proxyBasePath: http://localhost:9000/thisSecond
 

Итак, идея состоит в том, чтобы отправить запрос на localhost:9001/test/thisFirst (прокси localhost:9000/thisFirst , который выполняется успешно), получить этот ответ обратно и ничего не делать с этим ответом, отправить WebClient запрос на localhost:9000/thisSecond via executeRequest() , вернуть этот ответ, а затем использовать этот ответ в качестве нового exchange.response тела. Я также не уверен ResponseHandler , правильно ли это, поскольку executeRequest() никогда не заканчивается. Это будет частью 2 вопроса, как только я смогу решить, почему executeRequest() никогда не заканчивается.