Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compress the output when writing to a file?

I have a computation that along with other things generates some data (a lot of it) and I want to write into a file.

The way the code is structured now is (simplified):

writeRecord :: Handle -> Record -> IO ()
writeRecord h r = hPutStrLn h (toByteString r)

This function is then called periodically during a bigger computation. It is almost like a log, and in fact, multiple files are being written simultaneously.

Now I want the output file to be compressed with Gzip. In languages like Java I would do something like:

outStream = new GzipOutputStream(new FileOutputStream(path)) 

and then would just write into that wrapped output stream.

What is the way of doing it in Haskell? I think writing something like

writeRecord h r = hPut h ((compressed . toByteString) r)

is not correct because compressing each small bit individually isn't efficient (I even tried it and the size of the compressed file is bigger than uncompressed this way).

I also don't think that I can just produce a lazy ByteString (or even a list of chunks) and then write it with compressed . fromChunks because this will require my "generator" building the full thing in memory. And the fact that more than one file is produced at the same time makes it even more complicated.

So what would be a way to solve this in Haskell? Writing to file(s) and have them gzipped?

like image 925
Alexey Raga Avatar asked Oct 20 '16 11:10

Alexey Raga


3 Answers

All the streaming libraries support compression. If I understand the particular problem and the way you are thinking about it, io-streams might be the simplest for your purposes. Here I alternate between writing to trump and clinton output streams, which are written as compressed files. I follow by showing the pipes equivalent of Michael's conduit program

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package io-streams
{-# LANGUAGE OverloadedStrings #-}

import qualified System.IO.Streams as IOS
import qualified System.IO as IO
import Data.ByteString (ByteString)

analyzer :: IOS.OutputStream ByteString -> IOS.OutputStream ByteString -> IO ()
analyzer clinton trump = do 
  IOS.write (Just "This is a string\n") clinton
  IOS.write (Just "This is a string\n") trump
  IOS.write (Just "Clinton string\n") clinton
  IOS.write (Just "Trump string\n") trump   
  IOS.write (Just "Another Clinton string\n") clinton
  IOS.write (Just "Another Trump string\n") trump   
  IOS.write Nothing clinton
  IOS.write Nothing trump

main:: IO ()
main = 
  IOS.withFileAsOutput "some-file-clinton.txt.gz" $ \clinton_compressed ->
  IOS.withFileAsOutput "some-file-trump.txt.gz" $ \trump_compressed -> do
     clinton <- IOS.gzip IOS.defaultCompressionLevel clinton_compressed
     trump <- IOS.gzip IOS.defaultCompressionLevel trump_compressed
     analyzer clinton trump

Obviously you can mix all kinds of IO in analyzer between acts of writing to the two output streams - I'm just show in the writes, so to speak. In particular, if analyzer is understood as depending on an input stream, the writes can depend on reads from the input stream. Here's a (slightly!) more complicated program that does that. If I run the program above I see

$ stack gzip_so.hs  
$ gunzip some-file-clinton.txt.gz 
$ gunzip some-file-trump.txt.gz 
$ cat some-file-clinton.txt 
This is a string
Clinton string
Another Clinton string
$ cat some-file-trump.txt 
This is a string
Trump string
Another Trump string

With pipes and conduit there are various ways of achieving the above effect, with a higher level of decomposition of parts. Writing to separate files will however be a little more subtle. Here in any case is the pipes equivalent of Michael S's conduit program:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc  --package pipes-zlib 
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import System.IO  (IOMode(..), withFile, Handle)
import Pipes  
import qualified Pipes.ByteString as PB
import qualified Pipes.GZip as P

-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"

-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
    str <- someAction
    hPutStr h str

producerPipe :: MonadIO m => Producer ByteString m ()
producerPipe = do
    str <- liftIO someAction
    yield str

main :: IO ()
main =  withFile "some-file-pipes.txt.gz"  WriteMode $ \h -> 
     runEffect $ P.compress P.defaultCompression producerPipe  >-> PB.toHandle h 

-- Edit

Here for what it's worth is yet another way of superimposing several producers on a single thread with pipes or conduit, to add to the different approaches Michael S and danidiaz mentioned:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package pipes-zlib
{-# LANGUAGE OverloadedStrings #-}
import Pipes
import Pipes.GZip
import qualified Pipes.Prelude as P
import qualified Pipes.ByteString as Bytes
import System.IO
import Control.Monad (replicateM_)

producer = replicateM_ 50000 $ do
    marie  "This is going to Marie\n"  -- arbitary IO can be interspersed here
    arthur "This is going to Arthur\n" -- with liftIO
    sylvia "This is going to Sylvia\n" 
  where 
    marie = yield; arthur = lift . yield; sylvia = lift . lift . yield

sinkHelper h p = runEffect (compress bestSpeed p >-> Bytes.toHandle h)

main :: IO ()
main =  
   withFile "marie.txt.gz" WriteMode $ \marie ->
   withFile "arthur.txt.gz"  WriteMode $ \arthur -> 
   withFile "sylvia.txt.gz"  WriteMode $ \sylvia ->
      sinkHelper sylvia
      $ sinkHelper arthur
      $ sinkHelper marie
      $ producer

It is quite simple and fast, and can be written in conduit with the obvious alterations - but finding it natural involves a higher level of buy-in with the 'monad transformer stack' point of view. It would be the most natural way of writing such a program from the point of view of something like the streaming library.

like image 137
Michael Avatar answered Oct 11 '22 03:10

Michael


Doing this with conduit is fairly straightforward, though you'd need to adjust your code a bit. I've put together an example of before and after code to demonstrate it. The basic idea is:

  • Replace hPutStr h with yield
  • Add some liftIO wrappers
  • Instead of using withBinaryFile or the like, use runConduitRes, gzip, and sinkFile

Here's the example:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString, hPutStr)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes)
import Data.Conduit.Binary (sinkFile)
import Data.Conduit.Zlib (gzip)
import System.IO (Handle)

-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"

-- Original version
producerHandle :: Handle -> IO ()
producerHandle h = do
    str <- someAction
    hPutStr h str

-- Conduit version
producerConduit :: MonadIO m => ConduitM i ByteString m ()
producerConduit = do
    str <- liftIO someAction
    yield str

main :: IO ()
main = runConduitRes $ producerConduit
                    .| gzip
                    .| sinkFile "some-file.txt.gz"

You can learn more about conduit in the conduit tutorial.

Your Java idea is interesting, give me a few more minutes, I'll add an answer that looks more like that.

EDIT

Here's a version that's closer to your Java style approach. It relies on a SinkFunc.hs module which is available as a Gist at: https://gist.github.com/snoyberg/283154123d30ff9e201ea4436a5dd22d

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wall -Werror #-}
import Data.ByteString (ByteString)
import Data.Conduit ((.|))
import Data.Conduit.Binary (sinkHandle)
import Data.Conduit.Zlib (gzip)
import System.IO (withBinaryFile, IOMode (WriteMode))
import SinkFunc (withSinkFunc)

-- Some helper function you may have
someAction :: IO ByteString
someAction = return "This is a string\n"

producerFunc :: (ByteString -> IO ()) -> IO ()
producerFunc write = do
    str <- someAction
    write str

main :: IO ()
main = withBinaryFile "some-file.txt.gz" WriteMode $ \h -> do
    let sink = gzip .| sinkHandle h
    withSinkFunc sink $ \write -> producerFunc write

EDIT 2 One more for good measure, actually using ZipSink to stream the data to multiple different files. There are lots of different ways of slicing this, but this is one way that works:

#!/usr/bin/env stack
-- stack --resolver lts-6.21 --install-ghc runghc --package conduit-extra
{-# LANGUAGE OverloadedStrings #-}
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import Data.Conduit (ConduitM, (.|), yield, runConduitRes, ZipSink (..))
import Data.Conduit.Binary (sinkFile)
import qualified Data.Conduit.List as CL
import Data.Conduit.Zlib (gzip)

data Output = Foo ByteString | Bar ByteString

fromFoo :: Output -> Maybe ByteString
fromFoo (Foo bs) = Just bs
fromFoo _ = Nothing

fromBar :: Output -> Maybe ByteString
fromBar (Bar bs) = Just bs
fromBar _ = Nothing

producer :: Monad m => ConduitM i Output m ()
producer = do
    yield $ Foo "This is going to Foo"
    yield $ Bar "This is going to Bar"

sinkHelper :: MonadResource m
           => FilePath
           -> (Output -> Maybe ByteString)
           -> ConduitM Output o m ()
sinkHelper fp f
    = CL.mapMaybe f
   .| gzip
   .| sinkFile fp

main :: IO ()
main = runConduitRes
     $ producer
    .| getZipSink
            (ZipSink (sinkHelper "foo.txt.gz" fromFoo) *>
             ZipSink (sinkHelper "bar.txt.gz" fromBar))
like image 34
Michael Snoyman Avatar answered Oct 11 '22 02:10

Michael Snoyman


For incremental compression, I think you could make use of compressIO/foldCompressStream in Codec.Compression.Zlib.Internal.

If you're able to represent your producer action as an IO (Maybe a) (such as an MVar take or InputStream/Chan read) where Nothing signifies end of input, something like this should work:

import System.IO (Handle)
import qualified Data.ByteString as BS
import qualified Codec.Compression.Zlib.Internal as ZLib

compressedWriter :: Handle -> (IO (Maybe BS.ByteString)) -> IO ()
compressedWriter handle source =
  ZLib.foldCompressStream
    (\next -> source >>= maybe (next BS.empty) next)
    (\chunk next -> BS.hPut handle chunk >> next)
    (return ())
    (ZLib.compressIO ZLib.rawFormat ZLib.defaultCompressParams)
like image 45
ryachza Avatar answered Oct 11 '22 02:10

ryachza