I'm processing some audio using portaudio. The haskell FFI bindings call a user defined callback whenever there's audio data to be processed. This callback should be handled very quickly and ideally with no I/O. I wanted to save the audio input and return quickly since my application doesn't need to react to the audio in realtime (right now I'm just saving the audio data to a file; later I'll construct a simple speech recognition system).
I like the idea of pipes
and thought I could use that library. The problem is that I don't know how to create a Producer
that returns data that came in through a callback.
How do I handle my use case?
Here's what I'm working with right now, in case that helps (the datum mvar isn't working right now but I don't like storing all the data in a seq... I'd rather process it as it came instead of just at the end):
{-# LANGUAGE FlexibleInstances, MultiParamTypeClasses #-}
module Main where
import Codec.Wav
import Sound.PortAudio
import Sound.PortAudio.Base
import Sound.PortAudio.Buffer
import Foreign.Ptr
import Foreign.ForeignPtr
import Foreign.C.Types
import Foreign.Storable
import qualified Data.StorableVector as SV
import qualified Data.StorableVector.Base as SVB
import Control.Exception.Base (evaluate)
import Data.Int
import Data.Sequence as Seq
import Control.Concurrent
instance Buffer SV.Vector a where
fromForeignPtr fp = return . SVB.fromForeignPtr fp
toForeignPtr = return . (\(a, b, c) -> (a, c)) . SVB.toForeignPtr
-- | Wrap a buffer callback into the generic stream callback type.
buffCBtoRawCB' :: (StreamFormat input, StreamFormat output, Buffer a input, Buffer b output) =>
BuffStreamCallback input output a b -> StreamCallback input output
buffCBtoRawCB' func = \a b c d e -> do
fpA <- newForeignPtr_ d -- We will not free, as callback system will do that for us
fpB <- newForeignPtr_ e -- We will not free, as callback system will do that for us
storeInp <- fromForeignPtr fpA (fromIntegral $ 1 * c)
storeOut <- fromForeignPtr fpB (fromIntegral $ 0 * c)
func a b c storeInp storeOut
callback :: MVar (Seq.Seq [Int32]) -> PaStreamCallbackTimeInfo -> [StreamCallbackFlag] -> CULong
-> SV.Vector Int32 -> SV.Vector Int32 -> IO StreamResult
callback seqmvar = \timeinfo flags numsamples input output -> do
putStrLn $ "timeinfo: " ++ show timeinfo ++ "; flags are " ++ show flags ++ " in callback with " ++ show numsamples ++ " samples."
print input
-- write data to output
--mapM_ (uncurry $ pokeElemOff output) $ zip (map fromIntegral [0..(numsamples-1)]) datum
--print "wrote data"
input' <- evaluate $ SV.unpack input
modifyMVar_ seqmvar (\s -> return $ s Seq.|> input')
case flags of
[] -> return $ if unPaTime (outputBufferDacTime timeinfo) > 0.2 then Complete else Continue
_ -> return Complete
done doneMVar = do
putStrLn "total done dood!"
putMVar doneMVar True
return ()
main = do
let samplerate = 16000
Nothing <- initialize
print "initialized"
m <- newEmptyMVar
datum <- newMVar Seq.empty
Right s <- openDefaultStream 1 0 samplerate Nothing (Just $ buffCBtoRawCB' (callback datum)) (Just $ done m)
startStream s
_ <- takeMVar m -- wait until our callbacks decide they are done!
Nothing <- terminate
print "let's see what we've recorded..."
stuff <- takeMVar datum
print stuff
-- write out wav file
-- let datum =
-- audio = Audio { sampleRate = samplerate
-- , channelNumber = 1
-- , sampleData = datum
-- }
-- exportFile "foo.wav" audio
print "main done"
The simplest solution is to use MVar
s to communicate between the callback and Producer
. Here's how:
import Control.Proxy
import Control.Concurrent.MVar
fromMVar :: (Proxy p) => MVar (Maybe a) -> () -> Producer p a IO ()
fromMVar mvar () = runIdentityP loop where
loop = do
ma <- lift $ takeMVar mvar
case ma of
Nothing -> return ()
Just a -> do
respond a
loop
Your stream callback will write Just input
to the MVar
and your finalization callback will write Nothing
to terminate the Producer
.
Here's a ghci
example demonstrating how it works:
>>> mvar <- newEmptyMVar :: IO (MVar (Maybe Int))
>>> forkIO $ runProxy $ fromMVar mvar >-> printD
>>> putMVar mvar (Just 1)
1
>>> putMVar mvar (Just 2)
2
>>> putMVar mvar Nothing
>>> putMVar mvar (Just 3)
>>>
Edit: The pipes-concurrency
library now provides this feature, and it even has a section in the tutorial explaining specifically how to use it to get data out of callbacks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With