From 31eb94de4652cf9c63f1a45007135936f8f70c15 Mon Sep 17 00:00:00 2001 From: nuwang <2070605+nuwang@users.noreply.github.com> Date: Mon, 3 Jul 2023 22:56:14 +0530 Subject: [PATCH] Fix evaluation to support resource clamping --- tests/fixtures/mapping-destinations.yml | 19 +++++++++++++++++ tests/test_mapper_destinations.py | 15 ++++++++++++++ tpv/core/entities.py | 27 ++++++++++++++++--------- tpv/core/mapper.py | 9 ++++++--- 4 files changed, 57 insertions(+), 13 deletions(-) diff --git a/tests/fixtures/mapping-destinations.yml b/tests/fixtures/mapping-destinations.yml index b1e269f..818a95c 100644 --- a/tests/fixtures/mapping-destinations.yml +++ b/tests/fixtures/mapping-destinations.yml @@ -126,6 +126,13 @@ tools: scheduling: require: - test_of_mem_accepted + tool_for_testing_resource_clamping: + cores: 16 + env: + MY_TOOL_ENV: "cores: {cores} mem: {mem} gpus: {gpus}" + scheduling: + require: + - test_of_resource_clamping users: pulsar_canberra_user@act.au: @@ -303,3 +310,15 @@ destinations: scheduling: accept: - pulsar-canberra + clamped_destination: + runner: k8s + max_accepted_cores: 32 + max_accepted_mem: 128 + max_accepted_gpus: 2 + cores: 8 + gpus: 1 + env: + MY_DEST_ENV: "cores: {cores} mem: {mem} gpus: {gpus}" + scheduling: + accept: + - test_of_resource_clamping diff --git a/tests/test_mapper_destinations.py b/tests/test_mapper_destinations.py index 69bef82..49d6dfe 100644 --- a/tests/test_mapper_destinations.py +++ b/tests/test_mapper_destinations.py @@ -207,3 +207,18 @@ def test_user_map_to_destination_accepting_offline(self): datasets = [mock_galaxy.DatasetAssociation("test", mock_galaxy.Dataset("test.txt", file_size=12 * 1024 ** 3))] destination = self._map_to_destination(tool, user, datasets, tpv_config_paths=[config]) self.assertEqual(destination.id, "pulsar-canberra") + + def test_destination_clamping(self): + """_summary_ Any variables defined in a tool should be evaluated as late as possible, so that destination level + clamping works. + """ + user = mock_galaxy.User('albo', 'pulsar_canberra_user@act.au') + + config = os.path.join(os.path.dirname(__file__), 'fixtures/mapping-destinations.yml') + + tool = mock_galaxy.Tool('tool_for_testing_resource_clamping') + datasets = [mock_galaxy.DatasetAssociation("test", mock_galaxy.Dataset("test.txt", file_size=12 * 1024 ** 3))] + destination = self._map_to_destination(tool, user, datasets, tpv_config_paths=[config]) + self.assertEqual(destination.id, "clamped_destination") + self.assertEqual([env['value'] for env in destination.env if env['name'] == 'MY_DEST_ENV'], ['cores: 8 mem: 24 gpus: 1']) + self.assertEqual([env['value'] for env in destination.env if env['name'] == 'MY_TOOL_ENV'], ['cores: 8 mem: 24 gpus: 1']) diff --git a/tpv/core/entities.py b/tpv/core/entities.py index 6e08f49..15c4fa1 100644 --- a/tpv/core/entities.py +++ b/tpv/core/entities.py @@ -351,14 +351,7 @@ def combine(self, entity): new_entity.tpv_tags = entity.tpv_tags.combine(self.tpv_tags) return new_entity - def evaluate(self, context): - """ - Evaluate expressions in entity properties that must be evaluated as late as possible, which is - to say, after combining entity requirements. This includes env, params and resubmit, that rely on - properties such as cores, mem and gpus after they are combined. - :param context: - :return: - """ + def evaluate_resources(self, context): new_entity = copy.deepcopy(self) context.update(self.context or {}) if self.min_gpus is not None: @@ -397,6 +390,17 @@ def evaluate(self, context): new_entity.mem = max(new_entity.min_mem or 0, new_entity.mem or 0) new_entity.mem = min(new_entity.max_mem, new_entity.mem or 0) if new_entity.max_mem else new_entity.mem context['mem'] = new_entity.mem + return new_entity + + def evaluate(self, context): + """ + Evaluate expressions in entity properties that must be evaluated as late as possible, which is + to say, after combining entity requirements. This includes env, params and resubmit, that rely on + properties such as cores, mem and gpus after they are combined. + :param context: + :return: + """ + new_entity = self.evaluate_resources(context) if self.env: new_entity.env = self.evaluate_complex_property(self.env, context, stringify=True) context['env'] = new_entity.env @@ -406,7 +410,6 @@ def evaluate(self, context): if self.resubmit: new_entity.resubmit = self.evaluate_complex_property(self.resubmit, context) context['resubmit'] = new_entity.resubmit - return new_entity def rank_destinations(self, destinations, context): @@ -478,7 +481,7 @@ def override(self, entity): new_entity.rules[rule.id] = rule.inherit(entity.rules[rule.id]) return new_entity - def evaluate(self, context): + def evaluate_rules(self, context): new_entity = copy.deepcopy(self) context.update(new_entity.context or {}) for rule in self.rules.values(): @@ -492,6 +495,10 @@ def evaluate(self, context): context.update({ 'entity': new_entity }) + return new_entity + + def evaluate(self, context): + new_entity = self.evaluate_rules(context) return super(EntityWithRules, new_entity).evaluate(context) def __repr__(self): diff --git a/tpv/core/mapper.py b/tpv/core/mapper.py index 3ca2d51..114ec5f 100644 --- a/tpv/core/mapper.py +++ b/tpv/core/mapper.py @@ -70,7 +70,9 @@ def rank(self, entity, destinations, context): return entity.rank_destinations(destinations, context) def match_and_rank_destinations(self, entity, destinations, context): - matches = [dest for dest in destinations.values() if dest.matches(entity, context)] + # At this point, the resource requirements (cores, mem, gpus) are unevaluated. + # So temporarily evaluate them so we can match up with a destination. + matches = [dest for dest in destinations.values() if dest.matches(entity.evaluate_resources(context), context)] return self.rank(entity, matches, context) def to_galaxy_destination(self, destination): @@ -116,8 +118,9 @@ def match_combine_evaluate_entities(self, context, tool, user): 'self': combined_entity }) - # 3. Evaluate expressions - evaluated_entity = combined_entity.evaluate(context) + # 3. Evaluate rules only, so that all expressions are collapsed into a flat entity. The final + # values for expressions should be evaluated only after combining with the destination. + evaluated_entity = combined_entity.evaluate_rules(context) context.update({ 'entity': evaluated_entity, 'self': evaluated_entity