Skip to content

Commit f411bd1

Browse files
authored
Merge pull request #1 from schibsted/migration-fixes
Support serverless cluster and more DB Runtime environments, prefer spark cache by default if possible
2 parents 4fc3353 + a72a70d commit f411bd1

26 files changed

+3883
-2652
lines changed

Changelog.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
## [v0.5.0] 2025-05-07
10+
11+
- Add spark caching by default for classic clusters (default to spark cache instead of eager dbfs cache for .cacheToDbfs())
12+
- Adds support for .cacheToDbfsIfTriggered() and caching.backup_spark_cached_to_dbfs(), see README
13+
- Support serverless clusters (although performance is bad)
14+
- Better handing of DataFrames from createCacheDataFrame
15+
916
## [v0.4.9] 2025-04-29
1017

1118
- Setup PyPI publishing

Makefile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ validate-version:
5757
echo "::error::Version $(PROJECT_VERSION) must match semantic format X.Y.Z"; \
5858
exit 1; \
5959
fi
60-
6160
validate-release-version:
6261
@if git rev-parse "v$(PROJECT_VERSION)" >/dev/null 2>&1; then \
6362
echo "::error::Tag v$(PROJECT_VERSION) already exists. Have you updated the version in pyproject.toml?"; \
@@ -74,7 +73,6 @@ validate-release-version:
7473
fi
7574
@echo "Version v$(PROJECT_VERSION) is a valid new tag (latest is $${LATEST_TAG:-none})."
7675

77-
7876
validate-changelog:
7977
@echo "--- Validating Changelog.md for v$(PROJECT_VERSION) ---"
8078
@CHANGELOG_FILE="Changelog.md"; \
@@ -94,7 +92,7 @@ validate-changelog:
9492
validate: validate-version validate-changelog lint typecheck test
9593
@echo "--- All validation steps passed ---"
9694

97-
release: validate-version validate-changelog
95+
release: validate-version validate-changelog validate-release-version
9896
@echo "--- Creating release for v$(PROJECT_VERSION) ---"
9997
@# Ensure gh is installed and authenticated
10098
@if ! command -v gh &> /dev/null; then \

README.md

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ To some extent this library will trade some smaller amount of extra query latenc
88

99
- **DataFrame caching**: Intelligent caching system using DBFS (Databricks File System).
1010
- **Query complexity estimation**: Tools to analyze and estimate Spark query complexity and trigger caching if above some set threshold.
11+
- **Hybrid Spark/DBFS caching**: On classic clusters, you can now prefer Spark's in-memory cache (`.cache()`) for fast iterative work, and only persist to DBFS when needed (see below).
1112

1213
## Installation
1314

