From 186c5bc5d9aa506ddd85d881d2ae1a1e31494218 Mon Sep 17 00:00:00 2001 From: Mark Andreev Date: Thu, 26 Sep 2024 21:44:09 +0100 Subject: [PATCH] [SPARK-49549][SQL] Assign a name to the error conditions _LEGACY_ERROR_TEMP_3055, 3146 --- .../resources/error/error-conditions.json | 22 ++++++++-------- .../main/resources/error/error-states.json | 12 +++++++++ .../catalog/functions/ScalarFunction.java | 8 +++++- .../expressions/V2ExpressionUtils.scala | 5 ++-- .../connector/DataSourceV2FunctionSuite.scala | 25 ++++++++++++++++--- 5 files changed, 55 insertions(+), 17 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 3fcb53426eccf..bd01cd78f17ac 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3975,6 +3975,18 @@ ], "sqlState" : "21000" }, + "SCALAR_FUNCTION_NOT_COMPATIBLE" : { + "message" : [ + "ScalarFunction not overrides method 'produceResult(InternalRow)' with custom implementation." + ], + "sqlState" : "42K0O" + }, + "SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED" : { + "message" : [ + "ScalarFunction not implements or overrides method 'produceResult(InternalRow)'." + ], + "sqlState" : "42K0P" + }, "SCHEMA_ALREADY_EXISTS" : { "message" : [ "Cannot create schema because it already exists.", @@ -7911,11 +7923,6 @@ " is not currently supported" ] }, - "_LEGACY_ERROR_TEMP_3055" : { - "message" : [ - "ScalarFunction neither implement magic method nor override 'produceResult'" - ] - }, "_LEGACY_ERROR_TEMP_3056" : { "message" : [ "Unexpected row-level read relations (allow multiple = ): " @@ -8274,11 +8281,6 @@ "Partitions truncate is not supported" ] }, - "_LEGACY_ERROR_TEMP_3146" : { - "message" : [ - "Cannot find a compatible ScalarFunction#produceResult" - ] - }, "_LEGACY_ERROR_TEMP_3147" : { "message" : [ ": Batch scan are not supported" diff --git a/common/utils/src/main/resources/error/error-states.json b/common/utils/src/main/resources/error/error-states.json index 87811fef9836e..9be97556c1076 100644 --- a/common/utils/src/main/resources/error/error-states.json +++ b/common/utils/src/main/resources/error/error-states.json @@ -4631,6 +4631,18 @@ "standard": "N", "usedBy": ["Spark"] }, + "42K0O": { + "description": "ScalarFunction not overrides method 'produceResult(InternalRow)' with custom implementation.", + "origin": "Spark", + "standard": "N", + "usedBy": ["Spark"] + }, + "42K0P": { + "description": "ScalarFunction not implements or overrides method 'produceResult(InternalRow)'.", + "origin": "Spark", + "standard": "N", + "usedBy": ["Spark"] + }, "42KD0": { "description": "Ambiguous name reference.", "origin": "Databricks", diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java index ca4ea5114c26b..c0078872bd843 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java @@ -20,8 +20,11 @@ import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.QuotingUtils; import org.apache.spark.sql.types.DataType; +import java.util.Map; + /** * Interface for a function that produces a result value for each input row. *

@@ -149,7 +152,10 @@ public interface ScalarFunction extends BoundFunction { * @return a result value */ default R produceResult(InternalRow input) { - throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3146"); + throw new SparkUnsupportedOperationException( + "SCALAR_FUNCTION_NOT_COMPATIBLE", + Map.of("scalarFunc", QuotingUtils.quoteIdentifier(name())) + ); } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index 220920a5a3198..d14c8cb675387 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME import org.apache.spark.sql.connector.expressions.{BucketTransform, Expression => V2Expression, FieldReference, IdentityTransform, Literal => V2Literal, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform} +import org.apache.spark.sql.errors.DataTypeErrors.toSQLId import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ import org.apache.spark.util.ArrayImplicits._ @@ -182,8 +183,8 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { ApplyFunctionExpression(scalarFunc, arguments) case _ => throw new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_3055", - messageParameters = Map("scalarFunc" -> scalarFunc.name())) + errorClass = "SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED", + messageParameters = Map("scalarFunc" -> toSQLId(scalarFunc.name()))) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala index d6599debd3b11..6b0fd6084099c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala @@ -414,8 +414,8 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { new JavaStrLen(new JavaStrLenNoImpl)) checkError( exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()), - condition = "_LEGACY_ERROR_TEMP_3055", - parameters = Map("scalarFunc" -> "strlen"), + condition = "SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED", + parameters = Map("scalarFunc" -> "`strlen`"), context = ExpectedContext( fragment = "testcat.ns.strlen('abc')", start = 7, @@ -448,8 +448,8 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new JavaLongAddMismatchMagic)) checkError( exception = intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 2L)").collect()), - condition = "_LEGACY_ERROR_TEMP_3055", - parameters = Map("scalarFunc" -> "long_add_mismatch_magic"), + condition = "SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED", + parameters = Map("scalarFunc" -> "`long_add_mismatch_magic`"), context = ExpectedContext( fragment = "testcat.ns.add(1L, 2L)", start = 7, @@ -458,6 +458,23 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { ) } + test("SPARK-49549: scalar function w/ mismatch a compatible ScalarFunction#produceResult") { + case object CharLength extends ScalarFunction[Int] { + override def inputTypes(): Array[DataType] = Array(StringType) + override def resultType(): DataType = IntegerType + override def name(): String = "CHAR_LENGTH" + } + + catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) + addFunction(Identifier.of(Array("ns"), "my_strlen"), StrLen(CharLength)) + checkError( + exception = intercept[SparkUnsupportedOperationException] + (sql("SELECT testcat.ns.my_strlen('abc')").collect()), + condition = "SCALAR_FUNCTION_NOT_COMPATIBLE", + parameters = Map("scalarFunc" -> "`CHAR_LENGTH`") + ) + } + test("SPARK-35390: scalar function w/ type coercion") { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new JavaLongAddDefault(false)))