-
Notifications
You must be signed in to change notification settings - Fork 0
/
update_fmi.py
157 lines (125 loc) · 6.08 KB
/
update_fmi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import argparse
import urllib
import getpass
import requests
import pystac_client
import pandas as pd
import rasterio
import time
import json
from urllib.parse import urljoin
from pystac import Collection, Item
from utils.json_convert import convert_json_to_geoserver
from utils.retry_errors import retry_errors
def update_catalog(app_host, csc_catalog_client):
"""
The main updating function of the script. Checks the collection items in the FMI catalog and compares the to the ones in CSC catalog.
app_host - The REST API path for updating the collections
csc_catalog_client - The STAC API path for checking which items are already in the collections
"""
session = requests.Session()
session.auth = ("admin", pwd)
log_headers = {"User-Agent": "update-script"} # Added for easy log-filtering
# Get all FMI collections from the app_host
csc_collections = [col for col in csc_catalog_client.get_collections() if col.id.endswith("at_fmi")]
for collection in csc_collections:
derived_from = [link.target for link in collection.links if link.rel == "derived_from"]
# Some collections have wrongly configured Temporal Extents
try:
fmi_collection = Collection.from_file(derived_from[0])
except ValueError:
with urllib.request.urlopen(derived_from[0]) as url:
data = json.load(url)
data["extent"]["temporal"]["interval"] = [data["extent"]["temporal"]["interval"]]
fmi_collection = Collection.from_dict(data)
fmi_collection.id = collection.id
print(f"# Checking collection {collection.id}:")
fmi_collection_links = fmi_collection.get_child_links()
sub_collections = []
for link in fmi_collection_links:
# Some collections have wrongly configured Temporal Extents
try:
sub_collections.append(Collection.from_file(link.target))
except ValueError:
with urllib.request.urlopen(link.target) as url:
data = json.load(url)
data["extent"]["temporal"]["interval"] = [data["extent"]["temporal"]["interval"]]
sub_collections.append(Collection.from_dict(data))
item_links = list(set([link.target for sub in sub_collections for link in sub.get_item_links()]))
csc_item_ids = {item.id for item in collection.get_items()}
items = []
errors = []
for item in item_links:
try:
items.append(Item.from_file(item))
except Exception as e:
print(f" ! {e} on {item}")
errors.append(item)
# If there were connection errors during the item making process, the item generation for errors is retried
if len(errors) > 0:
retry_errors(items, errors)
print(" * All errors fixed")
print(f" * Number of items in CSC STAC and FMI: {len(csc_item_ids)}/{len(items)}")
for item in items:
if item.id not in csc_item_ids:
fmi_collection.add_item(item)
with rasterio.open(next(iter(item.assets.values())).href) as src:
item.extra_fields["gsd"] = src.res[0]
# 9391 EPSG code is false, replace by the standard 3067
if src.crs.to_epsg() == 9391:
item.properties["proj:epsg"] = 3067
else:
item.properties["proj:epsg"] = src.crs.to_epsg()
item.properties["proj:transform"] = [
src.transform.a,
src.transform.b,
src.transform.c,
src.transform.d,
src.transform.e,
src.transform.f,
src.transform.g,
src.transform.h,
src.transform.i
]
for asset in item.assets:
if item.assets[asset].roles is not list:
item.assets[asset].roles = [item.assets[asset].roles]
del item.extra_fields["license"]
item.remove_links("license")
item_dict = item.to_dict()
converted_item = convert_json_to_geoserver(item_dict)
request_point = f"collections/{collection.id}/products"
r = session.post(urljoin(app_host, request_point), headers=log_headers, json=converted_item)
r.raise_for_status()
print(f" + Added item {item.id}")
print(f" * All items present")
# Update the extents from the FMI collection
collection.extent = fmi_collection.extent
collection_dict = collection.to_dict()
converted_collection = convert_json_to_geoserver(collection_dict)
request_point = f"collections/{collection.id}/"
r = session.put(urljoin(app_host, request_point), headers=log_headers, json=converted_collection)
r.raise_for_status()
print(f" * Updated collection")
if __name__ == "__main__":
"""
The first check for REST API password is from a password file.
If a password file is not found, the script prompts the user to give a password through CLI
"""
pw_filename = '../passwords.txt'
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, help="Hostname of the selected STAC API", required=True)
args = parser.parse_args()
try:
pw_file = pd.read_csv(pw_filename, header=None)
pwd = pw_file.at[0,0]
except FileNotFoundError:
print("Password not given as an argument and no password file found")
pwd = getpass.getpass()
start = time.time()
app_host = f"{args.host}/geoserver/rest/oseo/"
csc_catalog_client = pystac_client.Client.open(f"{args.host}/geoserver/ogc/stac/v1/", headers={"User-Agent":"update-script"})
print(f"Updating STAC Catalog at {args.host}")
update_catalog(app_host, csc_catalog_client)
end = time.time()
print(f"Script took {end-start:.2f} seconds")