-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathwebsockets.py
59 lines (47 loc) · 1.48 KB
/
websockets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
'''Example for connecting to private WebSockets with an existing account.
Usage: python -m examples.websockets
'''
import asyncio
import json
import websockets
from dydx3 import Client
from dydx3.helpers.request_helpers import generate_now_iso
from dydx3.constants import API_HOST_SEPOLIA
from dydx3.constants import NETWORK_ID_SEPOLIA
from dydx3.constants import WS_HOST_SEPOLIA
from web3 import Web3
# Ganache test address.
ETHEREUM_ADDRESS = '0x22d491Bde2303f2f43325b2108D26f1eAbA1e32b'
# Ganache node.
WEB_PROVIDER_URL = 'http://localhost:8545'
client = Client(
network_id=NETWORK_ID_SEPOLIA,
host=API_HOST_SEPOLIA,
default_ethereum_address=ETHEREUM_ADDRESS,
web3=Web3(Web3.HTTPProvider(WEB_PROVIDER_URL)),
)
now_iso_string = generate_now_iso()
signature = client.private.sign(
request_path='/ws/accounts',
method='GET',
iso_timestamp=now_iso_string,
data={},
)
req = {
'type': 'subscribe',
'channel': 'v3_accounts',
'accountNumber': '0',
'apiKey': client.api_key_credentials['key'],
'passphrase': client.api_key_credentials['passphrase'],
'timestamp': now_iso_string,
'signature': signature,
}
async def main():
# Note: This doesn't work with Python 3.9.
async with websockets.connect(WS_HOST_SEPOLIA) as websocket:
await websocket.send(json.dumps(req))
print(f'> {req}')
while True:
res = await websocket.recv()
print(f'< {res}')
asyncio.get_event_loop().run_until_complete(main())