Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Hive (Spark) backends #32

Merged
merged 8 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ jobs:
strategy:
matrix:
python-version: ["3.11", "3.12"]
os: [ubuntu-latest, windows-latest]
# TODO: skip spark on Windows
#os: [ubuntu-latest, windows-latest]
os: [ubuntu-latest]

steps:
- uses: actions/checkout@v4
Expand All @@ -27,10 +29,15 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[dev]"
python -m pip install ".[dev,spark]"
wget https://dlcdn.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
tar -xzf spark-3.5.4-bin-hadoop3.tgz
export SPARK_HOME=$(pwd)/spark-3.5.4-bin-hadoop3
export PATH=$SPARK_HOME/sbin:$PATH
start-thriftserver.sh
- name: Run pytest with coverage
run: |
pytest -v --cov --cov-report=xml
CHRONIFY_HIVE_URL=hive://localhost:10000/default pytest -v --cov --cov-report=xml
- name: codecov
uses: codecov/[email protected]
if: ${{ matrix.os == env.DEFAULT_OS && matrix.python-version == env.DEFAULT_PYTHON }}
Expand Down
1 change: 1 addition & 0 deletions docs/how_tos/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
getting_started/index
ingest_multiple_tables
map_time_config
spark_backend
```
96 changes: 96 additions & 0 deletions docs/how_tos/spark_backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Apache Spark Backend
Download Spark from https://spark.apache.org/downloads.html and install it. Spark provides startup
scripts for UNIX operating systems (not Windows).

## Install chronify with Spark support
```
$ pip install chronify[spark]
```

## Installation on a development computer
Installation can be as simple as
```
$ tar -xzf spark-3.5.4-bin-hadoop3.tgz
$ export SPARK_HOME=$(pwd)/spark-3.5.4-bin-hadoop3
```

Start a Thrift server. This allows JDBC clients to send SQL queries to an in-process Spark cluster
running in local mode.
```
$ $SPARK_HOME/sbin/start-thriftserver.sh --master=spark://$(hostname):7077
```

The URL to connect to this server is `hive://localhost:10000/default`

