Skip to content

Commit

Permalink
Merge pull request #1776 from claire-simpson/1603-unit-tests
Browse files Browse the repository at this point in the history
Add unit tests for 5 models
  • Loading branch information
emlys authored Feb 7, 2025
2 parents f1c0bb6 + 952ef9f commit 347c7a0
Show file tree
Hide file tree
Showing 5 changed files with 871 additions and 3 deletions.
146 changes: 144 additions & 2 deletions tests/test_annual_water_yield.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,49 @@
import unittest

import numpy
from shapely.geometry import Polygon

import pandas
import pygeoprocessing
from osgeo import gdal
from osgeo import gdal, ogr, osr

REGRESSION_DATA = os.path.join(
os.path.dirname(__file__), '..', 'data', 'invest-test-data', 'annual_water_yield')
SAMPLE_DATA = os.path.join(REGRESSION_DATA, 'input')
gdal.UseExceptions()


def make_watershed_vector(path_to_shp):
"""
Generate watershed results shapefile with two polygons
Args:
path_to_shp (str): path to store watershed results vector
Outputs:
None
"""
shapely_geometry_list = [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]),
Polygon([(2, 2), (3, 2), (3, 3), (2, 3), (2, 2)])
]
projection_wkt = osr.GetUserInputAsWKT("EPSG:4326")
vector_format = "ESRI Shapefile"
fields = {"hp_energy": ogr.OFTReal, "hp_val": ogr.OFTReal,
"ws_id": ogr.OFTReal, "rsupply_vl": ogr.OFTReal,
"wyield_mn": ogr.OFTReal, "wyield_vol": ogr.OFTReal,
"consum_mn": ogr.OFTReal, "consum_vol": ogr.OFTReal}
attribute_list = [
{"hp_energy": 1, "hp_val": 1, "ws_id": 0, "rsupply_vl": 2},
{"hp_energy": 11, "hp_val": 3, "ws_id": 1, "rsupply_vl": 52}
]

pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp, projection_wkt,
vector_format, fields,
attribute_list)


class AnnualWaterYieldTests(unittest.TestCase):
"""Regression Tests for Annual Water Yield Model."""

Expand Down Expand Up @@ -367,7 +401,6 @@ def test_validation(self):
self.assertTrue(
'but are not found in the valuation table' in
actual_message, actual_message)

# if the demand table is missing but the valuation table is present,
# make sure we have a validation error.
args_missing_demand_table = args.copy()
Expand All @@ -380,3 +413,112 @@ def test_validation(self):
self.assertEqual(
validation_warnings[0],
(['demand_table_path'], 'Input is required but has no value'))

def test_fractp_op(self):
"""Test `fractp_op`"""
from natcap.invest.annual_water_yield import fractp_op

# generate fake data
kc = numpy.array([[1, .1, .1], [.6, .6, .1]])
eto = numpy.array([[1000, 900, 900], [1100, 1005, 1000]])
precip = numpy.array([[100, 1000, 10], [500, 800, 1100]])
root = numpy.array([[99, 300, 400], [5, 500, 800]])
soil = numpy.array([[600, 700, 700], [800, 900, 600]])
pawc = numpy.array([[.11, .11, .12], [.55, .55, .19]])
veg = numpy.array([[1, 1, 0], [0, 1, 0]])
nodata_dict = {'eto': None, 'precip': None, 'depth_root': None,
'pawc': None, 'out_nodata': None}
seasonality_constant = 6

actual_fractp = fractp_op(kc, eto, precip, root, soil, pawc, veg,
nodata_dict, seasonality_constant)

# generated by running fractp_op
expected_fractp = numpy.array([[0.9345682, 0.06896508, 1.],
[1., 0.6487423, 0.09090909]],
dtype=numpy.float32)

numpy.testing.assert_allclose(actual_fractp, expected_fractp,
err_msg="Fractp does not match expected")

def test_compute_watershed_valuation(self):
"""Test `compute_watershed_valuation`, `compute_rsupply_volume`
and `compute_water_yield_volume`"""
from natcap.invest import annual_water_yield

def _create_watershed_results_vector(path_to_shp):
"""Generate a fake watershed results vector file."""
shapely_geometry_list = [
Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)]),
Polygon([(2, 2), (3, 2), (3, 3), (2, 3), (2, 2)])
]
projection_wkt = osr.GetUserInputAsWKT("EPSG:4326")
vector_format = "ESRI Shapefile"
fields = {"ws_id": ogr.OFTReal, "wyield_mn": ogr.OFTReal,
"consum_mn": ogr.OFTReal, "consum_vol": ogr.OFTReal}
attribute_list = [{"ws_id": 0, "wyield_mn": 990000,
"consum_mn": 500, "consum_vol": 50},
{"ws_id": 1, "wyield_mn": 800000,
"consum_mn": 600, "consum_vol": 70}]

