I have an rtmp stream of a video call and I want to transcribe it. I have created 2 services in Go and I'm getting results but it's not very accurate and a lot of data seems to get lost.
Let me explain.
I have a transcode
service, I use ffmpeg to transcode the video to Linear16 audio and place the output bytes onto a PubSub queue for a transcribe
service to handle. Obviously there is a limit to the size of the PubSub message, and I want to start transcribing before the end of the video call. So, I chunk the transcoded data into 3 second clips (not fixed length, just seems about right) and put them onto the queue.
The data is transcoded quite simply:
var stdout Buffer
cmd := exec.Command("ffmpeg", "-i", url, "-f", "s16le", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-")
cmd.Stdout = &stdout
if err := cmd.Start(); err != nil {
log.Fatal(err)
}
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-ticker.C:
bytesConverted := stdout.Len()
log.Infof("Converted %d bytes", bytesConverted)
// Send the data we converted, even if there are no bytes.
topic.Publish(ctx, &pubsub.Message{
Data: stdout.Bytes(),
})
stdout.Reset()
}
}
The transcribe
service pulls messages from the queue at a rate of 1 every 3 seconds, helping to process the audio data at about the same rate as it's being created. There are limits on the Speech API stream, it can't be longer than 60 seconds so I stop the old stream and start a new one every 30 seconds so we never hit the limit, no matter how long the video call lasts for.
This is how I'm transcribing it:
stream := prepareNewStream()
clipLengthTicker := time.NewTicker(30 * time.Second)
chunkLengthTicker := time.NewTicker(3 * time.Second)
cctx, cancel := context.WithCancel(context.TODO())
err := subscription.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
select {
case <-clipLengthTicker.C:
log.Infof("Clip length reached.")
log.Infof("Closing stream and starting over")
err := stream.CloseSend()
if err != nil {
log.Fatalf("Could not close stream: %v", err)
}
go getResult(stream)
stream = prepareNewStream()
case <-chunkLengthTicker.C:
log.Infof("Chunk length reached.")
bytesConverted := len(msg.Data)
log.Infof("Received %d bytes\n", bytesConverted)
if bytesConverted > 0 {
if err := stream.Send(&speechpb.StreamingRecognizeRequest{
StreamingRequest: &speechpb.StreamingRecognizeRequest_AudioContent{
AudioContent: transcodedChunk.Data,
},
}); err != nil {
resp, _ := stream.Recv()
log.Errorf("Could not send audio: %v", resp.GetError())
}
}
msg.Ack()
}
})
I think the problem is that my 3 second chunks don't necessarily line up with starts and end of phrases or sentences so I suspect that the Speech API is a recurrent neural network which has been trained on full sentences rather than individual words. So starting a clip in the middle of a sentence loses some data because it can't figure out the first few words up to the natural end of a phrase. Also, I lose some data in changing from an old stream to a new stream. There's some context lost. I guess overlapping clips might help with this.
I have a couple of questions:
1) Does this architecture seem appropriate for my constraints (unknown length of audio stream, etc.)?
2) What can I do to improve accuracy and minimise lost data?
(Note I've simplified the examples for readability. Point out if anything doesn't make sense because I've been heavy handed in cutting the examples down.)
I think you are right that splitting the text into chunks causes many words to be chopped off.
I see another problem in the publishing. Between the calls topic.Publish
and stdout.Reset()
some time will pass and ffmpeg will probably have written some unpublished bytes to stdout, which will get cleared by the reset.
I am afraid the architecture is not fitted for your problem. The constraint of the message size causes many problems. The idea of a PubSub system is that a publisher notifies subscribers of events, but not necessarily to hold a large payload.
Do you really need two services? You could use two go routines to communicate via a channel. That would eliminate the pub sub system.
A strategy would be to make the chunks as large as possible. A possible solution:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With