Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature automate data transformation #137

Open
wants to merge 8 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions data_transformation_plugins/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
## Information about the folder
This folder is a part of the `automation pipeline using DAG`. It contains the functions which are essential for transforming the given dataset into COGs. There are python files for transforming every dataset which will be used as plugins into the pipeline.

## How this folder fits in the automation pipeline
These functions/files are created by the `developer` to transform a single data file to COG. Once the COGs are validated, these python scripts are pushed to the `GHGC-SMCE S3` These files are then fetched by the `SM2A DAG` to complete the transformation of entire dataset automatically.

## Naming convention for the transformation files in the folder
- `name of python file` - `collectionname_transformation.py`
`collectionname` refers to the STAC collection name of the dataset followed by the word `transformation`. Make sure the `collectionname` within the filename matches with the `collectionname` passed as a `parameter` to the DAG.
Empty file.
47 changes: 47 additions & 0 deletions data_transformation_plugins/ecco_darwin_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import xarray
import re

def ecco_darwin_transformation(file_obj, name, nodata):
"""Tranformation function for the ecco darwin dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""
var_data_netcdf = {}
xds = xarray.open_dataset(file_obj)
xds = xds.rename({"y": "latitude", "x": "longitude"})
xds = xds.assign_coords(longitude=((xds.longitude / 1440) * 360) - 180).sortby(
"longitude"
)
xds = xds.assign_coords(latitude=((xds.latitude / 721) * 180) - 90).sortby(
"latitude"
)

variable = [var for var in xds.data_vars]

for _ in xds.time.values:
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved
for var in variable[2:]:
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved
filename = name.split("/")[-1]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace this with python function like

filename = os.path.basename(name)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But name is a parameter that is passed to the function.

filename_elements = re.split("[_ .]", filename)
data = xds[var]

data = data.reindex(latitude=list(reversed(data.latitude)))
data = data.where(data != nodata, -9999)
data.rio.set_spatial_dims("longitude", "latitude", inplace=True)
data.rio.write_crs("epsg:4326", inplace=True)
data.rio.write_nodata(-9999, inplace=True)

filename_elements.pop()
filename_elements[-1] = filename_elements[-2] + filename_elements[-1]
filename_elements.pop(-2)
# # insert date of generated COG into filename
cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"
var_data_netcdf[cog_filename] = data
return var_data_netcdf
SwordSaintLancelot marked this conversation as resolved.
Show resolved Hide resolved
38 changes: 38 additions & 0 deletions data_transformation_plugins/geos_oco2_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import xarray
import re

def geos_oco2_transformation(file_obj, name, nodata):
"""Tranformation function for the oco2 geos dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""
var_data_netcdf = {}
xds = xarray.open_dataset(file_obj)
xds = xds.assign_coords(lon=(((xds.lon + 180) % 360) - 180)).sortby("lon")
variable = [var for var in xds.data_vars]
for time_increment in range(0, len(xds.time)):
for var in variable:
filename = name.split("/ ")[-1]
filename_elements = re.split("[_ .]", filename)
data = getattr(xds.isel(time=time_increment), var)
data = data.isel(lat=slice(None, None, -1))
data = data.where(data != nodata, -9999)
data.rio.set_spatial_dims("lon", "lat", inplace=True)
data.rio.write_crs("epsg:4326", inplace=True)
data.rio.write_nodata(-9999, inplace=True)
# # insert date of generated COG into filename
filename_elements[-1] = filename_elements[-3]
filename_elements.insert(2, var)
filename_elements.pop(-3)
cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"
var_data_netcdf[cog_filename] = data

return var_data_netcdf
36 changes: 36 additions & 0 deletions data_transformation_plugins/gosat_ch4_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import xarray
import re

def gosat_ch4_transformation(file_obj, name, nodata):
"""Tranformation function for the ecco darwin dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""
var_data_netcdf = {}
ds = xarray.open_dataset(file_obj)
variable = [var for var in ds.data_vars]

for var in variable:
filename = name.split("/")[-1]
filename_elements = re.split("[_ .]", filename)
data = ds[var]
filename_elements.pop()
filename_elements.insert(2, var)
cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"

data = data.reindex(lat=list(reversed(data.lat)))
data = data.where(data != -9999, -9999)
data.rio.write_nodata(-9999, inplace=True)

data.rio.set_spatial_dims("lon", "lat")
data.rio.write_crs("epsg:4326", inplace=True)
var_data_netcdf[cog_filename] = data
return var_data_netcdf
33 changes: 33 additions & 0 deletions data_transformation_plugins/gpw_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import xarray
import re

def gpw_transformation(file_obj, name, nodata):
"""Tranformation function for the gridded population dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""

