Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(doc): add some examples for SparkSqlOnK8SOperator operator #112

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/airflow/iceberg_query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS prod.db.sample (
id bigint,
data string,
category string
)
USING iceberg
PARTITIONED BY (category);

MERGE INTO prod.db.sample t
USING (SELECT * FROM prod.db.another_sample WHERE category = 'foo') s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE THEN DELETE;
14 changes: 14 additions & 0 deletions examples/airflow/query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE students (
name VARCHAR(64),
address VARCHAR(64),
added_at TIMESTAMP
)
USING PARQUET
PARTITIONED BY (student_id INT);

INSERT INTO students VALUES
('Amy Smith', '123 Park Ave, San Jose', {{ ts }}, 111111);

INSERT INTO students VALUES
('Bob Brown', '456 Taylor St, Cupertino', {{ ts }}, 222222);,
('Cathy Johnson', '789 Race Ave, Palo Alto', {{ ts}}, 333333);
87 changes: 87 additions & 0 deletions examples/airflow/spark_sql_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

from datetime import datetime

from spark_on_k8s.airflow.operators import SparkSqlOnK8SOperator
from spark_on_k8s.client import ExecutorInstances, PodResources

from airflow.models.dag import DAG

with DAG(
dag_id="spark_sql_on_k8s",
schedule=None,
start_date=datetime(2024, 1, 1),
) as dag:
"""
This DAG executes Spark SQL queries on Kubernetes using the SparkSqlOnK8SOperator.
"""

SparkSqlOnK8SOperator(
task_id="spark_sql_application",
sql="CREATE TABLE test_table (key INT, value STRING) AS SELECT 1 AS key, 'value' AS value",
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)

SparkSqlOnK8SOperator(
task_id="spark_multiple_sql_application",
sql="SELECT 1; SELECT 2; SELECT {{ ds }}",
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)

SparkSqlOnK8SOperator(
task_id="spark_sql_from_file_application",
sql="query.sql",
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)

SparkSqlOnK8SOperator(
task_id="spark_iceberg_sql_application",
sql="iceberg_query.sql",
# it's better to use a custom image with the iceberg-spark-runtime package installed
packages=["org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.0"],
spark_conf={
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.spark_catalog": "org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.type": "hive",
"spark.sql.catalog.prod": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.prod.type": "rest",
"spark.sql.catalog.prod.uri": "http://localhost:8080",
},
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)
Loading