My objective is to consume an audio stream. Logically, this is my objective:
FastAPI endpoint)To solve this problem, I've had quite a few ideas, but ultimately I've been trying to bridge WebSocket A to WebSocket B. My attempt so far involves a ConnectionManager class, which contains a Queue.queue. The chunks of the audio stream are added to this queue so that we do not consume directly from WebSocket A.
The ConnectionManager also contains a generator method to yield all values from the queue.
My FastAPI implementation consumes from websocket A like this:
@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
chunk = await websocket.receive_bytes()
manager.add_to_buffer(chunk)
except KeyboardInterrupt:
manager.disconnect()
Concurrent to this ingestion, I'd like to have a task that would bridge our audio stream to WebSocket B, and send the obtained values to WebSocket A. The audio stream could be consumed through the aforementioned generator method.
The generator method is necessary due to how WebSocket B consumes messages, as shown in Rev-ai's examples:
streamclient = RevAiStreamingClient(access_token, config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
# return through websocket A this value
print(response)
This is one of the biggest challenges, as we need to be consuming data into a generator and getting the results in real-time.
I've been trying my luck with asyncio; from what i'm understanding, a possibility would be to create a coroutine that would run in the background. I've been unsuccessful with this, but it sounded promising.
I've thought about triggering this through the FastAPI startup event, but I'm having trouble achieving concurrency. I tried to use event_loops, but it gave me a nested event loop related error.
FastAPI can be optional if your insight deems so, and in a way so is WebSocket A. At the end of the day, the ultimate objective is to receive an audio stream through our own API endpoint, run it through Rev.ai's WebSocket, do some extra processing, and send the results back.
Below is a simple example of websocket proxy, where websocket A and websocket B are both endpoints in the FastAPI app, but websocket B can be located in something else, just change its address ws_b_uri. For websocket client, websockets library is used.
To perform data forwarding, the code of A endpoint starts two tasks forward and reverse, and waits for their completion by means of asyncio.gather(). Data transfer for both directions occurs in a parallel manner.
import asyncio
from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()
ws_b_uri = "ws://localhost:8001/ws_b"
async def forward(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await ws_b.send(data)
async def reverse(ws_a: WebSocket, ws_b: websockets.WebSocketClientProtocol):
while True:
data = await ws_b.recv()
await ws_a.send_text(data)
print("websocket A sent:", data)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
await ws_a.accept()
async with websockets.connect(ws_b_uri) as ws_b_client:
fwd_task = asyncio.create_task(forward(ws_a, ws_b_client))
rev_task = asyncio.create_task(reverse(ws_a, ws_b_client))
await asyncio.gather(fwd_task, rev_task)
@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
await ws_b_server.accept()
while True:
data = await ws_b_server.receive_bytes()
print("websocket B server recieved: ", data)
await ws_b_server.send_text('{"response": "value from B server"}')
Considering the last update of the question, the issue is that WebSocket B is hidden behind a synchronous generator (in fact there are two of them, one for the input and the other for the output) and in fact, the task turns into how to make a bridge between the WebSocket and the synchronous generator. And since I never worked with the rev-ai library, I made a stub function stream_client_start for streamclient.start that takes a generator (MEDIA_GENERATOR in original) and returns a generator (response_generator in original).
In this case, I start the processing of generators in a separate thread through the run_in_executor, and in order not to reinvent the wheel, for communication I use a queue from the janus library, which allows you to bind synchronous and asynchronous code through a queue. Accordingly, there are also two queues, one for A -> B, the other for B -> A.
import asyncio
import time
from typing import Generator
from fastapi import FastAPI
from fastapi import WebSocket
import janus
import queue
app = FastAPI()
# Stub generator function (using websocket B in internal)
def stream_client_start(input_gen: Generator) -> Generator:
for chunk in input_gen:
time.sleep(1)
yield f"Get {chunk}"
# queue to generator auxiliary adapter
def queue_to_generator(sync_queue: queue.Queue) -> Generator:
while True:
yield sync_queue.get()
async def forward(ws_a: WebSocket, queue_b):
while True:
data = await ws_a.receive_bytes()
print("websocket A received:", data)
await queue_b.put(data)
async def reverse(ws_a: WebSocket, queue_b):
while True:
data = await queue_b.get()
await ws_a.send_text(data)
print("websocket A sent:", data)
def process_b_client(fwd_queue, rev_queue):
response_generator = stream_client_start(queue_to_generator(fwd_queue))
for r in response_generator:
rev_queue.put(r)
@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
loop = asyncio.get_event_loop()
fwd_queue = janus.Queue()
rev_queue = janus.Queue()
await ws_a.accept()
process_client_task = loop.run_in_executor(None, process_b_client, fwd_queue.sync_q, rev_queue.sync_q)
fwd_task = asyncio.create_task(forward(ws_a, fwd_queue.async_q))
rev_task = asyncio.create_task(reverse(ws_a, rev_queue.async_q))
await asyncio.gather(process_client_task, fwd_task, rev_task)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With