Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrency considerations between pipes and non-pipes code

I'm in the process of wrapping a C library for some encoding in a pipes interface, but I've hit upon some design decisions that need to be made.

After the C library is set up, we hold on to an encoder context. With this, we can either encode, or change some parameters (let's call the Haskell interface to this last function tune :: Context -> Int -> IO ()). There are two parts to my question:

  1. The encoding part is easily wrapped up in a Pipe Foo Bar IO (), but I would also like to expose tune. Since simultaneous use of the encoding context must be lock protected, I would need to take a lock at every iteration in the pipe, and protect tune with taking the same lock. But now I feel I'm forcing hidden locks on the user. Am I barking up the wrong tree here? How is this kind of situation normally resolved in the pipes ecosystem? In my case I expect the pipe that my specific code is part of to always run in its own thread, with tuning happening concurrently, but I don't want to force this point of view upon any users. Other packages in the pipes ecosystem do not seem to force their users like either.
  2. An encoding context that is no longer used needs to be properly de-initialized. How does one, in the pipes ecosystem, ensure that such things (in this case performing som IO actions) are taken care of when the pipe is destroyed?

A concrete example would be wrapping a compression library, in which case the above can be:

  1. The compression strength is tunable. We set up the pipe and it runs along merrily. How should one best go about allowing the compression strength setting to be changed while the pipe keeps running, assuming that concurrent access to the compression codec context must be serialized?
  2. The compression library allocated a bunch of memory off the Haskell heap when set up, and we'll need to call some library function to clean this up when the pipe is torn down.

Thanks… this might all be obvious, but I'm quite new to the pipes ecosystem.

Edit: Reading this after posting, I'm quite sure it's the vaguest question I've ever asked here. Ugh! Sorry ;-)

like image 382
gspr Avatar asked Sep 30 '22 23:09

gspr


1 Answers

Regarding (1), the general solution is to change your Pipe's type to:

Pipe (Either (Context, Int) Foo) Bar IO ()

In other words, it accepts both Foo inputs and tune requests, which it processes internally.

So let's then assume that you have two concurrent Producers corresponding to inputs and tune requests:

producer1 :: Producer Foo IO ()

producer2 :: Producer (Context, Int) IO ()

You can use pipes-concurrency to create a buffer that they both feed into, like this:

example = do
    (output, input) <- spawn Unbounded
    -- input  :: Input  (Either (Context, Int) Foo)
    -- output :: Output (Either (Context, Int) Foo)

    let io1 = runEffect $ producer1 >-> Pipes.Prelude.map Right >-> toOutput output
        io2 = runEffect $ producer2 >-> Pipes.Prelude.map Left  >-> toOutput output
    as <- mapM async [io1, io2]
    runEffect (fromInput >-> yourPipe >-> someConsumer)
    mapM_ wait as

You can learn more about the pipes-concurrency library by reading this tutorial.

By forcing all tune requests to go through the same single-threaded Pipe you can ensure that you don't accidentally have two concurrent invocations of the tune function.

Regarding (2) there are two ways you can acquire a resource using pipes. The more sophisticated approach is to use the pipes-safe library, which provides a bracket function that you can use within a Pipe, but that is probably overkill for your purpose and only exists for acquiring and releasing multiple resources over the lifetime of a pipe. A simpler solution is just to use the following with idiom to acquire the pipe:

withEncoder :: (Pipe Foo Bar IO () -> IO r) -> IO r
withEncoder k = bracket acquire release $ \resource -> do
    k (createPipeFromResource resource)

Then a user would just write:

withEncoder $ \yourPipe -> do
    runEffect (someProducer >-> yourPipe >-> someConsumer)

You can optionally use the managed package, which simplifies the types a bit and makes it easier to acquire multiple resources. You can learn more about it from reading this blog post of mine.

like image 67
Gabriella Gonzalez Avatar answered Oct 07 '22 20:10

Gabriella Gonzalez