Spring webflux — AOP Как использовать аспект таким образом, чтобы он обрабатывался перед вызовом операции блокировки (передавался в фоновый поток)

#aop #spring-webflux #project-reactor

#aop #spring-webflux #проект-реактор

Вопрос:

У меня есть приложение Restful, написанное на spring webflux.

В моем случае использования я должен вызвать операцию блокировки, которая выполнит некоторую обработку. Блокирующий вызов выполняется в отдельном пуле потоков. ( выполняется путем вызова операции блокировки внутри вызываемого объекта)
Я добавил аспект к методу myServiceMethod , в котором a Flatmap выполняет некоторые преобразования. Теперь целью добавления aspect здесь было зафиксировать общее время, затраченное rest api (исключая время, затраченное blockMethod ). Но реализация каким-то образом обрабатывается таким образом, что она регистрирует данные только после обработки блокирующего кода. Что здесь можно исправить, чтобы убедиться, что аспект обрабатывается до blockMethod вызова.

 private Mono<Result> myServiceMethod(String any1, String any2) {
  return myWebClient.get()
    .uri("http://I-just-wsnt-log-the-round-trip-time-fro-api-call.com/").bodyToMono(Result)
    .flatMap(someThing -> myCallable(someThing));
}

private Mono<Result> myCallable(int value) {
  return Mono.fromCallable(() -> blockMethod(value))
    .subscribeOn(this.scheduler);
}
  

* Примечание: приведенный выше код может быть немного ошибочным, но надеюсь, что все в порядке, если он передает идею.
И мой код аспекта с использованием pointcut выглядит следующим образом :

 @Pointcut("execution(public * MyClass.myServiceMethod(..))")
public void custompoint() {}

@Around("custompoint()")
public Mono<String> calculateTimeAdStuff(ProceedingJoinPoint joinPoint) {
  return Mono.just(1)
    .flatMap(logTime(joinPoint));
}

// logging time taken by calling ProceedingJoinPoint.proceed
private Mono<String> logTime(ProceedingJoinPoint joinPoint) {
  Instant start = Instant.now();
  return ((Mono<Result>)joinPoint.proceed()).flatMap(() -> {
    String log = Instant.now().toMillis() - start);
    return Mono.just(log);
  });
}
  

Мой вопрос в том, как я могу это изменить, убедившись, что зарегистрированное время не включает время, затрачиваемое blockMethod .

Обновить

Я попробовал некоторые вещи с контекстом. Добавляю свои изменения здесь. Но, похоже, мои изменения, внесенные в контекст, не достигают совета, куда я звоню jointpoint.proceed .

 private Mono<Result> myServiceMethod(String any1, String any2) {
  return myWebClient.get()
    .uri("http://I-just-want-log-the-round-trip-time-for-api-call.com/").bodyToMono(Result)
    .flatMap(someThing -> myCallable(someThing))
    .subscriberContext(ctx -> ctx.put("timeTakenTest", 56000L ));
}

  

Вот код метода рекомендаций. Здесь значение key "timeTakenTest" всегда равно 0 (по умолчанию) при печати. очевидно, что запись, которую я добавил в subscriberContext в выполняемом методе, здесь недоступна.

.replacing with getOrDefault get` выдает ошибку, как показано ниже. очевидно, что он либо никогда не добавляется в контекст, либо где-то посередине контекст воссоздается, вызывая эту проблему.

 java.util.NoSuchElementException: Context does not contain key: timeTakenTest
  
 // logging time taken by calling ProceedingJoinPoint.proceed
private Mono<String> logTime(ProceedingJoinPoint joinPoint) {
  Instant start = Instant.now();
  return ((Mono<Result>)joinPoint.proceed())
  .zipWith(Mono.subscriberContext() 
                .map(c -> (Long)c.getOrDefault("timeTakenTest", 0L))).flatMap(responseData -> {
             System.out.println("Subscription time registered is : "   responseData.getT2());
     //something done here.
  });
}
  

Я думаю, что я хорошо следил за документацией.
В документе даже говорится :

subscriberContext (Context) объединяет контекст, который вы предоставляете, и контекст из нисходящего потока (помните, контекст распространяется от нижней части цепочки к вершине). Это делается с помощью вызова putAll, что приводит к созданию нового контекста для восходящего потока.

Пожалуйста, предложите.

Ссылка : https://projectreactor.io/docs/core/release/reference/#context.read

Комментарии:

1. размещение чего-либо в своем планировщике on не означает, что только этот единственный метод обрабатывается асинхронно в его собственном пуле потоков. На самом деле вы сообщаете серверу, что когда кто-то подписывается, он должен использовать потоки из назначенного планировщика, а не из стандартного цикла событий. Таким образом, весь запрос выполняется в собственном пуле потоков. onSubscribe весь планировщик используется для всей подписки.

2. итак, вы в основном говорите, что хотите регистрировать время запроса, исключая один метод. Ну, тогда вам нужно рассчитать время всего запроса, а затем рассчитать время только для этого конкретного метода, а затем вычесть время метода из общего времени.

3. @ThomasAndolf Я подумал ((Mono<Result>)joinPoint.proceed()) joinPoint.proceed() , может ли здесь помочь переключение оператора на just ? в этом случае следующий код flatmap также будет удален, и возвращаемый тип тоже может быть изменен на void, возможно.

4. @ThomasAndolf В вашем последнем комментарии, как я могу рассчитать время, затраченное на весь запрос, и время, затраченное определенным методом, чтобы оно было доступно в том же методе. Если я использую отдельный аспект для вычисления времени для конкретного метода, то как я смогу сопоставить, принадлежат ли 2 вычисленных времени одному и тому же запросу?

5. Поместив их в контекст