Skip to content

Commit

Permalink
chore(doc): add some examples for SparkSqlOnK8SOperator operator (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Dec 4, 2024
1 parent 7fdcfc9 commit 99da937
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
14 changes: 14 additions & 0 deletions examples/airflow/iceberg_query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS prod.db.sample (
id bigint,
data string,
category string
)
USING iceberg
PARTITIONED BY (category);

MERGE INTO prod.db.sample t
USING (SELECT * FROM prod.db.another_sample WHERE category = 'foo') s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE THEN DELETE;
14 changes: 14 additions & 0 deletions examples/airflow/query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE students (
name VARCHAR(64),
address VARCHAR(64),
added_at TIMESTAMP
)
USING PARQUET
PARTITIONED BY (student_id INT);

INSERT INTO students VALUES
('Amy Smith', '123 Park Ave, San Jose', {{ ts }}, 111111);

INSERT INTO students VALUES
('Bob Brown', '456 Taylor St, Cupertino', {{ ts }}, 222222);,
('Cathy Johnson', '789 Race Ave, Palo Alto', {{ ts}}, 333333);
87 changes: 87 additions & 0 deletions examples/airflow/spark_sql_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

from datetime import datetime

from spark_on_k8s.airflow.operators import SparkSqlOnK8SOperator
from spark_on_k8s.client import ExecutorInstances, PodResources

from airflow.models.dag import DAG

with DAG(
dag_id="spark_sql_on_k8s",
schedule=None,
start_date=datetime(2024, 1, 1),
) as dag:
"""
This DAG executes Spark SQL queries on Kubernetes using the SparkSqlOnK8SOperator.
"""

SparkSqlOnK8SOperator(
task_id="spark_sql_application",
sql="CREATE TABLE test_table (key INT, value STRING) AS SELECT 1 AS key, 'value' AS value",
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)

SparkSqlOnK8SOperator(
task_id="spark_multiple_sql_application",
sql="SELECT 1; SELECT 2; SELECT {{ ds }}",
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)

SparkSqlOnK8SOperator(
task_id="spark_sql_from_file_application",
sql="query.sql",
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)

SparkSqlOnK8SOperator(
task_id="spark_iceberg_sql_application",
sql="iceberg_query.sql",
# it's better to use a custom image with the iceberg-spark-runtime package installed
packages=["org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.0"],
spark_conf={
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.spark_catalog": "org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.type": "hive",
"spark.sql.catalog.prod": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.prod.type": "rest",
"spark.sql.catalog.prod.uri": "http://localhost:8080",
},
namespace="spark",
image="pyspark-job",
image_pull_policy="Never",
app_name="spark-sql-job-example",
service_account="spark",
app_waiter="log",
driver_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_resources=PodResources(cpu=1, memory=1024, memory_overhead=512),
executor_instances=ExecutorInstances(min=0, max=5, initial=5),
ui_reverse_proxy=True,
)

0 comments on commit 99da937

Please sign in to comment.