Scenario: I have a ~900mb text file that is formatted as follows
...
Id: 109101
ASIN: 0806978473
title: The Beginner's Guide to Tai Chi
group: Book
salesrank: 672264
similar: 0
categories: 3
|Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|General[16575]
|Books[283155]|Subjects[1000]|Sports[26]|Individual Sports[16533]|Martial Arts[16571]|Taichi[16583]
|Books[283155]|Subjects[1000]|Sports[26]|General[11086921]
reviews: total: 2 downloaded: 2 avg rating: 5
2000-4-4 cutomer: A191SV1V1MK490 rating: 5 votes: 0 helpful: 0
2004-7-10 cutomer: AVXBUEPNVLZVC rating: 5 votes: 0 helpful: 0
(----- empty line ------)
Id :
and want to parse the information from it.
Problem: As a first step (and because I need it for another context) I want to process the file line by line and then collect the "chunks" belonging to one product together and then process them seperately with other logic.
So the plan is the following:
Now, I am trying to adapt the following example:
doStuff = do
writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()
runConduitRes -- m r
$ sourceFileBS "input.txt" -- ConduitT i ByteString m () -- by "chunk"
.| sinkFile "output.txt" -- FilePath -> ConduitT ByteString o m ()
readFile "output.txt"
>>= putStrLn
So sourceFileBS "input.txt" is of type ConduitT i ByteString m (), that is, a conduit with
iByteStreamt().sinkFile streams all incoming data into the given file. sinkFile "output.txt" is a conduit with input type ByteStream.
What I want now is to process the input source line-by-line, that is, pass on only one line each downstream. In pseudocode:
sourceFile "input.txt"
splitIntoLines
yieldMany (?)
other stuff
How do I do that?
What I currently have is
copyFile = do
writeFile "input.txt" "This is a \n test." -- Filepath -> String -> IO ()
runConduitRes -- m r
(lineC $ sourceFileBS "input.txt") -- ConduitT i ByteString m () -- by "chunk"
.| sinkFile "output.txt" -- FilePath -> ConduitT ByteString o m ()
readFile "output.txt"
>>= putStrLn --
but that gives the following type error:
* Couldn't match type `bytestring-0.10.8.2:Data.ByteString.Internal.ByteString'
with `Void'
Expected type: ConduitT
()
Void
(ResourceT
(ConduitT
a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
()
Actual type: ConduitT
()
bytestring-0.10.8.2:Data.ByteString.Internal.ByteString
(ResourceT
(ConduitT
a0 bytestring-0.10.8.2:Data.ByteString.Internal.ByteString m0))
()
* In the first argument of `runConduitRes', namely
`(lineC $ sourceFileBS "input.txt")'
In the first argument of `(.|)', namely
`runConduitRes (lineC $ sourceFileBS "input.txt")'
In a stmt of a 'do' block:
runConduitRes (lineC $ sourceFileBS "input.txt")
.| sinkFile "output.txt"
|
28 | (lineC $ sourceFileBS "input.txt") -- ConduitT i ByteString m () -- by "chunk"
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This makes me believe that the problem now is that the first conduit in line does not have an input type compatible with runConduitRes.
I just cant make sense of it and really need a hint.
Thanks a lot in advance.
I was struggling with this today, and found this question while trying to figure out a similar problem. I was trying to break git logs into chunks for further parsing, like
commit 12345
Author: Me
Date: Thu Jan 25 13:45:16 2019 -0500
made some changes
1 file changed, 10 insertions(+), 0 deletions(-)
commit 54321
Author: Me
...and so on...
The function I needed is almost splitOnUnBounded from Data.Conduit.Combinators, but I couldn't quite figure out how to write the predicate function there.
I came up with the following Conduit that is a slight modification of splitOnUnbounded. source It will take a stream of lists. There is one line of text per list, as I find it a bit easier to think about that way, though this is surely not an optimal solution.
It will group the lines of text together using a function that takes the next line and returns a Bool indicating if the next line is the start of the next group of text.
groupLines :: (Monad m, MonadIO m) => (Text -> Bool) -> [T.Text] -> ConduitM Text [Text] m ()
groupLines startNextLine ls = start
where
-- If the next line in the stream is Nothing, return.
-- If the next line is the stream is Just line, then
-- accumulate that line
start = await >>= maybe (return ()) (accumulateLines ls)
accumulateLines ls nextLine = do
-- if ls is [], then add nextLine. Try to get a new next line. If there isn't one, yield. If there is a next line,
-- yield lines and call accumulatelines again.
-- if ls is [Text], check if nextLine is the start of the next group. If it isn't, add nextLine to ls,
-- try got the the next nextLine. if there isn't one, yield, and if there is one, call accumulate lines again.
-- If nextLine _is_ the start of the next group, the yield this group of lines and call accumulate lines again.
nextLine' <- await
case nextLine' of
Nothing -> yield ls'
Just l ->
if Prelude.null ls
then accumulateLines ls' l
else
if startNextLine l
then yield ls' >> accumulateLines [] l
else accumulateLines ls' l
where
ls' = ls ++ [nextLine]
It can be used in a conduit like the following. Just pass the function above a Text -> Bool function that tells the conduit when the next collection of text should start.
isCommitLine :: Text -> Bool
isCommitLine t = listToMaybe (TS.indices "commit" t) == Just 0
logParser =
sourceFile "logs.txt"
.| decodeUtf8
.| linesUnbounded
.| groupLines isCommitLine []
.| Data.Conduit.Combinators.map (intercalate "\n")
-- do something with each log entry here --
.| Data.Conduit.Combinators.print
main :: IO ()
main = runConduitRes logParser
I'm new to Haskell, and strongly suspect this isn't the best way to accomplish this. So if others have better suggestions, I'll be happy to learn! Otherwise, maybe posting this solution here will help somebody down the line.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With