-
Notifications
You must be signed in to change notification settings - Fork 2
/
compileresults.py
267 lines (226 loc) · 13.2 KB
/
compileresults.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
import os
import simplejson as json
import logging
import sys
from string import Formatter
from itertools import zip_longest
from argparse import ArgumentParser
from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy import bindparam
from sqlalchemy import String
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import select
from sqlalchemy import insert
from sqlalchemy import BLANK_SCHEMA
def load_sql(name, db_type, classifiers, batch=None):
'''
Loads SQL from a file, using the specialized version if available (i.e. 00_setup_pg.sql instead
of 00_setup.sql for Postgres databases), and adds the simulation classifier columns in one of
several different formats for queries that need them.
'''
logging.info(f" {name}")
sql_dir = os.path.join(os.path.dirname(__file__), "sql")
for filename in (os.path.join(sql_dir, f"{name}_{db_type}.sql"),
os.path.join(sql_dir, f"{name}.sql")):
if os.path.exists(filename):
params = {"table_suffix": f"_{batch}" if batch else ""}
queries = []
for sql in open(filename, "r").read().split(";"):
if sql and not sql.isspace():
sql_params = {key for _, key, _, _ in Formatter().parse(sql) if key}
for param in sql_params:
if "classifiers" not in param:
continue
# Query can contain format strings to be replaced by classifier names:
# classifiers_select[_<table name>]
# classifiers_join_<table1>_<table2>
# classifiers_ddl
parts = param.split("_")
if "select" in parts:
table = parts[2] if len(parts) >= 3 else None
suffix = parts[3] if len(parts) == 4 else None
params[param] = ", ".join(
(f"{table}.{c} AS {c}_{suffix}" if suffix
else f"{table}.{c}" for c in classifiers)
if table else classifiers)
elif "join" in parts:
_, _, lhs_table, rhs_table = parts
params[param] = " AND ".join((
(
f"CASE WHEN ({lhs_table}.{c} = {rhs_table}.{c}) "
f"OR ({lhs_table}.{c} IS NULL AND {rhs_table}.{c} IS NULL) "
"THEN 0 ELSE 1 END = 0"
) for c in classifiers))
elif "ddl" in parts:
params[param] = ", ".join((f"{c} VARCHAR" for c in classifiers))
queries.append(sql.format(**params))
return queries
raise IOError(f"Couldn't find file for query '{name}'")
def find_project_classifiers(conn, batch=None):
table_suffix = f"_{batch}" if batch else ""
results = conn.execute(text(f"SELECT * FROM classifiersetdimension{table_suffix} LIMIT 1"))
classifiers = [col for col in results.keys() if col.lower() not in ("id", "jobid")]
return classifiers
def executemany(conn, queries, values):
params = values[0].keys()
for sql in queries:
stmt = text(sql)
stmt = stmt.bindparams(*[bindparam(param, type_=String) for param in params])
conn.execute(stmt, values)
def compile_results(conn, db_type, indicators, batch=None, cleanup=False):
logging.info("Compiling results tables...")
# Get the classifier columns used by the simulation - varies by project.
classifiers = find_project_classifiers(conn, batch)
for query in (
"00_setup", "01_reporting_table_ddl", "01a_create_all_pools_collection",
"02_create_location_area_view"
):
for sql in load_sql(query, db_type, classifiers, batch):
conn.execute(text(sql))
# Fluxes are the movement of carbon between pool collections as a result of
# annual processes, disturbance, or both combined.
sql = load_sql("03_populate_change_type_categories", db_type, classifiers, batch)
executemany(conn, sql, [
{"name": "Annual Process"},
{"name": "Disturbance"},
{"name": "Combined"}
])
# Individual pools are grouped into named collections that are used to calculate fluxes.
sql = load_sql("04_populate_pool_collections", db_type, classifiers, batch)
executemany(conn, sql, [{"name": name} for name in indicators["pool_collections"]])
sql = load_sql("05_populate_pool_collection_pools", db_type, classifiers, batch)[0]
for name, pools in indicators["pool_collections"].items():
params = {f"pool{i}": pool for i, pool in enumerate(pools)}
pool_param_names = ", ".join((f":{param}" for param in params))
conn.execute(text(sql.format(pools=pool_param_names)).bindparams(name=name, **params))
# A flux is a movement of carbon from any pool in the source collection to any pool in the
# sink collection as a result of annual processes, disturbances, or both combined.
sql = load_sql("06_populate_flux_indicators", db_type, classifiers, batch)
executemany(conn, sql, [{
"name" : name,
"change_type": details["source"],
"source" : details["from"],
"sink" : details["to"]}
for name, details in indicators["fluxes"].items()
])
sql = load_sql("07_create_flux_indicator_view", db_type, classifiers, batch)[0]
conn.execute(text(sql))
# Fluxes are grouped into named collections that are used as indicators themselves, and also
# to calculate stock change indicators where fluxes are added or subtracted from each other.
sql = load_sql("08_populate_flux_indicator_collections", db_type, classifiers, batch)
executemany(conn, sql, [{"name": name} for name in indicators["flux_collections"]])
sql = load_sql("09_populate_flux_indicator_collection_flux_indicators", db_type, classifiers, batch)[0]
for name, fluxes in indicators["flux_collections"].items():
params = {f"flux{i}": flux for i, flux in enumerate(fluxes)}
flux_param_names = ", ".join((f":{param}" for param in params))
conn.execute(text(sql.format(fluxes=flux_param_names)).bindparams(name=name, **params))
# Stock change indicators are groups of fluxes that are added or subtracted from each other.
sql = load_sql("10_populate_stock_changes", db_type, classifiers, batch)[0]
for name, details in indicators["stock_changes"].items():
add_flux_params = {f"add_flux{i}": flux for i, flux in enumerate(details.get("+") or ["_"])}
add_flux_param_names = ", ".join((f":{param}" for param in add_flux_params))
sub_flux_params = {f"sub_flux{i}": flux for i, flux in enumerate(details.get("-") or ["_"])}
sub_flux_param_names = ", ".join((f":{param}" for param in sub_flux_params))
params = {}
params.update(add_flux_params)
params.update(sub_flux_params)
conn.execute(
text(sql.format(add_fluxes=add_flux_param_names, subtract_fluxes=sub_flux_param_names))
.bindparams(name=name, **params))
for query in ("11_create_flux_indicator_aggregates_view", "12_create_stock_change_indicators_view"):
for sql in load_sql(query, db_type, classifiers, batch):
conn.execute(text(sql))
# Pool indicators report on named collections of pools.
sql = load_sql("13_populate_pool_indicators", db_type, classifiers, batch)
executemany(conn, sql, [{"name": name, "pools": pools}
for name, pools in indicators["pool_indicators"].items()])
for query in ("14_create_pool_indicators_view", "15_create_dist_indicators_view",
"16_create_age_indicators_view", "17_create_error_indicators_view",
"18_create_total_disturbed_areas_view", "20_create_disturbance_fluxes_view"):
for sql in load_sql(query, db_type, classifiers, batch):
conn.execute(text(sql))
density_sql = load_sql("19_create_density_views", db_type, classifiers, batch)
if density_sql:
sql = density_sql[0]
for table in ("v_flux_indicators", "v_flux_indicator_aggregates", "v_stock_change_indicators"):
conn.execute(text(sql.format(indicator_table=table)))
if cleanup:
table_suffix = f"_{batch}" if batch else ""
for sql in [
"DROP TABLE IF EXISTS agearea{} CASCADE;",
"DROP TABLE IF EXISTS ageclassdimension{} CASCADE;",
"DROP TABLE IF EXISTS classifiersetdimension{} CASCADE;",
"DROP TABLE IF EXISTS datedimension{} CASCADE;",
"DROP TABLE IF EXISTS disturbancedimension{} CASCADE;",
"DROP TABLE IF EXISTS disturbancetypedimension{} CASCADE;",
"DROP TABLE IF EXISTS errordimension{} CASCADE;",
"DROP TABLE IF EXISTS fluxes{} CASCADE;",
"DROP TABLE IF EXISTS landclassdimension{} CASCADE;",
"DROP TABLE IF EXISTS locationdimension{} CASCADE;",
"DROP TABLE IF EXISTS locationerrordimension{} CASCADE;",
"DROP TABLE IF EXISTS moduleinfodimension{} CASCADE;",
"DROP TABLE IF EXISTS pools{} CASCADE;",
"DROP TABLE IF EXISTS r_change_type_categories{} CASCADE;",
"DROP TABLE IF EXISTS r_pool_collections{} CASCADE;",
"DROP TABLE IF EXISTS r_pool_collection_pools{} CASCADE;",
"DROP TABLE IF EXISTS r_flux_indicators{} CASCADE;",
"DROP TABLE IF EXISTS r_flux_indicator_collections{} CASCADE;",
"DROP TABLE IF EXISTS r_flux_indicator_collection_flux_indicators{} CASCADE;",
"DROP TABLE IF EXISTS r_stock_changes{} CASCADE;",
"DROP TABLE IF EXISTS r_pool_indicators{} CASCADE;",
"DROP TABLE IF EXISTS r_location{} CASCADE;",
"DROP TABLE IF EXISTS r_stand_area{} CASCADE;",
]:
conn.execute(text(sql.format(table_suffix)))
conn.commit()
def copy_reporting_tables(from_conn, from_schema, to_conn, to_schema=None):
logging.info("Copying reporting tables to output database...")
md = MetaData()
md.reflect(bind=from_conn, schema=from_schema, views=True,
only=lambda table_name, _: table_name.startswith("v_"))
output_md = MetaData(schema=to_schema)
for fqn, table in md.tables.items():
logging.info(f" {fqn}")
table.to_metadata(output_md, schema=None)
output_table = Table(table.name, output_md)
output_table.drop(to_conn, checkfirst=True)
output_table.create(to_conn)
batch = []
for i, row in enumerate(from_conn.execute(select(table))):
batch.append({k: v for k, v in row._mapping.items()})
if i % 10000 == 0:
to_conn.execute(insert(output_table), batch)
batch = []
if batch:
to_conn.execute(insert(output_table), batch)
to_conn.commit()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(asctime)s %(message)s",
datefmt="%m/%d %H:%M:%S")
parser = ArgumentParser(description="Produce reporting tables from raw GCBM results. For connection strings, "
"see https://docs.sqlalchemy.org/en/latest/core/engines.html#database-urls")
parser.add_argument("results_db", help="connection string for the simulation results database")
parser.add_argument("--results_schema", help="name of the schema containing the simulation results", type=str.lower)
parser.add_argument("--output_db", help="connection string for the database to copy final reporting tables to")
parser.add_argument("--output_schema", help="name of the schema to copy final reporting tables to", type=str.lower)
parser.add_argument("--indicator_config", help="indicator configuration file - defaults to a generic set")
args = parser.parse_args()
results_db_engine = create_engine(args.results_db, future=True)
conn = results_db_engine.connect()
if args.results_schema:
conn.execute(text(f"SET SEARCH_PATH={args.results_schema}"))
# Some queries are specialized for distributed runs using Postgres; these are the
# files with the _pg suffix in sql\.
db_type = "pg" if args.results_db.startswith("postgres") else ""
indicators = json.load(open(
args.indicator_config
or os.path.join(os.path.dirname(__file__), "compileresults.json")))
compile_results(conn, db_type, indicators)
if args.output_db:
output_db_engine = create_engine(args.output_db, future=True)
output_conn = output_db_engine.connect()
if args.output_schema:
output_conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {args.output_schema}"))
copy_reporting_tables(conn, args.results_schema, output_conn, args.output_schema)