Sometimes, we have high requirements for the timeliness of the indicators. At this time, we often use socket connections with the data source, such as market price alert. In KingData, the startup of the socket is not deployed through Scrapyd, so it is slightly different, but Users only need to expose the startup method, and KingData will automatically inherit other development work into existing services. It should be noted that we need to handle various abnormal operations such as connection timeout, heartbeat, and reconnection in the program.
Indicator : BTC Price Alert
import typerimport asyncioimport aiohttpimport ujsonasyncdefbinance_conn(): websocket =await_conn('wss://stream.binance.com:443/ws/!miniTicker@arr')return websocketasyncdef_conn(uri): session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(100)) websocket =await session.ws_connect(uri, proxy=PROXY, ssl=False)return websocketasyncdefget_k_line(kline_q):print('binance web Socket Connecting ...') websocket =await ws.binance_conn() ts = time.time()print('binance web Socket Connect Success!')while1:try: msg =await websocket.receive(timeout=10)except asyncio.exceptions.TimeoutError as e:print('TimeoutError: Reconnecting') websocket =await ws.binance_conn()await asyncio.sleep(1)else:match msg.type:case aiohttp.WSMsgType.TEXT: data = ujson.loads(msg.data)for ticker in data:# TODO Real-time data logic processingcase aiohttp.WSMsgType.CLOSED:print('Binance web Socket Connect Failed!') websocket =await ws.binance_conn()await asyncio.sleep(1)===== starting program =====app = typer.Typer()asyncdefrun(): kline_q = asyncio.queues.Queue(maxsize=10000)await asyncio.gather(get_k_line(kline_q) )@app.command()defstart(): asyncio.run(run())@app.command()defgoodbye(name:str,formal:bool=False):if formal: typer.echo(f"Goodbye Ms. {name}. Have a good day.")else: typer.echo(f"Bye {name}!")if__name__=="__main__":app()