Bybit Websockets with Python Examples

by John | May 01, 2023

 

Are you looking to take your cryptocurrency trading to the next level? Look no further than Bybit, the leading crypto derivatives exchange. With advanced trading tools, low fees, and a user-friendly platform, Bybit makes it easy to trade Bitcoin, Ethereum, and other popular cryptocurrencies. And if you sign up using our affiliate link  and use the referral code CODEARMO, you'll receive exclusive benefits and bonuses up to $30,000 to help you get started. Don't miss out on this opportunity to join one of the fastest-growing communities in crypto trading. Sign up for Bybit today and start trading like a pro!

 

To begin, we must define two terms that are crucial to understanding this article. 

 

What is a Websocket?

 

A WebSocket is a communication protocol that enables real-time data exchange between a client, such as a web browser, and a server. Unlike traditional HTTP requests, which only allow one-way communication, a WebSocket connection allows for two-way communication, which means that data can be sent and received by both the client and the server at any time. This makes it ideal for applications that require real-time data updates, such as chat applications, online gaming, stock trading, and more. Essentially, a WebSocket is a way for a client and server to establish a long-lived connection that allows for continuous communication.

 

Bybit offers two types of websockets to its users. The private websocket is for users who have authenticated their accounts and have access to their private account information such as orders, positions, and account balances. The public websocket is for users who want to receive real-time market data such as price quotes, order book depth, and trading volumes. The public websocket does not require authentication and can be accessed by anyone. Bybit's websockets are a useful tool for traders who want to stay up-to-date on market conditions and execute trades quickly.

 

What is a Callback?

 

 

In the context of websockets, a callback is a function that is automatically executed by the program when a certain event happens, such as receiving new data from the server through the websocket connection. The callback function is registered by the client program and is called by the websocket library when the specified event occurs. In other words, the client program provides a callback function as a parameter to the websocket library when setting up the connection, and the library calls this function whenever it receives new data or other events from the server. This allows the client program to respond to incoming data in real-time without constantly polling the server for updates.

 

 

Hands on Examples

 

First lets import the relevant modules and instantiate two websocket objects, one private and one public. If you haven't set up an API key for Bybit yet please see this article.

 

from pybit.unified_trading import WebSocket
import json 

with open('authcreds.json') as j:
    creds = json.load(j)
        
key = creds['KEY_NAME']['key']
secret = creds['KEY_NAME']['secret']


public = WebSocket(channel_type='linear', testnet=False)
private = WebSocket(channel_type='private',
                     api_key=key,
                     api_secret=secret, 
                     testnet=False) 

 

 

Stream Live Orderbook Data

Lets say we wanted to stream level one orderbook data i.e. the best bid, ask with corresponding volumes for each level. Note that the callback in this function is the handle_orderbook_message which will be called upon a new message from Bybit. This function extracts the relevant data and appends it to a list we have defined called orderbook. 

We have also created a loop for illustrative purposes to show a) how fast this list will grow b) how a user can extract the latest data from this list. A trader may want to have functionality like this to optimally place orders at the touch. 

 

orderbook = [] 
   
def handle_orderbook_message(message):
    '''
    custom on callback function for orderbook stream , insert given values
    in to dictionary to be appended to oderbook list. 
    None.

    '''
    
    data = message.get('data', None)
    
    global orderbook 
    
    current = {}
    if data:
        try:
            current['asset'] = data.get('s')
            current['best_bid'] = float(data.get('b')[0][0])
            current['bid_volume'] = float(data.get('b')[0][1])
            current['best_ask'] = float(data.get('a')[0][0])
            current['ask_volume'] = float(data.get('a')[0][1])
            orderbook.append(current)
        except (TypeError, IndexError, AttributeError):
            pass
        
        
public.orderbook_stream(depth=1, symbol='BTCUSDT', callback=handle_orderbook_message)    

import time
for _ in range(10):
    print(len(orderbook))
    time.sleep(1)
    print(orderbook[-1])
    

'''


2516
{'asset': 'BTCUSDT', 'best_bid': 27905.4, 'bid_volume': 9.077, 'best_ask': 27905.5, 'ask_volume': 1.9}
2554
{'asset': 'BTCUSDT', 'best_bid': 27905.4, 'bid_volume': 1.81, 'best_ask': 27905.5, 'ask_volume': 5.279}
2588
{'asset': 'BTCUSDT', 'best_bid': 27890.1, 'bid_volume': 6.564, 'best_ask': 27890.2, 'ask_volume': 6.493}
2654
{'asset': 'BTCUSDT', 'best_bid': 27890.1, 'bid_volume': 7.214, 'best_ask': 27890.2, 'ask_volume': 0.592}
2677
{'asset': 'BTCUSDT', 'best_bid': 27890.0, 'bid_volume': 18.451, 'best_ask': 27890.1, 'ask_volume': 3.441}
2715
{'asset': 'BTCUSDT', 'best_bid': 27890.0, 'bid_volume': 17.964, 'best_ask': 27890.1, 'ask_volume': 4.489}
2752
{'asset': 'BTCUSDT', 'best_bid': 27890.0, 'bid_volume': 13.091, 'best_ask': 27890.1, 'ask_volume': 6.142}
2782
{'asset': 'BTCUSDT', 'best_bid': 27890.0, 'bid_volume': 10.077, 'best_ask': 27890.1, 'ask_volume': 9.456}
2805
{'asset': 'BTCUSDT', 'best_bid': 27890.0, 'bid_volume': 11.236, 'best_ask': 27890.1, 'ask_volume': 7.933}
2847
{'asset': 'BTCUSDT', 'best_bid': 27890.0, 'bid_volume': 9.355, 'best_ask': 27890.1, 'ask_volume': 9.385}


'''   

 

 

