-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathClient.py
297 lines (260 loc) · 11.7 KB
/
Client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
from typing import TYPE_CHECKING, Optional, Set, List, Dict
import struct
from NetUtils import ClientStatus
from .Locations import roomCount, nonBlock, beanstones, roomException, shop, badge, pants, eReward
from .Items import items_by_id
import asyncio
import worlds._bizhawk as bizhawk
from worlds._bizhawk.client import BizHawkClient
if TYPE_CHECKING:
from worlds._bizhawk.context import BizHawkClientContext
ROOM_ARRAY_POINTER = 0x51FA00
class MLSSClient(BizHawkClient):
game = "Mario & Luigi Superstar Saga"
system = "GBA"
patch_suffix = ".apmlss"
local_checked_locations: Set[int]
goal_flag: int
rom_slot_name: Optional[str]
eUsed: List[int]
room: int
local_events: List[int]
player_name: Optional[str]
checked_flags: Dict[int, list] = {}
def __init__(self) -> None:
super().__init__()
self.local_checked_locations = set()
self.local_set_events = {}
self.local_found_key_items = {}
self.rom_slot_name = None
self.seed_verify = False
self.eUsed = []
self.room = 0
self.local_events = []
async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
from CommonClient import logger
try:
# Check ROM name/patch version
rom_name_bytes = await bizhawk.read(ctx.bizhawk_ctx, [(0xA0, 14, "ROM")])
rom_name = bytes([byte for byte in rom_name_bytes[0] if byte != 0]).decode("UTF-8")
if not rom_name.startswith("MARIO&LUIGIUA8"):
return False
except UnicodeDecodeError:
return False
except bizhawk.RequestFailedError:
return False # Should verify on the next pass
ctx.game = self.game
ctx.items_handling = 0b101
ctx.want_slot_data = True
ctx.watcher_timeout = 0.125
self.rom_slot_name = rom_name
self.seed_verify = False
name_bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(0xDF0000, 16, "ROM")]))[0]
name = bytes([byte for byte in name_bytes if byte != 0]).decode("UTF-8")
self.player_name = name
for i in range(59):
self.checked_flags[i] = []
return True
async def set_auth(self, ctx: "BizHawkClientContext") -> None:
ctx.auth = self.player_name
def on_package(self, ctx, cmd, args) -> None:
if cmd == "RoomInfo":
ctx.seed_name = args["seed_name"]
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
from CommonClient import logger
try:
if ctx.seed_name is None:
return
if not self.seed_verify:
seed = await bizhawk.read(ctx.bizhawk_ctx, [(0xDF00A0, len(ctx.seed_name), "ROM")])
seed = seed[0].decode("UTF-8")
if seed not in ctx.seed_name:
logger.info(
"ERROR: The ROM you loaded is for a different game of AP. "
"Please make sure the host has sent you the correct patch file,"
"and that you have opened the correct ROM."
)
raise bizhawk.ConnectorError("Loaded ROM is for Incorrect lobby.")
self.seed_verify = True
read_state = await bizhawk.read(
ctx.bizhawk_ctx,
[
(0x4564, 59, "EWRAM"),
(0x2330, 2, "IWRAM"),
(0x3FE0, 1, "IWRAM"),
(0x304A, 1, "EWRAM"),
(0x304B, 1, "EWRAM"),
(0x304C, 4, "EWRAM"),
(0x3060, 6, "EWRAM"),
(0x4808, 2, "EWRAM"),
(0x4407, 1, "EWRAM"),
(0x2339, 1, "IWRAM"),
]
)
flags = read_state[0]
current_room = int.from_bytes(read_state[1], "little")
shop_init = read_state[2][0]
shop_scroll = read_state[3][0] & 0x1F
is_buy = read_state[4][0] != 0
shop_address = (struct.unpack("<I", read_state[5])[0]) & 0xFFFFFF
logo = bytes([byte for byte in read_state[6] if byte < 0x70]).decode("UTF-8")
received_index = (read_state[7][0] << 8) + read_state[7][1]
cackletta = read_state[8][0] & 0x40
shopping = read_state[9][0] & 0xF
if logo != "MLSSAP":
return
locs_to_send = set()
# Checking shop purchases
if is_buy:
await bizhawk.write(ctx.bizhawk_ctx, [(0x304A, [0x0, 0x0], "EWRAM")])
if shop_address != 0x3C0618 and shop_address != 0x3C0684:
location = shop[shop_address][shop_scroll]
else:
if shop_init & 0x1 != 0:
location = badge[shop_address][shop_scroll]
else:
location = pants[shop_address][shop_scroll]
if location in ctx.server_locations:
locs_to_send.add(location)
# Loop for receiving items. Item is written as an ID into 0x3057.
# ASM reads the ID in a loop and give the player the item before resetting the RAM address to 0x0.
# If RAM address isn't 0x0 yet break out and try again later to give the rest of the items
for i in range(len(ctx.items_received) - received_index):
item_data = items_by_id[ctx.items_received[received_index + i].item]
result = False
total = 0
while not result:
await asyncio.sleep(0.05)
total += 0.05
result = await bizhawk.guarded_write(
ctx.bizhawk_ctx,
[
(0x3057, [id_to_RAM(item_data.itemID)], "EWRAM")
],
[(0x3057, [0x0], "EWRAM")]
)
if result:
total = 0
if total >= 1:
break
if not result:
break
await bizhawk.write(
ctx.bizhawk_ctx,
[
(0x4808, [(received_index + i + 1) // 0x100, (received_index + i + 1) % 0x100], "EWRAM"),
]
)
# Early return and location send if you are currently in a shop,
# since other flags aren't going to change
if shopping & 0x3 == 0x3:
if locs_to_send != self.local_checked_locations:
self.local_checked_locations = locs_to_send
if locs_to_send is not None:
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locs_to_send)}])
return
# Checking flags that aren't digspots or blocks
for item in nonBlock:
address, mask, location = item
if location in self.local_checked_locations:
continue
flag_bytes = await bizhawk.read(ctx.bizhawk_ctx, [(address, 1, "EWRAM"), (0x3060, 6, "EWRAM")])
flag_byte = flag_bytes[0][0]
backup_logo = bytes([byte for byte in flag_bytes[1] if byte < 0x70]).decode("UTF-8")
if backup_logo != "MLSSAP":
return
if flag_byte & mask != 0:
if location >= 0xDA0000 and location not in self.local_events:
self.local_events += [location]
await ctx.send_msgs(
[
{
"cmd": "Set",
"key": f"mlss_flag_{ctx.team}_{ctx.slot}",
"default": 0,
"want_reply": False,
"operations": [{"operation": "or", "value": 1 << (location - 0xDA0000)}],
}
]
)
continue
if location in roomException:
if current_room not in roomException[location]:
exception = True
else:
exception = False
else:
exception = True
if location in eReward:
if location not in self.eUsed:
self.eUsed += [location]
location = eReward[len(self.eUsed) - 1]
else:
continue
if (location in ctx.server_locations) and exception:
locs_to_send.add(location)
# Check for set location flags.
for byte_i, byte in enumerate(bytearray(flags)):
for j in range(8):
if j in self.checked_flags[byte_i]:
continue
and_value = 1 << j
if byte & and_value != 0:
flag_id = byte_i * 8 + (j + 1)
room, item = find_key(roomCount, flag_id)
pointer_arr = await bizhawk.read(
ctx.bizhawk_ctx, [(ROOM_ARRAY_POINTER + ((room - 1) * 4), 4, "ROM")]
)
pointer = struct.unpack("<I", pointer_arr[0])[0]
pointer = pointer & 0xFFFFFF
offset = await bizhawk.read(ctx.bizhawk_ctx, [(pointer, 1, "ROM")])
offset = offset[0][0]
if offset != 0:
offset = 2
pointer += (item * 8) + 1 + offset
for key, value in beanstones.items():
if pointer == value:
pointer = key
break
if pointer in ctx.server_locations:
self.checked_flags[byte_i] += [j]
locs_to_send.add(pointer)
if not ctx.finished_game and cackletta != 0 and current_room == 0x1C7:
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
if self.room != current_room:
self.room = current_room
await ctx.send_msgs(
[
{
"cmd": "Set",
"key": f"mlss_room_{ctx.team}_{ctx.slot}",
"default": 0,
"want_reply": False,
"operations": [{"operation": "replace", "value": current_room}],
}
]
)
# Send locations if there are any to send.
if locs_to_send != self.local_checked_locations:
self.local_checked_locations = locs_to_send
if locs_to_send is not None:
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locs_to_send)}])
except bizhawk.RequestFailedError:
# Exit handler and return to main loop to reconnect.
pass
except bizhawk.ConnectorError:
pass
def find_key(dictionary, target):
leftover = target
for key, value in dictionary.items():
if leftover > value:
leftover -= value
else:
return key, leftover
def id_to_RAM(id_: int):
code = id_
if 0x1C <= code <= 0x1F:
code += 0xE
if 0x20 <= code <= 0x26:
code -= 0x4
return code