From eec6f144911d9cccf6abd2c30bdc257fde6b0c8c Mon Sep 17 00:00:00 2001 From: Jason Thomas Date: Mon, 11 Sep 2023 14:51:38 -0600 Subject: [PATCH 1/2] microservice, microservice_model, microservice_status_model --- .../lib/openc3/microservices/microservice.rb | 1 - .../python/openc3/microservices/__init__.py | 0 .../openc3/microservices/microservice.py | 205 +++++++++++++++ .../openc3/models/microservice_model.py | 213 +++++++++++++++ .../models/microservice_status_model.py | 72 +++++ openc3/python/openc3/models/model.py | 90 ++----- openc3/python/openc3/system/system.py | 6 +- openc3/python/openc3/top_level.py | 17 ++ openc3/python/ruby_to_python.py | 6 +- openc3/python/test/api/test_tlm_api.py | 5 - openc3/python/test/microservices/__init__.py | 0 .../test/microservices/test_microservice.py | 43 +++ openc3/python/test/models/__init__.py | 0 .../test/models/test_microservice_model.py | 247 ++++++++++++++++++ .../models/test_microservice_status_model.py | 42 +++ .../test/streams/test_tcpip_client_stream.py | 2 - openc3/python/test/test_helper.py | 6 +- openc3/spec/models/microservice_model_spec.rb | 2 +- 18 files changed, 871 insertions(+), 86 deletions(-) create mode 100644 openc3/python/openc3/microservices/__init__.py create mode 100644 openc3/python/openc3/microservices/microservice.py create mode 100644 openc3/python/openc3/models/microservice_model.py create mode 100644 openc3/python/openc3/models/microservice_status_model.py create mode 100644 openc3/python/test/microservices/__init__.py create mode 100644 openc3/python/test/microservices/test_microservice.py create mode 100644 openc3/python/test/models/__init__.py create mode 100644 openc3/python/test/models/test_microservice_model.py create mode 100644 openc3/python/test/models/test_microservice_status_model.py diff --git a/openc3/lib/openc3/microservices/microservice.rb b/openc3/lib/openc3/microservices/microservice.rb index 1b2672140..6ec1e2408 100644 --- a/openc3/lib/openc3/microservices/microservice.rb +++ b/openc3/lib/openc3/microservices/microservice.rb @@ -35,7 +35,6 @@ OpenC3.require_file 'tmpdir' module OpenC3 - class Microservice attr_accessor :microservice_status_thread attr_accessor :name attr_accessor :state diff --git a/openc3/python/openc3/microservices/__init__.py b/openc3/python/openc3/microservices/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openc3/python/openc3/microservices/microservice.py b/openc3/python/openc3/microservices/microservice.py new file mode 100644 index 000000000..8a8fa1998 --- /dev/null +++ b/openc3/python/openc3/microservices/microservice.py @@ -0,0 +1,205 @@ +# Copyright 2023 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +import os +import sys +import atexit +import tempfile +import threading +from openc3.system.system import System +from openc3.utilities.bucket import Bucket +from openc3.utilities.sleeper import Sleeper +from openc3.utilities.logger import Logger +from openc3.environment import OPENC3_CONFIG_BUCKET +from openc3.models.microservice_model import MicroserviceModel +from openc3.models.microservice_status_model import MicroserviceStatusModel + +# OpenC3.require_file 'openc3/utilities/zip' +# OpenC3.require_file 'openc3/utilities/secrets' +# OpenC3.require_file 'openc3/utilities/open_telemetry' + +openc3_scope = "DEFAULT" + + +class Microservice: + @classmethod + def run(cls, name=None): + if name is None: + name = os.environ.get("OPENC3_MICROSERVICE_NAME") + microservice = cls(name) + try: + MicroserviceStatusModel.set( + microservice.as_json(), scope=microservice.scope + ) + microservice.state = "RUNNING" + microservice.irun() + microservice.state = "FINISHED" + except Exception as err: + # if SystemExit === err or SignalException === err: + # microservice.state = 'KILLED' + # else: + microservice.error = err + microservice.state = "DIED_ERROR" + Logger.fatal(f"Microservice {name} dying from exception\n{repr(err)}") + finally: + MicroserviceStatusModel.set( + microservice.as_json(), scope=microservice.scope + ) + + def as_json(self): + json = { + "name": self.name, + "state": self.state, + "count": self.count, + "plugin": self.plugin, + } + if self.error is not None: + json["error"] = repr(self.error) + if self.custom is not None: + json["custom"] = self.custom.as_json() + return json + + def __init__(self, name, is_plugin=False): + Logger.info( + f"Microservice running from: python {__file__} {' '.join(sys.argv)}" + ) + if name is None: + raise RuntimeError("Microservice must be named") + + self.name = name + split_name = name.split("__") + if len(split_name) != 3: + raise RuntimeError( + f"Name {name} doesn't match convention of SCOPE__TYPE__NAME" + ) + + self.scope = split_name[0] + global openc3_scope + openc3_scope = self.scope + self.cancel_thread = False + # self.metric = Metric(microservice: self.name, scope: self.scope) + Logger.scope = self.scope + Logger.microservice_name = self.name + self.logger = Logger() + self.logger.scope = self.scope + self.logger.microservice_name = self.name + # self.secrets = Secrets.getClient + + # OpenC3.setup_open_telemetry(self.name, False) + + # Create temp folder for this microservice + self.temp_dir = tempfile.TemporaryDirectory() + + # Get microservice configuration from Redis + self.config = MicroserviceModel.get(self.name, scope=self.scope) + if self.config: + self.topics = self.config["topics"] + self.plugin = self.config["plugin"] + if self.config["secrets"]: + self.secrets.setup(self.config["secrets"]) + else: + self.config = {} + self.plugin = None + self.logger.info(f"Microservice initialized with config:\n{self.config}") + if not hasattr(self, "topics") or self.topics is None: + self.topics = [] + + # Get configuration for any targets + self.target_names = self.config.get("target_names") + if self.target_names is None: + self.target_names = [] + if not is_plugin: + System.setup_targets(self.target_names, self.temp_dir, scope=self.scope) + + # Use atexit to shutdown cleanly no matter how we die + atexit.register(self.shutdown) + + self.count = 0 + self.error = None + self.custom = None + self.state = "INITIALIZED" + self.work_dir = self.config.get("work_dir") + + if is_plugin: + self.config["cmd"] + + # Get Microservice files from bucket storage + temp_dir = tempfile.TemporaryDirectory() + bucket = OPENC3_CONFIG_BUCKET + client = Bucket.getClient() + + prefix = f"{self.scope}/microservices/{self.name}/" + file_count = 0 + for object in client.list_objects(bucket=bucket, prefix=prefix): + response_target = os.path.join(temp_dir, object.key.split(prefix)[-1]) + os.makedirs(os.path.dirname(response_target), exist_ok=True) + client.get_object(bucket=bucket, key=object.key, path=response_target) + file_count += 1 + + # Adjust @work_dir to microservice files downloaded if files and a relative path + if file_count > 0 and self.work_dir[0] != "/": + self.work_dir = os.path.join(temp_dir, self.work_dir) + + # TODO: Check Syntax on any python files + # ruby_filename = None + # for part in cmd_array: + # if /\.rb$/.match?(part): + # ruby_filename = part + # break + # if ruby_filename: + # OpenC3.set_working_dir(self.work_dir) do + # if os.path.exist(ruby_filename): + # # Run ruby syntax so we can log those + # syntax_check, _ = Open3.capture2e("ruby -c {ruby_filename}") + # if /Syntax OK/.match?(syntax_check): + # self.logger.info("Ruby microservice {self.name} file {ruby_filename} passed syntax check\n", scope: self.scope) + # else: + # self.logger.error("Ruby microservice {self.name} file {ruby_filename} failed syntax check\n{syntax_check}", scope: self.scope) + # else: + # self.logger.error("Ruby microservice {self.name} file {ruby_filename} does not exist", scope: self.scope) + else: + self.microservice_status_sleeper = Sleeper() + self.microservice_status_period_seconds = 5 + self.microservice_status_thread = threading.Thread( + target=self._status_thread + ) + + # Must be implemented by a subclass + def irun(self): + self.shutdown() + + def shutdown(self): + self.logger.info(f"Shutting down microservice: {self.name}") + self.cancel_thread = True + if self.microservice_status_sleeper: + self.microservice_status_sleeper.cancel() + MicroserviceStatusModel.set(self.as_json(), scope=self.scope) + if self.temp_dir is not None: + self.temp_dir.cleanup() + # self.metric.shutdown() + self.logger.info(f"Shutting down microservice complete: {self.name}") + + def _status_thread(self): + while not self.cancel_thread: + try: + MicroserviceStatusModel.set(self.as_json(), scope=self.scope) + if self.microservice_status_sleeper.sleep( + self.microservice_status_period_seconds + ): + break + except RuntimeError as error: + self.logger.error(f"{self.name} status thread died: {repr(error)}") + raise error diff --git a/openc3/python/openc3/models/microservice_model.py b/openc3/python/openc3/models/microservice_model.py new file mode 100644 index 000000000..97d141025 --- /dev/null +++ b/openc3/python/openc3/models/microservice_model.py @@ -0,0 +1,213 @@ +# Copyright 2023 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +from openc3.models.model import Model + +# require 'openc3/models/metric_model' +from openc3.utilities.bucket import Bucket +from openc3.config.config_parser import ConfigParser + + +class MicroserviceModel(Model): + PRIMARY_KEY = "openc3_microservices" + + # NOTE: The following three class methods are used by the ModelController + # and are reimplemented to enable various Model class methods to work + @classmethod + def get(cls, name, scope=None): + return super().get(MicroserviceModel.PRIMARY_KEY, name) + + @classmethod + def names(cls, scope=None): + scoped = [] + unscoped = super().names(MicroserviceModel.PRIMARY_KEY) + for name in unscoped: + if scope is None or name.split("__")[0] == scope: + scoped.append(name) + return scoped + + @classmethod + def all(cls, scope=None): + scoped = {} + unscoped = super().all(MicroserviceModel.PRIMARY_KEY) + for name, json in unscoped.items(): + if scope is None or name.split("__")[0] == scope: + scoped[name] = json + return scoped + + # Called by the PluginModel to allow this class to validate it's top-level keyword: "MICROSERVICE" + @classmethod + def handle_config( + cls, + parser, + keyword, + parameters, + plugin=None, + needs_dependencies=False, + scope=None, + ): + match keyword: + case "MICROSERVICE": + parser.verify_num_parameters(2, 2, f"{keyword} ") + # Create name by adding scope and type 'USER' to indicate where this microservice came from + return cls( + f"{scope}__USER__{parameters[1].upper()}", + folder_name=parameters[0], + plugin=plugin, + needs_dependencies=needs_dependencies, + scope=scope, + ) + case _: + raise ConfigParser.Error( + parser, + f"Unknown keyword and parameters for Microservice: {keyword} {' '.join(parameters)}", + ) + + # Create a microservice model to be deployed to bucket storage + def __init__( + self, + name, + folder_name=None, + cmd=[], + work_dir=".", + ports=[], + env={}, + topics=[], + target_names=[], + options=[], + parent=None, + container=None, + updated_at=None, + plugin=None, + needs_dependencies=False, + secrets=[], + prefix=None, + scope=None, + ): + parts = name.split("__") + if len(parts) != 3: + raise RuntimeError(f"name '{name}' must be formatted as SCOPE__TYPE__NAME") + if parts[0] != scope: + raise RuntimeError( + f"name '{name}' scope '{parts[0]}' doesn't match scope parameter '{scope}'" + ) + + super().__init__( + MicroserviceModel.PRIMARY_KEY, + name=name, + updated_at=updated_at, + plugin=plugin, + scope=scope, + ) + self.folder_name = folder_name + self.cmd = cmd + self.work_dir = work_dir + self.ports = ports + self.env = env + self.topics = topics + self.target_names = target_names + self.options = options + self.parent = parent + self.container = container + self.needs_dependencies = needs_dependencies + self.secrets = secrets + self.prefix = prefix + self.bucket = Bucket.getClient() + + def as_json(self): + return { + "name": self.name, + "folder_name": self.folder_name, + "cmd": self.cmd, + "work_dir": self.work_dir, + "ports": self.ports, + "env": self.env, + "topics": self.topics, + "target_names": self.target_names, + "options": self.options, + "parent": self.parent, + "container": self.container, + "updated_at": self.updated_at, + "plugin": self.plugin, + "needs_dependencies": self.needs_dependencies, + "secrets": self.secrets, # .as_json(), + "prefix": self.prefix, + } + + def handle_keyword(self, parser, keyword, parameters): + match keyword: + case "ENV": + parser.verify_num_parameters(2, 2, f"{keyword} ") + self.env[parameters[0]] = parameters[1] + case "WORK_DIR": + parser.verify_num_parameters(1, 1, f"{keyword} ") + self.work_dir = parameters[0] + case "PORT": + usage = "PORT 1: + protocol = ConfigParser.handle_none(parameters[1]) + # Per https://kubernetes.io/docs/concepts/services-networking/service/#protocol-support + if protocol.upper() in ["TCP", "UDP", "SCTP"]: + self.ports[-1].append(protocol.upper()) + else: + raise ConfigParser.Error( + parser, f"Unknown port protocol: {parameters[1]}", usage + ) + else: + self.ports[-1].append("TCP") + case "TOPIC": + parser.verify_num_parameters(1, 1, f"{keyword} ") + self.topics.append(parameters[0]) + case "TARGET_NAME": + parser.verify_num_parameters(1, 1, f"{keyword} ") + self.target_names.append(parameters[0]) + case "CMD": + parser.verify_num_parameters(1, None, f"{keyword} ") + self.cmd = parameters[:] + case "OPTION": + parser.verify_num_parameters( + 2, None, f"{keyword}