In addition the trader may want to store this orderbook data to a database once it reaches a certain length. A useful trick for this is to use a deque object as shown below. 

 

from collections import deque

orderbook = deque(maxlen=(1000))

 

The logic above ensures, that a maximum of 1000 datapoints will be stored in the list. 

 

 

Stream Live Trades 

 

Lets say we wanted to stream live trades for BTCUSDT contract and save large trades. For the purposes of this article we will write a callback function that prints to the console when there is a trade over $100,000 in value appears in the book. Traders may be interested in large order sizes as this can be indicative of which direction whales are trading in. 

 

THRESHOLD = 100_000

def handle_trade_message(message):
    '''
    message example
    {
    "topic": "publicTrade.BTCUSDT",
    "type": "snapshot",
    "ts": 1672304486868,
    "data": [
        {
            "T": 1672304486865,
            "s": "BTCUSDT",
            "S": "Buy",
            "v": "0.001",
            "p": "16578.50",
            "L": "PlusTick",
            "i": "20f43950-d8dd-5b31-9112-a178eb6023af",
            "BT": false
                }
            ]
        }
    

    custom callback to detect trades over a certain size

    '''

    try:
        data = message.get('data')
        for trade in data:
            value = float(trade.get('v')) * float((trade.get('p')))
            
            if value > THRESHOLD:
                print(f"A trade to {trade.get('S')} {value} was just executed")
            else:
                pass
    except (ValueError, AttributeError):
        pass
    
    
    

public.trade_stream(symbol='BTCUSDT', callback=handle_trade_message)


'''
A trade to Sell 129486.16549999999 was just executed
A trade to Sell 300585.60000000003 was just executed
A trade to Sell 113302.03649999999 was just executed
A trade to Sell 178719.72749999998 was just executed
A trade to Sell 118065.336 was just executed

'''

 

There are countless use cases for streaming data. For example we may be interested in detecting TWAP orders or other forms of systematic order placement. 

 

 

Replicate Coinglass Liquidation Data

 

We invite the reader to take a look at Coinglass liquidation dashboard to see what we are referring to here for those unfamiliar with the concept. It is likely Coinglass use something similar to what we are about to create to populate the data on the dashboard linked above. 

 

A crypto liquidation occurs when a trader's position in a cryptocurrency or other digital asset is forcibly closed by the exchange or platform on which the trade was made. This happens when the value of the trader's position falls below a certain threshold known as the liquidation price. When this occurs, the exchange or platform will automatically sell off the trader's position in order to protect itself and other traders from further losses. Crypto liquidations can occur due to sudden market fluctuations, high volatility, or other factors that cause significant price movements in a short period of time.

 

Why might this be of interest to an algorithmic trader? Well, since these positions are being forcibly closed, it may be executed at less/greater than the current market rate, and we may want to step in and send an order to take the other side of this trade, as the price could be more favorable that it otherwise would be. Below we make a script that calculates the hourly long/short liquidations for BTCUSDT. We print out the running total every 30 seconds also. 

 

long_liqs = [] 
short_liqs = [] 

    
def handle_liquidation_message(message):
    '''
    {
    "data": {
        "price": "0.03803",
        "side": "Buy",
        "size": "1637",
        "symbol": "BTCUSDT",
        "updatedTime": 1673251091822
    },
    "topic": "liquidation.GALAUSDT",
    "ts": 1673251091822,
    "type": "snapshot"
    }
    
    '''
    global long_liqs, short_liqs 
    
    try:
        data = message.get('data')
        if data.get('side') == 'Buy':
            usd_val = float(data.get('size')) * float(data.get('price'))
            print(f'A long degen just got liquidated for {usd_val}')
            long_liqs.append(usd_val)
        elif data.get('side') == 'Sell':
            usd_val = float(data.get('size')) * float(data.get('price'))
            print(f'A short degen just got liquidated for {usd_val}')
            short_liqs.append(usd_val) 
        else:
            pass
    except (TypeError, ValueError, IndexError):
        pass
    
            
    
    
public.liquidation_stream(symbol='BTCUSDT', callback=handle_liquidation_message) 

import datetime as dt


