-
Notifications
You must be signed in to change notification settings - Fork 0
/
loadtest.py
112 lines (89 loc) · 2.98 KB
/
loadtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import websocket
import time
import requests
import json
import os
from coolname import generate_slug
from random import uniform, choice
PY_ENV = os.environ.get("PY_ENV")
ROOM_COUNT = int(os.environ.get("ROOM_COUNT"))
ROOM_SIZE = int(os.environ.get("ROOM_SIZE"))
ROOM_TICKS = int(os.environ.get("ROOM_TICKS"))
PY_DOMAIN = "127.0.0.1:8000"
if PY_ENV == "prod":
PY_DOMAIN = "ineedempathy.com"
class EmpathyRoom:
@classmethod
def build(cls, room_size, room_id):
klass = cls(room_id, generate_slug(3), room_size)
klass.fill_room()
return klass
def __init__(self, room_id, room_name, room_size):
self.room_id = room_id
self.room_name = room_name
self.room_size = room_size
self.users = []
# complete a random action
def tick(self):
print("TICK | Room: " + self.room_name)
choice(self.users).toggle(int(uniform(0, 80)))
self.receive()
def receive(self):
for user in self.users:
user.receive()
def close(self):
for user in self.users:
user.close()
self.receive()
def fill_room(self):
for x in range(0, self.room_size):
user = RoomUser.build(self.room_name)
if user.connect():
self.users.append(user)
self.receive()
else:
print(f"DROP | User: {user}")
print(f"CREATED | Room: {self.room_id} - {self.room_name} Users: {len(self.users)}")
class RoomUser:
@classmethod
def build(cls, room_name):
return cls(room_name, generate_slug(2))
def __init__(self, room_name, user_name):
self.room_name = room_name
self.user_name = user_name
# print('OPEN | Room: ' + self.room_name + '; User: ' + self.user_name)
self.url = "ws://" + PY_DOMAIN + "/api/rooms/" + self.room_name + ".ws"
def connect(self):
try:
#print(self.url)
self.ws = websocket.create_connection(self.url)
self.identify()
except Exception as err:
print(err)
return False
return True
def close(self):
print("CLOSE | Room: " + self.room_name + "; User: " + self.user_name)
return self.ws.close(1005)
def receive(self):
if self.ws.connected:
self.ws.recv()
#print("Received '%s'" % result)
def identify(self):
self.ws.send('{"setName": "' + self.user_name + '"}')
def toggle(self, card_id):
self.ws.send('{"toggleCard": ' + str(card_id) + "}")
def run(room_count, room_ticks, room_size):
rooms = []
for x in range(0, room_count):
time.sleep(0.1)
room = EmpathyRoom.build(room_size=room_size, room_id=x)
rooms.append(room)
time.sleep(3)
for x in range(0, room_ticks):
time.sleep(0.05)
for room in rooms:
room.tick()
for room in rooms:
room.close()
run(room_count=ROOM_COUNT, room_ticks=ROOM_TICKS, room_size=ROOM_SIZE)