Skip to content

Commit c72f381

Browse files
authored
Fix/delta merge builder instance check for connect + util fix (#130)
<!--- Provide a general summary of your changes in the Title above --> ## Description <!--- Describe your changes in detail --> Additonal fixes for #101 and #102 ## Related Issue <!--- This project only accepts pull requests related to open issues --> <!--- If suggesting a new feature or change, please discuss it in an issue first --> <!--- If fixing a bug, there should be an issue describing it with steps to reproduce --> <!--- Please link to the issue here: --> #101, #102 ## Motivation and Context <!--- Why is this change required? What problem does it solve? --> ## How Has This Been Tested? <!--- Please describe in detail how you tested your changes. --> <!--- Include details of your testing environment, and the tests you ran to --> <!--- see how your change affects other areas of the code, etc. --> ## Screenshots (if appropriate): ## Types of changes <!--- What types of changes does your code introduce? Put an `x` in all the boxes that apply: --> - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Checklist: <!--- Go over all the following points, and put an `x` in all the boxes that apply. --> <!--- If you're unsure about any of these, don't hesitate to ask. We're here to help! --> - [x] My code follows the code style of this project. - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. - [ ] I have added tests to cover my changes. - [x] All new and existing tests passed. --------- Co-authored-by: Danny Meijer <[email protected]>
1 parent ea2d15e commit c72f381

File tree

2 files changed

+9
-3
lines changed

2 files changed

+9
-3
lines changed

src/koheesio/spark/utils/connect.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ def is_remote_session(spark: Optional[SparkSession] = None) -> bool:
1414
result = False
1515

1616
if (_spark := spark or get_active_session()) and check_if_pyspark_connect_is_supported():
17-
result = True if _spark.conf.get("spark.remote", None) else False # type: ignore
17+
# result = True if _spark.conf.get("spark.remote", None) else False # type: ignore
18+
from pyspark.sql.connect.session import SparkSession as ConnectSparkSession
19+
20+
result = isinstance(_spark, ConnectSparkSession)
1821

1922
return result

src/koheesio/spark/writers/delta/batch.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,10 @@ def _validate_params(cls, params: dict) -> dict:
323323
clause = merge_conf.get("clause")
324324
if clause not in valid_clauses:
325325
raise ValueError(f"Invalid merge clause '{clause}' provided")
326-
elif not isinstance(merge_builder, DeltaMergeBuilder):
326+
elif (
327+
not isinstance(merge_builder, DeltaMergeBuilder)
328+
or not type(merge_builder).__name__ == "DeltaMergeBuilder"
329+
):
327330
raise ValueError("merge_builder must be a list or merge clauses or a DeltaMergeBuilder instance")
328331

329332
return params
@@ -378,7 +381,7 @@ def execute(self) -> Writer.Output:
378381
if self.table.create_if_not_exists and not self.table.exists:
379382
_writer = _writer.options(**self.table.default_create_properties)
380383

381-
if isinstance(_writer, DeltaMergeBuilder):
384+
if isinstance(_writer, DeltaMergeBuilder) or type(_writer).__name__ == "DeltaMergeBuilder":
382385
_writer.execute()
383386
else:
384387
if options := self.params:

0 commit comments

Comments
 (0)