while True:
    
    timenow = dt.datetime.now() 
    
    if timenow.minute==0 and timenow.second==0:
        print(f'A total of {sum(long_liqs)} longs have been rekt in last hour')
        print(f'A total of {sum(short_liqs)} shorts have been rekt in last hour')
        #reset the lists for the next hour of slaughter
        longs_liqs = [] 
        short_liqs = [] 
        time.sleep(1)
    
    elif timenow.second %30 == 0:
        print(f'A total of {sum(long_liqs)} longs have been rekt so far in this hour')
        print(f'A total of {sum(short_liqs)} shorts have been rekt so far in this hour')
        time.sleep(1)
    
    





'''

A total of 0 longs have been rekt so far in this hour
A total of 44839.4481 shorts have been rekt so far in this hour
A total of 0 longs have been rekt so far in this hour
A total of 44839.4481 shorts have been rekt so far in this hour
A short degen just got liquidated for 1261.3905
A short degen just got liquidated for 51481.256400000006
A short degen just got liquidated for 280.413
A short degen just got liquidated for 617.0713999999999
A short degen just got liquidated for 13999.0458
A short degen just got liquidated for 10075.6222
A total of 0 longs have been rekt so far in this hour
A total of 122554.24740000001 shorts have been rekt so far in this hour
A short degen just got liquidated for 1150.9725
A short degen just got liquidated for 1038.8711999999998
A short degen just got liquidated for 2640.7513999999996
A short degen just got liquidated for 140.496
A short degen just got liquidated for 618.5718
A total of 0 longs have been rekt so far in this hour
A total of 128143.9103 shorts have been rekt so far in this hour
A short degen just got liquidated for 365.6185

'''

 

 

Streaming Position

It is important to keep track of how much of an asset we currently  have a position in. We are interested in this for risk management and order reconciliation purposes. Below we create a position stream to keep track of how much DOGE we currently have. We also do our bit to help the DOGE community. 

 

doge_position = 0 

def handle_position_message(message):
    '''
    
    custom error message to retrieve position of given asset.

    '''
    global doge_position
    try:
        data = message.get('data', [])
        for pos in data:
            if pos['symbol'] == 'DOGEUSDT':
                if pos['side'] == 'Sell':
                    doge_position = - float(pos['size'])
                elif pos['side'] == 'Buy':
                    doge_position = float(pos['size'])
                else:
                    doge_position =  0
            else:
                pass
    except (IndexError, AttributeError, TypeError):
        pass




private.position_stream(callback=handle_position_message)

while True:
    
    if doge_position > 0:
        print(f'Excellent you just bought  {doge_position} DOGE!!!!!!!')
        break
    elif doge_position < 0:
        print(f'Not breaking until you buy a DOGE bagholders must resort to extraordinary measures while underwater')
    else:
        print('Waiting for you to buy a DOGE ')

 

 

 

Stream Orders

 

Lets say we are creating an order management system, to monitor fills on an algorithm, well, we need to keep a track of order states. Below are the different states an order can take. 

 

  • Created order has been accepted by the system but not yet put through the matching engine
  • New order has been placed successfully
  • Rejected
  • PartiallyFilled
  • PartiallyFilledCanceled Only spot has this order status
  • Filled
  • Cancelled In derivatives, orders with this status may have an executed qty
  • Untriggered
  • Triggered
  • Deactivated
  • Active order has been triggered and the new active order has been successfully placed. Is the final state of a successful conditional order

 

Since creating an order management system is quite complex we simply print the results of orders from the API below:
 

def handle_orders_message(message):
    print(message)


private.order_stream(callback=handle_orders_message)

'''
{'topic': 'order', 'id': '2a97094f9f6323e97ca96e862dfb9d96:29938d51d74bc01b:0:01',
 'creationTime': 1682980546234, 'data':
     
     [{'avgPrice': '0.07860', 'blockTradeId': '',
       'cancelType': 'UNKNOWN', 'category': 'linear',
       'closeOnTrigger': False, 'createdTime': '1682980546232', 
       'cumExecFee': '0.0004716', 'cumExecQty': '10', 'cumExecValue': '0.786',
       'leavesQty': '0', 'leavesValue': '0', 
       'orderId': '89e64947-208c-4fac-a730-b0e2c2f1fa72', 
       'orderIv': '', 'isLeverage': '', 
       'lastPriceOnCreated': '0.07859', 
       'orderStatus': 'Filled', 'orderLinkId': '', 
       'orderType': 'Market', 'positionIdx': 0, 
       'price': '0.08251', 'qty': '10', 'reduceOnly': False,
       'rejectReason': 'EC_NoError', 'side': 'Buy',
       'slTriggerBy': 'UNKNOWN', 'stopLoss': '0.00000',
       'stopOrderType': 'UNKNOWN', 'symbol': 'DOGEUSDT',
       'takeProfit': '0.00000', 'timeInForce': 'IOC', 
       'tpTriggerBy': 'UNKNOWN', 'triggerBy': 'UNKNOWN',
       'triggerDirection': 0, 'triggerPrice': '0.00000',
       'updatedTime': '1682980546233', 'placeType': '', 
       'smpType': 'None', 'smpGroup': 0, 'smpOrderId': ''}]}

'''

 

 


Join the discussion

Share this post with your friends!