#kotlin #spring-cloud #spring-webflux #reactive #spring-cloud-gateway
#kotlin #spring-cloud #spring-webflux #реактивный #spring-cloud-gateway
Вопрос:
У меня есть фильтр POST Gateway, который я хочу изменить в теле ответа с помощью ответа на отдельный запрос webclient в фильтре gateway. Я могу дойти до отправки WebClient().create().post().exchange()
внутри моего пользовательского фильтра. Я вижу это в своих журналах
onStateChange(POST{uri=/test, connection=PooledConnection{channel=[id: 0x38eb2b4f, L:/127.0.0.1:51643 - R:localhost/127.0.0.1:9000]}}, [request_prepared])
и
onStateChange(POST{uri=/test, connection=PooledConnection{channel=[id: 0x38eb2b4f, L:/127.0.0.1:51643 - R:localhost/127.0.0.1:9000]}}, [request_sent])
здесь соединение зависает и не завершается.
вот мой пользовательский код фильтра
class PostProxyGatewayFilterFactory : AbstractGatewayFilterFactory<PostProxyGatewayFilterFactory.Params>(Params::class.java) {
override fun apply(params: Params): GatewayFilter {
val cachedBody = StringBuilder()
return GatewayFilter { exchange: ServerWebExchange, chain: GatewayFilterChain ->
chain.filter(exchange).then(
executeRequest(cachedBody,exchange, params)
.map {
val mr = ResponseHandler(exchange)
mr.mutateResponse(it.body.toString())
}.flatMap{
it
}
) }
}
data class Params(
val urlPath: String = "",
)
private fun cache(cachedBody: StringBuilder, buffer: DataBuffer) {
cachedBody.append(Charsets.UTF_8.decode(buffer.asByteBuffer())
.toString())
}
private fun executeRequest(cachedBody: StringBuilder, exchange: ServerWebExchange, params: PostProxyGatewayFilterFactory.Params): Mono<ResponseEntity<JsonNode>>{
val request = when(exchange.request.method){
HttpMethod.PUT -> WebClient.create().put().uri(params.urlPath).body(BodyInserters.fromDataBuffers(exchange.request.body.doOnNext{ cache(cachedBody, it)}))
HttpMethod.POST -> WebClient.create().post().uri(params.urlPath).body(BodyInserters.fromDataBuffers(exchange.request.body.doOnNext{ cache(cachedBody, it)}))
HttpMethod.GET -> WebClient.create().get().uri(params.urlPath)
HttpMethod.DELETE -> WebClient.create().delete().uri(params.urlPath)
else -> throw Exception("Invalid request method passed in to the proxy filter")
}
return request.headers { it.addAll(exchange.request.headers) }
.exchange()
.flatMap{
it.toEntity(JsonNode::class.java)
}
}
}
вот мой класс ResponseHandler
class ResponseHandler(val delegate: ServerWebExchange) {
fun mutateResponse(body: String): Mono<Void> {
val bytes: ByteArray = body.toByteArray(StandardCharsets.UTF_8)
val buffer: DataBuffer = delegate.response.bufferFactory().wrap(bytes)
return delegate.response.writeWith(Flux.just(buffer))
}
}
вот application.yml
- id: proxy
uri: http://${HOST:localhost}:${PORT:9000}
predicates:
- Path=/proxy/**
filters:
- RewritePath=/test(?<segment>/?.*), ${segment}
- name: PostProxy
args:
proxyBasePath: http://localhost:9000/thisSecond
Итак, идея состоит в том, чтобы отправить запрос на localhost:9001/test/thisFirst
(прокси localhost:9000/thisFirst
, который выполняется успешно), получить этот ответ обратно и ничего не делать с этим ответом, отправить WebClient
запрос на localhost:9000/thisSecond
via executeRequest()
, вернуть этот ответ, а затем использовать этот ответ в качестве нового exchange.response
тела. Я также не уверен ResponseHandler
, правильно ли это, поскольку executeRequest()
никогда не заканчивается. Это будет частью 2 вопроса, как только я смогу решить, почему executeRequest()
никогда не заканчивается.