var_data_netcdf = {}
xds = xarray.open_dataarray(file_obj, engine="rasterio")

filename = name.split("/")[-1]
filename_elements = re.split("[_ .]", filename)
# # insert date of generated COG into filename
filename_elements.pop()
filename_elements.append(filename_elements[-3])
xds = xds.where(xds != nodata, -9999)
xds.rio.set_spatial_dims("x", "y", inplace=True)
xds.rio.write_crs("epsg:4326", inplace=True)
xds.rio.write_nodata(-9999, inplace=True)

cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"
var_data_netcdf[cog_filename] = xds
return var_data_netcdf
47 changes: 47 additions & 0 deletions data_transformation_plugins/push_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import boto3
import os

import boto3
import os

def upload_files_to_s3(folder_path, bucket_name, s3_folder, exclude_files):
"""
Uploads all files in a folder to a specified S3 folder only if the file does not already exist in S3,
and excluding specified files.

Parameters:
- folder_path (str): Path to the local folder containing files to upload.
- bucket_name (str): Name of the S3 bucket.
- s3_folder (str): Destination folder path in the S3 bucket.
- exclude_files (list): List of files to exclude from uploading.
"""
# Initialize S3 client
s3 = boto3.client('s3')

# Loop through files in the local folder
for file_name in os.listdir(folder_path):
file_path = os.path.join(folder_path, file_name)

# Check if it's a file and not in the exclude list
if os.path.isfile(file_path) and file_name not in exclude_files:
s3_key = os.path.join(s3_folder, file_name)

try:
# Check if the file already exists in S3
s3.head_object(Bucket=bucket_name, Key=s3_key)
print(f"Skipped {file_name} (already exists in S3)")
except s3.exceptions.ClientError as e:
# If the file does not exist, upload it
if e.response['Error']['Code'] == '404':
try:
s3.upload_file(file_path, bucket_name, s3_key)
print(f"Uploaded {file_name} to {s3_key}")
except Exception as upload_error:
print(f"Error uploading {file_name}: {upload_error}")
else:
print(f"Error checking existence of {file_name}: {e}")

# Example usage:
# upload_folder_to_s3("path/to/local/folder", "my-s3-bucket", "my/s3/folder", ["exclude1.ext", "exclude2.ext"])
if __name__ == "__main__":
upload_files_to_s3("data_transformation_plugins", "ghgc-data-store-develop", "data_transformation_plugins", ["__init__.py", "push_to_s3.py", "README.md"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import xarray
from datetime import datetime
import re

def tm54dvar_ch4flux_mask_monthgrid_v5_transformation(file_obj, name, nodata):
"""Tranformation function for the tm5 ch4 influx dataset

Args:
file_obj (s3fs object): s3fs sile object for one file of the dataset
name (str): name of the file to be transformed
nodata (int): Nodata value as specified by the data provider

Returns:
dict: Dictionary with the COG name and its corresponding data array.
"""

var_data_netcdf = {}
xds = xarray.open_dataset(file_obj)
xds = xds.rename({"latitude": "lat", "longitude": "lon"})
xds = xds.assign_coords(lon=(((xds.lon + 180) % 360) - 180)).sortby("lon")
variable = [var for var in xds.data_vars if "global" not in var]

for time_increment in range(0, len(xds.months)):
filename = name.split("/")[-1]
filename_elements = re.split("[_ .]", filename)
start_time = datetime(int(filename_elements[-2]), time_increment + 1, 1)
for var in variable:
data = getattr(xds.isel(months=time_increment), var)
data = data.isel(lat=slice(None, None, -1))
data = data.where(data != nodata, -9999)
data.rio.set_spatial_dims("lon", "lat", inplace=True)
data.rio.write_crs("epsg:4326", inplace=True)
data.rio.write_nodata(-9999, inplace=True)

# # insert date of generated COG into filename
filename_elements.pop()
filename_elements[-1] = start_time.strftime("%Y%m")
filename_elements.insert(2, var)
cog_filename = "_".join(filename_elements)
# # add extension
cog_filename = f"{cog_filename}.tif"
var_data_netcdf[cog_filename] = data

return var_data_netcdf
Loading