Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Hive (Spark) backends #32

Merged
merged 8 commits into from
Jan 17, 2025
Merged

Add support for Hive (Spark) backends #32

merged 8 commits into from
Jan 17, 2025

Conversation

daniel-thom
Copy link
Collaborator

@daniel-thom daniel-thom commented Dec 28, 2024

  1. Add support for PyHive backends. This is an optional dependency. dsgrid will use it. There are some things that are not perfect, notably handling of timestamps when creating tables (there are workarounds). I'd like to move forward as-is and make improvements later.
  2. Add option to write the destination of a map-table operation to a Parquet file instead of the database. This is for dsgrid primarily, but other users might want that.
  3. Add options to skip time checks on the mapped table. dsgrid won't want to do this extra work on huge tables. It shouldn't be required, but we can talk about it.
  4. Refactor commit/rollback handling in ingest_table. There were corner cases not covered, especially with SQLite.
  5. Refactor write_database and read_database. The addition of hive necessitated some reorganization. I found that we really don't need Polars anymore, and so removed it from the repo.
  6. Added ignore_columns to TableSchema. This allows users to include columns that chronify ignores.

@codecov-commenter
Copy link

codecov-commenter commented Dec 28, 2024

Codecov Report

Attention: Patch coverage is 92.41935% with 47 lines in your changes missing coverage. Please review.

Project coverage is 94.04%. Comparing base (ea97feb) to head (6068b97).

Files with missing lines Patch % Lines
src/chronify/sqlalchemy/functions.py 81.08% 21 Missing ⚠️
src/chronify/store.py 87.41% 18 Missing ⚠️
src/chronify/time_series_mapper_base.py 86.84% 5 Missing ⚠️
src/chronify/schema_manager.py 94.59% 2 Missing ⚠️
tests/conftest.py 97.29% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #32      +/-   ##
==========================================
+ Coverage   93.17%   94.04%   +0.86%     
==========================================
  Files          34       37       +3     
  Lines        2214     2520     +306     
==========================================
+ Hits         2063     2370     +307     
+ Misses        151      150       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@daniel-thom daniel-thom force-pushed the dt/hive-support branch 4 times, most recently from 90478ce to 9770b7e Compare December 30, 2024 17:25
tar -xzf spark-3.5.4-bin-hadoop3.tgz
export SPARK_HOME=$(pwd)/spark-3.5.4-bin-hadoop3
export PATH=$SPARK_HOME/sbin:$PATH
start-master.sh
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting a real cluster is not needed. Starting Thrift by itself will run an in-process Spark cluster.

@daniel-thom daniel-thom force-pushed the dt/hive-support branch 4 times, most recently from 15ef17d to 97b6837 Compare January 7, 2025 16:39
if isinstance(config, DatetimeRange):
if isinstance(df2[config.time_column].dtype, DatetimeTZDtype):
# Spark doesn't like ns.
# TODO: is there a better way to change from ns to us?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lixiliu If you have suggestions, let me know.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since Pandas doesn't support other unit type ATM, there are only limited things we can do.

https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html

@daniel-thom daniel-thom marked this pull request as ready for review January 7, 2025 16:40
src/chronify/schema_manager.py Outdated Show resolved Hide resolved
src/chronify/sqlalchemy/functions.py Show resolved Hide resolved
src/chronify/store.py Show resolved Hide resolved
Copy link
Collaborator

@lixiliu lixiliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments I think

to_schema: TableSchema,
scratch_dir: Optional[Path] = None,
output_file: Optional[Path] = None,
check_mapped_timestamps: bool = True,
Copy link
Collaborator

@lixiliu lixiliu Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am inclined to suggest defaulting this to False for speed. While it's useful to have this feature, we have many default checks already to ensure the mapping process. E.g., the mapping creation starts with the to_time_config so we know they will at least match up in terms of unique values.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on setting this to False. Make sure that all tests set this to True.

@@ -29,6 +29,10 @@ class TableSchemaBase(ChronifyBaseModel):
"Should not include time columns."
),
]
ignore_columns: list[str] = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. We probably don't need to do check_name here b/c we'll never query it via chronify right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not checking that the ignore_columns are actually present. Looking again, this field has no effect other than provide possible clarity to the user. I should change it to one of these:

  1. Remove the field. Chronify only looks at the columns explicitly defined in the schema. Any other fields are implicitly ignored.
  2. Keep the field but check that those columns are present.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also similar to the mapping table config model, which has an "other_columns" to do this.

We need [2] because model.list_columns() is used in the mapping process for schema consistency and to ensure all the required columns are outputted.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go with 1.

if isinstance(config, DatetimeRange):
if isinstance(df2[config.time_column].dtype, DatetimeTZDtype):
# Spark doesn't like ns.
# TODO: is there a better way to change from ns to us?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, since Pandas doesn't support other unit type ATM, there are only limited things we can do.

https://pandas.pydata.org/docs/reference/api/pandas.DatetimeTZDtype.html

src/chronify/sqlalchemy/functions.py Show resolved Hide resolved
src/chronify/sqlalchemy/functions.py Show resolved Hide resolved
src/chronify/sqlalchemy/functions.py Show resolved Hide resolved
src/chronify/store.py Outdated Show resolved Hide resolved
src/chronify/time_series_mapper_base.py Outdated Show resolved Hide resolved
src/chronify/utils/path_utils.py Show resolved Hide resolved
@@ -45,7 +47,12 @@ def _check_source_table_has_time_zone(self) -> None:
msg = f"time_zone is required for tz-aware representative time mapping and it is missing from source table: {self._from_schema.name}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@daniel-thom - I think we need to check time_zone a different way here since list_columns() no longer captures all table columns.

Copy link
Collaborator

@lixiliu lixiliu Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we should enforce that time_zone is in the time_array_id_columns, so this check is still correct.

Copy link
Collaborator

@lixiliu lixiliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One remaining issue to be resolved on Teams

@daniel-thom daniel-thom merged commit 3885a65 into main Jan 17, 2025
4 checks passed
@daniel-thom daniel-thom deleted the dt/hive-support branch January 17, 2025 19:52
github-actions bot pushed a commit that referenced this pull request Jan 17, 2025
* Add support for Hive (Spark) backends
* Run a local Spark cluster in CI
* Add handling of DuckDB types
* Add option to write mapped tables to Parquet
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants