Skip to content

Commit 1716ceb

Browse files
authored
fix: ws api not allowing to send over 100 requests (#1560)
* add timeout to jobs * fix: sending ws api request over queue size * fix tests * fix tests * skip test for python 3.7
1 parent be87758 commit 1716ceb

File tree

2 files changed

+82
-13
lines changed

2 files changed

+82
-13
lines changed

binance/ws/websocket_api.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,26 @@ def __init__(self, url: str, tld: str = "com", testnet: bool = False):
2121
def _handle_message(self, msg):
2222
"""Override message handling to support request-response"""
2323
parsed_msg = super()._handle_message(msg)
24+
self._log.debug(f"Received message: {parsed_msg}")
2425
if parsed_msg is None:
2526
return None
26-
req_id, exception, throwError = None, None, False
27+
req_id, exception = None, None
2728
if "id" in parsed_msg:
2829
req_id = parsed_msg["id"]
2930
if "status" in parsed_msg:
3031
if parsed_msg["status"] != 200:
31-
throwError = True
3232
exception = BinanceAPIException(
3333
parsed_msg, parsed_msg["status"], self.json_dumps(parsed_msg["error"])
3434
)
3535
if req_id is not None and req_id in self._responses:
36-
if throwError and exception is not None:
36+
if exception is not None:
3737
self._responses[req_id].set_exception(exception)
3838
else:
3939
self._responses[req_id].set_result(parsed_msg)
40-
elif throwError and exception is not None:
40+
elif exception is not None:
4141
raise exception
42-
return parsed_msg
42+
else:
43+
self._log.warning(f"WS api receieved unknown message: {parsed_msg}")
4344

4445
async def _ensure_ws_connection(self) -> None:
4546
"""Ensure WebSocket connection is established and ready

tests/test_ws_api.py

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import sys
23
import re
34
import pytest
45
import asyncio
@@ -114,16 +115,35 @@ async def test_testnet_url():
114115
async def test_message_handling(clientAsync):
115116
"""Test message handling with various message types"""
116117
# Test valid message
117-
valid_msg = {"id": "123", "result": {"test": "data"}}
118-
result = clientAsync.ws_api._handle_message(json.dumps(valid_msg))
118+
future = asyncio.Future()
119+
clientAsync.ws_api._responses["123"] = future
120+
valid_msg = {"id": "123", "status": 200, "result": {"test": "data"}}
121+
clientAsync.ws_api._handle_message(json.dumps(valid_msg))
122+
result = await clientAsync.ws_api._responses["123"]
119123
assert result == valid_msg
124+
125+
@pytest.mark.asyncio
126+
async def test_message_handling_raise_exception(clientAsync):
127+
with pytest.raises(BinanceAPIException):
128+
future = asyncio.Future()
129+
clientAsync.ws_api._responses["123"] = future
130+
valid_msg = {"id": "123", "status": 400, "error": {"code": "0", "msg": "error message"}}
131+
clientAsync.ws_api._handle_message(json.dumps(valid_msg))
132+
await future
133+
@pytest.mark.asyncio
134+
async def test_message_handling_raise_exception_without_id(clientAsync):
135+
with pytest.raises(BinanceAPIException):
136+
future = asyncio.Future()
137+
clientAsync.ws_api._responses["123"] = future
138+
valid_msg = {"id": "123", "status": 400, "error": {"code": "0", "msg": "error message"}}
139+
clientAsync.ws_api._handle_message(json.dumps(valid_msg))
140+
await future
141+
142+
@pytest.mark.asyncio
143+
async def test_message_handling_invalid_json(clientAsync):
144+
with pytest.raises(json.JSONDecodeError):
145+
clientAsync.ws_api._handle_message("invalid json")
120146

121-
# Test message without ID
122-
no_id_msg = {"data": "test"}
123-
result = clientAsync.ws_api._handle_message(json.dumps(no_id_msg))
124-
assert result == no_id_msg
125-
126-
# Test invalid JSON
127147
with pytest.raises(json.JSONDecodeError):
128148
clientAsync.ws_api._handle_message("invalid json")
129149

@@ -151,3 +171,51 @@ async def test_cleanup_on_exit(clientAsync):
151171
# Check cleanup
152172
assert "test" not in clientAsync.ws_api._responses
153173
assert future.exception() is not None
174+
175+
176+
@pytest.mark.asyncio
177+
async def test_ws_queue_overflow(clientAsync):
178+
"""WebSocket API should not overflow queue"""
179+
#
180+
original_size = clientAsync.ws_api.MAX_QUEUE_SIZE
181+
clientAsync.ws_api.MAX_QUEUE_SIZE = 1
182+
183+
try:
184+
# Request multiple order books concurrently
185+
symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT"]
186+
tasks = [clientAsync.ws_get_order_book(symbol=symbol) for symbol in symbols]
187+
188+
# Execute all requests concurrently and wait for results
189+
results = await asyncio.gather(*tasks, return_exceptions=True)
190+
191+
# Check that we got valid responses or expected overflow errors
192+
valid_responses = [r for r in results if not isinstance(r, Exception)]
193+
assert len(valid_responses) == len(symbols), "Should get at least one valid response"
194+
195+
for result in valid_responses:
196+
assert_ob(result)
197+
198+
finally:
199+
# Restore original queue size
200+
clientAsync.ws_api.MAX_QUEUE_SIZE = original_size
201+
202+
@pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+")
203+
@pytest.mark.asyncio
204+
async def test_ws_api_with_stream(clientAsync):
205+
"""Test combining WebSocket API requests with stream listening"""
206+
from binance import BinanceSocketManager
207+
208+
# Create socket manager and trade socket
209+
bm = BinanceSocketManager(clientAsync)
210+
ts = bm.trade_socket("BTCUSDT")
211+
212+
async with ts:
213+
# Make WS API request while stream is active
214+
order_book = await clientAsync.ws_get_order_book(symbol="BTCUSDT")
215+
assert_ob(order_book)
216+
217+
# Verify we can still receive stream data
218+
trade = await ts.recv()
219+
assert "s" in trade # Symbol
220+
assert "p" in trade # Price
221+
assert "q" in trade # Quantity

0 commit comments

Comments
 (0)