pygeoprocessing.shapely_geometry_to_vector(shapely_geometry_list,
path_to_shp,
projection_wkt,
vector_format, fields,
attribute_list)

def _validate_fields(vector_path, field_name, expected_values, error_msg):
"""
Validate a specific field in the watershed results vector
by comparing actual to expected values. Expected values generated
by running the function.
Args:
vector path (str): path to watershed shapefile
field_name (str): attribute field to check
expected values (list): list of expected values for field
error_msg (str): what to print if assertion fails
Returns:
None
"""
with gdal.OpenEx(vector_path, gdal.OF_VECTOR | gdal.GA_Update) as ws_ds:
ws_layer = ws_ds.GetLayer()
actual_values = [ws_feat.GetField(field_name)
for ws_feat in ws_layer]
self.assertEqual(actual_values, expected_values, msg=error_msg)

# generate fake watershed results vector
watershed_results_vector_path = os.path.join(self.workspace_dir,
"watershed_results.shp")
_create_watershed_results_vector(watershed_results_vector_path)

# generate fake val_df
val_df = pandas.DataFrame({'efficiency': [.7, .8], 'height': [12, 50],
'fraction': [.9, .7], 'discount': [60, 20],
'time_span': [10, 10], 'cost': [100, 200],
'kw_price': [15, 20]})

# test water yield volume
annual_water_yield.compute_water_yield_volume(
watershed_results_vector_path)
_validate_fields(watershed_results_vector_path, "wyield_vol",
[990.0, 800.0],
"Error with water yield volume calculation.")

# test rsupply volume
annual_water_yield.compute_rsupply_volume(
watershed_results_vector_path)
_validate_fields(watershed_results_vector_path, "rsupply_vl",
[940.0, 730.0],
"Error calculating total realized water supply volume.")

# test compute watershed valuation
annual_water_yield.compute_watershed_valuation(
watershed_results_vector_path, val_df)
_validate_fields(watershed_results_vector_path, "hp_energy",
[19.329408, 55.5968],
"Error calculating energy.")
_validate_fields(watershed_results_vector_path, "hp_val",
[501.9029748723, 4587.91946857059],
"Error calculating net present value.")
63 changes: 63 additions & 0 deletions tests/test_carbon.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,69 @@ def test_carbon_full_undefined_nodata(self):
assert_raster_equal_value(
os.path.join(args['workspace_dir'], 'npv_redd.tif'), -0.4602106)

def test_generate_carbon_map(self):
"""Test `_generate_carbon_map`"""
from natcap.invest.carbon import _generate_carbon_map

def _make_simple_lulc_raster(base_raster_path):
"""Create a raster on designated path with arbitrary values.
Args:
base_raster_path (str): the raster path for making the new raster.
Returns:
None.
"""

array = numpy.array([[1, 1], [2, 3]], dtype=numpy.int32)

# UTM Zone 10N
srs = osr.SpatialReference()
srs.ImportFromEPSG(26910)
projection_wkt = srs.ExportToWkt()

origin = (461251, 4923245)
pixel_size = (1, 1)
no_data = -999

pygeoprocessing.numpy_array_to_raster(
array, no_data, pixel_size, origin, projection_wkt,
base_raster_path)

# generate a fake lulc raster
lulc_path = os.path.join(self.workspace_dir, "lulc.tif")
_make_simple_lulc_raster(lulc_path)

# make fake carbon pool dict
carbon_pool_by_type = {1: 5000, 2: 60, 3: 120}

out_carbon_stock_path = os.path.join(self.workspace_dir,
"carbon_stock.tif")

_generate_carbon_map(lulc_path, carbon_pool_by_type,
out_carbon_stock_path)

# open output carbon stock raster and check values
actual_carbon_stock = gdal.Open(out_carbon_stock_path)
band = actual_carbon_stock.GetRasterBand(1)
actual_carbon_stock = band.ReadAsArray()

expected_carbon_stock = numpy.array([[0.5, 0.5], [0.006, 0.012]],
dtype=numpy.float32)

numpy.testing.assert_array_equal(actual_carbon_stock,
expected_carbon_stock)

def test_calculate_valuation_constant(self):
"""Test `_calculate_valuation_constant`"""
from natcap.invest.carbon import _calculate_valuation_constant

valuation_constant = _calculate_valuation_constant(lulc_cur_year=2010,
lulc_fut_year=2012,
discount_rate=50,
rate_change=5,
price_per_metric_ton_of_c=50)
expected_valuation = 40.87302
self.assertEqual(round(valuation_constant, 5), expected_valuation)


class CarbonValidationTests(unittest.TestCase):
"""Tests for the Carbon Model MODEL_SPEC and validation."""
Expand Down
Loading

0 comments on commit 347c7a0

Please sign in to comment.