I'm trying to get real time bitcoin price data from the Bitstamp Websocket v2.0 API. Where can I get the certificate if needed? If download of the certificate is automatic, how can I make sure that it is possible for python to verify the received certificate?
The documentation on the Bitstamp website is rather scarce on this matter. Here's a quote from Bitstamp api documentation:
"Once you open a connection via websocket handshake (using HTTP upgrade header), you can subscribe to desired channels."
Bitstamp api docs: https://www.bitstamp.net/websocket/v2/
Tried searching in websockets documentation: https://websockets.readthedocs.io/en/stable/
I have looked into websockets and ssl. Now I know a bit about the handshake but still after much trying and searching I can't figure out what to do.
import asyncio
import websockets
async def bitstamp_ticker():
async with websockets.connect(
'wss://ws.bitstamp.net', ssl=True) as websocket:
pass
asyncio.get_event_loop().run_until_complete(bitstamp_ticker())
From what I understand in the websocket documentation adding ssl=True should be enough to establish a secure connection. But it seems that maybe the bitstamp certificate is not recognized by a Certificate Authority built into Python 3.6. and this is why the error occurs?
ssl.SSLError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:852)
I didn't spend time looking into this ssl cert issue for Bitstamp. But I just disabled ssl cert verification to make the web-socket market data push work. Below is the sample code, and appreciated if you could share your future progress on this issue.
# -*- coding: utf-8 -*-
import websocket
import json
import ssl
bitstamp_endpoint = 'wss://ws.bitstamp.net'
def subscribe_marketdata(ws):
params = {
'event': 'bts:subscribe',
'data': {
'channel': 'order_book_btcusd'
}
}
market_depth_subscription = json.dumps(params)
ws.send(market_depth_subscription)
def on_open(ws):
print('web-socket connected.')
subscribe_marketdata(ws)
def on_message(ws, data):
data = json.loads(data)
print(data)
def on_error(ws, msg):
print(msg)
if __name__ == '__main__':
marketdata_ws = websocket.WebSocketApp(bitstamp_endpoint, on_open=on_open, on_message=on_message, on_error=on_error)
marketdata_ws.run_forever(sslopt={'cert_reqs': ssl.CERT_NONE})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With