@@ -55,6 +56,8 @@ Requires Python 3.10 or higher. Install using pip:
5556
spark.createDataFrame = spark.createCachedDataFrame
5657
```
5758

59+
Note: serverless clusters do not support monkey patching, ie extending with new methods, so `df.cacheToDbfs()` needs to be replaced with `cacheToDbfs(df)` and similar for `createCachedDataFrame(spark, ...)`. However `cacheToDbfs` (imported from `dbfs_spark_cache.caching`) can be used with `df.transform(cacheToDbfs)`. Unfortunatley write (and read) performance is really poor with serverless clusters, so it can't be recommended for general use with library.
60+
5861
Default cache trigger thresholds can be set per notebook by calling
5962
```python
6063
dbfs_cache.extend_dataframe_methods(
@@ -71,6 +74,49 @@ DBFS_CACHE_MULTIPLIER_THRESHOLD=1.01
7174
Set either threshold to None to disable that specific check.
7275
Caching occurs only if BOTH conditions are met (or the threshold is None).
7376

77+
## Hybrid Spark/DBFS Caching and Backup
78+
79+
Because spark cache is faster than dbfs cache when used with clusers with enough memory or disk space (and fast SSD disks are use as well), we can use it for fast iterative work, and only persist to dbfs when needed, ie when shutting down the cluster.
80+
81+
- **Backup of Spark-cached DataFrames**: Use `backup_spark_cached_to_dbfs()` to persist all Spark-cached DataFrames to DBFS before cluster termination, although the performance win of having it in sparch chache is not that big compared to rerunning all with dbfs caching directly.
82+
- **Configurable caching mode**: The config `PREFER_SPARK_CACHE` (default: True) controls whether Spark in-memory cache is preferred on classic clusters. On serverless clusters, DBFS caching is always used.
83+
- **Automatic registry of Spark-cached DataFrames**: DataFrames cached via `.cacheToDbfs()` in Spark-cache mode are tracked and can be listed or backed up.
84+
85+
By default (on classic clusters), calling `.cacheToDbfs()` will:
86+
- Use Spark's in-memory cache (`.cache()`) if no DBFS cache exists. `.wcd()` will cache with Spark or not based on the estimated compute complexity of the query.
87+
- If a DBFS cache exists, it will be read instead.
88+
- You can persist all Spark-cached DataFrames to DBFS at any time (e.g. before cluster shutdown) with:
89+
90+
To force always caching to DBFS set:
91+
92+
```python
93+
from dbfs_spark_cache.config import config
94+
config.PREFER_SPARK_CACHE = False
95+
```
96+
97+
On serverless clusters, DBFS caching is always used regardless of this setting (spark cache is not available). If you want to disable all calls to the extensions you can do:
98+
```python
99+
dbfs_cache.extend_dataframe_methods(disable_cache_and_display=True)
100+
```
101+
and it will keep the DataFrame unchanged. When using spark cache you can do the persist to DBFS like this:
102+
103+
```python
104+
from dbfs_spark_cache.caching import backup_spark_cached_to_dbfs
105+
# backs up one or more specific DataFrames, eg the final result of your work and the DataFrames used with withCachedDisplay(), so you can pick up faster next time
106+
backup_spark_cached_to_dbfs(specific_dfs=[my_df_used_with_wcd, my_last_end_of_work_df])
107+
```
108+
109+
Alternativley, if you don't care about cache invalidation you can just use this lookup which avoids needing to rerun all query logic:
110+
```python
111+
from dbfs_spark_cache.caching import get_table_name_from_hash
112+
df = spark.read.table(get_table_name_from_hash("THE_HASH"))
113+
```
114+
where `"THE_HASH"` is the hash of the DataFrame you saved from before via:
115+
```python
116+
from dbfs_spark_cache.caching import get_table_hash
117+
print(get_table_hash(df))
118+
```
119+
74120
### Dataframe cache invalidation techniques that triggers cache invalidation?
75121

76122
Dataframe storage type|Query plan changes|Data changes
@@ -83,15 +129,11 @@ In-Memory|No not directly, but via conversion to BDFS table through createCached
83129
This library has been primarily tested under the following Databricks environment configuration, but anything supported by Databricks and PySpark DataFrame API should or may work too:
84130

85131
- **Databricks database**: Hive Metastore
86-
- **Databricks Runtime Version**: 15.4 LTS
132+
- **Databricks Runtime Version**: 15.4 LTS, client.1.13 (serverless cluster)
87133
- **Storage Layer**: DBFS and S3
88134
- **File Formats**: Parquet, JSON
89135

90-
If you want to disable all calls to the extensions you can do:
91-
```python
92-
dbfs_cache.extend_dataframe_methods(disable_cache_and_display=True)
93-
```
94-
and it will keep the DataFrame unchanged.
136+
Note that serverless performance when writing to DBFS is currently abysmal and can only be used for limited testing on small datasets. You can use file `serverless_env.yml` to automatically install the library on a serverless cluster.
95137

96138
#### What is "Total compute complexity" anyway?
97139

0 commit comments

Comments
 (0)