-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathrefresh_webmap_org_location_dag.py
67 lines (52 loc) · 2 KB
/
refresh_webmap_org_location_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# This DAG refreshs the webmap database organization_location materialzied view.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import psycopg2.extras
from airflow.utils.dates import days_ago
from lib.utils import on_failure_callback
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': on_failure_callback, # needs to be set in default_args to work correctly: https://github.com/apache/airflow/issues/26760
}
with DAG(
dag_id='refresh_webmap_db_org_location_view',
default_args=default_args,
description='Refresh the material view in the webmap database.',
schedule_interval="@daily",
start_date=days_ago(2),
catchup=False,
tags=['webmap'],
) as dag:
postgresConnId = "postgres_default"
# Python function to refresh the views
def view_refresh(ds, **kwargs):
db = PostgresHook(postgres_conn_id=postgresConnId)
conn = db.get_conn()
sql = 'REFRESH MATERIALIZED VIEW webmap.organization_location'
try:
# Setup the cursor to execute SQL statements
cur = conn.cursor()
# Execute the SQL statement
cur.execute(sql)
# Close the connection
cur.close()
conn.commit()
except (Exception, psycopg.DatabaseError) as error:
print(error)
task = PythonOperator(
task_id='refresh_webmap_organization_location',
python_callable=view_refresh,
)
task