Skip to content

Commit

Permalink
Merge pull request #107 from galaxyproject/fix_resource_clamping
Browse files Browse the repository at this point in the history
Fix evaluation to support resource clamping
  • Loading branch information
nuwang authored Jul 4, 2023
2 parents bc5431f + 31eb94d commit fb5cb96
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 13 deletions.
19 changes: 19 additions & 0 deletions tests/fixtures/mapping-destinations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ tools:
scheduling:
require:
- test_of_mem_accepted
tool_for_testing_resource_clamping:
cores: 16
env:
MY_TOOL_ENV: "cores: {cores} mem: {mem} gpus: {gpus}"
scheduling:
require:
- test_of_resource_clamping

users:
[email protected]:
Expand Down Expand Up @@ -303,3 +310,15 @@ destinations:
scheduling:
accept:
- pulsar-canberra
clamped_destination:
runner: k8s
max_accepted_cores: 32
max_accepted_mem: 128
max_accepted_gpus: 2
cores: 8
gpus: 1
env:
MY_DEST_ENV: "cores: {cores} mem: {mem} gpus: {gpus}"
scheduling:
accept:
- test_of_resource_clamping
15 changes: 15 additions & 0 deletions tests/test_mapper_destinations.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,18 @@ def test_user_map_to_destination_accepting_offline(self):
datasets = [mock_galaxy.DatasetAssociation("test", mock_galaxy.Dataset("test.txt", file_size=12 * 1024 ** 3))]
destination = self._map_to_destination(tool, user, datasets, tpv_config_paths=[config])
self.assertEqual(destination.id, "pulsar-canberra")

def test_destination_clamping(self):
"""_summary_ Any variables defined in a tool should be evaluated as late as possible, so that destination level
clamping works.
"""
user = mock_galaxy.User('albo', '[email protected]')

config = os.path.join(os.path.dirname(__file__), 'fixtures/mapping-destinations.yml')

tool = mock_galaxy.Tool('tool_for_testing_resource_clamping')
datasets = [mock_galaxy.DatasetAssociation("test", mock_galaxy.Dataset("test.txt", file_size=12 * 1024 ** 3))]
destination = self._map_to_destination(tool, user, datasets, tpv_config_paths=[config])
self.assertEqual(destination.id, "clamped_destination")
self.assertEqual([env['value'] for env in destination.env if env['name'] == 'MY_DEST_ENV'], ['cores: 8 mem: 24 gpus: 1'])
self.assertEqual([env['value'] for env in destination.env if env['name'] == 'MY_TOOL_ENV'], ['cores: 8 mem: 24 gpus: 1'])
27 changes: 17 additions & 10 deletions tpv/core/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,7 @@ def combine(self, entity):
new_entity.tpv_tags = entity.tpv_tags.combine(self.tpv_tags)
return new_entity

def evaluate(self, context):
"""
Evaluate expressions in entity properties that must be evaluated as late as possible, which is
to say, after combining entity requirements. This includes env, params and resubmit, that rely on
properties such as cores, mem and gpus after they are combined.
:param context:
:return:
"""
def evaluate_resources(self, context):
new_entity = copy.deepcopy(self)
context.update(self.context or {})
if self.min_gpus is not None:
Expand Down Expand Up @@ -397,6 +390,17 @@ def evaluate(self, context):
new_entity.mem = max(new_entity.min_mem or 0, new_entity.mem or 0)
new_entity.mem = min(new_entity.max_mem, new_entity.mem or 0) if new_entity.max_mem else new_entity.mem
context['mem'] = new_entity.mem
return new_entity

def evaluate(self, context):
"""
Evaluate expressions in entity properties that must be evaluated as late as possible, which is
to say, after combining entity requirements. This includes env, params and resubmit, that rely on
properties such as cores, mem and gpus after they are combined.
:param context:
:return:
"""
new_entity = self.evaluate_resources(context)
if self.env:
new_entity.env = self.evaluate_complex_property(self.env, context, stringify=True)
context['env'] = new_entity.env
Expand All @@ -406,7 +410,6 @@ def evaluate(self, context):
if self.resubmit:
new_entity.resubmit = self.evaluate_complex_property(self.resubmit, context)
context['resubmit'] = new_entity.resubmit

return new_entity

def rank_destinations(self, destinations, context):
Expand Down Expand Up @@ -478,7 +481,7 @@ def override(self, entity):
new_entity.rules[rule.id] = rule.inherit(entity.rules[rule.id])
return new_entity

def evaluate(self, context):
def evaluate_rules(self, context):
new_entity = copy.deepcopy(self)
context.update(new_entity.context or {})
for rule in self.rules.values():
Expand All @@ -492,6 +495,10 @@ def evaluate(self, context):
context.update({
'entity': new_entity
})
return new_entity

def evaluate(self, context):
new_entity = self.evaluate_rules(context)
return super(EntityWithRules, new_entity).evaluate(context)

def __repr__(self):
Expand Down
9 changes: 6 additions & 3 deletions tpv/core/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ def rank(self, entity, destinations, context):
return entity.rank_destinations(destinations, context)

def match_and_rank_destinations(self, entity, destinations, context):
matches = [dest for dest in destinations.values() if dest.matches(entity, context)]
# At this point, the resource requirements (cores, mem, gpus) are unevaluated.
# So temporarily evaluate them so we can match up with a destination.
matches = [dest for dest in destinations.values() if dest.matches(entity.evaluate_resources(context), context)]
return self.rank(entity, matches, context)

def to_galaxy_destination(self, destination):
Expand Down Expand Up @@ -116,8 +118,9 @@ def match_combine_evaluate_entities(self, context, tool, user):
'self': combined_entity
})

# 3. Evaluate expressions
evaluated_entity = combined_entity.evaluate(context)
# 3. Evaluate rules only, so that all expressions are collapsed into a flat entity. The final
# values for expressions should be evaluated only after combining with the destination.
evaluated_entity = combined_entity.evaluate_rules(context)
context.update({
'entity': evaluated_entity,
'self': evaluated_entity
Expand Down

0 comments on commit fb5cb96

Please sign in to comment.