KuCoin Futures WebSocket Python: Real-Time Trading Data
A practical guide to streaming KuCoin Futures market data with Python WebSocket — covering auth, subscriptions, order book feeds, and robust error handling for live trading.
A practical guide to streaming KuCoin Futures market data with Python WebSocket — covering auth, subscriptions, order book feeds, and robust error handling for live trading.
If you're building a trading bot or an analytics dashboard for crypto futures, polling REST endpoints is a dead end. You'll hit rate limits, miss fills, and always be half a second behind the market. WebSocket connections solve this — and KuCoin Futures has one of the cleaner WebSocket APIs in the space. This guide walks you through connecting to it with Python, authenticating, subscribing to live feeds, and keeping that connection alive when things get bumpy.
Unlike Binance or Bybit where you can connect to public streams without any token, KuCoin requires a connection token even for public market data. The flow is simple: hit a REST endpoint to get a token, then pass that token when opening the WebSocket connection. Private streams (account updates, position changes) need API key authentication in the REST call.
KuCoin Futures runs on a separate domain from the spot exchange — make sure you're hitting api-futures.kucoin.com, not the regular api.kucoin.com. This trips up a lot of developers coming from the spot side.
import requests
import hashlib
import hmac
import base64
import time
API_KEY = 'your_api_key'
API_SECRET = 'your_api_secret'
API_PASSPHRASE = 'your_passphrase'
BASE_URL = 'https://api-futures.kucoin.com'
def get_ws_token(private: bool = False) -> dict:
"""Fetch a WebSocket connection token from KuCoin Futures."""
endpoint = '/api/v1/bullet-private' if private else '/api/v1/bullet-public'
url = BASE_URL + endpoint
if private:
timestamp = str(int(time.time() * 1000))
str_to_sign = timestamp + 'POST' + endpoint
signature = base64.b64encode(
hmac.new(API_SECRET.encode(), str_to_sign.encode(), hashlib.sha256).digest()
).decode()
passphrase = base64.b64encode(
hmac.new(API_SECRET.encode(), API_PASSPHRASE.encode(), hashlib.sha256).digest()
).decode()
headers = {
'KC-API-KEY': API_KEY,
'KC-API-SIGN': signature,
'KC-API-TIMESTAMP': timestamp,
'KC-API-PASSPHRASE': passphrase,
'KC-API-KEY-VERSION': '2',
'Content-Type': 'application/json'
}
response = requests.post(url, headers=headers)
else:
response = requests.post(url)
response.raise_for_status()
return response.json()['data']
# Get public token (no API keys needed for market data)
token_data = get_ws_token(private=False)
token = token_data['token']
endpoint = token_data['instanceServers'][0]['endpoint']
ping_interval = token_data['instanceServers'][0]['pingInterval'] # in ms
print(f'Endpoint: {endpoint}')
print(f'Token acquired. Ping every {ping_interval}ms')
Tokens expire after roughly 24 hours. Build token refresh logic into any long-running bot — a silently expired token will drop the connection with no obvious error message.
Once you have a token, connecting is straightforward. KuCoin uses a standard JSON subscription protocol — you send a subscribe message with a topic string, and the server starts pushing updates. For futures, the most useful public channels are the ticker feed, level-2 order book deltas, and execution (trade) feed.
The symbol format on KuCoin Futures uses a dash and M suffix for perpetuals — XBTUSDTM for Bitcoin USDT-margined perpetual, ETHUSDTM for Ethereum. This differs from Binance (BTCUSDT) and OKX (BTC-USDT-SWAP), so double-check your symbol strings when porting logic between exchanges.
import asyncio
import json
import time
import websockets
async def stream_futures_ticker(symbol: str = 'XBTUSDTM'):
token_data = get_ws_token(private=False)
token = token_data['token']
ws_url = f"{token_data['instanceServers'][0]['endpoint']}?token={token}"
ping_interval = token_data['instanceServers'][0]['pingInterval'] / 1000 # convert to seconds
async with websockets.connect(ws_url) as ws:
# Wait for welcome message
welcome = json.loads(await ws.recv())
print(f"Server: {welcome['type']}")
# Subscribe to ticker
sub_msg = {
'id': str(int(time.time() * 1000)),
'type': 'subscribe',
'topic': f'/contractMarket/ticker:{symbol}',
'privateChannel': False,
'response': True
}
await ws.send(json.dumps(sub_msg))
ack = json.loads(await ws.recv())
print(f"Subscription ack: {ack['type']}")
# Stream with heartbeat
last_ping = time.time()
async for message in ws:
data = json.loads(message)
# Send ping to keep connection alive
if time.time() - last_ping > ping_interval:
ping = {'id': str(int(time.time() * 1000)), 'type': 'ping'}
await ws.send(json.dumps(ping))
last_ping = time.time()
if data.get('type') == 'message':
ticker = data['data']
print(
f"[{symbol}] "
f"Price: {ticker.get('price')} | "
f"Best Bid: {ticker.get('bestBidPrice')} | "
f"Best Ask: {ticker.get('bestAskPrice')} | "
f"Size: {ticker.get('size')}"
)
asyncio.run(stream_futures_ticker('XBTUSDTM'))
Ticker data tells you the last price and best bid/ask. But if you're building a market-making bot, an arbitrage scanner across KuCoin and Bybit, or a signals feed, you want depth — the full order book or at least the top levels. KuCoin Futures provides level-2 order book deltas, which means you get incremental updates rather than a full snapshot every tick.
The correct approach: fetch a REST snapshot first to initialize your local book, then apply the delta stream. Each delta message has a sequence number — if you see a gap in sequences, your local state is inconsistent and you need to re-snapshot. Platforms like VoiceOfChain use exactly this approach to maintain clean order book state for real-time signal generation.
import asyncio
import json
import time
from collections import defaultdict
import websockets
import requests
class FuturesOrderBook:
def __init__(self, symbol: str):
self.symbol = symbol
self.bids = {} # price -> size
self.asks = {} # price -> size
self.sequence = 0
def load_snapshot(self):
"""Fetch initial order book snapshot via REST."""
url = f'https://api-futures.kucoin.com/api/v1/level2/snapshot?symbol={self.symbol}'
resp = requests.get(url).json()['data']
self.sequence = resp['sequence']
self.bids = {item[0]: item[1] for item in resp['bids']}
self.asks = {item[0]: item[1] for item in resp['asks']}
print(f'Snapshot loaded. Sequence: {self.sequence}')
def apply_delta(self, delta: dict) -> bool:
"""Apply a delta update. Returns False if sequence gap detected."""
seq = delta['sequence']
if seq != self.sequence + 1:
print(f'Sequence gap! Expected {self.sequence + 1}, got {seq}')
return False # Trigger re-snapshot
self.sequence = seq
for change in delta.get('change', '').split(';'):
if not change:
continue
parts = change.split(',')
price, side, size = parts[0], parts[1], parts[2]
book = self.bids if side == 'buy' else self.asks
if size == '0':
book.pop(price, None)
else:
book[price] = size
return True
def best_bid(self):
return max(self.bids.keys(), key=float) if self.bids else None
def best_ask(self):
return min(self.asks.keys(), key=float) if self.asks else None
async def stream_order_book(symbol: str = 'ETHUSDTM'):
book = FuturesOrderBook(symbol)
book.load_snapshot()
token_data = get_ws_token()
ws_url = f"{token_data['instanceServers'][0]['endpoint']}?token={token_data['token']}"
async with websockets.connect(ws_url) as ws:
await ws.recv() # welcome
sub_msg = {
'id': '1',
'type': 'subscribe',
'topic': f'/contractMarket/level2:{symbol}',
'response': True
}
await ws.send(json.dumps(sub_msg))
await ws.recv() # ack
async for raw in ws:
msg = json.loads(raw)
if msg.get('type') != 'message':
continue
ok = book.apply_delta(msg['data'])
if not ok:
print('Re-syncing order book...')
book.load_snapshot() # Re-initialize on gap
continue
spread = float(book.best_ask()) - float(book.best_bid())
print(f'Bid: {book.best_bid()} | Ask: {book.best_ask()} | Spread: {spread:.2f}')
asyncio.run(stream_order_book('ETHUSDTM'))
Always handle sequence gaps. On volatile days — like a major liquidation cascade — KuCoin's servers can briefly drop messages. Without gap detection, your local order book silently drifts and your bot trades on stale prices.
A WebSocket connection that runs fine in testing will drop in production. Network hiccups, server restarts, token expiry, exchange maintenance windows — they all happen. The difference between a bot that makes money and one that sits silently broken at 3 AM is robust reconnection logic.
Compared to Binance (which will silently stop sending data after 24 hours without a listenKey refresh) or OKX (which requires ping/pong on a strict schedule), KuCoin is reasonably well-behaved. But you still need to handle disconnects, re-fetch tokens, and re-subscribe from scratch. The pattern below wraps a subscription in an exponential backoff reconnect loop.
import asyncio
import json
import time
import websockets
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
async def managed_stream(symbol: str, handler):
"""Reconnecting WebSocket wrapper with exponential backoff."""
backoff = 1
max_backoff = 60
while True:
try:
token_data = get_ws_token()
ws_url = (
f"{token_data['instanceServers'][0]['endpoint']}"
f"?token={token_data['token']}"
)
ping_interval = token_data['instanceServers'][0]['pingInterval'] / 1000
async with websockets.connect(ws_url, ping_interval=None) as ws:
await ws.recv() # welcome
# Subscribe
await ws.send(json.dumps({
'id': str(int(time.time() * 1000)),
'type': 'subscribe',
'topic': f'/contractMarket/ticker:{symbol}',
'response': True
}))
await ws.recv() # ack
print(f'Connected to {symbol} stream')
backoff = 1 # Reset on successful connect
last_ping = time.time()
async for raw in ws:
msg = json.loads(raw)
# Heartbeat
if time.time() - last_ping >= ping_interval:
await ws.send(json.dumps({
'id': str(int(time.time() * 1000)),
'type': 'ping'
}))
last_ping = time.time()
if msg.get('type') == 'message':
await handler(msg['data'])
except (ConnectionClosedError, ConnectionClosedOK) as e:
print(f'Connection closed: {e}. Reconnecting in {backoff}s...')
except Exception as e:
print(f'Stream error: {e}. Reconnecting in {backoff}s...')
finally:
await asyncio.sleep(backoff)
backoff = min(backoff * 2, max_backoff)
# Example usage
async def print_price(data):
print(f"Price update: {data.get('price')}")
asyncio.run(managed_stream('XBTUSDTM', print_price))
Raw price data from the WebSocket is just numbers. Turning those numbers into actionable decisions is the actual challenge. A common pattern is to buffer incoming ticks, build OHLCV candles in-memory, run indicators over those candles, and emit a signal when conditions are met.
VoiceOfChain takes a similar approach — aggregating real-time data across multiple exchanges and applying signal logic to surface high-probability setups for traders. If you're building your own version, feeding your WebSocket data into a pandas or polars rolling window lets you run RSI, VWAP, or funding rate divergence calculations on every new tick without hitting REST rate limits.
One important consideration when running this across exchanges: Binance Futures and KuCoin Futures have different funding rate schedules (8-hour vs. varies) and different liquidation mechanics. A signal that backtests well on Binance data may behave differently on KuCoin because of these structural differences. Always validate your logic against exchange-specific historical data, not just aggregate crypto data.
The KuCoin Futures WebSocket API is well-documented and reliable enough for production trading systems. The token-based authentication is an extra step compared to exchanges like Binance, but it's a minor inconvenience once you have it automated. The bigger investment is in the surrounding infrastructure: reconnection logic, sequence gap handling, and the data pipeline that turns raw ticks into trading decisions.
Start with the public ticker stream to validate your setup, then graduate to order book deltas once you need depth. If you're combining this with signals from a platform like VoiceOfChain, the WebSocket feed gives you the execution-quality data to act on those signals quickly, rather than finding out a move already happened when your next REST poll fires.