From cad48b582ceb5c97210033e310779c4aab0376da Mon Sep 17 00:00:00 2001 From: Maciej Procyk Date: Wed, 20 Oct 2021 21:07:13 +0200 Subject: [PATCH] Update Nussknacker version to 1.0.0 (refactor to component provider) (#184) * update nussknacker version * added component providers * naming fix * fix config accessing * fix configs names in tests * fix configs names in tests * add categories, version update * print version in proper way * simplify signature * simplify signature * fix clash by rebase * Update to Nussknacker 1.0.0 and its dependencies Define explicit minio image version * Fix compatibility checking * Update prinz version --- README.md | 10 +-- build.sbt | 4 +- dev-environment/README.md | 4 +- dev-environment/create_environment.sh | 2 +- dev-environment/docker-compose-env.yaml | 82 +++++++++++------- dev-environment/flink/flink-conf.yaml | 57 ++++++++----- dev-environment/minio/Dockerfile | 2 +- dev-environment/nussknacker/Dockerfile | 5 -- .../opt/prinz-sample/base-application.conf | 83 ------------------- .../opt/prinz-sample/prinz-application.conf | 70 ---------------- .../nussknacker/opt/prinz-sample/prinz.conf | 72 ++++++++++++++++ dev-environment/telegraf/telegraf.conf | 13 ++- .../prinz/engine/PrinzComponentProvider.scala | 28 +++++++ .../{enrichers => engine}/PrinzEnricher.scala | 14 ++-- .../touk/nussknacker/prinz/model/Model.scala | 5 +- .../prinz/model/ModelInstance.scala | 2 +- .../prinz/model/ModelSignature.scala | 16 +--- .../model/repository/ModelRepository.scala | 2 +- .../prinz/util/config/ConfigReader.scala | 2 +- .../prinz/container/ApiIntegrationSpec.scala | 8 +- .../prinz/container/TestModelsManager.scala | 2 +- .../h2o/DefaultH2OComponentProvider.scala | 18 ++++ .../nussknacker/prinz/h2o/H2OConfig.scala | 2 +- .../prinz/h2o/model/H2OModel.scala | 5 +- .../prinz/h2o/container/H2OHttpApiTest.scala | 2 +- .../h2o/container/H2OLocalFileApiTest.scala | 2 +- .../mlflow/DefaultMLFComponentProvider.scala | 18 ++++ .../nussknacker/prinz/mlflow/MLFConfig.scala | 2 +- .../mlflow/converter/MLFDataConverter.scala | 2 +- .../converter/MLFInputDataTypeWrapper.scala | 2 +- .../converter/MLFOutputDataTypeWrapper.scala | 2 +- .../mlflow/model/api/MLFRegisteredModel.scala | 5 +- .../mlflow/container/MLFRestApiTest.scala | 2 +- .../mlflow/converter/EncodeInputTest.scala | 4 +- .../pmml/DefaultPMMLComponentProvider.scala | 18 ++++ .../nussknacker/prinz/pmml/PMMLConfig.scala | 2 +- .../prinz/pmml/model/PMMLModel.scala | 5 +- .../pmml/container/PMMLHttpApiTest.scala | 2 +- .../pmml/container/PMMLLocalFileApiTest.scala | 2 +- ...ker.engine.api.component.ComponentProvider | 3 + .../prinz/sample/LoggingSink.scala | 4 +- ...dicRandomGaussianDoubleSourceFactory.scala | 76 +++++------------ .../prinz/sample/SampleConfigCreator.scala | 45 +--------- 43 files changed, 338 insertions(+), 368 deletions(-) delete mode 100644 dev-environment/nussknacker/Dockerfile delete mode 100644 dev-environment/nussknacker/opt/prinz-sample/base-application.conf delete mode 100644 dev-environment/nussknacker/opt/prinz-sample/prinz-application.conf create mode 100644 dev-environment/nussknacker/opt/prinz-sample/prinz.conf create mode 100644 prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzComponentProvider.scala rename prinz/src/main/scala/pl/touk/nussknacker/prinz/{enrichers => engine}/PrinzEnricher.scala (72%) create mode 100644 prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/DefaultH2OComponentProvider.scala create mode 100644 prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/DefaultMLFComponentProvider.scala create mode 100644 prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/DefaultPMMLComponentProvider.scala create mode 100644 prinz_sample/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider diff --git a/README.md b/README.md index 911d77e0..2c666a79 100644 --- a/README.md +++ b/README.md @@ -25,11 +25,11 @@ If you are already signed to GitHub in your project, just add any of these lines to add Prinz dependencies to your `.sbt` project ```sbt -"pl.touk.nussknacker.prinz" %% "prinz" % "0.1.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-mlflow" % "0.1.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-pmml" % "0.1.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-h2o" % "0.1.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-proxy" % "0.1.0-SNAPSHOT" +"pl.touk.nussknacker.prinz" %% "prinz" % "1.0.0-SNAPSHOT" +"pl.touk.nussknacker.prinz" %% "prinz-mlflow" % "1.0.0-SNAPSHOT" +"pl.touk.nussknacker.prinz" %% "prinz-pmml" % "1.0.0-SNAPSHOT" +"pl.touk.nussknacker.prinz" %% "prinz-h2o" % "1.0.0-SNAPSHOT" +"pl.touk.nussknacker.prinz" %% "prinz-proxy" % "1.0.0-SNAPSHOT" ``` ## Authors diff --git a/build.sbt b/build.sbt index 084bc562..83e5f7bf 100755 --- a/build.sbt +++ b/build.sbt @@ -1,13 +1,13 @@ import sbtassembly.MergeStrategy -val prinzV = "0.1.0-SNAPSHOT" +val prinzV = "1.0.0-SNAPSHOT" val prinzOrg = "pl.touk.nussknacker.prinz" val repositoryOwner = "prinz-nussknacker" val repositoryName = "prinz" // Dependency versions val scalaV = "2.12.10" -val nussknackerV = "0.3.0" +val nussknackerV = "1.0.0" val sttpV = "3.0.0-RC7" val scalatestV = "3.2.2" val minioS3V = "8.0.0" diff --git a/dev-environment/README.md b/dev-environment/README.md index 581b9ef2..c64f9e3b 100644 --- a/dev-environment/README.md +++ b/dev-environment/README.md @@ -7,9 +7,9 @@ To use it on CI the `secrets.GITHUB_TOKEN` is used to authenticate to repository To use it on your local machine you have to generate Personal Access Token on [this website](https://github.com/settings/tokens) and save it to `~/token.txt` file. Then use ``` -cat ~/token.txt | docker login https://docker.pkg.github.com -u YOUR_GITHUB_USERNAME --password-stdin +cat ~/token.txt | docker login https://ghcr.io -u YOUR_GITHUB_USERNAME --password-stdin ``` -to login to Github Docker Registry and use the prepared version. +to login to GitHub Docker Registry and use the prepared version. ## Publish updated image To publish the updated version of any image you should build a new version locally with command diff --git a/dev-environment/create_environment.sh b/dev-environment/create_environment.sh index e5eceb91..6e0fe70f 100755 --- a/dev-environment/create_environment.sh +++ b/dev-environment/create_environment.sh @@ -1,7 +1,7 @@ #!/bin/bash scalaV="2.12" -prinzV="0.1.0-SNAPSHOT" +prinzV="1.0.0-SNAPSHOT" COMP_FILES="" ENV_FILE="-f docker-compose-env.yaml" diff --git a/dev-environment/docker-compose-env.yaml b/dev-environment/docker-compose-env.yaml index a8131768..f6969c80 100644 --- a/dev-environment/docker-compose-env.yaml +++ b/dev-environment/docker-compose-env.yaml @@ -2,19 +2,29 @@ version: '3.8' services: - nussknacker: - container_name: nussknacker - build: ./nussknacker + designer: + container_name: nussknacker_designer + image: touk/nussknacker:1.0.0 ports: - - "3000:3000" - - "8080:8080" + - "3081:8080" environment: - NUSSKNACKER_CONFIG_FILE: >- - ${NUSSKNACKER_CONFIG_FILE:-/opt/prinz-sample/prinz-application.conf} + # multiple, comma separated, config files can be used. They will be merged in order, via HOCON fallback mechanism + # https://github.com/lightbend/config/blob/master/HOCON.md#config-object-merging-and-file-merging + CONFIG_FILE: ${NUSSKNACKER_CONFIG_FILE-/opt/nussknacker/conf/application.conf,/opt/nussknacker/conf/prinz.conf} + JDK_JAVA_OPTIONS: -Xmx256M + FLINK_REST_URL: http://jobmanager:8081 + FLINK_QUERYABLE_STATE_PROXY_URL: taskmanager:9069 + KAFKA_ADDRESS: kafka:9092 + SCHEMA_REGISTRY_URL: http://schemaregistry:8081 + COUNTS_URL: http://influxdb:8086/query + volumes: + - storage_designer:/opt/nussknacker/storage + # this is needed to be able to verify savepoints during deployments + - storage_flink:/opt/flink/data + - ./nussknacker/opt/prinz-sample/prinz.conf:/opt/nussknacker/conf/prinz.conf + - ./nussknacker/opt/prinz-sample/prinz-sample-assembly-1.0.0-SNAPSHOT.jar:/opt/prinz-sample/prinz-sample.jar networks: - dev-bridge-net - depends_on: - - jobmanager zookeeper: container_name: nussknacker_zookeeper @@ -23,8 +33,8 @@ services: ZOO_MY_ID: 1 JVMFLAGS: "-Xms64m -Xmx128m" volumes: - - nussknacker_storage_zookeeper_datalog:/datalog - - nussknacker_storage_zookeeper_data:/data + - storage_zookeeper_datalog:/datalog + - storage_zookeeper_data:/data networks: - dev-bridge-net @@ -35,7 +45,7 @@ services: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_HOST_NAME: schemaregistry ports: - - 3082:8081 + - "3082:8081" depends_on: - zookeeper - kafka @@ -49,11 +59,13 @@ services: container_name: nussknacker_kafka image: wurstmeister/kafka:2.12-2.3.0 ports: - - "3032:9092" + - "3032:3032" environment: HOSTNAME_COMMAND: "docker info | grep ^Name: | cut -d' ' -f 2" - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:3032 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:3032 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "processedEvents:1:1" KAFKA_BROKER_ID: 1 @@ -61,7 +73,7 @@ services: KAFKA_HEAP_OPTS: -Xms128m -Xmx512m volumes: - /var/run/docker.sock:/var/run/docker.sock - - nussknacker_storage_kafka_data:/kafka + - storage_kafka_data:/kafka networks: - dev-bridge-net depends_on: @@ -69,12 +81,14 @@ services: influxdb: container_name: nussknacker_influxdb - image: influxdb:1.7.10-alpine + image: influxdb:1.8.4-alpine + ports: + - "3086:8086" environment: INFLUXDB_DB: esp volumes: - ./influxdb/influxdb.conf:/etc/influxdb/influxdb.conf - - nussknacker_storage_influxdb:/var/lib/influxdb + - storage_influxdb:/var/lib/influxdb networks: - dev-bridge-net @@ -88,11 +102,16 @@ services: jobmanager: container_name: nussknacker_jobmanager - image: flink:1.11.2-scala_2.12-java11 + image: flink:1.13.1-scala_2.12-java11 ports: - "3031:8081" entrypoint: /flink-entrypoint.sh command: jobmanager + environment: + # those environment variables are duplicated here and in designer service, + # in case of setup with designer run in other network than flink + KAFKA_ADDRESS: kafka:9092 + SCHEMA_REGISTRY_URL: http://schemaregistry:8081 depends_on: - zookeeper - kafka @@ -100,19 +119,20 @@ services: volumes: - ./flink/flink-conf.yaml:/tmp/flink-conf.yaml - ./flink/flink-entrypoint.sh:/flink-entrypoint.sh - - nussknacker_storage_jobmanager:/opt/flink/data networks: - dev-bridge-net taskmanager: container_name: nussknacker_taskmanager - image: flink:1.11.2-scala_2.12-java11 + image: flink:1.13.1-scala_2.12-java11 ports: - "3063:9069" - "9009:9009" - "9008:9008" entrypoint: /flink-entrypoint.sh command: taskmanager + environment: + TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 40 depends_on: - zookeeper - kafka @@ -121,7 +141,7 @@ services: volumes: - ./flink/flink-conf.yaml:/tmp/flink-conf.yaml - ./flink/flink-entrypoint.sh:/flink-entrypoint.sh - - nussknacker_storage_taskmanager:/opt/flink/data + - storage_flink:/opt/flink/data ulimits: nproc: 70000 nofile: @@ -136,9 +156,15 @@ networks: external: true volumes: - nussknacker_storage_zookeeper_datalog: - nussknacker_storage_zookeeper_data: - nussknacker_storage_kafka_data: - nussknacker_storage_jobmanager: - nussknacker_storage_taskmanager: - nussknacker_storage_influxdb: + storage_zookeeper_datalog: + name: nussknacker_storage_zookeeper_datalog + storage_zookeeper_data: + name: nussknacker_storage_zookeeper_data + storage_kafka_data: + name: nussknacker_storage_kafka_data + storage_flink: + name: nussknacker_storage_flink + storage_influxdb: + name: nussknacker_storage_influxdb + storage_designer: + name: nussknacker_storage_designer diff --git a/dev-environment/flink/flink-conf.yaml b/dev-environment/flink/flink-conf.yaml index 787d7eb0..f7c0fa99 100644 --- a/dev-environment/flink/flink-conf.yaml +++ b/dev-environment/flink/flink-conf.yaml @@ -17,14 +17,13 @@ ################################################################################ -################################################################################ +#============================================================================== # Common -################################################################################ +#============================================================================== -# The host on which the JobManager runs. Only used in -# non-high-availability mode. The JobManager process will use -# this hostname to bind the listening servers to. The TaskManagers -# will try to connect to the JobManager on that host. +# The host on which the JobManager runs. Only used in non-high-availability mode. +# The JobManager process will use this hostname to bind the listening servers to. +# The TaskManagers will try to connect to the JobManager on that host. taskmanager.hostname: taskmanager @@ -41,10 +40,9 @@ env.java.opts.taskmanager: " -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" -# The number of task slots that each TaskManager offers. -# Each slot runs one parallel pipeline. +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. -taskmanager.numberOfTaskSlots: 40 +taskmanager.numberOfTaskSlots: jobmanager.rpc.address: jobmanager @@ -60,9 +58,9 @@ jobmanager.heap.size: 256m parallelism.default: 8 -################################################################################ +#============================================================================== # Web Frontend -################################################################################ +#============================================================================== # The port under which the web-based runtime monitor listens. # A value of -1 deactivates the web server. @@ -73,7 +71,7 @@ web.log.path: /opt/flink/data/logs high-availability: zookeeper high-availability.zookeeper.quorum: zookeeper:2181 -high-availability.zookeeper.path.root: /demo +high-availability.zookeeper.path.root: /flink high-availability.cluster-id: nussknacker high-availability.storageDir: file:///opt/flink/data/storage @@ -81,25 +79,38 @@ state.backend: filesystem state.checkpoints.dir: file:///opt/flink/data/checkpoints state.savepoints.dir: file:///opt/flink/data/savepoints +#Below are base settings for rocksdb metrics, that can be used for grafana dashboards +state.backend.rocksdb.metrics.estimate-num-keys: true +state.backend.rocksdb.metrics.estimate-live-data-size: true +state.backend.rocksdb.metrics.cur-size-all-mem-tables: true +state.backend.rocksdb.metrics.size-all-mem-tables: true + +#We can have many jobs per cluster, in such setting managed memory is not easy to tune +state.backend.rocksdb.memory.managed: false + +#For frequent writes increase the value as needed. Currently RocksDB settings can only be changed per Flink cluster +state.backend.rocksdb.writebuffer.size: 256m + + metrics.reporters: influxdb_reporter -metrics.reporter.influxdb_reporter.class: - org.apache.flink.metrics.influxdb.InfluxdbReporter +metrics.reporter.influxdb_reporter.class: org.apache.flink.metrics.influxdb.InfluxdbReporter metrics.reporter.influxdb_reporter.host: telegraf metrics.reporter.influxdb_reporter.port: 8087 metrics.reporter.influxdb_reporter.db: esp -metrics.scope.jm: demo..jobmanagerGlobal -metrics.scope.jm.job: demo..jobmanagerJob. -metrics.scope.tm: demo..taskmanagerGlobal. -metrics.scope.tm.job: demo..taskmanagerJob.. -metrics.scope.task: - demo..taskmanagerTask.... -metrics.scope.operator: - demo..taskmanagerTask.... +metrics.scope.jm: local..jobmanagerGlobal +metrics.scope.jm.job: local..jobmanagerJob. +metrics.scope.tm: local..taskmanagerGlobal. +metrics.scope.tm.job: local..taskmanagerJob.. +metrics.scope.task: local..taskmanagerTask.... +metrics.scope.operator: local..taskmanagerTask.... -classloader.resolve-order: parent-first +#Uncomment if you encounter problems with classloading with complex custom components +#classloader.resolve-order: parent-first akka.framesize: 209715200b query.server.port: 6125 blob.server.port: 6124 + +queryable-state.enable: true diff --git a/dev-environment/minio/Dockerfile b/dev-environment/minio/Dockerfile index bee2e0f2..4c65f7dd 100644 --- a/dev-environment/minio/Dockerfile +++ b/dev-environment/minio/Dockerfile @@ -1,4 +1,4 @@ -FROM minio/minio:latest +FROM minio/minio:RELEASE.2021-06-17T00-10-46Z-28-gac7697426 ENV MINIO_HOME /opt/minio ENV BUCKET_NAME mlflow diff --git a/dev-environment/nussknacker/Dockerfile b/dev-environment/nussknacker/Dockerfile deleted file mode 100644 index 0c866ae8..00000000 --- a/dev-environment/nussknacker/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM touk/nussknacker:0.3.0 - -COPY opt/ /opt/ - -CMD ["/opt/nussknacker/bin/nussknacker-entrypoint.sh"] diff --git a/dev-environment/nussknacker/opt/prinz-sample/base-application.conf b/dev-environment/nussknacker/opt/prinz-sample/base-application.conf deleted file mode 100644 index 2dc3c120..00000000 --- a/dev-environment/nussknacker/opt/prinz-sample/base-application.conf +++ /dev/null @@ -1,83 +0,0 @@ -# This config is a copy of /nussknacker-dist/src/universal/conf/base-application.conf -storageDir: ./storage -storageDir: ${?STORAGE_DIR} - -dbFilePath: ${storageDir}"/db" -dbFilePath: ${?DB_FILE_PATH} - -dbName: "db" -dbName: ${?DB_NAME} - -dbUser: "sa" -dbUser: ${?DB_USER} - -dbPassword: "" -dbPassword: ${?DB_PASSWORD} - -dbUrl: "jdbc:hsqldb:hsql://localhost/db;sql.syntax_ora=true" -dbUrl: ${?DB_URL} - -dbDriver: "org.hsqldb.jdbc.JDBCDriver" -dbDriver: ${?DB_DRIVER} - -dbConnectionTimeout: 30000 -dbConnectionTimeout: ${?DB_CONNECTION_TIMEOUT} - -jdbcServerEnabled: true -jdbcServerEnabled: ${?JDBC_SERVER_ENABLED} - -jdbcServer: { - enabled: ${jdbcServerEnabled} - dbFilePath: ${dbFilePath} - dbName: ${dbName} - user: ${dbUser} - password: ${dbPassword} -} - -db { - url: ${dbUrl} - driver: ${dbDriver} - password: ${dbPassword} - user: ${dbUser} - connectionTimeout: ${dbConnectionTimeout} -} - -commentSettings: { - matchExpression: "(issues/[0-9]*)" - link: "https://github.com/TouK/nussknacker/$1" -} - -developmentMode: ${?DEVELOPMENT_MODE} - -attachmentsPath: ${storageDir}"/attachments" - -proxyUrl: "http://localhost:8081" -proxyUrl: ${?PROXY_URL} - -grafanaUrl: ${proxyUrl}"/grafana" -grafanaUrl: ${?GRAFANA_URL} - -#TODO: Figure out the defaults. It's tricky part, because influxUrl and grafanaUrl can point to same location, but one link is used -#from browser and the other - from UI backend and in docker/nginx setups they *will* be different... -countsUrl: ${grafanaUrl}"/api/datasources/proxy/1/query" -countsUrl: ${?COUNTS_URL} - -countsSettings { - user: "admin" - password: "admin" - influxUrl: ${countsUrl} -} - -# Base streaming configuration -flinkRestUrl: "http://jobmanager:8081" -flinkRestUrl: ${?FLINK_REST_URL} - -flinkQueryableStateProxyUrl: "taskmanager:9069" -flinkQueryableStateProxyUrl: ${?FLINK_QUERYABLE_STATE_PROXY_URL} - -flinkEngineConfig { - jobManagerTimeout: 1m - type: "flinkStreaming" - restUrl: ${flinkRestUrl} - queryableStateProxyUrl: ${flinkQueryableStateProxyUrl} -} diff --git a/dev-environment/nussknacker/opt/prinz-sample/prinz-application.conf b/dev-environment/nussknacker/opt/prinz-sample/prinz-application.conf deleted file mode 100644 index 34f639fb..00000000 --- a/dev-environment/nussknacker/opt/prinz-sample/prinz-application.conf +++ /dev/null @@ -1,70 +0,0 @@ -# This config is a modified copy of /nussknacker-dist/src/universal/conf/docker-application.conf -base: { include "base-application.conf" } - -environment: "demo" - -categoriesConfig { - "FraudDetection": "streaming", - "Recommendations": "streaming" -} - -streamingModelConfig { - mlflow { - serverUrl: "http://mlflow-proxy:5000" - servedModelsUrl: "http://mlflow-proxy:5000" - s3AccessKey: "mlflow-key" - s3SecretKey: "mlflow-secret" - s3Url: "http://mlflow-proxy:9000" - s3ModelRelativePath: "/model/MLmodel" - s3BucketName: "mlflow" - } - - pmml { - fileExtension: ".pmml" - modelsDirectory: "http://pmml-proxy:5100" - modelDirectoryHrefSelector: "body > ul > li > a" - modelVersionSeparator: "-v" - } - - h2o { - fileExtension: ".zip" - modelsDirectory: "http://h2o-proxy:5200" - cachingStrategy: "MEMORY" - modelDirectoryHrefSelector: "body > ul > li > a" - modelVersionSeparator: "-v" - } - - timeout: 10s - - asyncExecutionConfig { - bufferSize: 200 - workers: 8 - } -} - -processTypes { - "streaming": { - engineConfig: ${base.flinkEngineConfig} - modelConfig: { - classPath: [ - "/opt/prinz-sample/prinz-sample-assembly-0.1.0-SNAPSHOT.jar" - ] - } ${streamingModelConfig} - } -} - -metricsSettings: { - url: ${base.grafanaUrl}"/dashboard/db/$dashboard?theme=dark&var-processName=$process&var-env="${environment} - defaultDashboard: "flink-esp" - processingTypeToDashboard: { - "streaming": "flink-esp" - } -} - -# TODO: lightbend config can't include files on root level - move nussknacker config on nk level and get rid of this below -jdbcServer: ${base.jdbcServer} -db: ${base.db} - -commentSettings: ${base.commentSettings} -attachmentsPath: ${base.attachmentsPath} -countsSettings: ${base.countsSettings} diff --git a/dev-environment/nussknacker/opt/prinz-sample/prinz.conf b/dev-environment/nussknacker/opt/prinz-sample/prinz.conf new file mode 100644 index 00000000..d46a73b8 --- /dev/null +++ b/dev-environment/nussknacker/opt/prinz-sample/prinz.conf @@ -0,0 +1,72 @@ +environment: "demo" + +categoriesConfig { + "FraudDetection": "streaming", + "Recommendations": "streaming" +} + +streamingModelConfig { + timeout: 10s + asyncExecutionConfig { + bufferSize: 200 + workers: 8 + } +} + +flinkEngineConfig { + jobManagerTimeout: 1m + type: "flinkStreaming" + restUrl: "http://jobmanager:8081" + queryableStateProxyUrl: "taskmanager:9069" +} + +scenarioTypes { + "streaming": { + deploymentConfig: ${flinkEngineConfig} + modelConfig: { + classPath: [ + "/opt/prinz-sample/prinz-sample.jar" + ] + + components { + prinzMLflow { + categories: ["FraudDetection", "Recommendations"] + mlfConfig { + serverUrl: "http://mlflow-proxy:5000" + servedModelsUrl: "http://mlflow-proxy:5000" + s3AccessKey: "mlflow-key" + s3SecretKey: "mlflow-secret" + s3Url: "http://mlflow-proxy:9000" + s3ModelRelativePath: "/model/MLmodel" + s3BucketName: "mlflow" + } + } + + prinzH2O { + categories: ["FraudDetection", "Recommendations"] + h2oConfig { + fileExtension: ".zip" + modelsDirectory: "http://h2o-proxy:5200" + cachingStrategy: "MEMORY" + modelDirectoryHrefSelector: "body > ul > li > a" + modelVersionSeparator: "-v" + } + } + + prinzPMML { + categories: ["FraudDetection", "Recommendations"] + pmmlConfig { + fileExtension: ".pmml" + modelsDirectory: "http://pmml-proxy:5100" + modelDirectoryHrefSelector: "body > ul > li > a" + modelVersionSeparator: "-v" + } + } + } + + rocksDB: { + enable: true + } + } ${streamingModelConfig} + } +} diff --git a/dev-environment/telegraf/telegraf.conf b/dev-environment/telegraf/telegraf.conf index e62957e8..b068b6b3 100644 --- a/dev-environment/telegraf/telegraf.conf +++ b/dev-environment/telegraf/telegraf.conf @@ -1,16 +1,17 @@ # Flink reporter for InfluxDB is somewhat limited: # - no possibility of adding own tags -# - no possitility of removing Flink internal tags (like job_id, task_id, etc.) +# - no possibility of removing Flink internal tags (like job_id, task_id, etc.) # - metric name has all tags encoded inside [global_tags] - env = "demo" + env = "local" [[inputs.influxdb_listener]] service_address = "0.0.0.0:8087" read_timeout = "7s" write_timeout = "7s" - tagexclude = ["tm_id", "job_id", "task_id", "task_attempt_id", "operator_id", "task_attempt_num", "task_name", "operator_name"] + #We exclude often changing ids to avoid excessive amount of series in InfluxDB + tagexclude = ["tm_id", "job_id", "task_id", "task_attempt_id", "operator_id", "task_attempt_num", "task_name"] [[processors.rename]] @@ -42,3 +43,9 @@ urls = ["http://influxdb:8086"] skip_database_creation = true database = "esp" + +[agent] + metric_batch_size = 10000 + metric_buffer_limit = 100000 + interval = "10s" + flush_interval = "10s" diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzComponentProvider.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzComponentProvider.scala new file mode 100644 index 00000000..9b52edbb --- /dev/null +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzComponentProvider.scala @@ -0,0 +1,28 @@ +package pl.touk.nussknacker.prinz.engine + +import com.typesafe.config.Config +import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion} +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.prinz.model.repository.ModelRepository + +import scala.math.Ordered.orderingToOrdered + +abstract class PrinzComponentProvider extends ComponentProvider { + + def getComponentRepository(config: Config, dependencies: ProcessObjectDependencies): ModelRepository + + override final def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = { + val repository = getComponentRepository(config, dependencies) + repository.listModels match { + case Right(models) => models.map(model => + ComponentDefinition(model.toString, PrinzEnricher(model)) + ) + case Left(exc) => throw exc + } + } + + override def isCompatible(version: NussknackerVersion): Boolean = + version.value >= NussknackerVersion.current.value + + override def resolveConfigForExecution(config: Config): Config = config +} diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/enrichers/PrinzEnricher.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzEnricher.scala similarity index 72% rename from prinz/src/main/scala/pl/touk/nussknacker/prinz/enrichers/PrinzEnricher.scala rename to prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzEnricher.scala index 17cc8356..b740b749 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/enrichers/PrinzEnricher.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzEnricher.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.prinz.enrichers +package pl.touk.nussknacker.prinz.engine import com.typesafe.scalalogging.LazyLogging import pl.touk.nussknacker.engine.api.definition.{Parameter, ServiceWithExplicitMethod} @@ -12,15 +12,19 @@ import pl.touk.nussknacker.prinz.util.collection.immutable.VectorMultimap import scala.concurrent.ExecutionContext import scala.concurrent.Future -case class PrinzEnricher(private val model: Model) extends ServiceWithExplicitMethod with LazyLogging { +final case class PrinzEnricher(private val model: Model) + extends ServiceWithExplicitMethod + with LazyLogging { private lazy val modelInstance = { model.toModelInstance } override def invokeService(params: List[AnyRef]) - (implicit ec: ExecutionContext, collector: InvocationCollectors.ServiceInvocationCollector, - metaData: MetaData, contextId: ContextId): Future[AnyRef] = { + (implicit ec: ExecutionContext, + collector: InvocationCollectors.ServiceInvocationCollector, + metaData: MetaData, + contextId: ContextId): Future[AnyRef] = { val inputMap = createInputMap(params) modelInstance.run(inputMap).map { case Right(runResult) => runResult @@ -38,7 +42,7 @@ case class PrinzEnricher(private val model: Model) extends ServiceWithExplicitMe model .getMetadata .signature - .getOutputDefinition + .toOutputTypedObjectTypingResult def createInputMap(inputs: List[AnyRef]): ModelInputData = VectorMultimap(parameterDefinition.map(_.name) zip inputs) diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/Model.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/Model.scala index dd67a910..5554a128 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/Model.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/Model.scala @@ -29,4 +29,7 @@ class ModelName(name: String) { override def toString: String = name } -trait ModelVersion +trait ModelVersion { + + def toString: String +} diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala index 069cb918..c41d1f49 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala @@ -12,7 +12,7 @@ abstract class ModelInstance(val model: Model) { protected def verify(inputMap: ModelInputData): Either[ModelRunException, ModelInputData] = { val inputColumnNames = inputMap.keys.toSet val signature = model.getMetadata.signature - val signatureColumnNames = signature.getInputNames.map(_.name).toSet + val signatureColumnNames = signature.getSignatureInputs.map(_.signatureName.name).toSet if (inputColumnNames.equals(signatureColumnNames)) { Right(inputMap) } diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelSignature.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelSignature.scala index bfc36466..d961aeed 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelSignature.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelSignature.scala @@ -11,15 +11,11 @@ case class ModelSignature private(private val signatureInputs: List[SignatureFie private val signatureOutputMap = signatureOutputs.groupBy(_.signatureName).mapValues(_.head.signatureType) - def getInputDefinition: TypedObjectTypingResult = - TypedObjectTypingResult(signatureInputMap.map(kv => (kv._1.name, kv._2.typingResult))) + def toInputParameterDefinition: List[Parameter] = + signatureInputs.map(field => field.toNussknackerParameter) - def getOutputDefinition: TypedObjectTypingResult = - TypedObjectTypingResult(signatureOutputMap.map(kv => (kv._1.name, kv._2.typingResult))) - - def getInputNames: List[SignatureName] = signatureInputs.map(_.signatureName) - - def getOutputNames: List[SignatureName] = signatureOutputs.map(_.signatureName) + def toOutputTypedObjectTypingResult: TypedObjectTypingResult = + TypedObjectTypingResult(signatureOutputMap.map(kv => (kv._1.name, kv._2.typingResult)).toList) def getSignatureInputs: List[SignatureField] = signatureInputs @@ -29,10 +25,6 @@ case class ModelSignature private(private val signatureInputs: List[SignatureFie def getOutputValueType(valueName: SignatureName): Option[SignatureType] = signatureOutputMap.get(valueName) - def toInputParameterDefinition: List[Parameter] = signatureInputs.map(field => field.toNussknackerParameter) - - def toOutputParameterDefinition: List[Parameter] = signatureOutputs.map(field => field.toNussknackerParameter) - override def toString: String = s"${getClass.getSimpleName}(\ninputs: $signatureInputs\noutputs: $signatureOutputs\n)" } diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/repository/ModelRepository.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/repository/ModelRepository.scala index 71e1690b..9bde3edc 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/repository/ModelRepository.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/repository/ModelRepository.scala @@ -1,6 +1,6 @@ package pl.touk.nussknacker.prinz.model.repository -import pl.touk.nussknacker.prinz.model.{Model, ModelName} +import pl.touk.nussknacker.prinz.model.Model trait ModelRepository { diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/util/config/ConfigReader.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/util/config/ConfigReader.scala index fe68225f..2ec60a7c 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/util/config/ConfigReader.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/util/config/ConfigReader.scala @@ -13,7 +13,7 @@ object ConfigReader extends LazyLogging { } def getOptionConfigValue[T](path: String, extractor: (Config, String) => T)(implicit config: Config, basePath: String): Option[T] = { - val fullPath = s"$basePath$path" + val fullPath = s"$basePath.$path" if (config.hasPath(fullPath)) { val extracted = extractor(config, fullPath) logger.info("Config value {} defined with value {}", fullPath, extracted) diff --git a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala index fe76e94d..ce183bbd 100644 --- a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala +++ b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala @@ -91,8 +91,8 @@ trait ApiIntegrationSpec extends UnitTest with TestModelsManager { it should "have wine model with proper model signature" in { val model = getModel(getElasticnetWineModelModel(1)) val signature = model.map(_.getMetadata.signature).get - val inputNames = signature.getInputNames - val outputNames = signature.getOutputNames + val inputNames = signature.getSignatureInputs.map(_.signatureName) + val outputNames = signature.getSignatureOutputs.map(_.signatureName) assertSignatureFields(inputNames, expectedWineInputs)(signature.getInputValueType) assertSignatureFields(outputNames, expectedWineOutputs)(signature.getOutputValueType) @@ -101,8 +101,8 @@ trait ApiIntegrationSpec extends UnitTest with TestModelsManager { it should "have fraud detection model with proper model signature" in { val model = getModel(getFraudDetectionModel) val signature = model.map(_.getMetadata.signature).get - val inputNames = signature.getInputNames - val outputNames = signature.getOutputNames + val inputNames = signature.getSignatureInputs.map(_.signatureName) + val outputNames = signature.getSignatureOutputs.map(_.signatureName) assertSignatureFields(inputNames, expectedFraudInputs)(signature.getInputValueType) assertSignatureFields(outputNames, expectedFraudOutputs)(signature.getOutputValueType) diff --git a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala index 8b93db62..14f24504 100644 --- a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala +++ b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala @@ -33,7 +33,7 @@ trait TestModelsManager { models.filter(_.getMetadata.modelName.toString.contains("ElasticnetWineModel-" + modelId)).head def constructInputMap(value: AnyRef, signature: ModelSignature): VectorMultimap[String, AnyRef] = { - val names = signature.getInputNames.map(_.name) + val names = signature.getSignatureInputs.map(_.signatureName.name) val data = List.fill(names.length)(value) VectorMultimap(names.zip(data)) } diff --git a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/DefaultH2OComponentProvider.scala b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/DefaultH2OComponentProvider.scala new file mode 100644 index 00000000..23dabdde --- /dev/null +++ b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/DefaultH2OComponentProvider.scala @@ -0,0 +1,18 @@ +package pl.touk.nussknacker.prinz.h2o + +import com.typesafe.config.Config +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.prinz.engine.PrinzComponentProvider +import pl.touk.nussknacker.prinz.h2o.repository.H2OModelRepository +import pl.touk.nussknacker.prinz.model.repository.ModelRepository + +class DefaultH2OComponentProvider extends PrinzComponentProvider { + + override def providerName: String = "prinzH2O" + + override final def getComponentRepository(config: Config, dependencies: ProcessObjectDependencies): ModelRepository = { + implicit val implicitConfig: Config = config + implicit val h2oConfig: H2OConfig = H2OConfig() + new H2OModelRepository() + } +} diff --git a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/H2OConfig.scala b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/H2OConfig.scala index 77cfd338..ad9c41ce 100644 --- a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/H2OConfig.scala +++ b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/H2OConfig.scala @@ -12,7 +12,7 @@ final case class H2OConfig(protected implicit val config: Config) with ModelVersionConfig with LazyLogging { - override protected implicit def baseConfigPath: String = "h2o." + override protected implicit def baseConfigPath: String = "h2oConfig" val cachingStrategy: CachingStrategy = getCachingStrategyForName(getConfigValue("cachingStrategy", getString)) diff --git a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModel.scala b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModel.scala index 2a47e41e..f9aaf0fe 100644 --- a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModel.scala +++ b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModel.scala @@ -26,6 +26,9 @@ final class H2OModel(payload: H2OModelPayload, cachingStrategy: CachingStrategy) final case class H2OModelName(name: String) extends ModelName(name) -final case class H2OModelVersion(version: String) extends ModelVersion +final case class H2OModelVersion(version: String) extends ModelVersion { + + override def toString: String = version +} final case class H2OModelSignatureLocationMetadata(genModel: GenModel) extends ModelSignatureLocationMetadata diff --git a/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OHttpApiTest.scala b/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OHttpApiTest.scala index 75711e1a..01e74571 100644 --- a/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OHttpApiTest.scala +++ b/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OHttpApiTest.scala @@ -20,7 +20,7 @@ class H2OHttpApiTest extends H2OContainerUnitTest private implicit val config: Config = ConfigFactory.parseString( s""" - | h2o { + | h2oConfig { | fileExtension: ".zip" | modelsDirectory: "http://localhost:${readEnv("H2O_SAMPLES_PORT")}" | cachingStrategy: "MEMORY" diff --git a/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OLocalFileApiTest.scala b/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OLocalFileApiTest.scala index 29aed9e6..81144b9a 100644 --- a/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OLocalFileApiTest.scala +++ b/prinz_h2o/src/test/scala/pl/touk/nussknacker/prinz/h2o/container/H2OLocalFileApiTest.scala @@ -20,7 +20,7 @@ class H2OLocalFileApiTest extends H2OContainerUnitTest private implicit val config: Config = ConfigFactory.parseString( s""" - | h2o { + | h2oConfig { | fileExtension: ".zip" | modelsDirectory: "file://${readEnv("REPOSITORY_ABSOLUTE_ROOT")}/dev-environment/h2o/exports" | cachingStrategy: "MEMORY" diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/DefaultMLFComponentProvider.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/DefaultMLFComponentProvider.scala new file mode 100644 index 00000000..3ff5a8dc --- /dev/null +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/DefaultMLFComponentProvider.scala @@ -0,0 +1,18 @@ +package pl.touk.nussknacker.prinz.mlflow + +import com.typesafe.config.Config +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.prinz.engine.PrinzComponentProvider +import pl.touk.nussknacker.prinz.mlflow.repository.MLFModelRepository +import pl.touk.nussknacker.prinz.model.repository.ModelRepository + +class DefaultMLFComponentProvider extends PrinzComponentProvider { + + override def providerName: String = "prinzMLflow" + + override final def getComponentRepository(config: Config, dependencies: ProcessObjectDependencies): ModelRepository = { + implicit val implicitConfig: Config = config + implicit val mlfConfig: MLFConfig = MLFConfig() + new MLFModelRepository() + } +} diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/MLFConfig.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/MLFConfig.scala index 0b3db7a3..44f18b75 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/MLFConfig.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/MLFConfig.scala @@ -9,7 +9,7 @@ import java.net.URL final case class MLFConfig(modelLocationStrategy: MLFModelLocationStrategy = LocalMLFModelLocationStrategy) (private implicit val config: Config) { - private implicit val BASE_CONFIG_PATH: String = "mlflow." + private implicit val BASE_CONFIG_PATH: String = "mlfConfig" val baseApiPath: String = "/api/2.0/mlflow" diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala index 2ae1e525..05ca643c 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.prinz.util.collection.immutable.VectorMultimapUtils.V object MLFDataConverter extends LazyLogging { def outputToResultMap(output: MLFOutputDataTypeWrapper, signature: ModelSignature): Map[String, _] = { - val outNames = signature.getOutputNames.map(_.name) + val outNames = signature.getSignatureOutputs.map(_.signatureName.name) outNames.zip(output.outputData) .toMap } diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala index 46021297..f5f9452e 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala @@ -12,7 +12,7 @@ case class MLFInputDataTypeWrapper private(typing: TypingResult, dataValue: AnyR object MLFInputDataTypeWrapper { - private val subclassedNumberEncoder = BestEffortJsonEncoder(failOnUnkown = true) + private val subclassedNumberEncoder = BestEffortJsonEncoder(failOnUnkown = true, classLoader = getClass.getClassLoader) implicit val encodeMLFDataType: Encoder[MLFInputDataTypeWrapper] = (data: MLFInputDataTypeWrapper) => data.typing match { diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFOutputDataTypeWrapper.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFOutputDataTypeWrapper.scala index 4ff8e777..95a1814e 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFOutputDataTypeWrapper.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFOutputDataTypeWrapper.scala @@ -24,7 +24,7 @@ object MLFOutputDataTypeWrapper { } private def getDecoderForIndex(index: Int): Decoder[_] = { - val outputName = signature.getOutputNames(index) + val outputName = signature.getSignatureOutputs(index).signatureName val outputType = signature.getOutputValueType(outputName).get.typingResult getDecoderForType(outputType) } diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFRegisteredModel.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFRegisteredModel.scala index 801f6983..7f23c6ba 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFRegisteredModel.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFRegisteredModel.scala @@ -52,7 +52,10 @@ final case class MLFRegisteredModelVersion(name: String, currentStage: String, source: String, runId: String, - status: String) extends ModelVersion + status: String) extends ModelVersion { + + override def toString: String = version +} final case class MLFModelSignatureLocationMetadata(name: MLFRegisteredModelName, version: MLFRegisteredModelVersion) diff --git a/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/container/MLFRestApiTest.scala b/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/container/MLFRestApiTest.scala index 9ea97356..5141094d 100644 --- a/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/container/MLFRestApiTest.scala +++ b/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/container/MLFRestApiTest.scala @@ -23,7 +23,7 @@ class MLFRestApiTest extends MLFContainerUnitTest private implicit val config: Config = ConfigFactory.parseString( s""" - | mlflow { + | mlfConfig { | serverUrl: "http://localhost:${readEnv("MLFLOW_SERVER_PORT")}" | servedModelsUrl: "http://localhost:${readEnv("MLFLOW_SERVER_PORT")}" | s3AccessKey: "mlflow-key" diff --git a/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala b/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala index a2c14241..72313468 100644 --- a/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala +++ b/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala @@ -104,11 +104,11 @@ class EncodeInputTest extends UnitTest { } private def createInputMultimap(input: List[AnyRef], signature: ModelSignature): VectorMultimap[String, AnyRef] = { - VectorMultimap(signature.getInputNames.map(_.name).zip(input)) + VectorMultimap(signature.getSignatureInputs.map(_.signatureName.name).zip(input)) } private def buildMultipleInput(inputs: List[List[AnyRef]], signature: ModelSignature): VectorMultimap[String, AnyRef] = { - val colValuesList = signature.getInputNames.map(_.name).zip(inputs) + val colValuesList = signature.getSignatureInputs.map(_.signatureName.name).zip(inputs) val colValues = colValuesList.flatMap { case (col, colValues) => colValues.map((col, _)) } VectorMultimap(colValues) } diff --git a/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/DefaultPMMLComponentProvider.scala b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/DefaultPMMLComponentProvider.scala new file mode 100644 index 00000000..a312a238 --- /dev/null +++ b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/DefaultPMMLComponentProvider.scala @@ -0,0 +1,18 @@ +package pl.touk.nussknacker.prinz.pmml + +import com.typesafe.config.Config +import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies +import pl.touk.nussknacker.prinz.engine.PrinzComponentProvider +import pl.touk.nussknacker.prinz.model.repository.ModelRepository +import pl.touk.nussknacker.prinz.pmml.repository.PMMLModelRepository + +class DefaultPMMLComponentProvider extends PrinzComponentProvider { + + override def providerName: String = "prinzPMML" + + override final def getComponentRepository(config: Config, dependencies: ProcessObjectDependencies): ModelRepository = { + implicit val implicitConfig: Config = config + implicit val pmmlConfig: PMMLConfig = PMMLConfig() + new PMMLModelRepository() + } +} diff --git a/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/PMMLConfig.scala b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/PMMLConfig.scala index 753dc97f..396eb4be 100644 --- a/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/PMMLConfig.scala +++ b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/PMMLConfig.scala @@ -8,5 +8,5 @@ final case class PMMLConfig(override protected implicit val config: Config) extends RepositoryClientConfig with ModelVersionConfig { - override protected implicit def baseConfigPath: String = "pmml." + override protected implicit def baseConfigPath: String = "pmmlConfig" } diff --git a/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModel.scala b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModel.scala index c24441c5..00b38d37 100644 --- a/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModel.scala +++ b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModel.scala @@ -28,7 +28,10 @@ final class PMMLModel(payload: PMMLModelPayload) extends Model { final case class PMMLModelName(name: String) extends ModelName(name) -final case class PMMLModelVersion(version: String) extends ModelVersion +final case class PMMLModelVersion(version: String) extends ModelVersion { + + override def toString: String = version +} final case class PMMLModelSignatureLocationMetadata(payload: PMMLModelPayload) extends ModelSignatureLocationMetadata diff --git a/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLHttpApiTest.scala b/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLHttpApiTest.scala index 779d511e..b18311a3 100644 --- a/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLHttpApiTest.scala +++ b/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLHttpApiTest.scala @@ -20,7 +20,7 @@ class PMMLHttpApiTest extends PMMLContainerUnitTest private implicit val config: Config = ConfigFactory.parseString( s""" - | pmml { + | pmmlConfig { | fileExtension: ".pmml" | modelsDirectory: "http://localhost:${readEnv("PMML_SAMPLES_PORT")}" | modelDirectoryHrefSelector: "body > ul > li > a" diff --git a/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLLocalFileApiTest.scala b/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLLocalFileApiTest.scala index 6769484d..bb6c9423 100644 --- a/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLLocalFileApiTest.scala +++ b/prinz_pmml/src/test/scala/pl/touk/nussknacker/prinz/pmml/container/PMMLLocalFileApiTest.scala @@ -20,7 +20,7 @@ class PMMLLocalFileApiTest extends PMMLContainerUnitTest private implicit val config: Config = ConfigFactory.parseString( s""" - | pmml { + | pmmlConfig { | fileExtension: ".pmml" | modelsDirectory: "file://${readEnv("REPOSITORY_ABSOLUTE_ROOT")}/dev-environment/pmml-samples/exports" | modelVersionSeparator: "-v" diff --git a/prinz_sample/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider b/prinz_sample/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider new file mode 100644 index 00000000..5ac0a672 --- /dev/null +++ b/prinz_sample/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider @@ -0,0 +1,3 @@ +pl.touk.nussknacker.prinz.mlflow.DefaultMLFComponentProvider +pl.touk.nussknacker.prinz.pmml.DefaultPMMLComponentProvider +pl.touk.nussknacker.prinz.h2o.DefaultH2OComponentProvider diff --git a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSink.scala b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSink.scala index 3fc3e4b4..5f1b6aeb 100644 --- a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSink.scala +++ b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSink.scala @@ -5,13 +5,13 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context import pl.touk.nussknacker.engine.flink.api.process.BasicFlinkSink -case class LoggingSink() extends BasicFlinkSink with LazyLogging { +object LoggingSink extends BasicFlinkSink with LazyLogging { override def testDataOutput: Option[Any => String] = Some(value => s"$value") override def toFlinkFunction: SinkFunction[Any] = new SinkFunction[Any] with Serializable { - override def invoke(value: Any, context: Context[_]): Unit = { + override def invoke(value: Any, context: Context): Unit = { val loggedMessage = s"$value" logger.info(loggedMessage) } diff --git a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala index 5a9b5f9e..6797186a 100644 --- a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala +++ b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala @@ -1,77 +1,50 @@ package pl.touk.nussknacker.prinz.sample import java.time.Duration -import java.{util => jul} - import javax.annotation.Nullable import javax.validation.constraints.Min import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ // scalastyle:ignore -import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.SourceFunction -import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import pl.touk.nussknacker.engine.api.{Context, MethodToInvoke, ParamName} +import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName} import pl.touk.nussknacker.engine.api.process.Source -import pl.touk.nussknacker.engine.api.typed.{ReturningType, typing} -import pl.touk.nussknacker.engine.flink.api.process.{FlinkCustomNodeContext, FlinkSource, FlinkSourceFactory} -import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{LegacyTimestampWatermarkHandler, TimestampWatermarkHandler} +import pl.touk.nussknacker.engine.flink.api.process.{BasicFlinkSource, FlinkSourceFactory} +import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler -import scala.annotation.nowarn import scala.math.sqrt import scala.util.Random -object PeriodicRandomGaussianDoubleSourceFactory extends PeriodicRandomGaussianDoubleSourceFactory( - new LegacyTimestampWatermarkHandler(new MapAscendingTimestampExtractor(MapAscendingTimestampExtractor.DefaultTimestampField))) - -class PeriodicRandomGaussianDoubleSourceFactory(timestampAssigner: TimestampWatermarkHandler[Double]) extends FlinkSourceFactory[Double] { +object PeriodicRandomGaussianDoubleSourceFactory extends FlinkSourceFactory[Double] { @MethodToInvoke def create(@ParamName("period") period: Duration, @ParamName("mean") @Nullable nullableMean: Double, @ParamName("variance") @Nullable @Min(0) nullableVariance: Double, - @ParamName("count") @Nullable @Min(1) nullableCount: Integer): Source[_] = { - new FlinkSource[Double] with ReturningType { - - override def typeInformation: TypeInformation[Double] = implicitly[TypeInformation[Double]] + @ParamName("count") @Nullable @Min(1) nullableCount: Integer): Source[_] = + new BasicFlinkSource[Double] { - override def sourceStream(env: StreamExecutionEnvironment, flinkNodeContext: FlinkCustomNodeContext): DataStream[Double] = { - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - val mean: Double = Option(nullableMean).getOrElse(0) - val variance: Double = Option(nullableVariance).getOrElse(1) - val count: Integer = Option(nullableCount).getOrElse(1) - val processId = flinkNodeContext.metaData.id - val stream = env - .addSource(new PeriodicFunction(period)) - .map(_ => Context(processId)) - .flatMap { _ => - 1.to(count).map(_ => { - val stdDev = sqrt(variance) - (Random.nextGaussian() * stdDev) + mean - }) - } - - timestampAssigner.assignTimestampAndWatermarks(stream) + override def flinkSourceFunction: SourceFunction[Double] = { + val mean = Option[Double](nullableMean).getOrElse(0.0) + val variance = Option[Double](nullableVariance).getOrElse(1.0) + new PeriodicGaussSourceFunction(period, mean, variance) } - override def timestampAssignerForTest: Option[TimestampWatermarkHandler[Double]] = Some(timestampAssigner) - - override val returnType: typing.TypingResult = typing.Typed[Double] + override def typeInformation: TypeInformation[Double] = implicitly[TypeInformation[Double]] + override def timestampAssigner: Option[TimestampWatermarkHandler[Double]] = None } - } - } -private class PeriodicFunction(duration: Duration) extends SourceFunction[Unit] { +private class PeriodicGaussSourceFunction(sleep: Duration, mean: Double, variance: Double) extends SourceFunction[Double] { @volatile private var isRunning = true - override def run(ctx: SourceFunction.SourceContext[Unit]): Unit = { + override def run(ctx: SourceFunction.SourceContext[Double]): Unit = { while (isRunning) { - ctx.collect(Unit) - Thread.sleep(duration.toMillis) + val stdDev = sqrt(variance) + val rnd = (Random.nextGaussian() * stdDev) + mean + ctx.collect(rnd) + Thread.sleep(sleep.toMillis) } } @@ -79,14 +52,3 @@ private class PeriodicFunction(duration: Duration) extends SourceFunction[Unit] isRunning = false } } - -@nowarn("deprecated") -private class MapAscendingTimestampExtractor(timestampField: String) extends AscendingTimestampExtractor[Double] { - override def extractAscendingTimestamp(element: Double): Long = { - System.currentTimeMillis() - } -} - -object MapAscendingTimestampExtractor { - val DefaultTimestampField = "timestamp" -} diff --git a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala index 60a7b474..5930fffa 100644 --- a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala +++ b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala @@ -1,6 +1,5 @@ package pl.touk.nussknacker.prinz.sample -import com.typesafe.config.Config import pl.touk.nussknacker.engine.api.Service import pl.touk.nussknacker.engine.api.exception.ExceptionHandlerFactory import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, SinkFactory, SourceFactory, WithCategories} @@ -8,16 +7,6 @@ import pl.touk.nussknacker.engine.flink.util.exception.VerboselyLoggingException import pl.touk.nussknacker.engine.flink.util.sink.EmptySink import pl.touk.nussknacker.engine.flink.util.transformer.PeriodicSourceFactory import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator -import pl.touk.nussknacker.prinz.enrichers.PrinzEnricher -import pl.touk.nussknacker.prinz.h2o.H2OConfig -import pl.touk.nussknacker.prinz.h2o.repository.H2OModelRepository -import pl.touk.nussknacker.prinz.mlflow.MLFConfig -import pl.touk.nussknacker.prinz.mlflow.model.api.LocalMLFModelLocationStrategy -import pl.touk.nussknacker.prinz.mlflow.repository.MLFModelRepository -import pl.touk.nussknacker.prinz.model.Model -import pl.touk.nussknacker.prinz.model.repository.CompositeModelRepository -import pl.touk.nussknacker.prinz.pmml.PMMLConfig -import pl.touk.nussknacker.prinz.pmml.repository.PMMLModelRepository class SampleConfigCreator extends EmptyProcessConfigCreator { @@ -33,40 +22,8 @@ class SampleConfigCreator extends EmptyProcessConfigCreator { "logMessage" -> allCategories(SinkFactory.noParam(LoggingSink)) ) - override def services(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = { - implicit val config: Config = processObjectDependencies.config - - val modelLocationStrategy = LocalMLFModelLocationStrategy - implicit val mlfConfig: MLFConfig = MLFConfig(modelLocationStrategy) - implicit val pmmlConfig: PMMLConfig = PMMLConfig() - implicit val h2oConfig: H2OConfig = H2OConfig() - - val mlfRepository = new MLFModelRepository() - val pmmlRepository = new PMMLModelRepository() - val h2oRepository = new H2OModelRepository() - - val repository = CompositeModelRepository( - mlfRepository, - pmmlRepository, - h2oRepository - ) - val modelsResult = repository.listModels - - val result = for { - models <- modelsResult - } yield models.foldLeft(Map.empty[String, WithCategories[Service]]) { - (services, model) => services + createModelEnricherRepresentation(model) - } - - result match { - case Right(services) => services - case Left(exception) => throw exception - } - } + override def services(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = Map() override def exceptionHandlerFactory(processObjectDependencies: ProcessObjectDependencies): ExceptionHandlerFactory = ExceptionHandlerFactory.noParams(VerboselyLoggingExceptionHandler(_)) - - private def createModelEnricherRepresentation(model: Model): (String, WithCategories[Service]) = - model.getMetadata.modelName.internal -> allCategories(PrinzEnricher(model)) }