diff --git a/README.md b/README.md index 2c666a79..b271ffe2 100644 --- a/README.md +++ b/README.md @@ -25,11 +25,11 @@ If you are already signed to GitHub in your project, just add any of these lines to add Prinz dependencies to your `.sbt` project ```sbt -"pl.touk.nussknacker.prinz" %% "prinz" % "1.0.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-mlflow" % "1.0.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-pmml" % "1.0.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-h2o" % "1.0.0-SNAPSHOT" -"pl.touk.nussknacker.prinz" %% "prinz-proxy" % "1.0.0-SNAPSHOT" +"pl.touk.nussknacker.prinz" %% "prinz" % "1.2.0-preview-staging" +"pl.touk.nussknacker.prinz" %% "prinz-mlflow" % "1.2.0-preview-staging" +"pl.touk.nussknacker.prinz" %% "prinz-pmml" % "1.2.0-preview-staging" +"pl.touk.nussknacker.prinz" %% "prinz-h2o" % "1.2.0-preview-staging" +"pl.touk.nussknacker.prinz" %% "prinz-proxy" % "1.2.0-preview-staging" ``` ## Authors diff --git a/build.sbt b/build.sbt index 83e5f7bf..73536596 100755 --- a/build.sbt +++ b/build.sbt @@ -1,17 +1,17 @@ import sbtassembly.MergeStrategy -val prinzV = "1.0.0-SNAPSHOT" +val prinzV = "1.2.0-preview-staging" val prinzOrg = "pl.touk.nussknacker.prinz" val repositoryOwner = "prinz-nussknacker" val repositoryName = "prinz" // Dependency versions val scalaV = "2.12.10" -val nussknackerV = "1.0.0" +val nussknackerV = "1.2.0-staging-2021-12-09-5983-f15ac11992e3a82d32f651ef4e05b5d9458f46aa-SNAPSHOT" val sttpV = "3.0.0-RC7" val scalatestV = "3.2.2" val minioS3V = "8.0.0" -val circeV = "0.11.1" +val circeV = "0.14.1" val circeYamlV = "0.11.0-M1" val jpmmlV = "1.5.11" val jpmmlTranspilerV = "1.1.7" @@ -69,7 +69,8 @@ lazy val commonSettings = Seq( Seq( "ch.qos.logback" % "logback-classic" % logbackV, "com.typesafe.scala-logging" %% "scala-logging" % typesafeLogV, - "pl.touk.nussknacker" %% "nussknacker-process" % nussknackerV, + "pl.touk.nussknacker" %% "nussknacker-util" % nussknackerV, + "pl.touk.nussknacker" %% "nussknacker-api" % nussknackerV, ) }, @@ -200,13 +201,11 @@ lazy val prinz_sample = (project in file("prinz_sample")) // "pl.touk.nussknacker.prinz" %% "prinz-pmml" % prinzV, // "pl.touk.nussknacker.prinz" %% "prinz-h2o" % prinzV, - "pl.touk.nussknacker" %% "nussknacker-process" % nussknackerV, - "pl.touk.nussknacker" %% "nussknacker-model-flink-util" % nussknackerV, - "pl.touk.nussknacker" %% "nussknacker-kafka-flink-util" % nussknackerV, + "pl.touk.nussknacker" %% "nussknacker-flink-engine" % nussknackerV, + "pl.touk.nussknacker" %% "nussknacker-flink-kafka-util" % nussknackerV, "pl.touk.nussknacker" %% "nussknacker-ui" % nussknackerV, "pl.touk.nussknacker" %% "nussknacker-flink-manager" % nussknackerV, "pl.touk.nussknacker" %% "nussknacker-flink-api" % nussknackerV, - "pl.touk.nussknacker" %% "nussknacker-flink-util" % nussknackerV, ) }, // add GitHub packages resolver dependency with GitHub token declared to download prinz diff --git a/dev-environment/create_environment.sh b/dev-environment/create_environment.sh index 6e0fe70f..dfee8ea4 100755 --- a/dev-environment/create_environment.sh +++ b/dev-environment/create_environment.sh @@ -1,7 +1,7 @@ #!/bin/bash scalaV="2.12" -prinzV="1.0.0-SNAPSHOT" +prinzV="1.2.0-preview-staging" COMP_FILES="" ENV_FILE="-f docker-compose-env.yaml" diff --git a/dev-environment/docker-compose-env.yaml b/dev-environment/docker-compose-env.yaml index f6969c80..9a91cd98 100644 --- a/dev-environment/docker-compose-env.yaml +++ b/dev-environment/docker-compose-env.yaml @@ -22,7 +22,7 @@ services: # this is needed to be able to verify savepoints during deployments - storage_flink:/opt/flink/data - ./nussknacker/opt/prinz-sample/prinz.conf:/opt/nussknacker/conf/prinz.conf - - ./nussknacker/opt/prinz-sample/prinz-sample-assembly-1.0.0-SNAPSHOT.jar:/opt/prinz-sample/prinz-sample.jar + - ./nussknacker/opt/prinz-sample/prinz-sample-assembly-1.2.0-preview-staging.jar:/opt/prinz-sample/prinz-sample.jar networks: - dev-bridge-net diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzEnricher.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzEnricher.scala index b740b749..bb7e8347 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzEnricher.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/engine/PrinzEnricher.scala @@ -1,38 +1,37 @@ package pl.touk.nussknacker.prinz.engine import com.typesafe.scalalogging.LazyLogging -import pl.touk.nussknacker.engine.api.definition.{Parameter, ServiceWithExplicitMethod} +import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.test.InvocationCollectors import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.api.{ContextId, MetaData} +import pl.touk.nussknacker.engine.util.service.ServiceWithStaticParametersAndReturnType import pl.touk.nussknacker.prinz.model.Model import pl.touk.nussknacker.prinz.model.ModelInstance.ModelInputData import pl.touk.nussknacker.prinz.util.collection.immutable.VectorMultimap -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} final case class PrinzEnricher(private val model: Model) - extends ServiceWithExplicitMethod + extends ServiceWithStaticParametersAndReturnType with LazyLogging { private lazy val modelInstance = { model.toModelInstance } - override def invokeService(params: List[AnyRef]) - (implicit ec: ExecutionContext, - collector: InvocationCollectors.ServiceInvocationCollector, - metaData: MetaData, - contextId: ContextId): Future[AnyRef] = { - val inputMap = createInputMap(params) + def invoke(params: Map[String, Any])(implicit ec: ExecutionContext, + collector: InvocationCollectors.ServiceInvocationCollector, + contextId: ContextId, + metaData: MetaData): Future[Any] = { + val inputMap = createInputMap(params.values.toList) modelInstance.run(inputMap).map { case Right(runResult) => runResult case Left(exc) => throw exc - } + }(ec) } - override def parameterDefinition: List[Parameter] = + override def parameters: List[Parameter] = model .getMetadata .signature @@ -44,6 +43,6 @@ final case class PrinzEnricher(private val model: Model) .signature .toOutputTypedObjectTypingResult - def createInputMap(inputs: List[AnyRef]): ModelInputData = - VectorMultimap(parameterDefinition.map(_.name) zip inputs) + def createInputMap(inputs: List[Any]): ModelInputData = + VectorMultimap(parameters.map(_.name) zip inputs) } diff --git a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala index c41d1f49..a8205150 100644 --- a/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala +++ b/prinz/src/main/scala/pl/touk/nussknacker/prinz/model/ModelInstance.scala @@ -36,5 +36,5 @@ object ModelInstance { type ModelRunResult = Future[Either[ModelRunException, JMap[String, _]]] - type ModelInputData = VectorMultimap[String, AnyRef] + type ModelInputData = VectorMultimap[String, Any] } diff --git a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala index ce183bbd..e667f86b 100644 --- a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala +++ b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/ApiIntegrationSpec.scala @@ -42,7 +42,7 @@ trait ApiIntegrationSpec extends UnitTest with TestModelsManager { val model = getModel().get val instance = getModelInstance().get val signature = model.getMetadata.signature - val sampleInput = constructInputMap(0.415.asInstanceOf[AnyRef], signature) + val sampleInput = constructInputMap(0.415.asInstanceOf[Any], signature) val response = Await.result(instance.run(sampleInput), awaitTimeout) response.toOption.isDefined shouldBe true @@ -82,7 +82,7 @@ trait ApiIntegrationSpec extends UnitTest with TestModelsManager { ("gender", "F"), ("category", "es_transportation"), ("amount", 800.0), - ).mapValues(_.asInstanceOf[AnyRef]) + ).mapValues(_.asInstanceOf[Any]) val response = Await.result(instance.run(sampleInput), awaitTimeout) response.toOption.isDefined shouldBe true diff --git a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala index 14f24504..7bf82f48 100644 --- a/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala +++ b/prinz/src/test/scala/pl/touk/nussknacker/prinz/container/TestModelsManager.scala @@ -32,7 +32,7 @@ trait TestModelsManager { def getElasticnetWineModelModel(modelId: Int)(models: List[Model]): Model = models.filter(_.getMetadata.modelName.toString.contains("ElasticnetWineModel-" + modelId)).head - def constructInputMap(value: AnyRef, signature: ModelSignature): VectorMultimap[String, AnyRef] = { + def constructInputMap(value: Any, signature: ModelSignature): VectorMultimap[String, Any] = { val names = signature.getSignatureInputs.map(_.signatureName.name) val data = List.fill(names.length)(value) VectorMultimap(names.zip(data)) diff --git a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/converter/H2ODataConverter.scala b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/converter/H2ODataConverter.scala index 6a076420..8645b2ce 100644 --- a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/converter/H2ODataConverter.scala +++ b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/converter/H2ODataConverter.scala @@ -12,20 +12,20 @@ object H2ODataConverter extends LazyLogging { def inputToTypedModelInput(input: ModelInputData, signature: ModelSignature): ModelInputData = input.mapValuesWithKeys(wrappedByDefinitionFrom(signature)) - private def wrappedByDefinitionFrom(signature: ModelSignature)(key: String, value: AnyRef): AnyRef = + private def wrappedByDefinitionFrom(signature: ModelSignature)(key: String, value: Any): Any = signature.getInputValueType(SignatureName(key)) match { case Some(signatureType) => mapValueBySignature(value, signatureType) case None => throw new IllegalStateException(s"Found data column not defined in signature with name: $key") } - private def mapValueBySignature(value: AnyRef, signatureType: SignatureType): AnyRef = + private def mapValueBySignature(value: Any, signatureType: SignatureType): Any = signatureType.typingResult match { case t: TypingResult if t.canBeSubclassOf(Typed[String]) => wrapStringInput(value) case t: TypingResult if t.canBeSubclassOf(Typed[Double]) => value case t: TypingResult => throw new IllegalStateException(s"Found not expected type in signature of H2O model: $t") } - private def wrapStringInput(input: AnyRef): AnyRef = { + private def wrapStringInput(input: Any): Any = { input match { case stringValue: String => val beginFixed = if (stringValue.startsWith(STRING_WRAPPER)) stringValue else STRING_WRAPPER + stringValue diff --git a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModelInstance.scala b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModelInstance.scala index 118b2dbb..87981c0e 100644 --- a/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModelInstance.scala +++ b/prinz_h2o/src/main/scala/pl/touk/nussknacker/prinz/h2o/model/H2OModelInstance.scala @@ -33,9 +33,9 @@ case class H2OModelInstance(private val modelWrapper: EasyPredictModelWrapper, } } - private def evaluateRow(row: Map[String, AnyRef]): AbstractPrediction = { + private def evaluateRow(row: Map[String, Any]): AbstractPrediction = { val rowData = new RowData() - rowData.putAll(row.asJava) + rowData.putAll(row.map{case (key, value) => key -> value.asInstanceOf[AnyRef]}.asJava) val result = modelWrapper.predict(rowData) result } diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala index 05ca643c..abd6c18b 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFDataConverter.scala @@ -32,6 +32,6 @@ object MLFDataConverter extends LazyLogging { Dataframe(columns, data) } - private def isMultimapConvertible(multimap: VectorMultimap[String, AnyRef]): Boolean = + private def isMultimapConvertible(multimap: VectorMultimap[String, Any]): Boolean = multimap.values.map(_.size).toSet.size <= 1 } diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala index f5f9452e..be8ab987 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/converter/MLFInputDataTypeWrapper.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult} import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder -case class MLFInputDataTypeWrapper private(typing: TypingResult, dataValue: AnyRef) { +case class MLFInputDataTypeWrapper private(typing: TypingResult, dataValue: Any) { override def toString: String = s"MLFInputDataTypeWrapper($dataValue: ${typing.display})" } @@ -23,7 +23,7 @@ object MLFInputDataTypeWrapper { case _ => throw new IllegalArgumentException(s"Unknown mlflow data type wrapper type: ${data.typing}") } - def apply(signature: ModelSignature, columnName: String, value: AnyRef): MLFInputDataTypeWrapper = { + def apply(signature: ModelSignature, columnName: String, value: Any): MLFInputDataTypeWrapper = { val columnType = extractColumnType(signature, columnName) new MLFInputDataTypeWrapper(columnType, value) } diff --git a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFModelInstance.scala b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFModelInstance.scala index ba5c35e5..c0ad427a 100644 --- a/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFModelInstance.scala +++ b/prinz_mlflow/src/main/scala/pl/touk/nussknacker/prinz/mlflow/model/api/MLFModelInstance.scala @@ -17,7 +17,7 @@ case class MLFModelInstance(config: MLFConfig, private val invokeRestClient = MLFInvokeRestClient(config.servedModelsUrl.toString, model) - override protected def runVerified(inputMap: VectorMultimap[String, AnyRef]): ModelRunResult = { + override protected def runVerified(inputMap: VectorMultimap[String, Any]): ModelRunResult = { val dataframe = MLFDataConverter.inputToDataframe(inputMap, model.getMetadata.signature) invokeRestClient.invoke(dataframe, model.getMetadata.signature, config.modelLocationStrategy) .map { response => diff --git a/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala b/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala index 72313468..807432ec 100644 --- a/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala +++ b/prinz_mlflow/src/test/scala/pl/touk/nussknacker/prinz/mlflow/converter/EncodeInputTest.scala @@ -103,13 +103,13 @@ class EncodeInputTest extends UnitTest { ModelSignature(input, List()) } - private def createInputMultimap(input: List[AnyRef], signature: ModelSignature): VectorMultimap[String, AnyRef] = { + private def createInputMultimap(input: List[AnyRef], signature: ModelSignature): VectorMultimap[String, Any] = { VectorMultimap(signature.getSignatureInputs.map(_.signatureName.name).zip(input)) } - private def buildMultipleInput(inputs: List[List[AnyRef]], signature: ModelSignature): VectorMultimap[String, AnyRef] = { + private def buildMultipleInput(inputs: List[List[AnyRef]], signature: ModelSignature): VectorMultimap[String, Any] = { val colValuesList = signature.getSignatureInputs.map(_.signatureName.name).zip(inputs) - val colValues = colValuesList.flatMap { case (col, colValues) => colValues.map((col, _)) } + val colValues = colValuesList.flatMap { case (col, colValues) => colValues.map(v => (col, v.asInstanceOf[Any])) } VectorMultimap(colValues) } } diff --git a/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModelInstance.scala b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModelInstance.scala index acdb4c52..686e4bab 100644 --- a/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModelInstance.scala +++ b/prinz_pmml/src/main/scala/pl/touk/nussknacker/prinz/pmml/model/PMMLModelInstance.scala @@ -27,7 +27,7 @@ case class PMMLModelInstance(private val evaluator: Evaluator, } } - def evaluateRow(row: Map[String, AnyRef]): Map[String, _] = { + def evaluateRow(row: Map[String, Any]): Map[String, _] = { val args = EvaluatorUtil.encodeKeys(row.asJava) val results = evaluator.evaluate(args) val decodeResult = EvaluatorUtil.decodeAll(results).asScala.toMap diff --git a/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/api/ProxiedInputModel.scala b/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/api/ProxiedInputModel.scala index 3d021ccf..bcfd2837 100644 --- a/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/api/ProxiedInputModel.scala +++ b/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/api/ProxiedInputModel.scala @@ -19,7 +19,7 @@ class ProxiedInputModel private(originalModel: Model, def this(model: Model, proxiedParams: Iterable[ProxiedModelInputParam], - compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]) { + compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]) { this( model, CompositeProxiedInputModelName(model), @@ -76,7 +76,7 @@ object ProxiedInputModel { def apply(model: Model, proxiedParams: Iterable[ProxiedModelInputParam], - compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]): ProxiedInputModel = + compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]): ProxiedInputModel = new ProxiedInputModel(model, proxiedParams, compositeProxiedParams) def apply(model: Model, @@ -89,14 +89,14 @@ object ProxiedInputModel { new ProxiedInputModel(model, transformer) private def collectRemovedParams(proxiedParams: Iterable[ProxiedModelInputParam], - compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]): Iterable[SignatureName] = { + compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]): Iterable[SignatureName] = { val proxiedNames = proxiedParams.map(_.paramName) val composedProxiedNames = compositeProxiedParams.flatMap(_.proxiedParams) proxiedNames ++ composedProxiedNames } private def filteredTransform(proxiedParams: Iterable[ProxiedModelInputParam], - compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]] + compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]] ): FilteredSignatureTransformer = new FilteredSignatureTransformer(collectRemovedParams(proxiedParams, compositeProxiedParams)) } diff --git a/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/build/ProxiedInputModelBuilder.scala b/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/build/ProxiedInputModelBuilder.scala index b6295c44..3356eee5 100644 --- a/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/build/ProxiedInputModelBuilder.scala +++ b/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/build/ProxiedInputModelBuilder.scala @@ -12,7 +12,7 @@ class ProxiedInputModelBuilder(private val model: Model) { protected val params: mutable.Map[SignatureName, ProxiedModelInputParam] = mutable.Map[SignatureName, ProxiedModelInputParam]() - protected val composedParams: mutable.MutableList[ProxiedModelCompositeInputParam[_ <: AnyRef]] = mutable.MutableList() + protected val composedParams: mutable.MutableList[ProxiedModelCompositeInputParam[_ <: Any]] = mutable.MutableList() def proxyParam(paramName: String)(paramSupplier: ParamSupplier): this.type = { val signatureName = SignatureName(paramName) @@ -21,7 +21,7 @@ class ProxiedInputModelBuilder(private val model: Model) { this } - def proxyComposedParam[T <: AnyRef](paramSupplier: ComposedParamsSupplier[T], + def proxyComposedParam[T <: Any](paramSupplier: ComposedParamsSupplier[T], paramsExtractor: ParamsExtractor[T], proxiedParams: Iterable[SignatureName]): this.type = { val createdInputParam = ProxiedModelCompositeInputParam(paramSupplier,paramsExtractor, proxiedParams) diff --git a/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/composite/ProxiedInputModelInstance.scala b/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/composite/ProxiedInputModelInstance.scala index 317139c6..22def49c 100644 --- a/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/composite/ProxiedInputModelInstance.scala +++ b/prinz_proxy/src/main/scala/pl/touk/nussknacker/prinz/model/proxy/composite/ProxiedInputModelInstance.scala @@ -12,23 +12,23 @@ class ProxiedInputModelInstance(originalModelMetadata: ModelMetadata, originalModelInstance: ModelInstance, proxiedModel: ProxiedInputModel, proxiedParams: Iterable[ProxiedModelInputParam], - compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: AnyRef]]) + compositeProxiedParams: Iterable[ProxiedModelCompositeInputParam[_ <: Any]]) extends ModelInstance(proxiedModel) { - override protected def runVerified(inputMap: VectorMultimap[String, AnyRef]): ModelRunResult = { + override protected def runVerified(inputMap: VectorMultimap[String, Any]): ModelRunResult = { val addInputParams = supplyNonProvidedInputs(inputMap) val addComposedParams = addInputParams.flatMap(supplyNonProvidedComposedInputs) addComposedParams.flatMap(originalModelInstance.run) } - private def supplyNonProvidedInputs(inputMap: VectorMultimap[String, AnyRef]): Future[VectorMultimap[String, AnyRef]] = + private def supplyNonProvidedInputs(inputMap: VectorMultimap[String, Any]): Future[VectorMultimap[String, Any]] = Future.sequence( proxiedParams .filter(inputsToBeReplacedIn(inputMap)) .map(_.supplyParamValue(originalModelMetadata)) ).map(addExtraInputsTo(inputMap)) - private def supplyNonProvidedComposedInputs(inputMap: VectorMultimap[String, AnyRef]): Future[VectorMultimap[String, AnyRef]] = + private def supplyNonProvidedComposedInputs(inputMap: VectorMultimap[String, Any]): Future[VectorMultimap[String, Any]] = Future.sequence(compositeProxiedParams .map(_.supplyCompositeParamValues(originalModelMetadata)) ) @@ -36,9 +36,9 @@ class ProxiedInputModelInstance(originalModelMetadata: ModelMetadata, addExtraInputsTo(acc)(composedValues) }) - private def inputsToBeReplacedIn(inputMap: VectorMultimap[String, AnyRef]): ProxiedModelInputParam => Boolean = + private def inputsToBeReplacedIn(inputMap: VectorMultimap[String, Any]): ProxiedModelInputParam => Boolean = param => !inputMap.containsKey(param.paramName.name) - private def addExtraInputsTo(inputMap: VectorMultimap[String, AnyRef])(extraInputs: Iterable[(String, AnyRef)]): VectorMultimap[String, AnyRef] = + private def addExtraInputsTo(inputMap: VectorMultimap[String, Any])(extraInputs: Iterable[(String, Any)]): VectorMultimap[String, Any] = extraInputs.foldLeft(inputMap) { (acc, value) => acc.add(value._1, value._2) } } diff --git a/prinz_proxy/src/test/scala/pl/touk/nussknacker/prinz/proxy/ModelsProxySpec.scala b/prinz_proxy/src/test/scala/pl/touk/nussknacker/prinz/proxy/ModelsProxySpec.scala index fa5d096d..f45bea13 100644 --- a/prinz_proxy/src/test/scala/pl/touk/nussknacker/prinz/proxy/ModelsProxySpec.scala +++ b/prinz_proxy/src/test/scala/pl/touk/nussknacker/prinz/proxy/ModelsProxySpec.scala @@ -27,7 +27,7 @@ trait ModelsProxySpec extends UnitTest val sampleInput = VectorMultimap( ("age", "4"), ("category", "es_transportation"), - ).mapValues(_.asInstanceOf[AnyRef]) + ).mapValues(_.asInstanceOf[Any]) val response = Await.result(instance.run(sampleInput), awaitTimeout) assertRunResult(response) @@ -57,7 +57,7 @@ trait ModelsProxySpec extends UnitTest val sampleInput = VectorMultimap( ("age", "4"), ("category", "es_transportation"), - ).mapValues(_.asInstanceOf[AnyRef]) + ).mapValues(_.asInstanceOf[Any]) val response = Await.result(instance.run(sampleInput), awaitTimeout) assertRunResult(response) @@ -75,7 +75,7 @@ trait ModelsProxySpec extends UnitTest (s"${tableName}_id", 1), ("age", "4"), ("category", "es_transportation"), - ).mapValues(_.asInstanceOf[AnyRef]) + ) val response = Await.result(instance.run(sampleInput), awaitTimeout) assertRunResult(response) diff --git a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSink.scala b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSink.scala deleted file mode 100644 index 5f1b6aeb..00000000 --- a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSink.scala +++ /dev/null @@ -1,19 +0,0 @@ -package pl.touk.nussknacker.prinz.sample - -import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.streaming.api.functions.sink.SinkFunction -import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context -import pl.touk.nussknacker.engine.flink.api.process.BasicFlinkSink - -object LoggingSink extends BasicFlinkSink with LazyLogging { - - override def testDataOutput: Option[Any => String] = - Some(value => s"$value") - - override def toFlinkFunction: SinkFunction[Any] = new SinkFunction[Any] with Serializable { - override def invoke(value: Any, context: Context): Unit = { - val loggedMessage = s"$value" - logger.info(loggedMessage) - } - } -} diff --git a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSinkFunction.scala b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSinkFunction.scala new file mode 100644 index 00000000..63c68f03 --- /dev/null +++ b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/LoggingSinkFunction.scala @@ -0,0 +1,11 @@ +package pl.touk.nussknacker.prinz.sample + +import com.typesafe.scalalogging.LazyLogging +import org.apache.flink.streaming.api.functions.sink.SinkFunction + +object LoggingSinkFunction extends SinkFunction[AnyRef] with LazyLogging { + override def invoke(value: AnyRef, context: SinkFunction.Context): Unit = { + val loggedMessage = s"$value" + logger.info(loggedMessage) + } +} diff --git a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala index 6797186a..d2ff263e 100644 --- a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala +++ b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/PeriodicRandomGaussianDoubleSourceFactory.scala @@ -7,20 +7,20 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.SourceFunction import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName} -import pl.touk.nussknacker.engine.api.process.Source -import pl.touk.nussknacker.engine.flink.api.process.{BasicFlinkSource, FlinkSourceFactory} +import pl.touk.nussknacker.engine.api.process.{Source, SourceFactory} +import pl.touk.nussknacker.engine.flink.api.process.BasicFlinkSource import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler import scala.math.sqrt import scala.util.Random -object PeriodicRandomGaussianDoubleSourceFactory extends FlinkSourceFactory[Double] { +object PeriodicRandomGaussianDoubleSourceFactory extends SourceFactory { @MethodToInvoke def create(@ParamName("period") period: Duration, @ParamName("mean") @Nullable nullableMean: Double, @ParamName("variance") @Nullable @Min(0) nullableVariance: Double, - @ParamName("count") @Nullable @Min(1) nullableCount: Integer): Source[_] = + @ParamName("count") @Nullable @Min(1) nullableCount: Integer): Source = new BasicFlinkSource[Double] { override def flinkSourceFunction: SourceFunction[Double] = { diff --git a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala index 5930fffa..053666b6 100644 --- a/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala +++ b/prinz_sample/src/main/scala/pl/touk/nussknacker/prinz/sample/SampleConfigCreator.scala @@ -1,29 +1,22 @@ package pl.touk.nussknacker.prinz.sample import pl.touk.nussknacker.engine.api.Service -import pl.touk.nussknacker.engine.api.exception.ExceptionHandlerFactory import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, SinkFactory, SourceFactory, WithCategories} -import pl.touk.nussknacker.engine.flink.util.exception.VerboselyLoggingExceptionHandler -import pl.touk.nussknacker.engine.flink.util.sink.EmptySink -import pl.touk.nussknacker.engine.flink.util.transformer.PeriodicSourceFactory +import pl.touk.nussknacker.engine.flink.util.sink.{EmptySink, SingleValueSinkFactory} import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator class SampleConfigCreator extends EmptyProcessConfigCreator { protected def allCategories[T](obj: T): WithCategories[T] = WithCategories(obj, "FraudDetection", "Recommendations") - override def sourceFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SourceFactory[_]]] = Map( - "periodic" -> allCategories(PeriodicSourceFactory), + override def sourceFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SourceFactory]] = Map( "periodicGaussianDouble" -> allCategories(PeriodicRandomGaussianDoubleSourceFactory) ) override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = Map( "empty" -> allCategories(SinkFactory.noParam(EmptySink)), - "logMessage" -> allCategories(SinkFactory.noParam(LoggingSink)) + "logMessage" -> allCategories(new SingleValueSinkFactory(LoggingSinkFunction)) ) override def services(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[Service]] = Map() - - override def exceptionHandlerFactory(processObjectDependencies: ProcessObjectDependencies): ExceptionHandlerFactory = - ExceptionHandlerFactory.noParams(VerboselyLoggingExceptionHandler(_)) }