## Installation on an HPC
The chronify development team uses these
[scripts](https://github.com/NREL/HPC/tree/master/applications/spark) to run Spark on NREL's HPC.

## Chronify Usage
This example creates a chronify Store with Spark as the backend and then adds a view to a Parquet
file. Chronify will run its normal time checks.

First, create the Parquet file and chronify schema.

```python
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
from chronify import DatetimeRange, Store, TableSchema, CsvTableSchema

initial_time = datetime(2020, 1, 1)
end_time = datetime(2020, 12, 31, 23)
resolution = timedelta(hours=1)
timestamps = pd.date_range(initial_time, end_time, freq=resolution, unit="us")
dfs = []
for i in range(1, 4):
df = pd.DataFrame(
{
"timestamp": timestamps,
"id": i,
"value": np.random.random(len(timestamps)),
}
)
dfs.append(df)
df = pd.concat(dfs)
df.to_parquet("data.parquet", index=False)
schema = TableSchema(
name="devices",
value_column="value",
time_config=DatetimeRange(
time_column="timestamp",
start=initial_time,
length=len(timestamps),
resolution=resolution,
),
time_array_id_columns=["id"],
)
```

```python
from chronify import Store

store = Store.create_new_hive_store("hive://localhost:10000/default")
store.create_view_from_parquet("data.parquet")
```

Verify the data:
```python
store.read_table(schema.name).head()
```
```
timestamp id value
0 2020-01-01 00:00:00 1 0.785399
1 2020-01-01 01:00:00 1 0.102756
2 2020-01-01 02:00:00 1 0.178587
3 2020-01-01 03:00:00 1 0.326194
4 2020-01-01 04:00:00 1 0.994851
```

## Time configuration mapping
The primary use case for Spark is to map datasets that are larger than can be processed by DuckDB
on one computer. In such a workflow a user would call
```python
store.map_table_time_config(src_table_name, dst_schema, output_file="mapped_data.parquet")
```
20 changes: 20 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ Python-based modeling packages.
explanation/index
```

## Supported Backends
While chronify should work with any database supported by SQLAlchemy, it has been tested with
the following:

- DuckDB (default)
- SQLite
- Apache Spark through Apache Thrift Server

DuckDB and SQLite are fully supported.

Because of limitations in the backend software, chronify functionality with Spark is limited to
the following:

- Create a view into an existing Parquet file (or directory).
- Perform time series checks.
- Map between time configurations.
- Write output data to Parquet files.

There is no support for creating tables and ingesting data with Spark.

## How to use this guide
- Refer to [How Tos](#how-tos-page) for step-by-step instructions for creating store and ingesting data.
- Refer to [Tutorials](#tutorials-page) examples of ingesting different types of data and mapping
Expand Down
11 changes: 9 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies = [
"duckdb_engine",
"loguru",
"pandas >= 2.2, < 3",
"polars ~= 1.11.0",
"pyarrow",
"pydantic >= 2.7, < 3",
"pytz",
Expand All @@ -39,6 +38,12 @@ dependencies = [
"tzdata",
]
[project.optional-dependencies]
spark = [
"pyhive @ git+https://github.com/apache/kyuubi.git#egg=pyhive&subdirectory=python",
"thrift",
"thrift_sasl",
]

dev = [
"mypy",
"pandas-stubs",
Expand All @@ -53,7 +58,6 @@ dev = [
"autodoc_pydantic~=2.0",
"sphinx-copybutton",
"sphinx-tabs~=3.4",

]

[project.urls]
Expand Down Expand Up @@ -119,3 +123,6 @@ docstring-code-line-length = "dynamic"
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["E402", "F401"]
"**/{tests,docs,tools}/*" = ["E402"]

[tool.hatch]
metadata.allow-direct-references = true
37 changes: 36 additions & 1 deletion src/chronify/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,53 @@
import importlib.metadata as metadata

from chronify.store import Store
from chronify.exceptions import (
ChronifyExceptionBase,
ConflictingInputsError,
InvalidTable,
InvalidOperation,
InvalidParameter,
MissingParameter,
TableAlreadyExists,
TableNotStored,
)
from chronify.models import (
ColumnDType,
CsvTableSchema,
PivotedTableSchema,
TableSchema,
)
from chronify.store import Store
from chronify.time import RepresentativePeriodFormat
from chronify.time_configs import (
AnnualTimeRange,
DatetimeRange,
IndexTimeRange,
RepresentativePeriodTime,
TimeBaseModel,
TimeBasedDataAdjustment,
)

__all__ = (
"AnnualTimeRange",
"ChronifyExceptionBase",
"ColumnDType",
"ConflictingInputsError",
"CsvTableSchema",
"DatetimeRange",
"IndexTimeRange",
"InvalidOperation",
"InvalidParameter",
"InvalidTable",
"MissingParameter",
"PivotedTableSchema",
"RepresentativePeriodFormat",
"RepresentativePeriodTime",
"Store",
"TableAlreadyExists",
"TableNotStored",
"TableSchema",
"TimeBaseModel",
"TimeBasedDataAdjustment",
)

__version__ = metadata.metadata("chronify")["Version"]
12 changes: 0 additions & 12 deletions src/chronify/duckdb/functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections.abc import Iterable
from datetime import datetime, timedelta
from pathlib import Path

import duckdb
from duckdb import DuckDBPyRelation
Expand Down Expand Up @@ -30,17 +29,6 @@ def add_datetime_column(
# )


def make_write_parquet_query(table_or_view: str, file_path: Path | str) -> str:
"""Make an SQL string that can be used to write a Parquet file from a table or view."""
# TODO: Hive partitioning?
return f"""
COPY
(SELECT * FROM {table_or_view})
TO '{file_path}'
(FORMAT 'parquet');
"""


def unpivot(
rel: DuckDBPyRelation,
pivoted_columns: Iterable[str],
Expand Down
34 changes: 34 additions & 0 deletions src/chronify/hive_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Optional

from sqlalchemy import Engine, MetaData, text


def create_materialized_view(
query: str,
dst_table: str,
engine: Engine,
metadata: MetaData,
scratch_dir: Optional[Path] = None,
) -> None:
"""Create a materialized view with a Parquet file. This is a workaround for an undiagnosed
problem with timestamps and time zones with hive.

The Parquet file will be written to scratch_dir. Callers must ensure that the directory
persists for the duration of the work.
"""
with NamedTemporaryFile(dir=scratch_dir, suffix=".parquet") as f:
f.close()
output = Path(f.name)
write_query = f"""
INSERT OVERWRITE DIRECTORY
'{output}'
USING parquet
({query})
"""
with engine.begin() as conn:
conn.execute(text(write_query))
view_query = f"CREATE VIEW {dst_table} AS SELECT * FROM parquet.`{output}`"
conn.execute(text(view_query))
metadata.reflect(engine, views=True)
32 changes: 13 additions & 19 deletions src/chronify/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd
from duckdb.typing import DuckDBPyType
from pydantic import Field, field_validator, model_validator
from sqlalchemy import BigInteger, Boolean, DateTime, Double, Integer, String
from sqlalchemy import BigInteger, Boolean, DateTime, Double, Float, Integer, SmallInteger, String
from typing_extensions import Annotated

from chronify.base_models import ChronifyBaseModel
Expand Down Expand Up @@ -164,7 +164,9 @@ def list_columns(self) -> list[str]:
duckdb.typing.BIGINT.id: BigInteger, # type: ignore
duckdb.typing.BOOLEAN.id: Boolean, # type: ignore
duckdb.typing.DOUBLE.id: Double, # type: ignore
duckdb.typing.FLOAT.id: Float, # type: ignore
duckdb.typing.INTEGER.id: Integer, # type: ignore
duckdb.typing.TINYINT.id: SmallInteger, # type: ignore
duckdb.typing.VARCHAR.id: String, # type: ignore
# Note: timestamp requires special handling because of timezone in sqlalchemy.
}
Expand Down Expand Up @@ -253,24 +255,16 @@ def fix_data_type(cls, data: dict[str, Any]) -> dict[str, Any]:
class CsvTableSchema(TableSchemaBase):
"""Defines the schema of data in a CSV file."""

pivoted_dimension_name: Annotated[
Optional[str],
Field(
default=None,
description="Only set if the table is pivoted. Use this name for the column "
"representing that dimension when unpivoting.",
),
]
column_dtypes: Annotated[
Optional[list[ColumnDType]],
Field(
default=None,
description="Column types. Will try to infer types of any column not listed.",
),
]
value_columns: Annotated[
list[str], Field(description="Columns in the table that contain values.")
]
pivoted_dimension_name: Optional[str] = Field(
default=None,
description="Only set if the table is pivoted. Use this name for the column "
"representing that dimension when unpivoting.",
)
column_dtypes: Optional[list[ColumnDType]] = Field(
default=None,
description="Column types. Will try to infer types of any column not listed.",
)
value_columns: list[str] = Field(description="Columns in the table that contain values.")
time_array_id_columns: list[str] = Field(
default=[],
description="Columns in the table that uniquely identify time arrays. "
Expand Down
Loading
Loading