From 2fa1bb0b63f47967b7b6d9b3a16f9f4210e86409 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 12:51:05 +0100 Subject: [PATCH 01/16] (cherry picked from commit 955ba829779010d43b9f37ec438f0c8eaea76e0e) --- .../test/scala/frameless/EncoderTests.scala | 16 +++++ .../src/test/scala/frameless/package.scala | 67 ++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/dataset/src/test/scala/frameless/EncoderTests.scala b/dataset/src/test/scala/frameless/EncoderTests.scala index 4ebf5d93..494ec112 100644 --- a/dataset/src/test/scala/frameless/EncoderTests.scala +++ b/dataset/src/test/scala/frameless/EncoderTests.scala @@ -10,6 +10,8 @@ object EncoderTests { case class InstantRow(i: java.time.Instant) case class DurationRow(d: java.time.Duration) case class PeriodRow(p: java.time.Period) + + case class VectorOfObject(a: Vector[X1[Int]]) } class EncoderTests extends TypedDatasetSuite with Matchers { @@ -32,4 +34,18 @@ class EncoderTests extends TypedDatasetSuite with Matchers { test("It should encode java.time.Period") { implicitly[TypedEncoder[PeriodRow]] } + + test("It should encode a Vector of Objects") { + forceInterpreted { + implicit val e = implicitly[TypedEncoder[VectorOfObject]] + implicit val te = TypedExpressionEncoder[VectorOfObject] + implicit val xe = implicitly[TypedEncoder[X1[VectorOfObject]]] + implicit val xte = TypedExpressionEncoder[X1[VectorOfObject]] + val v = (1 to 20).map(X1(_)).toVector + val ds = { + sqlContext.createDataset(Seq(X1[VectorOfObject](VectorOfObject(v)))) + } + ds.head.a.a shouldBe v + } + } } diff --git a/dataset/src/test/scala/frameless/package.scala b/dataset/src/test/scala/frameless/package.scala index 82ff375c..ab870c68 100644 --- a/dataset/src/test/scala/frameless/package.scala +++ b/dataset/src/test/scala/frameless/package.scala @@ -1,6 +1,8 @@ +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.internal.SQLConf + import java.time.format.DateTimeFormatter import java.time.{LocalDateTime => JavaLocalDateTime} - import org.scalacheck.{Arbitrary, Gen} package object frameless { @@ -35,6 +37,14 @@ package object frameless { def vectorGen[A: Arbitrary]: Gen[Vector[A]] = arbVector[A].arbitrary + implicit def arbSeq[A]( + implicit + A: Arbitrary[A] + ): Arbitrary[scala.collection.Seq[A]] = + Arbitrary(Gen.listOf(A.arbitrary).map(_.toVector.toSeq)) + + def seqGen[A: Arbitrary]: Gen[scala.collection.Seq[A]] = arbSeq[A].arbitrary + implicit val arbUdtEncodedClass: Arbitrary[UdtEncodedClass] = Arbitrary { for { int <- Arbitrary.arbitrary[Int] @@ -139,4 +149,59 @@ package object frameless { } res } + + // from Quality, which is from Spark test versions + + // if this blows then debug on CodeGenerator 1294, 1299 and grab code.body + def forceCodeGen[T](f: => T): T = { + val codegenMode = CodegenObjectFactoryMode.CODEGEN_ONLY.toString + + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenMode) { + f + } + } + + def forceInterpreted[T](f: => T): T = { + val codegenMode = CodegenObjectFactoryMode.NO_CODEGEN.toString + + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenMode) { + f + } + } + + /** + * runs the same test with both eval and codegen, then does the same again using resolveWith + * + * @param f + * @tparam T + * @return + */ + def evalCodeGens[T](f: => T): (T, T) = + (forceCodeGen(f), forceInterpreted(f)) + + /** + * Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL + * configurations. + */ + protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = { + val conf = SQLConf.get + val (keys, values) = pairs.unzip + val currentValues = keys.map { key => + if (conf.contains(key)) { + Some(conf.getConfString(key)) + } else { + None + } + } + (keys, values).zipped.foreach { (k, v) => + conf.setConfString(k, v) + } + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => conf.setConfString(key, value) + case (key, None) => conf.unsetConf(key) + } + } + } + } From ee388043e39dd3e9b9afd0021f27afc73f0e067c Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 13:30:58 +0100 Subject: [PATCH 02/16] #804 - starter fix, set needed (cherry picked from commit caed43956187a52f1a37d8192b70bd9ecf297a1f) --- .../scala/frameless/CollectionCaster.scala | 24 +++++++++++++++++++ .../main/scala/frameless/TypedEncoder.scala | 23 ++++++++++++++++-- 2 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 dataset/src/main/scala/frameless/CollectionCaster.scala diff --git a/dataset/src/main/scala/frameless/CollectionCaster.scala b/dataset/src/main/scala/frameless/CollectionCaster.scala new file mode 100644 index 00000000..8eb712a2 --- /dev/null +++ b/dataset/src/main/scala/frameless/CollectionCaster.scala @@ -0,0 +1,24 @@ +package frameless + +import frameless.TypedEncoder.SeqConversion +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.types.DataType + +case class CollectionCaster[C[_]](child: Expression, conversion: SeqConversion[C]) extends UnaryExpression with CodegenFallback { + protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild) + + override def eval(input: InternalRow): Any = { + val o = child.eval(input).asInstanceOf[Object] + o match { + case seq: scala.collection.Seq[_] => + conversion.convertSeq(seq) + case set: scala.collection.Set[_] => + o + case _ => o + } + } + + override def dataType: DataType = child.dataType +} diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index b42b026e..ee762903 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -501,10 +501,27 @@ object TypedEncoder { override def toString: String = s"arrayEncoder($jvmRepr)" } + trait SeqConversion[C[_]] extends Serializable { + def convertSeq[Y](c: Seq[Y]): C[Y] + } + + object SeqConversion { + implicit val seqToSeq = new SeqConversion[Seq] { + override def convertSeq[Y](c: Seq[Y]): Seq[Y] = c + } + implicit val seqToVector = new SeqConversion[Vector] { + override def convertSeq[Y](c: Seq[Y]): Vector[Y] = c.toVector + } + implicit val seqToList = new SeqConversion[List] { + override def convertSeq[Y](c: Seq[Y]): List[Y] = c.toList + } + } + implicit def collectionEncoder[C[X] <: Seq[X], T]( implicit i0: Lazy[RecordFieldEncoder[T]], - i1: ClassTag[C[T]] + i1: ClassTag[C[T]], + i2: SeqConversion[C] ): TypedEncoder[C[T]] = new TypedEncoder[C[T]] { private lazy val encodeT = i0.value.encoder @@ -526,13 +543,15 @@ object TypedEncoder { } def fromCatalyst(path: Expression): Expression = - MapObjects( + CollectionCaster( + MapObjects( i0.value.fromCatalyst, path, encodeT.catalystRepr, encodeT.nullable, Some(i1.runtimeClass) // This will cause MapObjects to build a collection of type C[_] directly ) + , implicitly[SeqConversion[C]]) override def toString: String = s"collectionEncoder($jvmRepr)" } From fb1c109165ab145cb6f7b12c779f3512c0b39de1 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 19:34:11 +0100 Subject: [PATCH 03/16] #804 - encoding for Set derivatives as well - test build --- .../scala/frameless/CollectionCaster.scala | 44 +++++++-- .../main/scala/frameless/TypedEncoder.scala | 91 ++++++++++++------- .../test/scala/frameless/EncoderTests.scala | 21 ++++- 3 files changed, 112 insertions(+), 44 deletions(-) diff --git a/dataset/src/main/scala/frameless/CollectionCaster.scala b/dataset/src/main/scala/frameless/CollectionCaster.scala index 8eb712a2..55e7ca7d 100644 --- a/dataset/src/main/scala/frameless/CollectionCaster.scala +++ b/dataset/src/main/scala/frameless/CollectionCaster.scala @@ -1,24 +1,52 @@ package frameless -import frameless.TypedEncoder.SeqConversion +import frameless.TypedEncoder.CollectionConversion import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, ObjectType} -case class CollectionCaster[C[_]](child: Expression, conversion: SeqConversion[C]) extends UnaryExpression with CodegenFallback { +case class CollectionCaster[F[_],C[_],Y](child: Expression, conversion: CollectionConversion[F,C,Y]) extends UnaryExpression with CodegenFallback { protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild) override def eval(input: InternalRow): Any = { val o = child.eval(input).asInstanceOf[Object] o match { - case seq: scala.collection.Seq[_] => - conversion.convertSeq(seq) - case set: scala.collection.Set[_] => - o + case col: F[Y] @unchecked => + conversion.convert(col) case _ => o } } override def dataType: DataType = child.dataType } + +case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression) extends UnaryExpression { + protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild) + + // eval on interpreted works, fallback on codegen does not, e.g. with ColumnTests.asCol and Vectors, the code generated still has child of type Vector but child eval returns X2, which is not good + override def eval(input: InternalRow): Any = { + val o = child.eval(input).asInstanceOf[Object] + o match { + case col: Set[Y]@unchecked => + col.toSeq + case _ => o + } + } + + def toSeqOr[T](isSet: => T, or: => T): T = + child.dataType match { + case ObjectType(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) => + isSet + case t => or + } + + override def dataType: DataType = + toSeqOr(ObjectType(classOf[scala.collection.Seq[_]]), child.dataType) + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = + defineCodeGen(ctx, ev, c => + toSeqOr(s"$c.toSeq()", s"$c") + ) + +} \ No newline at end of file diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index ee762903..f6f9b2e0 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -1,31 +1,24 @@ package frameless import java.math.BigInteger - import java.util.Date - -import java.time.{ Duration, Instant, Period, LocalDate } - +import java.time.{Duration, Instant, LocalDate, Period} import java.sql.Timestamp - import scala.reflect.ClassTag - import org.apache.spark.sql.FramelessInternals import org.apache.spark.sql.FramelessInternals.UserDefinedType -import org.apache.spark.sql.{ reflection => ScalaReflection } +import org.apache.spark.sql.{reflection => ScalaReflection} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects._ -import org.apache.spark.sql.catalyst.util.{ - ArrayBasedMapData, - DateTimeUtils, - GenericArrayData -} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String - import shapeless._ import shapeless.ops.hlist.IsHCons +import scala.collection.generic.CanBuildFrom +import scala.collection.immutable.TreeSet + abstract class TypedEncoder[T]( implicit val classTag: ClassTag[T]) @@ -501,27 +494,57 @@ object TypedEncoder { override def toString: String = s"arrayEncoder($jvmRepr)" } - trait SeqConversion[C[_]] extends Serializable { - def convertSeq[Y](c: Seq[Y]): C[Y] + /** + * Per #804 - when MapObjects is used in interpreted mode the type returned is Seq, not the derived type used in compilation + * + * This type class offers extensible conversion for more specific types. By default Seq, List and Vector are supported. + * + * @tparam C + */ + trait CollectionConversion[F[_], C[_], Y] extends Serializable { + def convert(c: F[Y]): C[Y] } - object SeqConversion { - implicit val seqToSeq = new SeqConversion[Seq] { - override def convertSeq[Y](c: Seq[Y]): Seq[Y] = c + object CollectionConversion { + implicit def seqToSeq[Y](implicit cbf: CanBuildFrom[Nothing, Y, Seq[Y]]) = new CollectionConversion[Seq, Seq, Y] { + override def convert(c: Seq[Y]): Seq[Y] = c + } + implicit def seqToVector[Y](implicit cbf: CanBuildFrom[Nothing, Y, Vector[Y]]) = new CollectionConversion[Seq, Vector, Y] { + override def convert(c: Seq[Y]): Vector[Y] = c.toVector + } + implicit def seqToList[Y](implicit cbf: CanBuildFrom[Nothing, Y, List[Y]]) = new CollectionConversion[Seq, List, Y] { + override def convert(c: Seq[Y]): List[Y] = c.toList } - implicit val seqToVector = new SeqConversion[Vector] { - override def convertSeq[Y](c: Seq[Y]): Vector[Y] = c.toVector + implicit def setToSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, Set[Y]]) = new CollectionConversion[Set, Set, Y] { + override def convert(c: Set[Y]): Set[Y] = c } - implicit val seqToList = new SeqConversion[List] { - override def convertSeq[Y](c: Seq[Y]): List[Y] = c.toList + implicit def setToTreeSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, TreeSet[Y]]) = new CollectionConversion[Set, TreeSet, Y] { + override def convert(c: Set[Y]): TreeSet[Y] = c.to[TreeSet] } } - implicit def collectionEncoder[C[X] <: Seq[X], T]( + implicit def seqEncoder[C[X] <: Seq[X], T]( + implicit + i0: Lazy[RecordFieldEncoder[T]], + i1: ClassTag[C[T]], + i2: CollectionConversion[Seq, C, T], + i3: CanBuildFrom[Nothing, T, C[T]] + ) = collectionEncoder[Seq, C, T] + + implicit def setEncoder[C[X] <: Set[X], T]( + implicit + i0: Lazy[RecordFieldEncoder[T]], + i1: ClassTag[C[T]], + i2: CollectionConversion[Set, C, T], + i3: CanBuildFrom[Nothing, T, C[T]] + ) = collectionEncoder[Set, C, T] + + def collectionEncoder[O[_], C[X], T]( implicit i0: Lazy[RecordFieldEncoder[T]], i1: ClassTag[C[T]], - i2: SeqConversion[C] + i2: CollectionConversion[O, C, T], + i3: CanBuildFrom[Nothing, T, C[T]] ): TypedEncoder[C[T]] = new TypedEncoder[C[T]] { private lazy val encodeT = i0.value.encoder @@ -538,20 +561,20 @@ object TypedEncoder { if (ScalaReflection.isNativeType(enc.jvmRepr)) { NewInstance(classOf[GenericArrayData], path :: Nil, catalystRepr) } else { - MapObjects(enc.toCatalyst, path, enc.jvmRepr, encodeT.nullable) + // converts to Seq, both Set and Seq handling must convert to Seq first + MapObjects(enc.toCatalyst, SeqCaster(path), enc.jvmRepr, encodeT.nullable) } } def fromCatalyst(path: Expression): Expression = - CollectionCaster( + CollectionCaster[O, C, T]( MapObjects( i0.value.fromCatalyst, path, encodeT.catalystRepr, encodeT.nullable, - Some(i1.runtimeClass) // This will cause MapObjects to build a collection of type C[_] directly - ) - , implicitly[SeqConversion[C]]) + Some(i1.runtimeClass) // This will cause MapObjects to build a collection of type C[_] directly when compiling + ), implicitly[CollectionConversion[O,C,T]]) // This will convert Seq to the appropriate C[_] when eval'ing. override def toString: String = s"collectionEncoder($jvmRepr)" } @@ -561,16 +584,18 @@ object TypedEncoder { * @param i2 implicit `ClassTag[Set[T]]` to provide runtime information about the set type. * @tparam T the element type of the set. * @return a `TypedEncoder` instance for `Set[T]`. - */ - implicit def setEncoder[T]( + + implicit def setEncoder[C[X] <: Seq[X], T]( implicit i1: shapeless.Lazy[RecordFieldEncoder[T]], - i2: ClassTag[Set[T]] + i2: ClassTag[Set[T]], + i3: CollectionConversion[Set, C, T], + i4: CanBuildFrom[Nothing, T, C[T]] ): TypedEncoder[Set[T]] = { implicit val inj: Injection[Set[T], Seq[T]] = Injection(_.toSeq, _.toSet) TypedEncoder.usingInjection - } + }*/ /** * @tparam A the key type diff --git a/dataset/src/test/scala/frameless/EncoderTests.scala b/dataset/src/test/scala/frameless/EncoderTests.scala index 494ec112..fe144281 100644 --- a/dataset/src/test/scala/frameless/EncoderTests.scala +++ b/dataset/src/test/scala/frameless/EncoderTests.scala @@ -1,7 +1,6 @@ package frameless -import scala.collection.immutable.Set - +import scala.collection.immutable.{Set, TreeSet} import org.scalatest.matchers.should.Matchers object EncoderTests { @@ -12,6 +11,8 @@ object EncoderTests { case class PeriodRow(p: java.time.Period) case class VectorOfObject(a: Vector[X1[Int]]) + + case class TreeSetOfObjects(a: TreeSet[X1[Int]]) } class EncoderTests extends TypedDatasetSuite with Matchers { @@ -36,7 +37,7 @@ class EncoderTests extends TypedDatasetSuite with Matchers { } test("It should encode a Vector of Objects") { - forceInterpreted { + evalCodeGens { implicit val e = implicitly[TypedEncoder[VectorOfObject]] implicit val te = TypedExpressionEncoder[VectorOfObject] implicit val xe = implicitly[TypedEncoder[X1[VectorOfObject]]] @@ -48,4 +49,18 @@ class EncoderTests extends TypedDatasetSuite with Matchers { ds.head.a.a shouldBe v } } + + test("It should encode a TreeSet of Objects") { + evalCodeGens { + implicit val e = implicitly[TypedEncoder[TreeSetOfObjects]] + implicit val te = TypedExpressionEncoder[TreeSetOfObjects] + implicit val xe = implicitly[TypedEncoder[X1[TreeSetOfObjects]]] + implicit val xte = TypedExpressionEncoder[X1[TreeSetOfObjects]] + val v = (1 to 20).map(X1(_)).to[TreeSet] + val ds = { + sqlContext.createDataset(Seq(X1[TreeSetOfObjects](TreeSetOfObjects(v)))) + } + ds.head.a.a shouldBe v + } + } } From ae8b69a585fa89aba8a673df4aaed819934b9913 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 19:40:07 +0100 Subject: [PATCH 04/16] #804 - encoding for Set derivatives as well - test build --- dataset/src/main/scala/frameless/TypedEncoder.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index f6f9b2e0..312bd019 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -17,7 +17,8 @@ import shapeless._ import shapeless.ops.hlist.IsHCons import scala.collection.generic.CanBuildFrom -import scala.collection.immutable.TreeSet +import scala.collection.immutable.HashSet.HashTrieSet +import scala.collection.immutable.{ListSet, TreeSet} abstract class TypedEncoder[T]( implicit @@ -497,7 +498,7 @@ object TypedEncoder { /** * Per #804 - when MapObjects is used in interpreted mode the type returned is Seq, not the derived type used in compilation * - * This type class offers extensible conversion for more specific types. By default Seq, List and Vector are supported. + * This type class offers extensible conversion for more specific types. By default Seq, List and Vector for Seq's and Set, TreeSet and HashTrieSet are supported. * * @tparam C */ @@ -521,6 +522,12 @@ object TypedEncoder { implicit def setToTreeSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, TreeSet[Y]]) = new CollectionConversion[Set, TreeSet, Y] { override def convert(c: Set[Y]): TreeSet[Y] = c.to[TreeSet] } + implicit def setToListSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, ListSet[Y]]) = new CollectionConversion[Set, ListSet, Y] { + override def convert(c: Set[Y]): ListSet[Y] = c.to[ListSet] + } + implicit def setToTrieSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, HashTrieSet[Y]]) = new CollectionConversion[Set, HashTrieSet, Y] { + override def convert(c: Set[Y]): HashTrieSet[Y] = c.to[HashTrieSet] + } } implicit def seqEncoder[C[X] <: Seq[X], T]( From 0435c3a0e728beaea5e7fde0b71f334168d30131 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 19:46:34 +0100 Subject: [PATCH 05/16] #804 - encoding for Set derivatives as well - test build --- .../scala/frameless/CollectionCaster.scala | 43 ++++-- .../main/scala/frameless/TypedEncoder.scala | 124 ++++++++++++------ .../test/scala/frameless/EncoderTests.scala | 2 +- .../src/test/scala/frameless/package.scala | 56 ++++---- 4 files changed, 145 insertions(+), 80 deletions(-) diff --git a/dataset/src/main/scala/frameless/CollectionCaster.scala b/dataset/src/main/scala/frameless/CollectionCaster.scala index 55e7ca7d..d253901a 100644 --- a/dataset/src/main/scala/frameless/CollectionCaster.scala +++ b/dataset/src/main/scala/frameless/CollectionCaster.scala @@ -2,12 +2,22 @@ package frameless import frameless.TypedEncoder.CollectionConversion import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} -import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} -import org.apache.spark.sql.types.{DataType, ObjectType} +import org.apache.spark.sql.catalyst.expressions.codegen.{ + CodegenContext, + CodegenFallback, + ExprCode +} +import org.apache.spark.sql.catalyst.expressions.{ Expression, UnaryExpression } +import org.apache.spark.sql.types.{ DataType, ObjectType } + +case class CollectionCaster[F[_], C[_], Y]( + child: Expression, + conversion: CollectionConversion[F, C, Y]) + extends UnaryExpression + with CodegenFallback { -case class CollectionCaster[F[_],C[_],Y](child: Expression, conversion: CollectionConversion[F,C,Y]) extends UnaryExpression with CodegenFallback { - protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild) + protected def withNewChildInternal(newChild: Expression): Expression = + copy(child = newChild) override def eval(input: InternalRow): Any = { val o = child.eval(input).asInstanceOf[Object] @@ -21,14 +31,17 @@ case class CollectionCaster[F[_],C[_],Y](child: Expression, conversion: Collecti override def dataType: DataType = child.dataType } -case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression) extends UnaryExpression { - protected def withNewChildInternal(newChild: Expression): Expression = copy(child = newChild) +case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression) + extends UnaryExpression { + + protected def withNewChildInternal(newChild: Expression): Expression = + copy(child = newChild) // eval on interpreted works, fallback on codegen does not, e.g. with ColumnTests.asCol and Vectors, the code generated still has child of type Vector but child eval returns X2, which is not good override def eval(input: InternalRow): Any = { val o = child.eval(input).asInstanceOf[Object] o match { - case col: Set[Y]@unchecked => + case col: Set[Y] @unchecked => col.toSeq case _ => o } @@ -36,7 +49,8 @@ case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression) extends UnaryExp def toSeqOr[T](isSet: => T, or: => T): T = child.dataType match { - case ObjectType(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) => + case ObjectType(cls) + if classOf[scala.collection.Set[_]].isAssignableFrom(cls) => isSet case t => or } @@ -44,9 +58,10 @@ case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression) extends UnaryExp override def dataType: DataType = toSeqOr(ObjectType(classOf[scala.collection.Seq[_]]), child.dataType) - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = - defineCodeGen(ctx, ev, c => - toSeqOr(s"$c.toSeq()", s"$c") - ) + override protected def doGenCode( + ctx: CodegenContext, + ev: ExprCode + ): ExprCode = + defineCodeGen(ctx, ev, c => toSeqOr(s"$c.toSeq()", s"$c")) -} \ No newline at end of file +} diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index 312bd019..864d6710 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -2,15 +2,19 @@ package frameless import java.math.BigInteger import java.util.Date -import java.time.{Duration, Instant, LocalDate, Period} +import java.time.{ Duration, Instant, LocalDate, Period } import java.sql.Timestamp import scala.reflect.ClassTag import org.apache.spark.sql.FramelessInternals import org.apache.spark.sql.FramelessInternals.UserDefinedType -import org.apache.spark.sql.{reflection => ScalaReflection} +import org.apache.spark.sql.{ reflection => ScalaReflection } import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ + ArrayBasedMapData, + DateTimeUtils, + GenericArrayData +} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import shapeless._ @@ -18,7 +22,7 @@ import shapeless.ops.hlist.IsHCons import scala.collection.generic.CanBuildFrom import scala.collection.immutable.HashSet.HashTrieSet -import scala.collection.immutable.{ListSet, TreeSet} +import scala.collection.immutable.{ ListSet, TreeSet } abstract class TypedEncoder[T]( implicit @@ -507,44 +511,72 @@ object TypedEncoder { } object CollectionConversion { - implicit def seqToSeq[Y](implicit cbf: CanBuildFrom[Nothing, Y, Seq[Y]]) = new CollectionConversion[Seq, Seq, Y] { + + implicit def seqToSeq[Y]( + implicit + cbf: CanBuildFrom[Nothing, Y, Seq[Y]] + ) = new CollectionConversion[Seq, Seq, Y] { override def convert(c: Seq[Y]): Seq[Y] = c } - implicit def seqToVector[Y](implicit cbf: CanBuildFrom[Nothing, Y, Vector[Y]]) = new CollectionConversion[Seq, Vector, Y] { + + implicit def seqToVector[Y]( + implicit + cbf: CanBuildFrom[Nothing, Y, Vector[Y]] + ) = new CollectionConversion[Seq, Vector, Y] { override def convert(c: Seq[Y]): Vector[Y] = c.toVector } - implicit def seqToList[Y](implicit cbf: CanBuildFrom[Nothing, Y, List[Y]]) = new CollectionConversion[Seq, List, Y] { + + implicit def seqToList[Y]( + implicit + cbf: CanBuildFrom[Nothing, Y, List[Y]] + ) = new CollectionConversion[Seq, List, Y] { override def convert(c: Seq[Y]): List[Y] = c.toList } - implicit def setToSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, Set[Y]]) = new CollectionConversion[Set, Set, Y] { + + implicit def setToSet[Y]( + implicit + cbf: CanBuildFrom[Nothing, Y, Set[Y]] + ) = new CollectionConversion[Set, Set, Y] { override def convert(c: Set[Y]): Set[Y] = c } - implicit def setToTreeSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, TreeSet[Y]]) = new CollectionConversion[Set, TreeSet, Y] { + + implicit def setToTreeSet[Y]( + implicit + cbf: CanBuildFrom[Nothing, Y, TreeSet[Y]] + ) = new CollectionConversion[Set, TreeSet, Y] { override def convert(c: Set[Y]): TreeSet[Y] = c.to[TreeSet] } - implicit def setToListSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, ListSet[Y]]) = new CollectionConversion[Set, ListSet, Y] { + + implicit def setToListSet[Y]( + implicit + cbf: CanBuildFrom[Nothing, Y, ListSet[Y]] + ) = new CollectionConversion[Set, ListSet, Y] { override def convert(c: Set[Y]): ListSet[Y] = c.to[ListSet] } - implicit def setToTrieSet[Y](implicit cbf: CanBuildFrom[Nothing, Y, HashTrieSet[Y]]) = new CollectionConversion[Set, HashTrieSet, Y] { + + implicit def setToTrieSet[Y]( + implicit + cbf: CanBuildFrom[Nothing, Y, HashTrieSet[Y]] + ) = new CollectionConversion[Set, HashTrieSet, Y] { override def convert(c: Set[Y]): HashTrieSet[Y] = c.to[HashTrieSet] } } implicit def seqEncoder[C[X] <: Seq[X], T]( - implicit - i0: Lazy[RecordFieldEncoder[T]], - i1: ClassTag[C[T]], - i2: CollectionConversion[Seq, C, T], - i3: CanBuildFrom[Nothing, T, C[T]] - ) = collectionEncoder[Seq, C, T] + implicit + i0: Lazy[RecordFieldEncoder[T]], + i1: ClassTag[C[T]], + i2: CollectionConversion[Seq, C, T], + i3: CanBuildFrom[Nothing, T, C[T]] + ) = collectionEncoder[Seq, C, T] implicit def setEncoder[C[X] <: Set[X], T]( - implicit - i0: Lazy[RecordFieldEncoder[T]], - i1: ClassTag[C[T]], - i2: CollectionConversion[Set, C, T], - i3: CanBuildFrom[Nothing, T, C[T]] - ) = collectionEncoder[Set, C, T] + implicit + i0: Lazy[RecordFieldEncoder[T]], + i1: ClassTag[C[T]], + i2: CollectionConversion[Set, C, T], + i3: CanBuildFrom[Nothing, T, C[T]] + ) = collectionEncoder[Set, C, T] def collectionEncoder[O[_], C[X], T]( implicit @@ -569,19 +601,26 @@ object TypedEncoder { NewInstance(classOf[GenericArrayData], path :: Nil, catalystRepr) } else { // converts to Seq, both Set and Seq handling must convert to Seq first - MapObjects(enc.toCatalyst, SeqCaster(path), enc.jvmRepr, encodeT.nullable) + MapObjects( + enc.toCatalyst, + SeqCaster(path), + enc.jvmRepr, + encodeT.nullable + ) } } def fromCatalyst(path: Expression): Expression = CollectionCaster[O, C, T]( MapObjects( - i0.value.fromCatalyst, - path, - encodeT.catalystRepr, - encodeT.nullable, - Some(i1.runtimeClass) // This will cause MapObjects to build a collection of type C[_] directly when compiling - ), implicitly[CollectionConversion[O,C,T]]) // This will convert Seq to the appropriate C[_] when eval'ing. + i0.value.fromCatalyst, + path, + encodeT.catalystRepr, + encodeT.nullable, + Some(i1.runtimeClass) // This will cause MapObjects to build a collection of type C[_] directly when compiling + ), + implicitly[CollectionConversion[O, C, T]] + ) // This will convert Seq to the appropriate C[_] when eval'ing. override def toString: String = s"collectionEncoder($jvmRepr)" } @@ -591,18 +630,19 @@ object TypedEncoder { * @param i2 implicit `ClassTag[Set[T]]` to provide runtime information about the set type. * @tparam T the element type of the set. * @return a `TypedEncoder` instance for `Set[T]`. - - implicit def setEncoder[C[X] <: Seq[X], T]( - implicit - i1: shapeless.Lazy[RecordFieldEncoder[T]], - i2: ClassTag[Set[T]], - i3: CollectionConversion[Set, C, T], - i4: CanBuildFrom[Nothing, T, C[T]] - ): TypedEncoder[Set[T]] = { - implicit val inj: Injection[Set[T], Seq[T]] = Injection(_.toSeq, _.toSet) - - TypedEncoder.usingInjection - }*/ + * + * implicit def setEncoder[C[X] <: Seq[X], T]( + * implicit + * i1: shapeless.Lazy[RecordFieldEncoder[T]], + * i2: ClassTag[Set[T]], + * i3: CollectionConversion[Set, C, T], + * i4: CanBuildFrom[Nothing, T, C[T]] + * ): TypedEncoder[Set[T]] = { + * implicit val inj: Injection[Set[T], Seq[T]] = Injection(_.toSeq, _.toSet) + * + * TypedEncoder.usingInjection + * } + */ /** * @tparam A the key type diff --git a/dataset/src/test/scala/frameless/EncoderTests.scala b/dataset/src/test/scala/frameless/EncoderTests.scala index fe144281..b2f57876 100644 --- a/dataset/src/test/scala/frameless/EncoderTests.scala +++ b/dataset/src/test/scala/frameless/EncoderTests.scala @@ -1,6 +1,6 @@ package frameless -import scala.collection.immutable.{Set, TreeSet} +import scala.collection.immutable.{ Set, TreeSet } import org.scalatest.matchers.should.Matchers object EncoderTests { diff --git a/dataset/src/test/scala/frameless/package.scala b/dataset/src/test/scala/frameless/package.scala index ab870c68..527a4b9a 100644 --- a/dataset/src/test/scala/frameless/package.scala +++ b/dataset/src/test/scala/frameless/package.scala @@ -2,10 +2,11 @@ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.internal.SQLConf import java.time.format.DateTimeFormatter -import java.time.{LocalDateTime => JavaLocalDateTime} -import org.scalacheck.{Arbitrary, Gen} +import java.time.{ LocalDateTime => JavaLocalDateTime } +import org.scalacheck.{ Arbitrary, Gen } package object frameless { + /** Fixed decimal point to avoid precision problems specific to Spark */ implicit val arbBigDecimal: Arbitrary[BigDecimal] = Arbitrary { for { @@ -32,15 +33,18 @@ package object frameless { } // see issue with scalacheck non serializable Vector: https://github.com/rickynils/scalacheck/issues/315 - implicit def arbVector[A](implicit A: Arbitrary[A]): Arbitrary[Vector[A]] = + implicit def arbVector[A]( + implicit + A: Arbitrary[A] + ): Arbitrary[Vector[A]] = Arbitrary(Gen.listOf(A.arbitrary).map(_.toVector)) def vectorGen[A: Arbitrary]: Gen[Vector[A]] = arbVector[A].arbitrary implicit def arbSeq[A]( - implicit - A: Arbitrary[A] - ): Arbitrary[scala.collection.Seq[A]] = + implicit + A: Arbitrary[A] + ): Arbitrary[scala.collection.Seq[A]] = Arbitrary(Gen.listOf(A.arbitrary).map(_.toVector.toSeq)) def seqGen[A: Arbitrary]: Gen[scala.collection.Seq[A]] = arbSeq[A].arbitrary @@ -52,7 +56,8 @@ package object frameless { } yield new UdtEncodedClass(int, doubles.toArray) } - val dateTimeFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm") + val dateTimeFormatter: DateTimeFormatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm") implicit val localDateArb: Arbitrary[JavaLocalDateTime] = Arbitrary { for { @@ -82,11 +87,10 @@ package object frameless { def anyCauseHas(t: Throwable, f: Throwable => Boolean): Boolean = if (f(t)) true + else if (t.getCause ne null) + anyCauseHas(t.getCause, f) else - if (t.getCause ne null) - anyCauseHas(t.getCause, f) - else - false + false /** * Runs up to maxRuns and outputs the number of failures (times thrown) @@ -95,11 +99,11 @@ package object frameless { * @tparam T * @return the last passing thunk, or null */ - def runLoads[T](maxRuns: Int = 1000)(thunk: => T): T ={ + def runLoads[T](maxRuns: Int = 1000)(thunk: => T): T = { var i = 0 var r = null.asInstanceOf[T] var passed = 0 - while(i < maxRuns){ + while (i < maxRuns) { i += 1 try { r = thunk @@ -108,29 +112,36 @@ package object frameless { println(s"run $i successful") } } catch { - case t: Throwable => System.err.println(s"failed unexpectedly on run $i - ${t.getMessage}") + case t: Throwable => + System.err.println(s"failed unexpectedly on run $i - ${t.getMessage}") } } if (passed != maxRuns) { - System.err.println(s"had ${maxRuns - passed} failures out of $maxRuns runs") + System.err.println( + s"had ${maxRuns - passed} failures out of $maxRuns runs" + ) } r } - /** + /** * Runs a given thunk up to maxRuns times, restarting the thunk if tolerantOf the thrown Throwable is true * @param tolerantOf * @param maxRuns default of 20 * @param thunk * @return either a successful run result or the last error will be thrown */ - def tolerantRun[T](tolerantOf: Throwable => Boolean, maxRuns: Int = 20)(thunk: => T): T ={ + def tolerantRun[T]( + tolerantOf: Throwable => Boolean, + maxRuns: Int = 20 + )(thunk: => T + ): T = { var passed = false var i = 0 var res: T = null.asInstanceOf[T] var thrown: Throwable = null - while((i < maxRuns) && !passed) { + while ((i < maxRuns) && !passed) { try { i += 1 res = thunk @@ -193,13 +204,12 @@ package object frameless { None } } - (keys, values).zipped.foreach { (k, v) => - conf.setConfString(k, v) - } - try f finally { + (keys, values).zipped.foreach { (k, v) => conf.setConfString(k, v) } + try f + finally { keys.zip(currentValues).foreach { case (key, Some(value)) => conf.setConfString(key, value) - case (key, None) => conf.unsetConf(key) + case (key, None) => conf.unsetConf(key) } } } From 52034b234f0926dbf91405525033f09aa93d1cac Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 19:51:26 +0100 Subject: [PATCH 06/16] #804 - encoding for Set derivatives as well - test build, hashtrieset no longer there on 2.13 --- dataset/src/main/scala/frameless/TypedEncoder.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index 864d6710..cd934e62 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -502,7 +502,7 @@ object TypedEncoder { /** * Per #804 - when MapObjects is used in interpreted mode the type returned is Seq, not the derived type used in compilation * - * This type class offers extensible conversion for more specific types. By default Seq, List and Vector for Seq's and Set, TreeSet and HashTrieSet are supported. + * This type class offers extensible conversion for more specific types. By default Seq, List and Vector for Seq's and Set, TreeSet and ListSet are supported. * * @tparam C */ @@ -553,13 +553,6 @@ object TypedEncoder { ) = new CollectionConversion[Set, ListSet, Y] { override def convert(c: Set[Y]): ListSet[Y] = c.to[ListSet] } - - implicit def setToTrieSet[Y]( - implicit - cbf: CanBuildFrom[Nothing, Y, HashTrieSet[Y]] - ) = new CollectionConversion[Set, HashTrieSet, Y] { - override def convert(c: Set[Y]): HashTrieSet[Y] = c.to[HashTrieSet] - } } implicit def seqEncoder[C[X] <: Seq[X], T]( From 9e45d929746bb2e27e7afc28320a64e7ca185dca Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 19:55:28 +0100 Subject: [PATCH 07/16] #804 - encoding for Set derivatives as well - test build, hashtrieset no longer there on 2.13 --- dataset/src/main/scala/frameless/TypedEncoder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index cd934e62..b900662e 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -21,7 +21,7 @@ import shapeless._ import shapeless.ops.hlist.IsHCons import scala.collection.generic.CanBuildFrom -import scala.collection.immutable.HashSet.HashTrieSet + import scala.collection.immutable.{ ListSet, TreeSet } abstract class TypedEncoder[T]( From e7881c0780a0442809ba51e0e8a65f6d3b6c1618 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 21:18:19 +0100 Subject: [PATCH 08/16] #804 - encoding for Set derivatives as well - test build, 2.13 forced changes, compilation issue with toSeq():GenSeq --- .../scala/frameless/CollectionCaster.scala | 2 +- .../main/scala/frameless/TypedEncoder.scala | 61 +++-------------- .../test/scala/frameless/EncoderTests.scala | 67 ++++++++++++------- .../src/test/scala/frameless/package.scala | 2 +- 4 files changed, 54 insertions(+), 78 deletions(-) diff --git a/dataset/src/main/scala/frameless/CollectionCaster.scala b/dataset/src/main/scala/frameless/CollectionCaster.scala index d253901a..bf329992 100644 --- a/dataset/src/main/scala/frameless/CollectionCaster.scala +++ b/dataset/src/main/scala/frameless/CollectionCaster.scala @@ -62,6 +62,6 @@ case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression) ctx: CodegenContext, ev: ExprCode ): ExprCode = - defineCodeGen(ctx, ev, c => toSeqOr(s"$c.toSeq()", s"$c")) + defineCodeGen(ctx, ev, c => toSeqOr(s"$c.toVector()", s"$c")) } diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index b900662e..904c6ff2 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -20,8 +20,6 @@ import org.apache.spark.unsafe.types.UTF8String import shapeless._ import shapeless.ops.hlist.IsHCons -import scala.collection.generic.CanBuildFrom - import scala.collection.immutable.{ ListSet, TreeSet } abstract class TypedEncoder[T]( @@ -512,46 +510,31 @@ object TypedEncoder { object CollectionConversion { - implicit def seqToSeq[Y]( - implicit - cbf: CanBuildFrom[Nothing, Y, Seq[Y]] - ) = new CollectionConversion[Seq, Seq, Y] { + implicit def seqToSeq[Y] = new CollectionConversion[Seq, Seq, Y] { override def convert(c: Seq[Y]): Seq[Y] = c } - implicit def seqToVector[Y]( - implicit - cbf: CanBuildFrom[Nothing, Y, Vector[Y]] - ) = new CollectionConversion[Seq, Vector, Y] { + implicit def seqToVector[Y] = new CollectionConversion[Seq, Vector, Y] { override def convert(c: Seq[Y]): Vector[Y] = c.toVector } - implicit def seqToList[Y]( - implicit - cbf: CanBuildFrom[Nothing, Y, List[Y]] - ) = new CollectionConversion[Seq, List, Y] { + implicit def seqToList[Y] = new CollectionConversion[Seq, List, Y] { override def convert(c: Seq[Y]): List[Y] = c.toList } - implicit def setToSet[Y]( - implicit - cbf: CanBuildFrom[Nothing, Y, Set[Y]] - ) = new CollectionConversion[Set, Set, Y] { + implicit def setToSet[Y] = new CollectionConversion[Set, Set, Y] { override def convert(c: Set[Y]): Set[Y] = c } implicit def setToTreeSet[Y]( implicit - cbf: CanBuildFrom[Nothing, Y, TreeSet[Y]] + ordering: Ordering[Y] ) = new CollectionConversion[Set, TreeSet, Y] { - override def convert(c: Set[Y]): TreeSet[Y] = c.to[TreeSet] + override def convert(c: Set[Y]): TreeSet[Y] = TreeSet.newBuilder.++=(c).result() } - implicit def setToListSet[Y]( - implicit - cbf: CanBuildFrom[Nothing, Y, ListSet[Y]] - ) = new CollectionConversion[Set, ListSet, Y] { - override def convert(c: Set[Y]): ListSet[Y] = c.to[ListSet] + implicit def setToListSet[Y] = new CollectionConversion[Set, ListSet, Y] { + override def convert(c: Set[Y]): ListSet[Y] = ListSet.newBuilder.++=(c).result() } } @@ -559,24 +542,21 @@ object TypedEncoder { implicit i0: Lazy[RecordFieldEncoder[T]], i1: ClassTag[C[T]], - i2: CollectionConversion[Seq, C, T], - i3: CanBuildFrom[Nothing, T, C[T]] + i2: CollectionConversion[Seq, C, T] ) = collectionEncoder[Seq, C, T] implicit def setEncoder[C[X] <: Set[X], T]( implicit i0: Lazy[RecordFieldEncoder[T]], i1: ClassTag[C[T]], - i2: CollectionConversion[Set, C, T], - i3: CanBuildFrom[Nothing, T, C[T]] + i2: CollectionConversion[Set, C, T] ) = collectionEncoder[Set, C, T] def collectionEncoder[O[_], C[X], T]( implicit i0: Lazy[RecordFieldEncoder[T]], i1: ClassTag[C[T]], - i2: CollectionConversion[O, C, T], - i3: CanBuildFrom[Nothing, T, C[T]] + i2: CollectionConversion[O, C, T] ): TypedEncoder[C[T]] = new TypedEncoder[C[T]] { private lazy val encodeT = i0.value.encoder @@ -618,25 +598,6 @@ object TypedEncoder { override def toString: String = s"collectionEncoder($jvmRepr)" } - /** - * @param i1 implicit lazy `RecordFieldEncoder[T]` to encode individual elements of the set. - * @param i2 implicit `ClassTag[Set[T]]` to provide runtime information about the set type. - * @tparam T the element type of the set. - * @return a `TypedEncoder` instance for `Set[T]`. - * - * implicit def setEncoder[C[X] <: Seq[X], T]( - * implicit - * i1: shapeless.Lazy[RecordFieldEncoder[T]], - * i2: ClassTag[Set[T]], - * i3: CollectionConversion[Set, C, T], - * i4: CanBuildFrom[Nothing, T, C[T]] - * ): TypedEncoder[Set[T]] = { - * implicit val inj: Injection[Set[T], Seq[T]] = Injection(_.toSeq, _.toSet) - * - * TypedEncoder.usingInjection - * } - */ - /** * @tparam A the key type * @tparam B the value type diff --git a/dataset/src/test/scala/frameless/EncoderTests.scala b/dataset/src/test/scala/frameless/EncoderTests.scala index b2f57876..b8335270 100644 --- a/dataset/src/test/scala/frameless/EncoderTests.scala +++ b/dataset/src/test/scala/frameless/EncoderTests.scala @@ -1,6 +1,6 @@ package frameless -import scala.collection.immutable.{ Set, TreeSet } +import scala.collection.immutable.{ListSet, Set, TreeSet} import org.scalatest.matchers.should.Matchers object EncoderTests { @@ -10,9 +10,7 @@ object EncoderTests { case class DurationRow(d: java.time.Duration) case class PeriodRow(p: java.time.Period) - case class VectorOfObject(a: Vector[X1[Int]]) - - case class TreeSetOfObjects(a: TreeSet[X1[Int]]) + case class ContainerOf[CC[X] <: Iterable[X]](a: CC[X1[Int]]) } class EncoderTests extends TypedDatasetSuite with Matchers { @@ -36,31 +34,48 @@ class EncoderTests extends TypedDatasetSuite with Matchers { implicitly[TypedEncoder[PeriodRow]] } - test("It should encode a Vector of Objects") { - evalCodeGens { - implicit val e = implicitly[TypedEncoder[VectorOfObject]] - implicit val te = TypedExpressionEncoder[VectorOfObject] - implicit val xe = implicitly[TypedEncoder[X1[VectorOfObject]]] - implicit val xte = TypedExpressionEncoder[X1[VectorOfObject]] - val v = (1 to 20).map(X1(_)).toVector - val ds = { - sqlContext.createDataset(Seq(X1[VectorOfObject](VectorOfObject(v)))) - } - ds.head.a.a shouldBe v + def performCollection[C[X] <: Iterable[X]](toType: Seq[X1[Int]] => C[X1[Int]])(implicit ce: TypedEncoder[C[X1[Int]]]): (Unit,Unit) = evalCodeGens { + + implicit val cte = TypedExpressionEncoder[C[X1[Int]]] + implicit val e = implicitly[TypedEncoder[ContainerOf[C]]] + implicit val te = TypedExpressionEncoder[ContainerOf[C]] + implicit val xe = implicitly[TypedEncoder[X1[ContainerOf[C]]]] + implicit val xte = TypedExpressionEncoder[X1[ContainerOf[C]]] + val v = toType((1 to 20).map(X1(_))) + val ds = { + sqlContext.createDataset(Seq(X1[ContainerOf[C]](ContainerOf[C](v)))) } + ds.head.a.a shouldBe v + () + } + + test("It should serde a Seq of Objects") { + performCollection[Seq](_) } - test("It should encode a TreeSet of Objects") { - evalCodeGens { - implicit val e = implicitly[TypedEncoder[TreeSetOfObjects]] - implicit val te = TypedExpressionEncoder[TreeSetOfObjects] - implicit val xe = implicitly[TypedEncoder[X1[TreeSetOfObjects]]] - implicit val xte = TypedExpressionEncoder[X1[TreeSetOfObjects]] - val v = (1 to 20).map(X1(_)).to[TreeSet] - val ds = { - sqlContext.createDataset(Seq(X1[TreeSetOfObjects](TreeSetOfObjects(v)))) - } - ds.head.a.a shouldBe v + test("It should serde a Set of Objects") { + performCollection[Set](_) + } + + test("It should serde a Vector of Objects") { + performCollection[Vector](_.toVector) + } + + test("It should serde a TreeSet of Objects") { + // only needed for 2.12 + implicit val ordering = new Ordering[X1[Int]] { + val intordering = implicitly[Ordering[Int]] + override def compare(x: X1[Int], y: X1[Int]): Int = intordering.compare(x.a, y.a) } + + performCollection[TreeSet](TreeSet.newBuilder.++=(_).result()) + } + + test("It should serde a List of Objects") { + performCollection[List](_.toList) + } + + test("It should serde a ListSet of Objects") { + performCollection[ListSet](ListSet.newBuilder.++=(_).result()) } } diff --git a/dataset/src/test/scala/frameless/package.scala b/dataset/src/test/scala/frameless/package.scala index 527a4b9a..06b92d99 100644 --- a/dataset/src/test/scala/frameless/package.scala +++ b/dataset/src/test/scala/frameless/package.scala @@ -188,7 +188,7 @@ package object frameless { * @return */ def evalCodeGens[T](f: => T): (T, T) = - (forceCodeGen(f), forceInterpreted(f)) + (forceInterpreted(f), forceCodeGen(f)) /** * Sets all SQL configurations specified in `pairs`, calls `f`, and then restores all SQL From 594fcebd86b26b95606a4600883b8b5196d5d910 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 21:19:03 +0100 Subject: [PATCH 09/16] #804 - encoding for Set derivatives as well - test build, 2.13 forced changes, compilation issue with toSeq():GenSeq --- dataset/src/test/scala/frameless/EncoderTests.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dataset/src/test/scala/frameless/EncoderTests.scala b/dataset/src/test/scala/frameless/EncoderTests.scala index b8335270..7d700f6f 100644 --- a/dataset/src/test/scala/frameless/EncoderTests.scala +++ b/dataset/src/test/scala/frameless/EncoderTests.scala @@ -1,6 +1,6 @@ package frameless -import scala.collection.immutable.{ListSet, Set, TreeSet} +import scala.collection.immutable.{ ListSet, Set, TreeSet } import org.scalatest.matchers.should.Matchers object EncoderTests { @@ -34,7 +34,11 @@ class EncoderTests extends TypedDatasetSuite with Matchers { implicitly[TypedEncoder[PeriodRow]] } - def performCollection[C[X] <: Iterable[X]](toType: Seq[X1[Int]] => C[X1[Int]])(implicit ce: TypedEncoder[C[X1[Int]]]): (Unit,Unit) = evalCodeGens { + def performCollection[C[X] <: Iterable[X]]( + toType: Seq[X1[Int]] => C[X1[Int]] + )(implicit + ce: TypedEncoder[C[X1[Int]]] + ): (Unit, Unit) = evalCodeGens { implicit val cte = TypedExpressionEncoder[C[X1[Int]]] implicit val e = implicitly[TypedEncoder[ContainerOf[C]]] @@ -65,7 +69,8 @@ class EncoderTests extends TypedDatasetSuite with Matchers { // only needed for 2.12 implicit val ordering = new Ordering[X1[Int]] { val intordering = implicitly[Ordering[Int]] - override def compare(x: X1[Int], y: X1[Int]): Int = intordering.compare(x.a, y.a) + override def compare(x: X1[Int], y: X1[Int]): Int = + intordering.compare(x.a, y.a) } performCollection[TreeSet](TreeSet.newBuilder.++=(_).result()) From 5a0197648783e529e4300763beb24c8f1e7cc221 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 21:24:00 +0100 Subject: [PATCH 10/16] #804 - encoding for Set derivatives as well - test build, 2.13 forced changes, compilation issue with toSeq():GenSeq --- dataset/src/main/scala/frameless/TypedEncoder.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index 904c6ff2..8467ecd3 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -530,11 +530,15 @@ object TypedEncoder { implicit ordering: Ordering[Y] ) = new CollectionConversion[Set, TreeSet, Y] { - override def convert(c: Set[Y]): TreeSet[Y] = TreeSet.newBuilder.++=(c).result() + + override def convert(c: Set[Y]): TreeSet[Y] = + TreeSet.newBuilder.++=(c).result() } implicit def setToListSet[Y] = new CollectionConversion[Set, ListSet, Y] { - override def convert(c: Set[Y]): ListSet[Y] = ListSet.newBuilder.++=(c).result() + + override def convert(c: Set[Y]): ListSet[Y] = + ListSet.newBuilder.++=(c).result() } } From 365b21f6e7b906e56eb867c47d0db6cce35f1e83 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 21:38:52 +0100 Subject: [PATCH 11/16] #804 - encoding for Set derivatives as well - test build, 2.13 forced changes, compilation issue with toSeq():GenSeq --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 84653d53..f00b3d44 100644 --- a/build.sbt +++ b/build.sbt @@ -240,7 +240,8 @@ lazy val datasetSettings = mc("frameless.functions.FramelessLit"), mc(f"frameless.functions.FramelessLit$$"), dmm("frameless.functions.package.litAggr"), - dmm("org.apache.spark.sql.FramelessInternals.column") + dmm("org.apache.spark.sql.FramelessInternals.column"), + dmm("frameless.TypedEncoder.collectionEncoder") ) }, coverageExcludedPackages := "org.apache.spark.sql.reflection", From 4395c16ac3156216fb286490a214f47edf2d1a41 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 21:54:15 +0100 Subject: [PATCH 12/16] #804 - encoding for Set derivatives as well - test build, 2.13 forced changes, compilation issue with toSeq():GenSeq --- build.sbt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index f00b3d44..d52e3216 100644 --- a/build.sbt +++ b/build.sbt @@ -241,7 +241,8 @@ lazy val datasetSettings = mc(f"frameless.functions.FramelessLit$$"), dmm("frameless.functions.package.litAggr"), dmm("org.apache.spark.sql.FramelessInternals.column"), - dmm("frameless.TypedEncoder.collectionEncoder") + dmm("frameless.TypedEncoder.collectionEncoder"), + dmm("frameless.TypedEncoder.setEncoder") ) }, coverageExcludedPackages := "org.apache.spark.sql.reflection", From f0d5f16ce47c8105e95ceb954dc4ea5670037158 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 20 Mar 2024 22:16:40 +0100 Subject: [PATCH 13/16] #804 - rebased --- dataset/src/test/scala/frameless/EncoderTests.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/dataset/src/test/scala/frameless/EncoderTests.scala b/dataset/src/test/scala/frameless/EncoderTests.scala index 7d700f6f..ab1f3581 100644 --- a/dataset/src/test/scala/frameless/EncoderTests.scala +++ b/dataset/src/test/scala/frameless/EncoderTests.scala @@ -69,6 +69,7 @@ class EncoderTests extends TypedDatasetSuite with Matchers { // only needed for 2.12 implicit val ordering = new Ordering[X1[Int]] { val intordering = implicitly[Ordering[Int]] + override def compare(x: X1[Int], y: X1[Int]): Int = intordering.compare(x.a, y.a) } From 3bdb8ad8af5e3eb77f24c65fbde6d603c58b11ab Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Thu, 21 Mar 2024 10:01:06 +0100 Subject: [PATCH 14/16] #803 - clean udf from #804, no shim start --- .../main/scala/frameless/functions/Udf.scala | 316 +++++++++------ .../scala/frameless/functions/UdfTests.scala | 373 +++++++++++------- 2 files changed, 418 insertions(+), 271 deletions(-) diff --git a/dataset/src/main/scala/frameless/functions/Udf.scala b/dataset/src/main/scala/frameless/functions/Udf.scala index 93ba7f11..f5e5cb7a 100644 --- a/dataset/src/main/scala/frameless/functions/Udf.scala +++ b/dataset/src/main/scala/frameless/functions/Udf.scala @@ -2,132 +2,175 @@ package frameless package functions import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, NonSQLExpression} +import org.apache.spark.sql.catalyst.expressions.{ + Expression, + LeafExpression, + NonSQLExpression +} import org.apache.spark.sql.catalyst.expressions.codegen._ import Block._ +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.types.DataType import shapeless.syntax.std.tuple._ -/** Documentation marked "apache/spark" is thanks to apache/spark Contributors - * at https://github.com/apache/spark, licensed under Apache v2.0 available at - * http://www.apache.org/licenses/LICENSE-2.0 - */ +/** + * Documentation marked "apache/spark" is thanks to apache/spark Contributors + * at https://github.com/apache/spark, licensed under Apache v2.0 available at + * http://www.apache.org/licenses/LICENSE-2.0 + */ trait Udf { - /** Defines a user-defined function of 1 arguments as user-defined function (UDF). - * The data types are automatically inferred based on the function's signature. - * - * apache/spark - */ - def udf[T, A, R: TypedEncoder](f: A => R): - TypedColumn[T, A] => TypedColumn[T, R] = { + /** + * Defines a user-defined function of 1 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. + * + * apache/spark + */ + def udf[T, A, R: TypedEncoder](f: A => R): TypedColumn[T, A] => TypedColumn[T, R] = { u => - val scalaUdf = FramelessUdf(f, List(u), TypedEncoder[R]) + val scalaUdf = FramelessUdf( + f, + List(u), + TypedEncoder[R], + s => f(s.head.asInstanceOf[A]) + ) new TypedColumn[T, R](scalaUdf) } - /** Defines a user-defined function of 2 arguments as user-defined function (UDF). - * The data types are automatically inferred based on the function's signature. - * - * apache/spark - */ - def udf[T, A1, A2, R: TypedEncoder](f: (A1,A2) => R): - (TypedColumn[T, A1], TypedColumn[T, A2]) => TypedColumn[T, R] = { + /** + * Defines a user-defined function of 2 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. + * + * apache/spark + */ + def udf[T, A1, A2, R: TypedEncoder](f: (A1, A2) => R): ( + TypedColumn[T, A1], + TypedColumn[T, A2] + ) => TypedColumn[T, R] = { case us => - val scalaUdf = FramelessUdf(f, us.toList[UntypedExpression[T]], TypedEncoder[R]) + val scalaUdf = + FramelessUdf( + f, + us.toList[UntypedExpression[T]], + TypedEncoder[R], + s => f(s.head.asInstanceOf[A1], s(1).asInstanceOf[A2]) + ) new TypedColumn[T, R](scalaUdf) - } + } - /** Defines a user-defined function of 3 arguments as user-defined function (UDF). - * The data types are automatically inferred based on the function's signature. - * - * apache/spark - */ - def udf[T, A1, A2, A3, R: TypedEncoder](f: (A1,A2,A3) => R): - (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3]) => TypedColumn[T, R] = { + /** + * Defines a user-defined function of 3 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. + * + * apache/spark + */ + def udf[T, A1, A2, A3, R: TypedEncoder](f: (A1, A2, A3) => R): ( + TypedColumn[T, A1], + TypedColumn[T, A2], + TypedColumn[T, A3] + ) => TypedColumn[T, R] = { case us => - val scalaUdf = FramelessUdf(f, us.toList[UntypedExpression[T]], TypedEncoder[R]) + val scalaUdf = + FramelessUdf( + f, + us.toList[UntypedExpression[T]], + TypedEncoder[R], + s => + f( + s.head.asInstanceOf[A1], + s(1).asInstanceOf[A2], + s(2).asInstanceOf[A3] + ) + ) new TypedColumn[T, R](scalaUdf) - } + } - /** Defines a user-defined function of 4 arguments as user-defined function (UDF). - * The data types are automatically inferred based on the function's signature. - * - * apache/spark - */ - def udf[T, A1, A2, A3, A4, R: TypedEncoder](f: (A1,A2,A3,A4) => R): - (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4]) => TypedColumn[T, R] = { + /** + * Defines a user-defined function of 4 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. + * + * apache/spark + */ + def udf[T, A1, A2, A3, A4, R: TypedEncoder](f: (A1, A2, A3, A4) => R): (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4]) => TypedColumn[T, R] = { case us => - val scalaUdf = FramelessUdf(f, us.toList[UntypedExpression[T]], TypedEncoder[R]) + val scalaUdf = + FramelessUdf( + f, + us.toList[UntypedExpression[T]], + TypedEncoder[R], + s => + f( + s.head.asInstanceOf[A1], + s(1).asInstanceOf[A2], + s(2).asInstanceOf[A3], + s(3).asInstanceOf[A4] + ) + ) new TypedColumn[T, R](scalaUdf) - } + } - /** Defines a user-defined function of 5 arguments as user-defined function (UDF). - * The data types are automatically inferred based on the function's signature. - * - * apache/spark - */ - def udf[T, A1, A2, A3, A4, A5, R: TypedEncoder](f: (A1,A2,A3,A4,A5) => R): - (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4], TypedColumn[T, A5]) => TypedColumn[T, R] = { + /** + * Defines a user-defined function of 5 arguments as user-defined function (UDF). + * The data types are automatically inferred based on the function's signature. + * + * apache/spark + */ + def udf[T, A1, A2, A3, A4, A5, R: TypedEncoder](f: (A1, A2, A3, A4, A5) => R): (TypedColumn[T, A1], TypedColumn[T, A2], TypedColumn[T, A3], TypedColumn[T, A4], TypedColumn[T, A5]) => TypedColumn[T, R] = { case us => - val scalaUdf = FramelessUdf(f, us.toList[UntypedExpression[T]], TypedEncoder[R]) + val scalaUdf = + FramelessUdf( + f, + us.toList[UntypedExpression[T]], + TypedEncoder[R], + s => + f( + s.head.asInstanceOf[A1], + s(1).asInstanceOf[A2], + s(2).asInstanceOf[A3], + s(3).asInstanceOf[A4], + s(4).asInstanceOf[A5] + ) + ) new TypedColumn[T, R](scalaUdf) - } + } } /** - * NB: Implementation detail, isn't intended to be directly used. - * - * Our own implementation of `ScalaUDF` from Catalyst compatible with [[TypedEncoder]]. - */ + * NB: Implementation detail, isn't intended to be directly used. + * + * Our own implementation of `ScalaUDF` from Catalyst compatible with [[TypedEncoder]]. + */ case class FramelessUdf[T, R]( - function: AnyRef, - encoders: Seq[TypedEncoder[_]], - children: Seq[Expression], - rencoder: TypedEncoder[R] -) extends Expression with NonSQLExpression { + function: AnyRef, + encoders: Seq[TypedEncoder[_]], + children: Seq[Expression], + rencoder: TypedEncoder[R], + evalFunction: Seq[Any] => Any) + extends Expression + with NonSQLExpression { override def nullable: Boolean = rencoder.nullable - override def toString: String = s"FramelessUdf(${children.mkString(", ")})" - - lazy val evalCode = { - val ctx = new CodegenContext() - val eval = genCode(ctx) - val codeBody = s""" - public scala.Function1 generate(Object[] references) { - return new FramelessUdfEvalImpl(references); - } + override def toString: String = s"FramelessUdf(${children.mkString(", ")})" - class FramelessUdfEvalImpl extends scala.runtime.AbstractFunction1 { - private final Object[] references; - ${ctx.declareMutableStates()} - ${ctx.declareAddedFunctions()} - - public FramelessUdfEvalImpl(Object[] references) { - this.references = references; - ${ctx.initMutableStates()} - } - - public java.lang.Object apply(java.lang.Object z) { - InternalRow ${ctx.INPUT_ROW} = (InternalRow) z; - ${eval.code} - return ${eval.isNull} ? ((Object)null) : ((Object)${eval.value}); - } - } - """ + lazy val typedEnc = + TypedExpressionEncoder[R](rencoder).asInstanceOf[ExpressionEncoder[R]] - val code = CodeFormatter.stripOverlappingComments( - new CodeAndComment(codeBody, ctx.getPlaceHolderToComments())) + def eval(input: InternalRow): Any = { + val jvmTypes = children.map(_.eval(input)) - val (clazz, _) = CodeGenerator.compile(code) - val codegen = clazz.generate(ctx.references.toArray).asInstanceOf[InternalRow => AnyRef] + val returnJvm = evalFunction(jvmTypes).asInstanceOf[R] - codegen - } + val returnCatalyst = typedEnc.createSerializer().apply(returnJvm) + val retval = + if (returnCatalyst == null) + null + else if (typedEnc.isSerializedAsStructForTopLevel) + returnCatalyst + else + returnCatalyst.get(0, dataType) - def eval(input: InternalRow): Any = { - evalCode(input) + retval } def dataType: DataType = rencoder.catalystRepr @@ -139,29 +182,45 @@ case class FramelessUdf[T, R]( val framelessUdfClassName = classOf[FramelessUdf[_, _]].getName val funcClassName = s"scala.Function${children.size}" val funcExpressionIdx = ctx.references.size - 1 - val funcTerm = ctx.addMutableState(funcClassName, ctx.freshName("udf"), - v => s"$v = ($funcClassName)((($framelessUdfClassName)references" + - s"[$funcExpressionIdx]).function());") - - val (argsCode, funcArguments) = encoders.zip(children).map { - case (encoder, child) => - val eval = child.genCode(ctx) - val codeTpe = CodeGenerator.boxedType(encoder.jvmRepr) - val argTerm = ctx.freshName("arg") - val convert = s"${eval.code}\n$codeTpe $argTerm = ${eval.isNull} ? (($codeTpe)null) : (($codeTpe)(${eval.value}));" + val funcTerm = ctx.addMutableState( + funcClassName, + ctx.freshName("udf"), + v => + s"$v = ($funcClassName)((($framelessUdfClassName)references" + + s"[$funcExpressionIdx]).function());" + ) - (convert, argTerm) - }.unzip + val (argsCode, funcArguments) = encoders + .zip(children) + .map { + case (encoder, child) => + val eval = child.genCode(ctx) + val codeTpe = CodeGenerator.boxedType(encoder.jvmRepr) + val argTerm = ctx.freshName("arg") + val convert = + s"${eval.code}\n$codeTpe $argTerm = ${eval.isNull} ? (($codeTpe)null) : (($codeTpe)(${eval.value}));" + + (convert, argTerm) + } + .unzip val internalTpe = CodeGenerator.boxedType(rencoder.jvmRepr) - val internalTerm = ctx.addMutableState(internalTpe, ctx.freshName("internal")) - val internalNullTerm = ctx.addMutableState("boolean", ctx.freshName("internalNull")) + val internalTerm = + ctx.addMutableState(internalTpe, ctx.freshName("internal")) + val internalNullTerm = + ctx.addMutableState("boolean", ctx.freshName("internalNull")) // CTw - can't inject the term, may have to duplicate old code for parity - val internalExpr = Spark2_4_LambdaVariable(internalTerm, internalNullTerm, rencoder.jvmRepr, true) + val internalExpr = Spark2_4_LambdaVariable( + internalTerm, + internalNullTerm, + rencoder.jvmRepr, + true + ) val resultEval = rencoder.toCatalyst(internalExpr).genCode(ctx) - ev.copy(code = code""" + ev.copy( + code = code""" ${argsCode.mkString("\n")} $internalTerm = @@ -175,21 +234,28 @@ case class FramelessUdf[T, R]( ) } - protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy(children = newChildren) + protected def withNewChildrenInternal( + newChildren: IndexedSeq[Expression] + ): Expression = copy(children = newChildren) } case class Spark2_4_LambdaVariable( - value: String, - isNull: String, - dataType: DataType, - nullable: Boolean = true) extends LeafExpression with NonSQLExpression { + value: String, + isNull: String, + dataType: DataType, + nullable: Boolean = true) + extends LeafExpression + with NonSQLExpression { - private val accessor: (InternalRow, Int) => Any = InternalRow.getAccessor(dataType) + private val accessor: (InternalRow, Int) => Any = + InternalRow.getAccessor(dataType) // Interpreted execution of `LambdaVariable` always get the 0-index element from input row. override def eval(input: InternalRow): Any = { - assert(input.numFields == 1, - "The input row of interpreted LambdaVariable should have only 1 field.") + assert( + input.numFields == 1, + "The input row of interpreted LambdaVariable should have only 1 field." + ) if (nullable && input.isNullAt(0)) { null } else { @@ -197,7 +263,10 @@ case class Spark2_4_LambdaVariable( } } - override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + override protected def doGenCode( + ctx: CodegenContext, + ev: ExprCode + ): ExprCode = { val isNullValue = if (nullable) { JavaCode.isNullVariable(isNull) } else { @@ -208,15 +277,18 @@ case class Spark2_4_LambdaVariable( } object FramelessUdf { + // Spark needs case class with `children` field to mutate it def apply[T, R]( - function: AnyRef, - cols: Seq[UntypedExpression[T]], - rencoder: TypedEncoder[R] - ): FramelessUdf[T, R] = FramelessUdf( + function: AnyRef, + cols: Seq[UntypedExpression[T]], + rencoder: TypedEncoder[R], + evalFunction: Seq[Any] => Any + ): FramelessUdf[T, R] = FramelessUdf( function = function, encoders = cols.map(_.uencoder).toList, children = cols.map(x => x.uencoder.fromCatalyst(x.expr)).toList, - rencoder = rencoder + rencoder = rencoder, + evalFunction = evalFunction ) } diff --git a/dataset/src/test/scala/frameless/functions/UdfTests.scala b/dataset/src/test/scala/frameless/functions/UdfTests.scala index 10e65180..af452cba 100644 --- a/dataset/src/test/scala/frameless/functions/UdfTests.scala +++ b/dataset/src/test/scala/frameless/functions/UdfTests.scala @@ -4,182 +4,257 @@ package functions import org.scalacheck.Prop import org.scalacheck.Prop._ +import scala.collection.immutable.{ ListSet, TreeSet } + class UdfTests extends TypedDatasetSuite { test("one argument udf") { - def prop[A: TypedEncoder, B: TypedEncoder](data: Vector[X1[A]], f1: A => B): Prop = { - val dataset: TypedDataset[X1[A]] = TypedDataset.create(data) - val u1 = udf[X1[A], A, B](f1) - val u2 = dataset.makeUDF(f1) - val A = dataset.col[A]('a) - - // filter forces whole codegen - val codegen = dataset.deserialized.filter((_:X1[A]) => true).select(u1(A)).collect().run().toVector - - // otherwise it uses local relation - val local = dataset.select(u2(A)).collect().run().toVector - - val d = data.map(x => f1(x.a)) - - (codegen ?= d) && (local ?= d) + evalCodeGens { + def prop[A: TypedEncoder, B: TypedEncoder]( + data: Vector[X1[A]], + f1: A => B + ): Prop = { + val dataset: TypedDataset[X1[A]] = TypedDataset.create(data) + val u1 = udf[X1[A], A, B](f1) + val u2 = dataset.makeUDF(f1) + val A = dataset.col[A]('a) + + // filter forces whole codegen + val codegen = dataset.deserialized + .filter((_: X1[A]) => true) + .select(u1(A)) + .collect() + .run() + .toVector + + // otherwise it uses local relation + val local = dataset.select(u2(A)).collect().run().toVector + + val d = data.map(x => f1(x.a)) + + (codegen ?= d) && (local ?= d) + } + + check(forAll(prop[Int, Int] _)) + check(forAll(prop[String, String] _)) + check(forAll(prop[Option[Int], Option[Int]] _)) + check(forAll(prop[X1[Int], X1[Int]] _)) + check(forAll(prop[X1[Option[Int]], X1[Option[Int]]] _)) + + // TODO doesn't work for the same reason as `collect` + // check(forAll(prop[X1[Option[X1[Int]]], X1[Option[X1[Option[Int]]]]] _)) + + // Vector/List isn't supported by MapObjects, not all collections are equal see #804 + check(forAll(prop[Option[Seq[String]], Option[Seq[String]]] _)) + check(forAll(prop[Option[List[String]], Option[List[String]]] _)) + check(forAll(prop[Option[Vector[String]], Option[Vector[String]]] _)) + + // ListSet/TreeSet weren't supported before #804 + check(forAll(prop[Option[Set[String]], Option[Set[String]]] _)) + check(forAll(prop[Option[ListSet[String]], Option[ListSet[String]]] _)) + check(forAll(prop[Option[TreeSet[String]], Option[TreeSet[String]]] _)) + + def prop2[A: TypedEncoder, B: TypedEncoder](f: A => B)(a: A): Prop = + prop(Vector(X1(a)), f) + + check( + forAll( + prop2[Int, Option[Int]](x => if (x % 2 == 0) Some(x) else None) _ + ) + ) + check(forAll(prop2[Option[Int], Int](x => x getOrElse 0) _)) } - - check(forAll(prop[Int, Int] _)) - check(forAll(prop[String, String] _)) - check(forAll(prop[Option[Int], Option[Int]] _)) - check(forAll(prop[X1[Int], X1[Int]] _)) - check(forAll(prop[X1[Option[Int]], X1[Option[Int]]] _)) - - // TODO doesn't work for the same reason as `collect` - // check(forAll(prop[X1[Option[X1[Int]]], X1[Option[X1[Option[Int]]]]] _)) - - check(forAll(prop[Option[Vector[String]], Option[Vector[String]]] _)) - - def prop2[A: TypedEncoder, B: TypedEncoder](f: A => B)(a: A): Prop = prop(Vector(X1(a)), f) - - check(forAll(prop2[Int, Option[Int]](x => if (x % 2 == 0) Some(x) else None) _)) - check(forAll(prop2[Option[Int], Int](x => x getOrElse 0) _)) } test("multiple one argument udf") { - def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder] - (data: Vector[X3[A, B, C]], f1: A => A, f2: B => B, f3: C => C): Prop = { - val dataset = TypedDataset.create(data) - val u11 = udf[X3[A, B, C], A, A](f1) - val u21 = udf[X3[A, B, C], B, B](f2) - val u31 = udf[X3[A, B, C], C, C](f3) - val u12 = dataset.makeUDF(f1) - val u22 = dataset.makeUDF(f2) - val u32 = dataset.makeUDF(f3) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - - val dataset21 = dataset.select(u11(A), u21(B), u31(C)).collect().run().toVector - val dataset22 = dataset.select(u12(A), u22(B), u32(C)).collect().run().toVector - val d = data.map(x => (f1(x.a), f2(x.b), f3(x.c))) - - (dataset21 ?= d) && (dataset22 ?= d) + evalCodeGens { + def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder]( + data: Vector[X3[A, B, C]], + f1: A => A, + f2: B => B, + f3: C => C + ): Prop = { + val dataset = TypedDataset.create(data) + val u11 = udf[X3[A, B, C], A, A](f1) + val u21 = udf[X3[A, B, C], B, B](f2) + val u31 = udf[X3[A, B, C], C, C](f3) + val u12 = dataset.makeUDF(f1) + val u22 = dataset.makeUDF(f2) + val u32 = dataset.makeUDF(f3) + val A = dataset.col[A]('a) + val B = dataset.col[B]('b) + val C = dataset.col[C]('c) + + val dataset21 = + dataset.select(u11(A), u21(B), u31(C)).collect().run().toVector + val dataset22 = + dataset.select(u12(A), u22(B), u32(C)).collect().run().toVector + val d = data.map(x => (f1(x.a), f2(x.b), f3(x.c))) + + (dataset21 ?= d) && (dataset22 ?= d) + } + + check(forAll(prop[Int, Int, Int] _)) + check(forAll(prop[String, Int, Int] _)) + check(forAll(prop[X3[Int, String, Boolean], Int, Int] _)) + check(forAll(prop[X3U[Int, String, Boolean], Int, Int] _)) } - - check(forAll(prop[Int, Int, Int] _)) - check(forAll(prop[String, Int, Int] _)) - check(forAll(prop[X3[Int, String, Boolean], Int, Int] _)) - check(forAll(prop[X3U[Int, String, Boolean], Int, Int] _)) } test("two argument udf") { - def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder] - (data: Vector[X3[A, B, C]], f1: (A, B) => C): Prop = { - val dataset = TypedDataset.create(data) - val u1 = udf[X3[A, B, C], A, B, C](f1) - val u2 = dataset.makeUDF(f1) - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - - val dataset21 = dataset.select(u1(A, B)).collect().run().toVector - val dataset22 = dataset.select(u2(A, B)).collect().run().toVector - val d = data.map(x => f1(x.a, x.b)) - - (dataset21 ?= d) && (dataset22 ?= d) + evalCodeGens { + def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder]( + data: Vector[X3[A, B, C]], + f1: (A, B) => C + ): Prop = { + val dataset = TypedDataset.create(data) + val u1 = udf[X3[A, B, C], A, B, C](f1) + val u2 = dataset.makeUDF(f1) + val A = dataset.col[A]('a) + val B = dataset.col[B]('b) + + val dataset21 = dataset.select(u1(A, B)).collect().run().toVector + val dataset22 = dataset.select(u2(A, B)).collect().run().toVector + val d = data.map(x => f1(x.a, x.b)) + + (dataset21 ?= d) && (dataset22 ?= d) + } + + check(forAll(prop[Int, Int, Int] _)) + check(forAll(prop[String, Int, Int] _)) } - - check(forAll(prop[Int, Int, Int] _)) - check(forAll(prop[String, Int, Int] _)) } test("multiple two argument udf") { - def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder] - (data: Vector[X3[A, B, C]], f1: (A, B) => C, f2: (B, C) => A): Prop = { - val dataset = TypedDataset.create(data) - val u11 = udf[X3[A, B, C], A, B, C](f1) - val u12 = dataset.makeUDF(f1) - val u21 = udf[X3[A, B, C], B, C, A](f2) - val u22 = dataset.makeUDF(f2) - - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - - val dataset21 = dataset.select(u11(A, B), u21(B, C)).collect().run().toVector - val dataset22 = dataset.select(u12(A, B), u22(B, C)).collect().run().toVector - val d = data.map(x => (f1(x.a, x.b), f2(x.b, x.c))) - - (dataset21 ?= d) && (dataset22 ?= d) + evalCodeGens { + def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder]( + data: Vector[X3[A, B, C]], + f1: (A, B) => C, + f2: (B, C) => A + ): Prop = { + val dataset = TypedDataset.create(data) + val u11 = udf[X3[A, B, C], A, B, C](f1) + val u12 = dataset.makeUDF(f1) + val u21 = udf[X3[A, B, C], B, C, A](f2) + val u22 = dataset.makeUDF(f2) + + val A = dataset.col[A]('a) + val B = dataset.col[B]('b) + val C = dataset.col[C]('c) + + val dataset21 = + dataset.select(u11(A, B), u21(B, C)).collect().run().toVector + val dataset22 = + dataset.select(u12(A, B), u22(B, C)).collect().run().toVector + val d = data.map(x => (f1(x.a, x.b), f2(x.b, x.c))) + + (dataset21 ?= d) && (dataset22 ?= d) + } + + check(forAll(prop[Int, Int, Int] _)) + check(forAll(prop[String, Int, Int] _)) } - - check(forAll(prop[Int, Int, Int] _)) - check(forAll(prop[String, Int, Int] _)) } test("three argument udf") { - def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder] - (data: Vector[X3[A, B, C]], f: (A, B, C) => C): Prop = { - val dataset = TypedDataset.create(data) - val u1 = udf[X3[A, B, C], A, B, C, C](f) - val u2 = dataset.makeUDF(f) - - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - - val dataset21 = dataset.select(u1(A, B, C)).collect().run().toVector - val dataset22 = dataset.select(u2(A, B, C)).collect().run().toVector - val d = data.map(x => f(x.a, x.b, x.c)) - - (dataset21 ?= d) && (dataset22 ?= d) + evalCodeGens { + forceInterpreted { + def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder]( + data: Vector[X3[A, B, C]], + f: (A, B, C) => C + ): Prop = { + val dataset = TypedDataset.create(data) + val u1 = udf[X3[A, B, C], A, B, C, C](f) + val u2 = dataset.makeUDF(f) + + val A = dataset.col[A]('a) + val B = dataset.col[B]('b) + val C = dataset.col[C]('c) + + val dataset21 = dataset.select(u1(A, B, C)).collect().run().toVector + val dataset22 = dataset.select(u2(A, B, C)).collect().run().toVector + val d = data.map(x => f(x.a, x.b, x.c)) + + (dataset21 ?= d) && (dataset22 ?= d) + } + + check(forAll(prop[Int, Int, Int] _)) + check(forAll(prop[String, Int, Int] _)) + } } - - check(forAll(prop[Int, Int, Int] _)) - check(forAll(prop[String, Int, Int] _)) } test("four argument udf") { - def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder, D: TypedEncoder] - (data: Vector[X4[A, B, C, D]], f: (A, B, C, D) => C): Prop = { - val dataset = TypedDataset.create(data) - val u1 = udf[X4[A, B, C, D], A, B, C, D, C](f) - val u2 = dataset.makeUDF(f) - - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - val D = dataset.col[D]('d) - - val dataset21 = dataset.select(u1(A, B, C, D)).collect().run().toVector - val dataset22 = dataset.select(u2(A, B, C, D)).collect().run().toVector - val d = data.map(x => f(x.a, x.b, x.c, x.d)) - - (dataset21 ?= d) && (dataset22 ?= d) + evalCodeGens { + forceInterpreted { + def prop[ + A: TypedEncoder, + B: TypedEncoder, + C: TypedEncoder, + D: TypedEncoder + ](data: Vector[X4[A, B, C, D]], + f: (A, B, C, D) => C + ): Prop = { + val dataset = TypedDataset.create(data) + val u1 = udf[X4[A, B, C, D], A, B, C, D, C](f) + val u2 = dataset.makeUDF(f) + + val A = dataset.col[A]('a) + val B = dataset.col[B]('b) + val C = dataset.col[C]('c) + val D = dataset.col[D]('d) + + val dataset21 = + dataset.select(u1(A, B, C, D)).collect().run().toVector + val dataset22 = + dataset.select(u2(A, B, C, D)).collect().run().toVector + val d = data.map(x => f(x.a, x.b, x.c, x.d)) + + (dataset21 ?= d) && (dataset22 ?= d) + } + + check(forAll(prop[Int, Int, Int, Int] _)) + check(forAll(prop[String, Int, Int, String] _)) + check(forAll(prop[String, String, String, String] _)) + check(forAll(prop[String, Long, String, String] _)) + check(forAll(prop[String, Boolean, Boolean, String] _)) + } } - - check(forAll(prop[Int, Int, Int, Int] _)) - check(forAll(prop[String, Int, Int, String] _)) - check(forAll(prop[String, String, String, String] _)) - check(forAll(prop[String, Long, String, String] _)) - check(forAll(prop[String, Boolean, Boolean, String] _)) } test("five argument udf") { - def prop[A: TypedEncoder, B: TypedEncoder, C: TypedEncoder, D: TypedEncoder, E: TypedEncoder] - (data: Vector[X5[A, B, C, D, E]], f: (A, B, C, D, E) => C): Prop = { - val dataset = TypedDataset.create(data) - val u1 = udf[X5[A, B, C, D, E], A, B, C, D, E, C](f) - val u2 = dataset.makeUDF(f) - - val A = dataset.col[A]('a) - val B = dataset.col[B]('b) - val C = dataset.col[C]('c) - val D = dataset.col[D]('d) - val E = dataset.col[E]('e) - - val dataset21 = dataset.select(u1(A, B, C, D, E)).collect().run().toVector - val dataset22 = dataset.select(u2(A, B, C, D, E)).collect().run().toVector - val d = data.map(x => f(x.a, x.b, x.c, x.d, x.e)) - - (dataset21 ?= d) && (dataset22 ?= d) + evalCodeGens { + forceInterpreted { + def prop[ + A: TypedEncoder, + B: TypedEncoder, + C: TypedEncoder, + D: TypedEncoder, + E: TypedEncoder + ](data: Vector[X5[A, B, C, D, E]], + f: (A, B, C, D, E) => C + ): Prop = { + val dataset = TypedDataset.create(data) + val u1 = udf[X5[A, B, C, D, E], A, B, C, D, E, C](f) + val u2 = dataset.makeUDF(f) + + val A = dataset.col[A]('a) + val B = dataset.col[B]('b) + val C = dataset.col[C]('c) + val D = dataset.col[D]('d) + val E = dataset.col[E]('e) + + val dataset21 = + dataset.select(u1(A, B, C, D, E)).collect().run().toVector + val dataset22 = + dataset.select(u2(A, B, C, D, E)).collect().run().toVector + val d = data.map(x => f(x.a, x.b, x.c, x.d, x.e)) + + (dataset21 ?= d) && (dataset22 ?= d) + } + + check(forAll(prop[Int, Int, Int, Int, Int] _)) + } } - - check(forAll(prop[Int, Int, Int, Int, Int] _)) } } From c2f349299caebc74dd3370ee82edd9134c45eb32 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Thu, 21 Mar 2024 10:27:48 +0100 Subject: [PATCH 15/16] #803 - clean udf eval needs #804 --- .../main/scala/frameless/functions/Udf.scala | 6 +- .../src/test/scala/frameless/package.scala | 60 +++++++++++++++++-- 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/dataset/src/main/scala/frameless/functions/Udf.scala b/dataset/src/main/scala/frameless/functions/Udf.scala index f5e5cb7a..c34e8561 100644 --- a/dataset/src/main/scala/frameless/functions/Udf.scala +++ b/dataset/src/main/scala/frameless/functions/Udf.scala @@ -140,6 +140,7 @@ trait Udf { * * Our own implementation of `ScalaUDF` from Catalyst compatible with [[TypedEncoder]]. */ +// Possibly add UserDefinedExpression trait to stop the functions being registered and used as aggregates case class FramelessUdf[T, R]( function: AnyRef, encoders: Seq[TypedEncoder[_]], @@ -156,6 +157,9 @@ case class FramelessUdf[T, R]( lazy val typedEnc = TypedExpressionEncoder[R](rencoder).asInstanceOf[ExpressionEncoder[R]] + lazy val isSerializedAsStructForTopLevel = + typedEnc.isSerializedAsStructForTopLevel + def eval(input: InternalRow): Any = { val jvmTypes = children.map(_.eval(input)) @@ -165,7 +169,7 @@ case class FramelessUdf[T, R]( val retval = if (returnCatalyst == null) null - else if (typedEnc.isSerializedAsStructForTopLevel) + else if (isSerializedAsStructForTopLevel) returnCatalyst else returnCatalyst.get(0, dataType) diff --git a/dataset/src/test/scala/frameless/package.scala b/dataset/src/test/scala/frameless/package.scala index 06b92d99..601613c8 100644 --- a/dataset/src/test/scala/frameless/package.scala +++ b/dataset/src/test/scala/frameless/package.scala @@ -1,9 +1,10 @@ +import java.time.format.DateTimeFormatter +import java.time.{ LocalDateTime => JavaLocalDateTime } import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.internal.SQLConf +import org.scalacheck.{ Arbitrary, Cogen, Gen } -import java.time.format.DateTimeFormatter -import java.time.{ LocalDateTime => JavaLocalDateTime } -import org.scalacheck.{ Arbitrary, Gen } +import scala.collection.immutable.{ ListSet, TreeSet } package object frameless { @@ -49,6 +50,46 @@ package object frameless { def seqGen[A: Arbitrary]: Gen[scala.collection.Seq[A]] = arbSeq[A].arbitrary + implicit def arbList[A]( + implicit + A: Arbitrary[A] + ): Arbitrary[List[A]] = + Arbitrary(Gen.listOf(A.arbitrary).map(_.toList)) + + def listGen[A: Arbitrary]: Gen[List[A]] = arbList[A].arbitrary + + implicit def arbSet[A]( + implicit + A: Arbitrary[A] + ): Arbitrary[Set[A]] = + Arbitrary(Gen.listOf(A.arbitrary).map(Set.newBuilder.++=(_).result())) + + def setGen[A: Arbitrary]: Gen[Set[A]] = arbSet[A].arbitrary + + implicit def cogenListSet[A: Cogen: Ordering]: Cogen[ListSet[A]] = + Cogen.it(_.toVector.sorted.iterator) + + implicit def arbListSet[A]( + implicit + A: Arbitrary[A] + ): Arbitrary[ListSet[A]] = + Arbitrary(Gen.listOf(A.arbitrary).map(ListSet.newBuilder.++=(_).result())) + + def listSetGen[A: Arbitrary]: Gen[ListSet[A]] = arbListSet[A].arbitrary + + implicit def cogenTreeSet[A: Cogen: Ordering]: Cogen[TreeSet[A]] = + Cogen.it(_.toVector.sorted.iterator) + + implicit def arbTreeSet[A]( + implicit + A: Arbitrary[A], + o: Ordering[A] + ): Arbitrary[TreeSet[A]] = + Arbitrary(Gen.listOf(A.arbitrary).map(TreeSet.newBuilder.++=(_).result())) + + def treeSetGen[A: Arbitrary: Ordering]: Gen[TreeSet[A]] = + arbTreeSet[A].arbitrary + implicit val arbUdtEncodedClass: Arbitrary[UdtEncodedClass] = Arbitrary { for { int <- Arbitrary.arbitrary[Int] @@ -76,7 +117,18 @@ package object frameless { localDate <- listOfDates } yield localDate.format(dateTimeFormatter) - val TEST_OUTPUT_DIR = "target/test-output" + private var outputDir: String = _ + + /** allow usage on non-build environments */ + def setOutputDir(path: String): Unit = { + outputDir = path + } + + lazy val TEST_OUTPUT_DIR = + if (outputDir ne null) + outputDir + else + "target/test-output" /** * Will dive down causes until either the cause is true or there are no more causes From 08d7c3dd834002424ab8bd6df4d83436d89bd879 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Thu, 21 Mar 2024 11:14:12 +0100 Subject: [PATCH 16/16] #803 - clean udf eval needs #804 --- build.sbt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 9724561c..3881a35b 100644 --- a/build.sbt +++ b/build.sbt @@ -242,7 +242,12 @@ lazy val datasetSettings = dmm("frameless.functions.package.litAggr"), dmm("org.apache.spark.sql.FramelessInternals.column"), dmm("frameless.TypedEncoder.collectionEncoder"), - dmm("frameless.TypedEncoder.setEncoder") + dmm("frameless.TypedEncoder.setEncoder"), + dmm("frameless.functions.FramelessUdf.evalCode"), + dmm("frameless.functions.FramelessUdf.copy"), + dmm("frameless.functions.FramelessUdf.this"), + dmm("frameless.functions.FramelessUdf.apply"), + imt("frameless.functions.FramelessUdf.apply") ) }, coverageExcludedPackages := "org.apache.spark.sql.reflection",