rxpy эффективно собирает наблюдаемые

#python #rx-py

#python #rx-py

Вопрос:

Вступление: Привет. Я изучаю библиотеку python rxpy для своего варианта использования, где я создаю конвейер выполнения, используя концепции реактивного программирования. Таким образом, я ожидаю, что мне не придется манипулировать слишком многими состояниями. Хотя мое решение кажется функциональным, но у меня возникают проблемы при попытке создать новую наблюдаемую из других наблюдаемых.

Проблема в том, что способ, которым я составляю свои наблюдаемые, приводит к повторению некоторых дорогостоящих вычислений дважды. Для повышения производительности я действительно хочу предотвратить запуск дорогостоящих вычислений.

Я очень новичок в реактивном программировании. Попытка почесать голову и просмотреть интернет-ресурсы и справочную документацию — кажется мне слишком кратким для понимания. Пожалуйста, посоветуйте.

Ниже приведен игрушечный пример, который иллюстрирует, что я делаю:

 import rx
from rx import operators as op
from rx.subject import Subject

root = Subject()

foo = root.pipe(
        op.map( lambda x : x   1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r)))
    )

bar_foo = foo.pipe(
        op.map( lambda x : x * 2 ),
        op.do_action(lambda r: print("bar(foo(x)) = %s" % str(r)))
    )

bar_foo.pipe(
        op.zip(foo),
        op.map(lambda i: i[0] i[1]),
        op.do_action(lambda r: print("foo(x)   bar(foo(x)) = %s" % str(r)))
    ).subscribe()


print("-------------")
root.on_next(10)
print("-------------")
  

Вывод:

 -------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) = 11 (expensive)
foo(x)   bar(foo(x)) = 33
-------------
  

Вы могли бы подумать о foo() и bar() быть дорогостоящими и сложными операциями. Сначала я создаю наблюдаемый foo объект. Затем создайте новую наблюдаемую bar_foo , которая включает foo . Позже оба архивируются вместе для вычисления конечного результата foo(x) bar(foo(x)) .

Вопрос:

  1. Что я могу сделать foo() , чтобы предотвратить запуск более одного раза для одного ввода? У меня есть действительно веские причины для сохранения foo() и bar() разделения. Кроме того, я также не хочу явно foo() запоминать.

  2. Любой, у кого есть опыт использования rxpy в производстве, может поделиться своим опытом. Приведет ли использование rxpy к повышению производительности или замедлению по сравнению с эквивалентным кодом, созданным вручную (но не поддерживаемым)?

Ответ №1:

op.share() Здесь может быть полезно добавление сразу после дорогостоящего вычисления в foo конвейере. Итак, меняем foo конвейер на:

 foo = root.pipe(
        op.map( lambda x : x   1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r))),
        op.share() # added to pipeline
    )
  

приведет к:

 -------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x)   bar(foo(x)) = 33
-------------
  

Я считаю, что это приводит к тому, что .share() передаваемые события дорогостоящей операции распределяются между нижестоящими подписчиками, так что результат одного дорогостоящего вычисления можно использовать несколько раз.

Что касается вашего второго вопроса; Я также новичок в RxPy, поэтому заинтересован в ответе более опытных пользователей. До сих пор я замечал, что, будучи новичком, вы можете легко создавать (плохие) конвейеры, в которых сообщения и вычисления повторяются в фоновом режиме. .share() кажется, это в некоторой степени уменьшает это, но не уверен в том, что происходит в фоновом режиме.