#haskell #compression #gzip
#haskell #сжатие #gzip
Вопрос:
У меня есть вычисление, которое наряду с другими вещами генерирует некоторые данные (их много), и я хочу записать в файл.
Способ структурирования кода сейчас (упрощенный):
writeRecord :: Handle -> Record -> IO ()
writeRecord h r = hPutStrLn h (toByteString r)
Затем эта функция периодически вызывается во время больших вычислений. Это почти как журнал, и фактически одновременно записывается несколько файлов.
Теперь я хочу, чтобы выходной файл был сжат с помощью Gzip
. На языках, подобных Java, я бы сделал что-то вроде:
outStream = new GzipOutputStream(new FileOutputStream(path))
а затем просто записал бы в этот обернутый выходной поток.
Как это делается в Haskell? Я думаю, что написание чего-то вроде
writeRecord h r = hPut h ((compressed . toByteString) r)
неверно, потому что сжатие каждого маленького бита по отдельности неэффективно (я даже пробовал это, и размер сжатого файла больше, чем несжатого таким образом).
Я также не думаю, что я могу просто создать lazy ByteString
(или даже список фрагментов), а затем записать его с помощью compressed . fromChunks
, потому что для этого потребуется, чтобы мой «генератор» полностью собрал все в памяти. И тот факт, что одновременно создается более одного файла, делает это еще более сложным.
Итак, каким был бы способ решить эту проблему в Haskell? Запись в файлы и их архивирование?
Комментарии:
1. Пакет pipes может быть тем, что вам нужно. Вы можете создавать потоки данных от производителей к потребителю и преобразовывать данные между ними (с помощью каналов). Однако я недостаточно знаю пакет, чтобы написать ответ об этом.
2. Каналы / Conduit могли быть решением. Однако это может иметь дополнительные сложности, связанные с «разветвлением»: предположим, что один «генератор» хочет выполнить запись в 5 разных файлов (разные данные, разная скорость), следовательно, существует 1 производитель и 5 потребителей… Не уверен, что это будет хорошо моделировать. Также выглядит немного излишним для такого простого (как кажется) требования?
3. Что открывает дескриптор, в который вы записываете? Это то место, куда вы хотите вставить «канал» через ваш компрессор, точно так же, как в вашем примере Java.
4. @chepner Я контролирую открытие дескриптора. И это было бы идеально, если бы я мог «пропустить» это через компрессор, но не могли бы вы указать мне, как это можно сделать?
5. Все библиотеки потоковой передачи поддерживают знакомые формы сжатия, см., например hackage.haskell.org/package/io-streams-1.3.5.0/docs / … поскольку он в некоторой степени вдохновлен Java. Но я думаю, вы это понимаете. Невозможно дать совет, потому что вы недостаточно говорите о том, как «многие файлы» попадают в картину.
Ответ №1:
Все библиотеки потоковой передачи поддерживают сжатие. Если я понимаю конкретную проблему и то, как вы об этом думаете, io-streams
это может быть самым простым для ваших целей. Здесь я чередую запись в trump
и clinton
выходные потоки, которые записываются в виде сжатых файлов. Далее я показываю pipes
эквивалент программы Майкла conduit
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package io-streams
{-# LANGUAGE OverloadedStrings #-}
import qualified System.IO.Streams as IOS
import qualified System.IO as IO
import Data.ByteString (ByteString)
analyzer :: IOS.OutputStream ByteString -> IOS.OutputStream ByteString -> IO ()
analyzer clinton trump = do
IOS.write (Just "This is a stringn") clinton
IOS.write (Just "This is a stringn") trump
IOS.write (Just "Clinton stringn") clinton
IOS.write (Just "Trump stringn") trump
IOS.write (Just "Another Clinton stringn") clinton
IOS.write (Just "Another Trump stringn") trump
IOS.write Nothing clinton
IOS.write Nothing trump
main:: IO ()
main =
IOS.withFileAsOutput "some-file-clinton.txt.gz" $ clinton_compressed ->
IOS.withFileAsOutput "some-file-trump.txt.gz" $ trump_compressed -> do
clinton <- IOS.gzip IOS.defaultCompressionLevel clinton_compressed
trump <- IOS.gzip IOS.defaultCompressionLevel trump_compressed
analyzer clinton trump
Очевидно, что вы можете смешивать все виды IO
in analyzer
между актами записи в два выходных потока — я просто показываю в write
s, так сказать. В частности, если analyzer
понимается как зависящий от входного потока, write
s могут зависеть от read
s из входного потока. Вот (немного!) более сложная программа, которая делает это. Если я запускаю приведенную выше программу, я вижу
$ stack gzip_so.hs
$ gunzip some-file-clinton.txt.gz
$ gunzip some-file-trump.txt.gz
$ cat some-file-clinton.txt
This is a string
Clinton string
Another Clinton string
$ cat some-file-trump.txt
This is a string
Trump string
Another Trump string
С помощью pipes и conduit существуют различные способы достижения вышеуказанного эффекта, с более высоким уровнем разложения частей. Запись в отдельные файлы, однако, будет немного сложнее. Здесь в любом случае используется pipes-эквивалент программы conduit Майкла С.:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import System.IO (IOMode(..), withFile, Handle)
import Pipes
import qualified Pipes.ByteString as PB
import qualified Pipes.GZip as P
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a stringn"
-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
str <- someAction
hPutStr h str
producerPipe :: MonadIO m => Producer ByteString m ()
producerPipe = do
str <- liftIO someAction
yield str
main :: IO ()
main = withFile "some-file-pipes.txt.gz" WriteMode $ h ->
runEffect $ P.compress P.defaultCompression producerPipe >-> PB.toHandle h
— Редактировать
Вот, для чего это стоит, еще один способ наложения нескольких производителей на один поток с помощью pipes или conduit, в дополнение к различным подходам, упомянутым Майклом С и danidiaz
:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Pipes
import Pipes.GZip
import qualified Pipes.Prelude as P
import qualified Pipes.ByteString as Bytes
import System.IO
import Control.Monad (replicateM_)
producer = replicateM_ 50000 $ do
marie "This is going to Marien" -- arbitary IO can be interspersed here
arthur "This is going to Arthurn" -- with liftIO
sylvia "This is going to Sylvian"
where
marie = yield; arthur = lift . yield; sylvia = lift . lift . yield
sinkHelper h p = runEffect (compress bestSpeed p >-> Bytes.toHandle h)
main :: IO ()
main =
withFile "marie.txt.gz" WriteMode $ marie ->
withFile "arthur.txt.gz" WriteMode $ arthur ->
withFile "sylvia.txt.gz" WriteMode $ sylvia ->
sinkHelper sylvia
$ sinkHelper arthur
$ sinkHelper marie
$ producer
Это довольно просто и быстро, и может быть записано в conduit с очевидными изменениями — но для того, чтобы найти это естественным, требуется более высокий уровень поддержки с точки зрения «стека монадных трансформаторов». Это был бы самый естественный способ написания такой программы с точки зрения чего-то вроде streaming
библиотеки.
Ответ №2:
Сделать это с помощью conduit довольно просто, хотя вам потребуется немного скорректировать свой код. Я собрал пример кода before и after, чтобы продемонстрировать это. Основная идея заключается в:
- Заменить
hPutStr h
наyield
- Добавьте несколько
liftIO
оберток - Вместо использования
withBinaryFile
или подобного, используйтеrunConduitRes
,gzip
иsinkFile
Вот пример:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes)
import Data.Conduit.Binary (sinkFile)
import Data.Conduit.Zlib (gzip)
import System.IO (Handle)
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a stringn"
-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
str <- someAction
hPutStr h str
-- Conduit version
producerConduit :: MonadIO m => ConduitM i ByteString m ()
producerConduit = do
str <- liftIO someAction
yield str
main :: IO ()
main = runConduitRes $ producerConduit
.| gzip
.| sinkFile "some-file.txt.gz"
Вы можете узнать больше о conduit в руководстве по conduit.
Ваша идея Java интересна, дайте мне еще несколько минут, я добавлю ответ, который больше похож на этот.
Редактировать
Вот версия, которая ближе к вашему подходу в стиле Java. Он основан на SinkFunc.hs
модуле, суть которого доступна по адресу: https://gist.github.com/snoyberg/283154123d30ff9e201ea4436a5dd22d
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wall -Werror #-}
import Data.ByteString (ByteString)
import Data.Conduit ((.|))
import Data.Conduit.Binary (sinkHandle)
import Data.Conduit.Zlib (gzip)
import System.IO (withBinaryFile, IOMode (WriteMode))
import SinkFunc (withSinkFunc)
-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a stringn"
producerFunc :: (ByteString -> IO ()) -> IO ()
producerFunc write = do
str <- someAction
write str
main :: IO ()
main = withBinaryFile "some-file.txt.gz" WriteMode $ h -> do
let sink = gzip .| sinkHandle h
withSinkFunc sink $ write -> producerFunc write
ОТРЕДАКТИРУЙТЕ 2, еще один для пущей убедительности, фактически используемый ZipSink
для потоковой передачи данных в несколько разных файлов. Существует множество различных способов нарезки, но это один из способов, который работает:
#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes, ZipSink (..))
import Data.Conduit.Binary (sinkFile)
import qualified Data.Conduit.List as CL
import Data.Conduit.Zlib (gzip)
data Output = Foo ByteString | Bar ByteString
fromFoo :: Output -> Maybe ByteString
fromFoo (Foo bs) = Just bs
fromFoo _ = Nothing
fromBar :: Output -> Maybe ByteString
fromBar (Bar bs) = Just bs
fromBar _ = Nothing
producer :: Monad m => ConduitM i Output m ()
producer = do
yield $ Foo "This is going to Foo"
yield $ Bar "This is going to Bar"
sinkHelper :: MonadResource m
=> FilePath
-> (Output -> Maybe ByteString)
-> ConduitM Output o m ()
sinkHelper fp f
= CL.mapMaybe f
.| gzip
.| sinkFile fp
main :: IO ()
main = runConduitRes
$ producer
.| getZipSink
(ZipSink (sinkHelper "foo.txt.gz" fromFoo) *>
ZipSink (sinkHelper "bar.txt.gz" fromBar))
Комментарии:
1. Я не видел комментариев раньше, я просто отвечал на первоначальный вопрос. Если это действительно то, что ищет спрашивающий, это должно включать что-то вроде
ZipSink
в conduit, или этот другой подход, который я описываю прямо сейчас в качестве доказательства концепции.2. Кроме того, вообще говоря, даже если что-то «простое», новичку в библиотеке обычно намного проще создать рабочий пример.
Ответ №3:
Для инкрементного сжатия, я думаю, вы могли бы использовать compressIO
/ foldCompressStream
в Codec.Compression.Zlib.Internal
.
Если вы можете представить свое действие producer как IO (Maybe a)
(например, MVar
take или InputStream
/ Chan
read), где Nothing
означает конец ввода, что-то вроде этого должно сработать:
import System.IO (Handle)
import qualified Data.ByteString as BS
import qualified Codec.Compression.Zlib.Internal as ZLib
compressedWriter :: Handle -> (IO (Maybe BS.ByteString)) -> IO ()
compressedWriter handle source =
ZLib.foldCompressStream
(next -> source >>= maybe (next BS.empty) next)
(chunk next -> BS.hPut handle chunk >> next)
(return ())
(ZLib.compressIO ZLib.rawFormat ZLib.defaultCompressParams)
Ответ №4:
Это решение похоже на EDIT 2 Майкла Сноймана, но использует пакеты foldl, pipes, pipes-zlib и streaming-eversion.
{-# language OverloadedStrings #-}
module Main where
-- cabal install bytestring foldl pipes pipes-zlib streaming-eversion
import Data.Foldable
import Data.ByteString
import qualified Control.Foldl as L
import Pipes
import qualified Pipes.Prelude
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import Streaming.Eversion.Pipes (transvertMIO)
import System.IO
type Tag = String
producer :: Monad m => Producer (Tag,ByteString) m ()
producer = do
yield $ ("foo","This is going to Foo")
yield $ ("bar","This is going to Bar")
foldForTag :: Handle -> Tag -> L.FoldM IO (Tag,ByteString) ()
foldForTag handle tag =
L.premapM ((tag',bytes) -> if tag' == tag then Just bytes else Nothing)
. L.handlesM L.folded
. transvertMIO (compress defaultCompression defaultWindowBits)
$ L.mapM_ (Data.ByteString.hPut handle)
main :: IO ()
main = do
withFile "foo.txt" WriteMode $ h1 ->
withFile "bar.txt" WriteMode $ h2 ->
let multifold = traverse_ (uncurry foldForTag) [(h1,"foo"),(h2,"bar")]
in L.impurely Pipes.Prelude.foldM multifold producer
Комментарии:
1.Я добавил
Streaming.Zip
сgzip
github.com/michaelt/streaming-utils/blob/master/Streaming /…2. @Michael Это здорово, спасибо. Я не знал о существовании «streaming-utils».
Ответ №5:
Это решение похоже на EDIT 2 Майкла Сноймана, но использует пакеты streaming, streaming-bytestring, pipes и pipes-zlib.
{-# language OverloadedStrings #-}
module Main where
-- cabal install bytestring streaming streaming-bytestring pipes pipes-zlib
import Data.ByteString
import qualified Data.ByteString.Streaming as B
import Streaming
import qualified Streaming.Prelude as S
import Pipes (next)
import qualified Pipes.Prelude
import Pipes.Zlib (compress,defaultCompression,defaultWindowBits)
import System.IO
type Tag = String
producer :: Monad m => Stream (Of (Tag,ByteString)) m ()
producer = do
S.yield ("foo","This is going to Foo")
S.yield ("bar","This is going to Bar")
-- I couldn't find a streaming-zlib on Hackage, took a pipes detour
compress' :: MonadIO m
=> Stream (Of ByteString) m r -> Stream (Of ByteString) m r
compress' = S.unfoldr Pipes.next
. compress defaultCompression defaultWindowBits
. Pipes.Prelude.unfoldr S.next
keepTag :: Monad m
=> Tag -> Stream (Of (Tag,ByteString)) m r -> Stream (Of ByteString) m r
keepTag tag = S.map snd . S.filter ((tag==) . fst)
main :: IO ()
main = runResourceT
. B.writeFile "foo.txt" . B.fromChunks . compress' . keepTag "foo"
. B.writeFile "bar.txt" . B.fromChunks . compress' . keepTag "bar"
$ S.copy producer
Я использую функцию копирования из потоковой передачи.Прелюдия, которая позволяет вам
Продублируйте содержимое stream, чтобы с ним можно было работать дважды разными способами, но без прерывания потоковой передачи.