-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#787 - Move encoder implementation details to external shim library (not dependent on the Spark 4 release) #800
Open
chris-twiner
wants to merge
74
commits into
typelevel:master
Choose a base branch
from
chris-twiner:temp/787_shim
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
74 commits
Select commit
Hold shift + click to select a range
d3ddaf1
#755 - correct version number in readme for non 3.5 build
chris-twiner a435adc
Merge branch 'master' of github.com:typelevel/frameless
chris-twiner 24bde95
Merge branch 'master' of github.com:typelevel/frameless
chris-twiner cb259fa
#787 - base required for shim and 14.3.dbr
chris-twiner b8d4f05
#787 - [Un]WrapOption, Invoke, NewInstance, GetStructField, ifisnull,…
chris-twiner 7944fe9
#787 - [Un]WrapOption, Invoke, NewInstance, GetStructField, ifisnull,…
chris-twiner 71bb38c
#787 - forced reformatting
chris-twiner c843c6a
#787 - forced reformatting
chris-twiner 9a0c55b
#787 - forced reformatting
chris-twiner 0616953
#787 - mima MapGroups removal
chris-twiner a70d5c3
#787 - Spark 4 starter pack
chris-twiner 1ef1d9b
#787 - Spark 4 starter pack
chris-twiner 7a96748
#787 - Spark 4 starter pack
chris-twiner 7d0e131
#787 - Spark 4 starter pack, doh
chris-twiner c6a4341
#787 - resolve conflict for auto merge
chris-twiner 025ee64
Merge branch 'master' into temp/787_shim
chris-twiner 7b17009
Merge branch 'temp/787_shim' of github.com:chris-twiner/frameless int…
chris-twiner 6717e4b
Merge remote-tracking branch 'frameless/master' into temp/787_shim
chris-twiner 4933a90
#787 - reduce the case class api usage even further and CreateStruct,…
chris-twiner 0f9b7cf
#787 - disable local maven again
chris-twiner 059a8e6
#787 - remove all sql package private code
chris-twiner 9c506df
#787 - remove all sql package private code
chris-twiner 11aece0
#787 - ml internals removal - all public - #300
chris-twiner 089cb3a
#787 - ml internals removal - all public - #300 - use rc1
chris-twiner c7fa1c7
#787 - ml internals removal - all public - #300 - use rc1, so1 not a …
chris-twiner 28071ff
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner 3806e03
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner 1c1d370
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner 728c935
#787 - ml internals removal - all public - #300 - use rc2
chris-twiner 768d467
#787 - rc2
chris-twiner 5a30614
#787 - rc2
chris-twiner 95c66cc
#787 - rc2 - seems each sub object needs adding
chris-twiner 2e11b6d
#787 - rc2 - doc is now an issue?
chris-twiner 1888f4e
#787 - rc2 - mc reflection
chris-twiner d146f00
#787 - rc2 - add test artefacts
chris-twiner 692475f
#787 - allow testing of all frameless logic
chris-twiner 1008b85
#787 - compilation issue on interface
chris-twiner dd10cee
#787 - fix test to run on dbr 14.3
chris-twiner f253d45
#787 #803 - rc4 usage and fix udf with expressionproxy
chris-twiner 7c1e603
#787 #803 - rc4 usage and fix udf with expressionproxy - deeply neste…
chris-twiner b161067
#787 #803 - rc4 usage and fix udf with expressionproxy - deeply neste…
chris-twiner 2fa1bb0
(cherry picked from commit 955ba829779010d43b9f37ec438f0c8eaea76e0e)
chris-twiner ee38804
#804 - starter fix, set needed
chris-twiner fb1c109
#804 - encoding for Set derivatives as well - test build
chris-twiner ae8b69a
#804 - encoding for Set derivatives as well - test build
chris-twiner 0435c3a
#804 - encoding for Set derivatives as well - test build
chris-twiner 52034b2
#804 - encoding for Set derivatives as well - test build, hashtrieset…
chris-twiner 9e45d92
#804 - encoding for Set derivatives as well - test build, hashtrieset…
chris-twiner e7881c0
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner 594fceb
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner 5a01976
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner 365b21f
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner 4395c16
#804 - encoding for Set derivatives as well - test build, 2.13 forced…
chris-twiner c792c05
Merge remote-tracking branch 'upstream/master' into temp/804_clean
chris-twiner f0d5f16
#804 - rebased
chris-twiner 3bdb8ad
#803 - clean udf from #804, no shim start
chris-twiner c2f3492
#803 - clean udf eval needs #804
chris-twiner 08d7c3d
#803 - clean udf eval needs #804
chris-twiner b82d266
(cherry picked from commit 3bdb8ad8af5e3eb77f24c65fbde6d603c58b11ab)
chris-twiner e36eac2
(cherry picked from commit c2f349299caebc74dd3370ee82edd9134c45eb32)
chris-twiner aa1e6de
#787 - merge #803 / #804
chris-twiner be4c35e
#787 - Seq can be stream, fails on dbr, do the same as for arb
chris-twiner b880261
#787 #804 - stream
chris-twiner f793fc7
#787 - tests have ordering and precision issues when run on clusters
chris-twiner e582962
#787 - tests have ordering and precision issues when run on clusters
chris-twiner 986891a
#787 - tests have ordering and precision issues when run on clusters …
chris-twiner 66b31e9
#787 - attempt to solve all but covar_pop and kurtosis
chris-twiner 80de4f2
#787 - attempt covar_pop and kurtosis through tolerances
chris-twiner a89542e
#787 - tolerance on map members and on vectors for cluster runs
chris-twiner 271e953
#787 - pivottest was random ordering
chris-twiner fa75889
#787 - ensure last/first are run on a single partition - 15.0 databri…
chris-twiner b6189b1
#787 - ensure last/first are run on a single partition - 15.0 databri…
chris-twiner 25cc5c3
#787 - ensure last/first are run on a single partition - 15.0 databri…
chris-twiner 52a2bcf
#787 - spark4 test compat
chris-twiner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ import _root_.cats.syntax.all._ | |
import org.apache.spark.SparkContext | ||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.{SparkConf, SparkContext => SC} | ||
import org.apache.spark.{ SparkConf, SparkContext => SC } | ||
|
||
import org.scalatest.compatible.Assertion | ||
import org.scalactic.anyvals.PosInt | ||
|
@@ -21,24 +21,39 @@ import org.scalatest.matchers.should.Matchers | |
import org.scalatest.propspec.AnyPropSpec | ||
|
||
trait SparkTests { | ||
val appID: String = new java.util.Date().toString + math.floor(math.random() * 10E4).toLong.toString | ||
|
||
val appID: String = new java.util.Date().toString + math | ||
.floor(math.random() * 10e4) | ||
.toLong | ||
.toString | ||
|
||
val conf: SparkConf = new SparkConf() | ||
.setMaster("local[*]") | ||
.setAppName("test") | ||
.set("spark.ui.enabled", "false") | ||
.set("spark.app.id", appID) | ||
|
||
implicit def session: SparkSession = SparkSession.builder().config(conf).getOrCreate() | ||
implicit def session: SparkSession = | ||
SparkSession.builder().config(conf).getOrCreate() | ||
implicit def sc: SparkContext = session.sparkContext | ||
|
||
implicit class seqToRdd[A: ClassTag](seq: Seq[A])(implicit sc: SC) { | ||
implicit class seqToRdd[A: ClassTag]( | ||
seq: Seq[A] | ||
)(implicit | ||
sc: SC) { | ||
def toRdd: RDD[A] = sc.makeRDD(seq) | ||
} | ||
} | ||
|
||
object Tests { | ||
def innerPairwise(mx: Map[String, Int], my: Map[String, Int], check: (Any, Any) => Assertion)(implicit sc: SC): Assertion = { | ||
|
||
def innerPairwise( | ||
mx: Map[String, Int], | ||
my: Map[String, Int], | ||
check: (Any, Any) => Assertion | ||
)(implicit | ||
sc: SC | ||
): Assertion = { | ||
import frameless.cats.implicits._ | ||
import frameless.cats.inner._ | ||
val xs = sc.parallelize(mx.toSeq) | ||
|
@@ -63,21 +78,31 @@ object Tests { | |
} | ||
} | ||
|
||
class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with SparkTests { | ||
class Test | ||
extends AnyPropSpec | ||
with Matchers | ||
with ScalaCheckPropertyChecks | ||
with SparkTests { | ||
|
||
implicit override val generatorDrivenConfig = | ||
PropertyCheckConfiguration(minSize = PosInt(10)) | ||
|
||
property("spark is working") { | ||
sc.parallelize(Seq(1, 2, 3)).collect() shouldBe Array(1,2,3) | ||
sc.parallelize(Seq(1, 2, 3)).collect() shouldBe Array(1, 2, 3) | ||
} | ||
|
||
property("inner pairwise monoid") { | ||
// Make sure we have non-empty map | ||
forAll { (xh: (String, Int), mx: Map[String, Int], yh: (String, Int), my: Map[String, Int]) => | ||
Tests.innerPairwise(mx + xh, my + yh, _ shouldBe _) | ||
forAll { | ||
(xh: (String, Int), | ||
mx: Map[String, Int], | ||
yh: (String, Int), | ||
my: Map[String, Int] | ||
) => Tests.innerPairwise(mx + xh, my + yh, _ shouldBe _) | ||
} | ||
} | ||
|
||
org.scalatestplus.scalacheck.Checkers | ||
property("rdd simple numeric commutative semigroup") { | ||
import frameless.cats.implicits._ | ||
|
||
|
@@ -110,7 +135,8 @@ class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with | |
property("rdd tuple commutative semigroup example") { | ||
import frameless.cats.implicits._ | ||
forAll { seq: List[(Int, Int)] => | ||
val expectedSum = if (seq.isEmpty) None else Some(Foldable[List].fold(seq)) | ||
val expectedSum = | ||
if (seq.isEmpty) None else Some(Foldable[List].fold(seq)) | ||
Comment on lines
+138
to
+139
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fmt is super weird |
||
val rdd = seq.toRdd | ||
|
||
rdd.csum shouldBe expectedSum.getOrElse(0 -> 0) | ||
|
@@ -120,10 +146,22 @@ class Test extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks with | |
|
||
property("pair rdd numeric commutative semigroup example") { | ||
import frameless.cats.implicits._ | ||
val seq = Seq( ("a",2), ("b",3), ("d",6), ("b",2), ("d",1) ) | ||
val seq = Seq(("a", 2), ("b", 3), ("d", 6), ("b", 2), ("d", 1)) | ||
val rdd = seq.toRdd | ||
rdd.cminByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",2), ("d",1) ) | ||
rdd.cmaxByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",3), ("d",6) ) | ||
rdd.csumByKey.collect().toSeq should contain theSameElementsAs Seq( ("a",2), ("b",5), ("d",7) ) | ||
rdd.cminByKey.collect().toSeq should contain theSameElementsAs Seq( | ||
("a", 2), | ||
("b", 2), | ||
("d", 1) | ||
) | ||
rdd.cmaxByKey.collect().toSeq should contain theSameElementsAs Seq( | ||
("a", 2), | ||
("b", 3), | ||
("d", 6) | ||
) | ||
rdd.csumByKey.collect().toSeq should contain theSameElementsAs Seq( | ||
("a", 2), | ||
("b", 5), | ||
("d", 7) | ||
) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package frameless | ||
|
||
import frameless.TypedEncoder.CollectionConversion | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.codegen.{ | ||
CodegenContext, | ||
CodegenFallback, | ||
ExprCode | ||
} | ||
import org.apache.spark.sql.catalyst.expressions.{ Expression, UnaryExpression } | ||
import org.apache.spark.sql.types.{ DataType, ObjectType } | ||
|
||
case class CollectionCaster[F[_], C[_], Y]( | ||
child: Expression, | ||
conversion: CollectionConversion[F, C, Y]) | ||
extends UnaryExpression | ||
with CodegenFallback { | ||
|
||
protected def withNewChildInternal(newChild: Expression): Expression = | ||
copy(child = newChild) | ||
|
||
override def eval(input: InternalRow): Any = { | ||
val o = child.eval(input).asInstanceOf[Object] | ||
o match { | ||
case col: F[Y] @unchecked => | ||
conversion.convert(col) | ||
case _ => o | ||
} | ||
} | ||
|
||
override def dataType: DataType = child.dataType | ||
} | ||
|
||
case class SeqCaster[C[X] <: Iterable[X], Y](child: Expression) | ||
extends UnaryExpression { | ||
|
||
protected def withNewChildInternal(newChild: Expression): Expression = | ||
copy(child = newChild) | ||
|
||
// eval on interpreted works, fallback on codegen does not, e.g. with ColumnTests.asCol and Vectors, the code generated still has child of type Vector but child eval returns X2, which is not good | ||
override def eval(input: InternalRow): Any = { | ||
val o = child.eval(input).asInstanceOf[Object] | ||
o match { | ||
case col: Set[Y] @unchecked => | ||
col.toSeq | ||
case _ => o | ||
} | ||
} | ||
|
||
def toSeqOr[T](isSet: => T, or: => T): T = | ||
child.dataType match { | ||
case ObjectType(cls) | ||
if classOf[scala.collection.Set[_]].isAssignableFrom(cls) => | ||
isSet | ||
case t => or | ||
} | ||
|
||
override def dataType: DataType = | ||
toSeqOr(ObjectType(classOf[scala.collection.Seq[_]]), child.dataType) | ||
|
||
override protected def doGenCode( | ||
ctx: CodegenContext, | ||
ev: ExprCode | ||
): ExprCode = | ||
defineCodeGen(ctx, ev, c => toSeqOr(s"$c.toVector()", s"$c")) | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, is it how scalafmt formats it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the builds won't run unless scalafmt is run on files, so yes, those settings lead to some hideous looking code.