#haskell #stream #continuations #deriving #derivingvia
#haskell #поток #продолжения #вывод #derivingvia
Вопрос:
Я пытаюсь создать потоковую библиотеку, используя абстракции, описанные в статье «Более быстрые конвейеры сопрограмм». Я изменил код так, чтобы он правильно обрабатывал выход из конвейера (вместо того, чтобы выдавать ошибки, когда это происходит):
-- | r: return type of the continuation, i: input stream type, o: output stream type,
-- m: underlying monad, a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
deriving
( Functor,
Applicative,
Monad
)
via (Cont (Result r m i o))
type Result r m i o = InCont r m i -> OutCont r m o -> m r
newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}
newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}
suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont ok -> k ik ok
suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont v ik -> k v ik ok
emptyIk :: InCont r m a
emptyIk = MakeInCont ok -> resumeOut ok Nothing emptyIk
await :: ContPipe r i o m (Maybe i)
await = MakePipe k ik ok -> resumeIn ik (suspendOut k ok)
yield :: o -> ContPipe r i o m ()
yield v = MakePipe k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)
(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe k ik ok ->
runPipe
q
(a _ ok' -> k a emptyIk ok')
(suspendIn (runPipe p (() -> f)) ik)
ok
where
f :: Result r m i e
f _ ok = resumeOut ok Nothing emptyIk
runContPipe :: forall m a. Applicative m => ContPipe a () Void m a -> m a
runContPipe p = runPipe p (a _ _ -> pure a) ik ok
where
ik :: InCont a m ()
ik = MakeInCont ok' -> resumeOut ok' (Just ()) ik
ok :: OutCont a m Void
ok = MakeOutCont _ ik' -> resumeIn ik' ok
Я хотел бы реализовать функцию
fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
Которая объединяет два потока потребителей в один (аналогично conduit ZipSink
). Он должен иметь следующую семантику:
- Если оба потока не завершены и принимают входные данные, передайте одно и то же входное значение в оба потока
- Если один поток вышел, сохраните возвращаемое значение, а затем передайте входные данные в поток, который принимает значение
- Если оба потока завершились, завершите работу с возвращаемым значением обоих потоков, помещенным в кортеж.
Вот моя попытка:
Мы повторно loop
используем функцию в документе, которая соединяет an InCont r m i
с two OutCont r m i
и активно возобновляет продолжения.
loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
resumeIn ik $ MakeOutCont v ik' ->
resumeOut ok1 v $ MakeInCont ok1' ->
resumeOut ok2 v $ MakeInCont ok2' -> loop ik' ok1' ok2'
Если loop
мы можем подключить вход результирующего канала к двум каналам одновременно, выходные данные будут разделены между двумя каналами (это не имеет большого значения, поскольку вы не можете получить a Void
).
fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = _
g :: b -> Result r m i Void
g b ik' ok' = _
in runPipe
p
f
(MakeInCont ok1 -> runPipe q g (MakeInCont ok2 -> loop ik ok1 ok2) ok)
ok
Now we just need to fill in the continuations f
and g
which will be called by p
and q
when they exit.
If g
has already been called when f
was called, which means q
has exited, then f
should call the continuation k
, if g
hasn’t been called yet, then f
should store the return value a
and resume the input continuation (by discarding all of the values passed)
It seems to me that it’s not possible to achieve this without some form of shared state. And we could try to store the state in m
using a state monad:
fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Left a))
resumeIn ik' sinkOk
Just (Right b) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
g :: b -> Result r m i Void
g b ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Right b))
resumeIn ik' sinkOk
Just (Left a) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
in runPipe
p
f
(MakeInCont ok1 -> runPipe q g (MakeInCont ok2 -> loop ik ok1 ok2) ok)
ok
sinkOk
является продолжением вывода, которое отбрасывает все его входные данные:
sinkOk :: OutCont r m o
sinkOk = MakeOutCont _ ik -> resumeIn ik sinkOk
теперь мы могли бы добавить некоторые вспомогательные функции для тестирования:
print' :: MonadIO m => Show i => ContPipe r i o m ()
print' = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
lift $ liftIO (print i)
print'
upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
yield i
upfrom (i 1)
take' :: Int -> ContPipe r i i m ()
take' n
| n <= 0 = pure ()
| otherwise = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
yield i
take' (n - 1)
Это работает в случае, когда p
завершается раньше, чем q
:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' print'
дает желаемый результат:
1
1
2
2
3
3
((),())
Но он переходит в бесконечные циклы при q
выходе раньше, чем p
:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')
выводит:
1
1
2
2
<loops>
Комментарии:
1. Что должно произойти, если
fork
получает два «пустых» источника, которые никогда ничего не производят?2. Нет источников, которые никогда ничего не будут производить. Способ
.|
работает так: если восходящий поток вышел раньше, замените восходящий поток источником, который выдает толькоNothing
s (emptyIk
) .