Skip to content

Commit

Permalink
[SPARK-49549][SQL] Assign a name to the error conditions _LEGACY_ERRO…
Browse files Browse the repository at this point in the history
…R_TEMP_3055, 3146
  • Loading branch information
mrk-andreev committed Oct 8, 2024
1 parent b6681fb commit 943845a
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 17 deletions.
20 changes: 10 additions & 10 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3975,6 +3975,16 @@
],
"sqlState" : "21000"
},
"SCALAR_FUNCTION_NOT_COMPATIBLE" : {
"message" : [
"ScalarFunction <scalarFunc> not overrides method 'produceResult(InternalRow)' with custom implementation."
]
},
"SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED" : {
"message" : [
"ScalarFunction <scalarFunc> not implements or overrides method 'produceResult(InternalRow)'."
]
},
"SCHEMA_ALREADY_EXISTS" : {
"message" : [
"Cannot create schema <schemaName> because it already exists.",
Expand Down Expand Up @@ -7911,11 +7921,6 @@
"<expr> is not currently supported"
]
},
"_LEGACY_ERROR_TEMP_3055" : {
"message" : [
"ScalarFunction <scalarFunc> neither implement magic method nor override 'produceResult'"
]
},
"_LEGACY_ERROR_TEMP_3056" : {
"message" : [
"Unexpected row-level read relations (allow multiple = <allowMultipleReads>): <other>"
Expand Down Expand Up @@ -8274,11 +8279,6 @@
"Partitions truncate is not supported"
]
},
"_LEGACY_ERROR_TEMP_3146" : {
"message" : [
"Cannot find a compatible ScalarFunction#produceResult"
]
},
"_LEGACY_ERROR_TEMP_3147" : {
"message" : [
"<description>: Batch scan are not supported"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public interface ScalarFunction<R> extends BoundFunction {
* @return a result value
*/
default R produceResult(InternalRow input) {
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3146");
throw new SparkUnsupportedOperationException("SCALAR_FUNCTION_NOT_COMPATIBLE");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier}
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
import org.apache.spark.sql.connector.expressions.{BucketTransform, Expression => V2Expression, FieldReference, IdentityTransform, Literal => V2Literal, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -182,8 +183,8 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
ApplyFunctionExpression(scalarFunc, arguments)
case _ =>
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3055",
messageParameters = Map("scalarFunc" -> scalarFunc.name()))
errorClass = "SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED",
messageParameters = Map("scalarFunc" -> toSQLId(scalarFunc.name())))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,8 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
new JavaStrLen(new JavaStrLenNoImpl))
checkError(
exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()),
condition = "_LEGACY_ERROR_TEMP_3055",
parameters = Map("scalarFunc" -> "strlen"),
condition = "SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED",
parameters = Map("scalarFunc" -> "`strlen`"),
context = ExpectedContext(
fragment = "testcat.ns.strlen('abc')",
start = 7,
Expand Down Expand Up @@ -448,8 +448,8 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new JavaLongAddMismatchMagic))
checkError(
exception = intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 2L)").collect()),
condition = "_LEGACY_ERROR_TEMP_3055",
parameters = Map("scalarFunc" -> "long_add_mismatch_magic"),
condition = "SCALAR_FUNCTION_NOT_FULLY_IMPLEMENTED",
parameters = Map("scalarFunc" -> "`long_add_mismatch_magic`"),
context = ExpectedContext(
fragment = "testcat.ns.add(1L, 2L)",
start = 7,
Expand All @@ -458,6 +458,23 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
)
}

test("SPARK-49549: scalar function w/ mismatch a compatible ScalarFunction#produceResult") {
case object CharLength extends ScalarFunction[Int] {
override def inputTypes(): Array[DataType] = Array(StringType)
override def resultType(): DataType = IntegerType
override def name(): String = "CHAR_LENGTH"
}

catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
addFunction(Identifier.of(Array("ns"), "my_strlen"), StrLen(CharLength))
checkError(
exception = intercept[SparkUnsupportedOperationException]
(sql("SELECT testcat.ns.my_strlen('abc')").collect()),
condition = "SCALAR_FUNCTION_NOT_COMPATIBLE",
parameters = Map()
)
}

test("SPARK-35390: scalar function w/ type coercion") {
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new JavaLongAddDefault(false)))
Expand Down

0 comments on commit 943845a

Please sign in to comment.