I am trying to pull data from Deribit's API, feed it into a Queue, then send it out in a while loop in a Python Tornado Websocket Server. The problem I'm facing is the while loop in the on_message method blocks the execution flow. I'm not sure how to create a way so that the websocket stays open and streams the data without blocking the the queue producing function. Is this possible to do? The code for each task is below.
Websocket code:
class web_socket_handler(ws.WebSocketHandler):
@classmethod
def route_urls(cls):
return [(r'/',cls, {}),]
def simple_init(self):
self.last = time.time()
self.stop = False
def open(self):
self.simple_init()
print("New client connected")
async def on_message(self, message):
if message == 'BTC':
while True:
try:
reply = wsQueue.get(block=False)
self.write_message(reply)
except:
pass
else:
self.write_message('bad request - closing connection!')
self.close()
self.last = time.time()
def on_close(self):
print("connection is closed")
try:
self.loop.stop()
except:
pass
def close(self):
self.ws_connection.close()
def check_origin(self, origin):
return True
API Code:
class Deribit:
def __init__(self, contract, currency, q):
self.baseUrl = "wss://www.deribit.com/ws/api/v2"
self.contract = contract
self.currency = currency
self.q = q
async def getIndex(self):
msg = {
"jsonrpc": "2.0",
"method": "public/get_index",
"params": {f"currency": "{self.currency}"},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
elif not websocket.open:
await self.getIndex()
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'getIndex'
self.q.put(response)
async def flowSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"trades.option.{self.currency}.raw"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'flowSubscribe'
self.q.put(response)
'''if not websocket.open:
print('flowSubscribe closed')
await self.flowSubscribe()'''
async def tradeSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"trades.{self.contract}.raw"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
elif not websocket.open:
await self.tradeSubscribe()
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'tradeSubscribe'
self.q.put(response)
async def orderbookSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"book.{self.contract}.100ms"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
elif not websocket.open:
await self.orderbookSubscribe()
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'orderbookSubscribe'
self.q.put(response)
async def orderbookUpdateSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"book.{self.contract}.1.raw"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'orderbookUpdateSubscribe'
self.q.put(response)
if not websocket.open:
print('orderbookUpdateSubscribe Closed')
await self.orderbookUpdateSubscribe()
async def tickerSubscribe(self, i):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": i},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
elif not websocket.open:
await self.tickerSubscribe(i)
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'tickerSubscribe'
self.q.put(response)
def getInstruments(self, expired):
msg = f"https://www.deribit.com/api/v2/public/get_instruments?currency={self.currency}&expired={expired}&kind=option"
response = requests.get(msg)
return response.json()
def getLastTradesByCurrency(self, count, expired):
msg = f"https://www.deribit.com/api/v2/public/get_last_trades_by_currency?count={count}¤cy={self.currency}"
response = requests.get(msg)
return response.json()
Queue Producer Function:
def threadLoop():
global flag
q = queue.Queue()
flag = 'go'
count = 0
loop = asyncio.get_event_loop()
time = datetime.datetime.now().time().strftime("%H:%M:%S")
time = datetime.datetime.strptime(time, '%H:%M:%S').time()
startTime = datetime.time(11, 58, 0)
endTime = datetime.time(11, 58, 2)
while True:
time = datetime.datetime.now().time().strftime("%H:%M:%S")
time = datetime.datetime.strptime(time, '%H:%M:%S').time()
if time <= startTime or time >= endTime:
t1 = threading.Thread(target=loop.run_until_complete, args=(main(q),), daemon=True)
t1.start()
break
while True:
try:
if flag == 'stop':
for i in range(len(tasks)):
tasks[i].cancel()
t1.join()
if t1.is_alive() is False:
print('threads stopped')
threadLoop()
else:
break
elif flag != 'stop':
message = q.get(block = False)
if message != None:
if message['type'] == 'tickerSubscribe':
instrument_name = message['params']['data']['instrument_name']
timestamp = message['params']['data']['timestamp']
bid_price = message['params']['data']['best_bid_price']
bid_iv = message['params']['data']['bid_iv']
ask_price = message['params']['data']['best_ask_price']
ask_iv = message['params']['data']['ask_iv']
open_interest = message['params']['data']['open_interest']
try:
old_open_interest = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 1]['open_interest']
orderbooks[instrument_name].append({'timestamp': timestamp, 'bid_price': bid_price, 'bid_iv': bid_iv, 'ask_price': ask_price, 'ask_iv': ask_iv, 'open_interest': open_interest, 'old_open_interest': old_open_interest})
except:
orderbooks[instrument_name].append({'timestamp': timestamp, 'bid_price': bid_price, 'bid_iv': bid_iv, 'ask_price': ask_price, 'ask_iv': ask_iv, 'open_interest': open_interest})
elif message['type'] == 'flowSubscribe':
instrument_name = message['params']['data'][0]['instrument_name']
trade_price = message['params']['data'][0]['price']
bid_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 2]['bid_price']
ask_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 2]['ask_price']
bid_trade_difference = str(bid_price - trade_price)
ask_trade_difference = str(ask_price - trade_price)
if bid_trade_difference[0] == '-':
bid_trade_difference = bid_trade_difference[1:]
if ask_trade_difference[0] == '-':
ask_trade_difference = ask_trade_difference[1:]
bid_trade_difference = float(bid_trade_difference)
ask_trade_difference = float(ask_trade_difference)
if bid_price or ask_price != message['params']['data'][0]['price']:
bid_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 1]['bid_price']
ask_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 1]['ask_price']
if bid_price or ask_price != message['params']['data'][0]['price']:
bid_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 3]['bid_price']
ask_price = orderbooks[instrument_name][len(orderbooks[instrument_name]) - 3]['ask_price']
if bid_price == message['params']['data'][0]['price']:
message['params']['data'][0]['direction'] == 'sell'