forked from xDaya/Co-Learing_Scenario
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom_actions.py
487 lines (345 loc) · 19.3 KB
/
custom_actions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
import numpy as np
from matrx.actions.action import Action, ActionResult
from matrx.actions.object_actions import _is_drop_poss, _act_drop, _possible_drop, _find_drop_loc, GrabObject, GrabObjectResult
import random
rock_imgs = ['/images/rock1.png', '/images/rock2.png', '/images/rock3.png']
class BreakObject(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
# The is_possible function here is currently stolen from the grab_large, should be adjusted later
def is_possible(self, grid_world, agent_id, **kwargs):
# Set default values check
#object_id = None if 'object_id' not in kwargs else kwargs['object_id']
#grab_range = np.inf if 'grab_range' not in kwargs else kwargs['grab_range']
#max_objects = np.inf if 'max_objects' not in kwargs else kwargs['max_objects']
return BreakObjectResult(BreakObjectResult.RESULT_SUCCESS, True)
def mutate(self, grid_world, agent_id, **kwargs):
# Additional check
assert 'object_id' in kwargs.keys()
assert 'grab_range' in kwargs.keys()
succeeded = None
object_ids = kwargs['object_id'] # So this is the list with large object and parts that is in range
# Remove first item from the list and environment, which is the large object itself
if len(object_ids) > 0:
succeeded = grid_world.remove_from_grid(object_ids[0])
object_ids.pop(0)
# For loop that loops through list of parts to change
for object_id in object_ids:
env_obj = grid_world.environment_objects[object_id] # Environment object
# Change property 'bound_to' to None
env_obj.change_property('bound_to', None)
# Get a random rock image and change img_name to that image
rock_img_property = random.choice(rock_imgs)
env_obj.change_property('img_name', rock_img_property)
if not succeeded:
return BreakObjectResult(
BreakObjectResult.FAILED_TO_REMOVE_OBJECT_FROM_WORLD.replace("{OBJECT_ID}",
env_obj.obj_id), False)
return BreakObjectResult(BreakObjectResult.RESULT_SUCCESS, True)
class BreakObjectResult(ActionResult):
""" Result when the object can be successfully grabbed. """
RESULT_SUCCESS = 'Grab action success'
""" Result when the specified object is not within range. """
NOT_IN_RANGE = 'Object not in range'
""" Result when the specified object is an agent. """
RESULT_AGENT = 'This is an agent, cannot be picked up'
""" Result when no object was specified. """
RESULT_NO_OBJECT = 'No Object specified'
""" Result when the specified object does not exist in the :class:`matrxs.grid_world.GridWorld` """
RESULT_UNKNOWN_OBJECT_TYPE = 'obj_id is no Agent and no Object, unknown what to do'
""" Result when the specified object is not movable. """
RESULT_OBJECT_UNMOVABLE = 'Object is not movable'
FAILED_TO_REMOVE_OBJECT_FROM_WORLD = 'Failed to remove object'
def __init__(self, result, succeeded):
super().__init__(result, succeeded)
class GrabLargeObject(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
def is_possible(self, grid_world, agent_id, **kwargs):
# Set default values check
object_id = None if 'object_id' not in kwargs else kwargs['object_id']
grab_range = np.inf if 'grab_range' not in kwargs else kwargs['grab_range']
max_objects = np.inf if 'max_objects' not in kwargs else kwargs['max_objects']
return _is_possible_grab_large(grid_world, agent_id=agent_id, object_id=object_id, grab_range=grab_range,
max_objects=max_objects)
def mutate(self, grid_world, agent_id, **kwargs):
# Additional check
assert 'object_id' in kwargs.keys()
assert 'grab_range' in kwargs.keys()
assert 'max_objects' in kwargs.keys()
# if possible:
object_ids = kwargs['object_id'] # assign
# Loading properties
reg_ag = grid_world.registered_agents[agent_id] # Registered Agent
# For loop that loops through list of large object and its parts
for object_id in object_ids:
env_obj = grid_world.environment_objects[object_id] # Environment object
# Updating properties
env_obj.carried_by.append(agent_id)
reg_ag.is_carrying.append(env_obj) # we add the entire object!
# Remove it from the grid world (it is now stored in the is_carrying list of the AgentAvatar
succeeded = grid_world.remove_from_grid(object_id=env_obj.obj_id, remove_from_carrier=False)
if not succeeded:
return GrabLargeObjectResult(GrabLargeObjectResult.FAILED_TO_REMOVE_OBJECT_FROM_WORLD.replace("{OBJECT_ID}",
env_obj.obj_id), False)
# Updating Location (done after removing from grid, or the grid will search the object on the wrong location)
env_obj.location = reg_ag.location
reg_ag.change_property('img_name', '/images/selector_holding2.png') # Code to change agent image
return GrabLargeObjectResult(GrabLargeObjectResult.RESULT_SUCCESS, True)
class GrabLargeObjectResult(ActionResult):
""" Result when the object can be successfully grabbed. """
RESULT_SUCCESS = 'Grab action success'
""" Result when the grabbed object cannot be removed from the :class:`matrxs.grid_world.GridWorld`. """
FAILED_TO_REMOVE_OBJECT_FROM_WORLD = 'Grab action failed; could not remove object with id {OBJECT_ID} from grid.'
""" Result when the specified object is not within range. """
NOT_IN_RANGE = 'Object not in range'
""" Result when the specified object is an agent. """
RESULT_AGENT = 'This is an agent, cannot be picked up'
""" Result when no object was specified. """
RESULT_NO_OBJECT = 'No Object specified'
""" Result when the agent is at its maximum carrying capacity. """
RESULT_CARRIES_OBJECT = 'Agent already carries the maximum amount of objects'
""" Result when the specified object is already carried by another agent. """
RESULT_OBJECT_CARRIED = 'Object is already carried by {AGENT_ID}'
""" Result when the specified object does not exist in the :class:`matrxs.grid_world.GridWorld` """
RESULT_UNKNOWN_OBJECT_TYPE = 'obj_id is no Agent and no Object, unknown what to do'
""" Result when the specified object is not movable. """
RESULT_OBJECT_UNMOVABLE = 'Object is not movable'
def __init__(self, result, succeeded):
super().__init__(result, succeeded)
class DropLargeObject(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
def is_possible(self, grid_world, agent_id, **kwargs):
reg_ag = grid_world.registered_agents[agent_id]
drop_range = 1 if 'drop_range' not in kwargs else kwargs['drop_range']
# If no object id is given, the last item is dropped
if 'object_id' in kwargs:
obj_id = kwargs['object_id']
elif len(reg_ag.is_carrying) > 0:
obj_id = reg_ag.is_carrying[-1] # Not edited for now; eventually we will need to make sure there is space for all blocks contained by the large block, now checking for 1 is enough
else:
return DropLargeObjectResult(DropLargeObjectResult.RESULT_NO_OBJECT, False)
return _possible_drop(grid_world, agent_id=agent_id, obj_id=obj_id, drop_range=drop_range)
def mutate(self, grid_world, agent_id, **kwargs):
reg_ag = grid_world.registered_agents[agent_id]
# fetch range from kwargs
drop_range = 1 if 'drop_range' not in kwargs else kwargs['drop_range']
obj_type = None if 'obj_type' not in kwargs else kwargs['obj_type']
parts_obj = []
# If no object id is given, the last item is dropped
if 'object_id' in kwargs:
env_obj = kwargs['object_id']
elif len(reg_ag.is_carrying) > 0:
env_obj = reg_ag.is_carrying[-1]
parts_obj = reg_ag.is_carrying[::-1] # Assuming here that agent only carries 1 object which is large. Reversed so the parts come first, then the large object itself
else:
return DropLargeObjectResult(DropLargeObjectResult.RESULT_NO_OBJECT_CARRIED, False)
# check that it is even possible to drop this object somewhere
if not env_obj.is_traversable and not reg_ag.is_traversable and drop_range == 0:
raise Exception(
f"Intraversable agent {reg_ag.obj_id} can only drop the intraversable object {env_obj.obj_id} at its "
f"own location (drop_range = 0), but this is impossible. Enlarge the drop_range for the DropAction to "
f"atleast 1")
# check if we can drop it at our current location
curr_loc_drop_poss = _is_drop_poss(grid_world, env_obj, reg_ag.location, agent_id)
# drop it on the agent location if possible
if curr_loc_drop_poss:
reg_ag.change_property('img_name', '/images/selector2.png') # Code to change agent image
return _act_drop_large(grid_world, agent=reg_ag, parts_obj=parts_obj, drop_loc=reg_ag.location, obj_type=obj_type) # We need to make this loop over the different objects
# if the agent location was the only within range, return a negative action result
elif not curr_loc_drop_poss and drop_range == 0:
return DropLargeObjectResult(DropLargeObjectResult.RESULT_OBJECT, False)
# Try finding other drop locations from close to further away around the agent
drop_loc = _find_drop_loc(grid_world, reg_ag, env_obj, drop_range, reg_ag.location)
# If we didn't find a valid drop location within range, return a negative action result
if not drop_loc:
return DropLargeObjectResult(DropLargeObjectResult.RESULT_OBJECT, False)
return _act_drop(grid_world, agent=reg_ag, env_obj=env_obj, drop_loc=drop_loc)
class DropLargeObjectResult(ActionResult):
""" Result when dropping the object succeeded. """
RESULT_SUCCESS = 'Drop action success'
""" Result when there is not object in the agent's inventory. """
RESULT_NO_OBJECT = 'The item is not carried'
""" Result when the specified object is not in the agent's inventory. """
RESULT_NONE_GIVEN = "'None' used as input id"
""" Result when the specified object should be dropped on an agent. """
RESULT_AGENT = 'Cannot drop item on an agent'
""" Result when the specified object should be dropped on an intraversable object."""
RESULT_OBJECT = 'Cannot drop item on another intraversable object'
""" Result when the specified object does not exist (anymore). """
RESULT_UNKNOWN_OBJECT_TYPE = 'Cannot drop item on an unknown object'
""" Result when the agent is not carrying anything. """
RESULT_NO_OBJECT_CARRIED = 'Cannot drop object when none carried'
def __init__(self, result, succeeded, obj_id=None):
super().__init__(result, succeeded)
self.obj_id = obj_id
class Fall(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
def is_possible(self, grid_world, agent_id, **kwargs):
# Maybe do a check to see if the empty location is really and still empty?
return FallResult(FallResult.RESULT_SUCCESS, True)
def mutate(self, grid_world, agent_id, **kwargs):
# Make sure this can deal with a list of objects and lets them fall down (in an order that makes sense?)
# Additional check
assert 'object_list' in kwargs.keys()
# if possible:
falling_objs = kwargs['object_list'] # assign
for object_id in falling_objs:
if isinstance(object_id, list):
for object_part in object_id:
env_obj = grid_world.environment_objects[object_part] # Environment object
object_loc = env_obj.location
object_loc_x = object_loc[0]
object_loc_y = object_loc[1]
# Update y value
new_y = object_loc_y + 1
new_loc = (object_loc_x, new_y)
# Actually update location
env_obj.location = new_loc
else:
env_obj = grid_world.environment_objects[object_id] # Environment object
object_loc = env_obj.location
object_loc_x = object_loc[0]
object_loc_y = object_loc[1]
# Update y value
new_y = object_loc_y + 1
new_loc = (object_loc_x, new_y)
# Actually update location
env_obj.location = new_loc
return FallResult(FallResult.RESULT_SUCCESS, True)
class FallResult(ActionResult):
""" Result when falling succeeded. """
RESULT_SUCCESS = 'Falling action successful'
""" Result when the emptied space was not actually empty. """
RESULT_NOT_EMPTY = 'There was no empty space for the objects to fall in'
""" Result when the emptied space was not actually empty. """
RESULT_FAILED = 'Failed to let object fall'
def __init__(self, result, succeeded):
super().__init__(result, succeeded)
def _is_possible_grab_large(grid_world, agent_id, object_id, grab_range, max_objects):
reg_ag = grid_world.registered_agents[agent_id] # Registered Agent
loc_agent = reg_ag.location # Agent location
# There is no large object specified
if not object_id:
return GrabLargeObjectResult(GrabLargeObjectResult.RESULT_NO_OBJECT, False)
# Already carries an/too many object(s)
if len(reg_ag.is_carrying) + len(object_id) > max_objects:
return GrabLargeObjectResult(GrabLargeObjectResult.RESULT_CARRIES_OBJECT, False)
# Go through all objects at the desired location
# Set random object in range
# Check if object is in range
# Check if object_id is the id of an agent
# Check if it is an object
else:
return GrabLargeObjectResult(GrabLargeObjectResult.RESULT_SUCCESS, True)
class Idle(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
def is_possible(self, grid_world, agent_id, **kwargs):
# Maybe do a check to see if the empty location is really and still empty?
return IdleResult(IdleResult.RESULT_SUCCESS, True)
class IdleResult(ActionResult):
""" Result when falling succeeded. """
RESULT_SUCCESS = 'Idling action successful'
""" Result when the emptied space was not actually empty. """
RESULT_FAILED = 'Failed to idle'
def __init__(self, result, succeeded):
super().__init__(result, succeeded)
class SendReward(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
def is_possible(self, grid_world, agent_id, **kwargs):
# What kind of check is necessary here?
return IdleResult(IdleResult.RESULT_SUCCESS, True)
def mutate(self, grid_world, agent_id, **kwargs):
return
class SendRewardResult(ActionResult):
""" Result when falling succeeded. """
RESULT_SUCCESS = 'Reward sent successfully'
""" Result when the emptied space was not actually empty. """
RESULT_FAILED = 'Failed to send reward'
def __init__(self, result, succeeded):
super().__init__(result, succeeded)
def _act_drop_large(grid_world, agent, parts_obj, drop_loc, obj_type):
x_drop_loc = drop_loc[0]
y_drop_loc = drop_loc[1]
locations = []
if obj_type == 'vert':
locations = [drop_loc, (x_drop_loc, y_drop_loc + 1), (x_drop_loc, y_drop_loc + 2), (x_drop_loc, y_drop_loc + 3),
drop_loc]
elif obj_type == 'long':
if x_drop_loc > 16:
drop_loc = (16, y_drop_loc)
x_drop_loc = drop_loc[0]
y_drop_loc = drop_loc[1]
locations = [drop_loc, (x_drop_loc + 1, y_drop_loc), (x_drop_loc + 2, y_drop_loc), (x_drop_loc + 3, y_drop_loc),
drop_loc]
else:
if x_drop_loc > 18:
drop_loc = (18, y_drop_loc)
x_drop_loc = drop_loc[0]
y_drop_loc = drop_loc[1]
locations = [drop_loc, (x_drop_loc+1, y_drop_loc), (x_drop_loc, y_drop_loc+1), (x_drop_loc+1, y_drop_loc+1),
drop_loc]
for env_obj in parts_obj:
# Updating properties
agent.is_carrying.remove(env_obj)
env_obj.carried_by.remove(agent.obj_id)
# We return the object to the grid location we are standing at
env_obj.location = locations[parts_obj.index(env_obj)]
grid_world._register_env_object(env_obj)
class ManageImg(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
def is_possible(self, grid_world, agent_id, **kwargs):
# What kind of check is necessary here?
return ManageImgResult(ManageImgResult.RESULT_SUCCESS, True)
def mutate(self, grid_world, agent_id, **kwargs):
reg_ag = grid_world.registered_agents[agent_id]
if kwargs['health_score'] >= 600:
if kwargs['animation'] == True:
reg_ag.change_property('img_name', '/images/victim_scream_anim.gif')
else:
reg_ag.change_property('img_name', '/images/victim_square.png')
elif kwargs['health_score'] >= 300:
if kwargs['animation'] == True:
reg_ag.change_property('img_name', '/images/victim2_scream_anim.gif')
else:
reg_ag.change_property('img_name', '/images/victim2.png')
else:
if kwargs['animation'] == True:
reg_ag.change_property('img_name', '/images/victim3_scream_anim.gif')
else:
reg_ag.change_property('img_name', '/images/victim3.png')
return ManageImgResult(ManageImgResult.RESULT_SUCCESS, True)
class ManageImgResult(ActionResult):
""" Result when falling succeeded. """
RESULT_SUCCESS = 'Idling action successful'
""" Result when the emptied space was not actually empty. """
RESULT_FAILED = 'Failed to idle'
def __init__(self, result, succeeded):
super().__init__(result, succeeded)
class GoalReachedImg(Action):
def __init__(self, duration_in_ticks=1):
super().__init__(duration_in_ticks)
def is_possible(self, grid_world, agent_id, **kwargs):
# What kind of check is necessary here?
return GoalReachedImgResult(GoalReachedImgResult.RESULT_SUCCESS, True)
def mutate(self, grid_world, agent_id, **kwargs):
object_id = kwargs['object_id'] # The goal object
result = kwargs['result'] # Whether it was a success or not
env_obj = grid_world.environment_objects[object_id]
if result is True:
env_obj.change_property("img_name", "/images/goalreached_img.png")
else:
env_obj.change_property("img_name", "/images/goalnotreached_img.png")
return GoalReachedImgResult(GoalReachedImgResult.RESULT_SUCCESS, True)
class GoalReachedImgResult(ActionResult):
""" Result when falling succeeded. """
RESULT_SUCCESS = 'Idling action successful'
""" Result when the emptied space was not actually empty. """
RESULT_FAILED = 'Failed to idle'
def __init__(self, result, succeeded):
super().__init__(result, succeeded)