Skip to content

Commit

Permalink
typelevel#787 - use shim'd joinWith - requires extra implicit
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-twiner committed Oct 2, 2024
1 parent 13177e7 commit a549807
Showing 1 changed file with 25 additions and 26 deletions.
51 changes: 25 additions & 26 deletions dataset/src/main/scala/frameless/TypedDataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,16 @@ import java.util
import frameless.functions.CatalystExplodableCollection
import frameless.ops._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Column, DataFrame, Dataset, SparkSession }
import org.apache.spark.sql.catalyst.expressions.{
Attribute,
AttributeReference,
Literal
}
import org.apache.spark.sql.catalyst.plans.logical.{ Join, JoinHint }
import org.apache.spark.sql.{Column, DataFrame, Dataset, ShimUtils, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.ShimUtils.column
import org.apache.spark.sql.types.StructType
import shapeless._
import shapeless.labelled.FieldType
import shapeless.ops.hlist.{
Diff,
IsHCons,
Mapper,
Prepend,
ToTraversable,
Tupler
}
import shapeless.ops.record.{ Keys, Modifier, Remover, Values }
import shapeless.ops.hlist.{Diff, IsHCons, Mapper, Prepend, ToTraversable, Tupler}
import shapeless.ops.record.{Keys, Modifier, Remover, Values}

import scala.language.experimental.macros

Expand Down Expand Up @@ -767,7 +756,8 @@ class TypedDataset[T] protected[frameless] (
e: TypedEncoder[(T, U)]
): TypedDataset[(T, U)] =
new TypedDataset(
self.dataset.joinWith(other.dataset, column(Literal(true)), "cross")
ShimUtils.joinWith(dataset, other.dataset, column(Literal(true)), "cross")(TypedExpressionEncoder[(T, U)])
//self.dataset.joinWith(other.dataset, column(Literal(true)), "cross")
)

/**
Expand All @@ -778,14 +768,17 @@ class TypedDataset[T] protected[frameless] (
other: TypedDataset[U]
)(condition: TypedColumn[T with U, Boolean]
)(implicit
e: TypedEncoder[(Option[T], Option[U])]
e: TypedEncoder[(Option[T], Option[U])],
to: TypedEncoder[(T, U)]
): TypedDataset[(Option[T], Option[U])] =
new TypedDataset(
self.dataset
ShimUtils.joinWith(dataset, other.dataset, condition.untyped, "full")(TypedExpressionEncoder[(T, U)])
.as[(Option[T], Option[U])](TypedExpressionEncoder[(Option[T], Option[U])])
/*self.dataset
.joinWith(other.dataset, condition.untyped, "full")
.as[(Option[T], Option[U])](
TypedExpressionEncoder[(Option[T], Option[U])]
)
)*/
)

/**
Expand Down Expand Up @@ -820,12 +813,15 @@ class TypedDataset[T] protected[frameless] (
other: TypedDataset[U]
)(condition: TypedColumn[T with U, Boolean]
)(implicit
e: TypedEncoder[(T, Option[U])]
e: TypedEncoder[(T, Option[U])],
to: TypedEncoder[(T, U)]
): TypedDataset[(T, Option[U])] =
new TypedDataset(
self.dataset
.joinWith(other.dataset, condition.untyped, "left_outer")
ShimUtils.joinWith(dataset, other.dataset, condition.untyped, "left_outer")(TypedExpressionEncoder[(T, U)])
.as[(T, Option[U])](TypedExpressionEncoder[(T, Option[U])])
/*self.dataset
.joinWith(other.dataset, condition.untyped, "left_outer")
.as[(T, Option[U])](TypedExpressionEncoder[(T, Option[U])])*/
)

/**
Expand Down Expand Up @@ -864,12 +860,15 @@ class TypedDataset[T] protected[frameless] (
other: TypedDataset[U]
)(condition: TypedColumn[T with U, Boolean]
)(implicit
e: TypedEncoder[(Option[T], U)]
e: TypedEncoder[(Option[T], U)],
to: TypedEncoder[(T, U)]
): TypedDataset[(Option[T], U)] =
new TypedDataset(
self.dataset
.joinWith(other.dataset, condition.untyped, "right_outer")
ShimUtils.joinWith( self.dataset, other.dataset, condition.untyped, "right_outer")(TypedExpressionEncoder[(T, U)])
.as[(Option[T], U)](TypedExpressionEncoder[(Option[T], U)])
/*self.dataset
.joinWith(other.dataset, condition.untyped, "right_outer")
.as[(Option[T], U)](TypedExpressionEncoder[(Option[T], U)])*/
)

private def disambiguate(join: Join): Join = {
Expand Down

0 comments on commit a549807

Please sign in to comment.