diff --git a/disco/cli/disco.py b/disco/cli/disco.py index bbe823b4..58a3b670 100644 --- a/disco/cli/disco.py +++ b/disco/cli/disco.py @@ -19,6 +19,7 @@ from disco.cli.create_pipeline import create_pipeline from disco.cli.ingest_tables import ingest_tables from disco.cli.make_summary_tables import make_summary_tables +from disco.cli.select_time_points import select_time_points from disco.cli.summarize_hosting_capacity import summarize_hosting_capacity from disco.cli.config_generic_models import config_generic_models from disco.cli.upgrade_cost_analysis import upgrade_cost_analysis @@ -52,6 +53,7 @@ def cli(): cli.add_command(ingest_tables) cli.add_command(install_extensions) cli.add_command(make_summary_tables) +cli.add_command(select_time_points) cli.add_command(summarize_hosting_capacity) cli.add_command(upgrade_cost_analysis) cli.add_command(hosting_capacity_by_timestep) diff --git a/disco/cli/select_time_points.py b/disco/cli/select_time_points.py new file mode 100644 index 00000000..70721a83 --- /dev/null +++ b/disco/cli/select_time_points.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python + +""" +Down-selects load shape time points in the circuit based on +user-specified critical conditions. +""" + +import logging +import shutil +import sys +from pathlib import Path + +import click + +from jade.loggers import setup_logging +from jade.utils.utils import get_cli_string + +from disco.preprocess.select_timepoints2 import ( + CriticalCondition, + DemandCategory, + GenerationCategory, + main, +) + + +logger = logging.getLogger(__name__) + + +@click.command() +@click.argument("master_file", type=click.Path(exists=True), callback=lambda *x: Path(x[2])) +@click.option( + "-c", + "--critical-conditions", + type=click.Choice([x.value for x in CriticalCondition]), + default=tuple(x.value for x in CriticalCondition), + show_default=True, + multiple=True, + callback=lambda *x: tuple(CriticalCondition(y) for y in x[2]), + help="critical conditions to use for time-point selection", +) +@click.option( + "-d", + "--demand-categories", + type=click.Choice([x.value for x in DemandCategory]), + default=tuple(x.value for x in DemandCategory), + show_default=True, + multiple=True, + callback=lambda *x: tuple(DemandCategory(y) for y in x[2]), + help="Demand-based devices to use in time-point selection algorithm", +) +@click.option( + "-g", + "--generation-categories", + type=click.Choice([x.value for x in GenerationCategory]), + default=tuple(x.value for x in GenerationCategory), + show_default=True, + multiple=True, + callback=lambda *x: tuple(GenerationCategory(y) for y in x[2]), + help="Generation-based devices to use in time-point selection algorithm", +) +@click.option( + "-o", + "--output", + default="output_time_points", + callback=lambda *x: Path(x[2]), + help="Output directory", +) +@click.option( + "--create-new-circuit/--no-create-new-circuit", + default=True, + is_flag=True, + show_default=True, + help="Create new circuit with down-selected time points.", +) +@click.option( + "--fix-master-file/--no-fix-master-file", + is_flag=True, + show_default=True, + default=False, + help="Remove commands in the Master.dss file that interfere with time-point selection.", +) +@click.option( + "-f", + "--force", + is_flag=True, + show_default=True, + default=False, + help="Delete output directory if it exists.", +) +@click.option( + "-v", + "--verbose", + is_flag=True, + show_default=True, + default=False, + help="Enabled debug logging.", +) +def select_time_points( + master_file, + demand_categories, + generation_categories, + critical_conditions, + output, + create_new_circuit, + fix_master_file, + force, + verbose, +): + """Select load shape time points in the circuit based on the specified critical conditions. + + By default, the Master.dss file is not allowed to enable time-series mode. Specify + --fix-master-file to disable time-series mode and other disallowed parameters. + + """ + if output.exists(): + if force: + shutil.rmtree(output) + else: + print( + f"Output directory {output} exists. Choose a different path or set --force.", + file=sys.stderr, + ) + sys.exit(1) + + output.mkdir() + level = logging.DEBUG if verbose else logging.INFO + log_file = output / "disco.log" + setup_logging("disco", log_file, console_level=level, packages=["disco"]) + logger.info(get_cli_string()) + categories = {"demand": demand_categories, "generation": generation_categories} + main( + master_file, + categories=categories, + critical_conditions=critical_conditions, + destination_dir=output, + create_new_circuit=create_new_circuit, + fix_master_file=fix_master_file, + recreate_profiles=False, + ) diff --git a/disco/preprocess/__init__.py b/disco/preprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/disco/preprocess/select_timepoints.py b/disco/preprocess/select_timepoints.py new file mode 100644 index 00000000..a1f2bcae --- /dev/null +++ b/disco/preprocess/select_timepoints.py @@ -0,0 +1,227 @@ +# -*- coding: utf-8 -*- +""" +Created on Fri Nov 25 07:43:46 2022 + +@author: ksedzro +""" + +import os +import numpy as np +import pandas as pd +import time + + + +def get_parameter(line, parameter_name, func=None): + if func!=None: + val = func(line.split(f'{parameter_name}=')[1].split(' ')[0]) + + else: + val = line.split(f'{parameter_name}=')[1].split(' ')[0] + if '\n' in val: + val = val.strip('\n') + + return val + + +def collect_category(file_path, bus_data, category='demand'): + """ + file_path can be: + loads.dss path + EVloads.dss path + PVsystems.dss path + category must be 'demand' or 'generation' + if file_path points to loads or EV loads, category must be 'demand' + if file_path points to PV systems or DERs, category must be 'generation' + """ + assert category in ['demand', 'generation'] + with open(file_path, 'r') as lr: + lines = lr.readlines() + + for line in lines: + if line.lower().startswith('new'): + line = line.lower() + size = 0 + profile_name = '' + bus = get_parameter(line, 'bus1') + #bus = line.split('bus1=')[1].split(' ')[0] + if '.' in bus: + bus = bus.split('.')[0] + if not bus in bus_data.keys(): + bus_data[bus] = {'bus':bus, 'demand':[], 'generation':[]} + + if 'kw=' in line: + size = get_parameter(line, 'kw', float) + # load_size = float(line.split('kw=')[1].split(' ')[0]) + elif 'kva=' in line: + size = get_parameter(line, 'kva', float) + # load_size = float(line.split('kva=')[1].split(' ')[0]) + elif 'pmpp=' in line: + size = get_parameter(line, 'pmpp', float) + if 'yearly' in line: + profile_name = get_parameter(line, 'yearly') + elif 'daily' in line: + profile_name = get_parameter(line, 'daily') + + bus_data[bus][category].append([size, profile_name]) + + return bus_data + +def collect_profiles(profile_files): + """ + This function builds a dictionary called profile_data. + The profile_data collects for each timeseries profile: + the name + the path to the actual CV profile data file + the numpy array of the profile timeseries data + It also builds and stores a new destination path where new reduced profile timeseries will be written + INPUT: + profile_files: a list of paths pointing to .dss profile files such as LoadShapes.dss, PVshapes.dss, etc. + OUTPUT: + proile_data: a dictionary (see description above) + """ + profile_data = {} + for file_path in profile_files: + base_path, _ = os.path.split(file_path) + with open(file_path, 'r') as lr: + lines = lr.readlines() + for line in lines: + if line.lower().startswith('new'): + line = line.lower() + profile_name = line.split('loadshape.')[1].split(' ')[0] + rel_path = line.split('file=')[1].split(')')[0] + profile_path = os.path.join(base_path, rel_path) + with open(profile_path) as pr: + profile_array = np.loadtxt(pr, delimiter=",") + folder, filename = os.path.split(profile_path) + copy_dir = folder+'-new' + if not os.path.exists(copy_dir): + os.mkdir(copy_dir) + new_profile_path = os.path.join(copy_dir, filename) + profile_data[profile_name] = {'profile_name': profile_name, + 'profile_path': profile_path, + 'new_profile_path': new_profile_path, + 'time_series': profile_array + } + + return profile_data + +def agregate_series(bus_data, profile_data, critical_conditions): + ag_series = {} + critical_time_indices = [] + head_critical_time_indices = [] + for bus, dic in bus_data.items(): + ag_series[bus] = {'critical_time_idx':[],'condition':[],} + + if dic['demand']: + for data in dic['demand']: + + if 'demand' in ag_series[bus].keys(): + ag_series[bus]['demand'] += data[0]*profile_data[data[1]]['time_series'] + else: + ag_series[bus]['demand'] = data[0]*profile_data[data[1]]['time_series'] + if 'max_demand' in critical_conditions: + max_demand_idx = np.where(ag_series[bus]['demand'] == np.amax(ag_series[bus]['demand']))[0].tolist()[0] + ag_series[bus]['critical_time_idx'].append(max_demand_idx) + ag_series[bus]['condition'].append("max_demand") + + if 'min_demand' in critical_conditions: + min_demand_idx = np.where(ag_series[bus]['demand'] == np.amin(ag_series[bus]['demand']))[0].tolist()[0] + ag_series[bus]['critical_time_idx'].append(min_demand_idx) + ag_series[bus]['condition'].append("min_demand") + # ag_series[bus]['critical_time_idx'] += [max_demand_idx, min_demand_idx] + + if dic['generation']: + for data in dic['generation']: + if 'generation' in ag_series[bus].keys(): + ag_series[bus]['generation'] += data[0]*profile_data[data[1]]['time_series'] + else: + ag_series[bus]['generation'] = data[0]*profile_data[data[1]]['time_series'] + if 'max_generation' in critical_conditions: + max_gen_idx = np.where(ag_series[bus]['generation'] == np.amax(ag_series[bus]['generation']))[0].tolist()[0] + ag_series[bus]['critical_time_idx'].append(max_gen_idx) + ag_series[bus]['condition'].append("max_generation") + if 'demand' in ag_series[bus].keys() and 'max_net_generation' in critical_conditions: + arr = ag_series[bus]['generation'] - ag_series[bus]['demand'] + max_netgen_idx = np.where(arr == np.amax(arr))[0].tolist()[0] + ag_series[bus]['critical_time_idx'].append(max_netgen_idx) + ag_series[bus]['condition'].append("max_net_generation") + + total_gen = sum([dic['generation'] for bus, dic in ag_series.items() + if 'generation' in dic.keys()]) + total_dem = sum([dic['demand'] for bus, dic in ag_series.items() + if 'demand' in dic.keys()]) + net_total_gen = total_gen - total_dem + if 'max_demand' in critical_conditions: + max_demand_idx = np.where(total_dem == np.amax(total_dem))[0].tolist()[0] + head_critical_time_indices.append(max_demand_idx) + if 'min_demand' in critical_conditions: + min_demand_idx = np.where(total_dem == np.amin(total_dem))[0].tolist()[0] + head_critical_time_indices.append(min_demand_idx) + if 'max_generation' in critical_conditions: + max_gen_idx = np.where(total_gen == np.amax(total_gen))[0].tolist()[0] + head_critical_time_indices.append(max_gen_idx) + if 'max_net_generation' in critical_conditions: + max_netgen_idx = np.where(net_total_gen == np.amax(net_total_gen))[0].tolist()[0] + head_critical_time_indices.append(max_netgen_idx) + + critical_time_indices = [t for bus, dic in ag_series.items() + for t in dic['critical_time_idx'] + if 'critical_time_idx' in dic.keys()] + critical_time_indices += head_critical_time_indices + critical_time_indices = list(set(critical_time_indices)) + critical_time_indices.sort() + for profile, val in profile_data.items(): + base_len = len(val['time_series']) + compression_rate = len(critical_time_indices)/base_len + data = val['time_series'][critical_time_indices] + pd.DataFrame(data).to_csv(val['new_profile_path'], + index=False, header=False) + + return ag_series, head_critical_time_indices, critical_time_indices, compression_rate + + +def main(category_path_dict, + profile_files, + critical_conditions=['max_demand', 'min_demand', 'max_generation', 'max_net_generation']): + """ + INPUT: + category_path_dict: a dictionary where: + the keys are power conversion asset categories: "demand" and "generation" + the values are a list of paths pointing to the corresponding power conversions assets' .dss files such as "Loads.dss" and "PVSystems.dss" files + example: category_path_dict = {'demand': [LOAD_PATH, EVLOAD_PATH], 'generation': [PV_PATH]} + profile_files: a list of paths pointing to shape (profile) files such as "LoadShapes.dss" + OUTPUT: + bus_data: + profile_data: + ag_series: + head_time_indices: list of critical time indices when only feeder-head timeseries are considered + critical_time_indices: all critical time indices (individual buses as well as feeder-head considered) + compression_rate: ratio between the number of critical timepoints and the total number of timepoints in the timeseries + + """ + + bus_data={} + for category, file_paths in category_path_dict.items(): + for file_path in file_paths: + bus_data = collect_category(file_path, bus_data, category=category) + profile_data = collect_profiles(profile_files) + ag_series, head_time_indices, critical_time_indices, compression_rate = \ + agregate_series(bus_data, profile_data, critical_conditions) + return bus_data, profile_data, ag_series, head_time_indices, critical_time_indices, compression_rate + + +if __name__ == "__main__": + st = time.time() + PV_PATH = r"C:\Users\KSEDZRO\Documents\Projects\LA-equity-resilience\data\P12U\sb9_p12uhs3_1247_trans_264--p12udt8475\190\PVSystems.dss" + LOAD_PATH = r"C:\Users\KSEDZRO\Documents\Projects\LA-equity-resilience\data\P12U\sb9_p12uhs3_1247_trans_264--p12udt8475\Loads.dss" + LOADSHAPES_PATH = r"C:\Users\KSEDZRO\Documents\Projects\LA-equity-resilience\data\P12U\sb9_p12uhs3_1247_trans_264--p12udt8475\LoadShapes.dss" + PVSHAPES_PATH = r"C:\Users\KSEDZRO\Documents\Projects\LA-equity-resilience\data\P12U\sb9_p12uhs3_1247_trans_264--p12udt8475\PVShapes.dss" + category_path_dict = {'demand': [LOAD_PATH], 'generation': [PV_PATH]} + profile_files = [LOADSHAPES_PATH, PVSHAPES_PATH] + critical_conditions = ['max_demand', 'max_net_generation'] + + bus_data, profile_data, ag_series, head_time_indices, critical_time_indices, compression_rate = \ + main(category_path_dict, profile_files, critical_conditions) + et = time.time() + elapse_time = et-st diff --git a/disco/preprocess/select_timepoints2.py b/disco/preprocess/select_timepoints2.py new file mode 100644 index 00000000..e5f15936 --- /dev/null +++ b/disco/preprocess/select_timepoints2.py @@ -0,0 +1,562 @@ +# -*- coding: utf-8 -*- +""" +Created on Fri Dec 2 12:13:24 2022 + +@author: ksedzro +""" + +import filecmp +import enum +import logging +import shutil +import time +from pathlib import Path + +import numpy as np +import pandas as pd +import opendssdirect as dss + + +logger = logging.getLogger(__name__) + + +_DISALLOWED_OPEN_DSS_COMMANDS = ("export", "plot", "show") +_SOLVE_LINE = "Solve mode=snapshot\n" + + +class CriticalCondition(enum.Enum): + """Possible critical conditions to use for time-point selection""" + + MAX_DEMAND = "max_demand" + MIN_DEMAND = "min_demand" + MAX_GENERATION = "max_generation" + MAX_NET_GENERATION = "max_net_generation" + + +class DemandCategory(enum.Enum): + + LOAD = "load" + + +class GenerationCategory(enum.Enum): + + PV_SYSTEM = "pv_system" + STORAGE = "storage" + + +class InvalidParameter(Exception): + """Raised when user input is invalid""" + + +def load_feeder(path_to_master: Path, destination_dir: Path, fix_master_file: bool): + """Compile an OpenDSS circuit after first ensuring that time-series mode is disabled.""" + if not path_to_master.exists(): + raise FileNotFoundError(path_to_master) + + if fix_master_file: + new_master = make_fixes_to_master(path_to_master) + else: + new_master = path_to_master + check_master_file(new_master) + + try: + dss.Text.Command(f"redirect {new_master}") + shutil.copyfile(new_master, destination_dir / new_master.name) + logger.info("Redirected to %s", new_master) + finally: + if path_to_master != new_master: + new_master.unlink() + + +def make_fixes_to_master(master_file): + suffix = master_file.suffix + new_master = master_file.parent / master_file.name.replace(suffix, f"_new{suffix}") + + def has_invalid_command(line): + for command in _DISALLOWED_OPEN_DSS_COMMANDS: + if line.startswith(command): + return True + return False + + with open(new_master, "w") as f_out: # overwrite file if it exists + with open(master_file, "r") as f_in: + for line in f_in: + lowered = line.strip().lower() + if lowered.startswith("solve") or has_invalid_command(lowered): + logger.warning("Removing line from new Master.dss: %s", line.strip()) + else: + f_out.write(line) + f_out.write(_SOLVE_LINE) + + return new_master + + +def check_master_file(master_file: Path): + comment_chars = ("!", "#") + + def check_invalid_command(line): + for invalid_command in _DISALLOWED_OPEN_DSS_COMMANDS: + if line.startswith(invalid_command): + raise InvalidParameter(f"The command {invalid_command} is not allowed.") + + def is_commented(line): + for comment_char in comment_chars: + if line.startswith(comment_char): + return True + return False + + with open(master_file) as f_in: + found_solve = False + for line in f_in: + lowered = line.strip().lower() + if not lowered or is_commented(line): + continue + check_invalid_command(lowered) + if "solve" in lowered: + if lowered not in ("solve", _SOLVE_LINE.strip().lower()): + raise InvalidParameter( + "The solve command cannot have parameters besides mode=snapshot: " + f"{line.strip()}: {master_file}" + ) + if found_solve: + raise InvalidParameter( + f"Cannot have more than one call to Solve: {master_file}" + ) + found_solve = True + + +def get_profile(): + """Return the profile of the currenlty-selected OpenDSS element. + + Returns + ------- + str + Return "" if there is no load shape profile attached. + + """ + profile = dss.Properties.Value("yearly") + if not profile: + profile = dss.Properties.Value("daily") + if not profile: + profile = dss.Properties.Value("duty") + return profile + + +def get_param_values(param_class, bus_data, category): + def get_bus(): + bus = dss.Properties.Value("bus1") + if "." in bus: + bus = dss.Properties.Value("bus1").split(".")[0] + return bus + + if param_class == DemandCategory.LOAD: + flag = dss.Loads.First() + while flag > 0: + bus = get_bus() + if not bus in bus_data.keys(): + bus_data[bus] = {"bus": bus, "demand": [], "generation": []} + capacity = 0 + size = "" + if not size: + size = dss.Properties.Value("kW") + if not size: + size = dss.Properties.Value("kva") + if size: + capacity = float(size) + else: + raise Exception(f"Did not find size for {dss.CktElement.Name()}") + + profile_name = get_profile() + if not profile_name: + raise Exception(f"Did not find profile name for {dss.CktElement.Name()}") + bus_data[bus][category].append([capacity, profile_name]) + flag = dss.Loads.Next() + + elif param_class == GenerationCategory.PV_SYSTEM: + flag = dss.PVsystems.First() + while flag > 0: + bus = get_bus() + if not bus in bus_data.keys(): + bus_data[bus] = {"bus": bus, "demand": [], "generation": []} + capacity = 0 + size = "" + if not size: + size = dss.Properties.Value("pmpp") + if not size: + size = dss.Properties.Value("kva") + if size: + capacity = float(size) + else: + raise Exception(f"Did not find size for {dss.CktElement.Name()}") + + profile_name = get_profile() + if not profile_name: + raise Exception(f"Did not find profile name for {dss.CktElement.Name()}") + bus_data[bus][category].append([capacity, profile_name]) + flag = dss.PVsystems.Next() + + elif param_class == GenerationCategory.STORAGE: + flag = dss.Storages.First() + while flag > 0: + bus = get_bus() + if not bus in bus_data.keys(): + bus_data[bus] = {"bus": bus, "demand": [], "generation": []} + capacity = 0 + size = "" + if not size: + size = dss.Properties.Value("kwrated") + if not size: + size = dss.Properties.Value("kva") + if size: + capacity = float(size) + else: + raise Exception(f"Did not find size for {dss.CktElement.Name()}") + + profile_name = get_profile() + if not profile_name: + raise Exception(f"Did not find profile name for {dss.CktElement.Name()}") + bus_data[bus][category].append([capacity, profile_name]) + flag = dss.Storages.Next() + + else: + raise Exception(f"Invalid param_class={param_class}") + + return bus_data + + +def reset_profile_data(used_profiles, critical_time_indices, profile_types=("active", "reactive")): + flag = dss.LoadShape.First() + while flag > 0: + name = dss.LoadShape.Name() + number_of_timepoints = len(critical_time_indices) + # TODO: Kwami, should there be error checking on profile_types? + original_p_mult = None + original_q_mult = None + + if name in used_profiles: + if "active" in profile_types: + original_p_mult = dss.LoadShape.PMult() + + if "reactive" in profile_types: + original_q_mult = dss.LoadShape.QMult() + + dss.LoadShape.Npts(number_of_timepoints) + + if original_p_mult is not None and len(original_p_mult) > 1: + if len(original_p_mult) > max(critical_time_indices): + dss.LoadShape.PMult(list(np.array(original_p_mult)[critical_time_indices])) + else: + raise Exception("IndexError: Index out of range") + if original_q_mult is not None and len(original_q_mult) > 1: + if len(original_q_mult) > max(critical_time_indices): + dss.LoadShape.QMult(list(np.array(original_q_mult)[critical_time_indices])) + else: + raise Exception("IndexError: Index out of range") + flag = dss.LoadShape.Next() + + +def save_circuit(output_folder: Path): + """Run the OpenDSS command to save a compiled circuit into a directory.""" + dss.Text.Command(f"Save Circuit Dir={output_folder}") + logger.info("Saved circuit to %s", output_folder) + + +def export_power_flow_results(path: Path): + """Export OpenDSS circuit elemement values into a directory.""" + path.mkdir(exist_ok=True) + for export_type, filename in { + "currents": "currents.csv", + "capacity": "capacity.csv", + "loads": "loads.csv", + "powers [mva]": "powers.csv", + "voltages": "voltages.csv", + }.items(): + dss.Text.Command(f"export {export_type} {path}/{filename}") + + +def compare_power_flow_results(before_path, after_path): + """Compare the exported results from two directories. + + Raises + ------ + Exception + Raised if the results do not match. + + """ + before = {x.name: x for x in before_path.iterdir()} + after = {x.name: x for x in after_path.iterdir()} + assert sorted(before.keys()) == sorted(after.keys()) + match = True + for name in before: + if not filecmp.cmp(before[name], after[name]): + logger.error("Files before=%s and after=%s do not match", before[name], after[name]) + match = False + + # TODO: csv comparisons have _mostly_ minor differences. + # if not match: + # raise Exception("Before/after power flow results do not match. Refer to the log file.") + + +def get_profile_data(): + """ + This function builds a dictionary called profile_data. + The profile_data collects for each timeseries profile: + the name + the numpy array of the profile timeseries data + INPUT: + None + OUTPUT: + proile_data: a dictionary (see description above) + """ + profile_data = {} + flag = dss.LoadShape.First() + while flag > 0: + profile_name = dss.LoadShape.Name() + profile_array = np.array(dss.LoadShape.PMult()) + if profile_name in profile_data: + raise Exception(f"Detected duplicate profile name: {profile_name}") + profile_data[profile_name] = { + "profile_name": profile_name, + "time_series": profile_array, + } + flag = dss.LoadShape.Next() + + return profile_data + + +def aggregate_series( + bus_data, + profile_data, + critical_conditions, + recreate_profiles, + destination_dir, + create_new_circuit, +): + ag_series = {} + critical_time_indices = [] + head_critical_time_indices = [] + used_profiles = [] + for bus, dic in bus_data.items(): + ag_series[bus] = { + "critical_time_idx": [], + "condition": [], + } + + if dic["demand"]: + for data in dic["demand"]: + if "demand" in ag_series[bus]: + ag_series[bus]["demand"] += data[0] * profile_data[data[1]]["time_series"] + else: + ag_series[bus]["demand"] = data[0] * profile_data[data[1]]["time_series"] + used_profiles.append(data[1]) + if CriticalCondition.MAX_DEMAND in critical_conditions: + max_demand_idx = np.where( + ag_series[bus]["demand"] == np.amax(ag_series[bus]["demand"]) + )[0].tolist()[0] + ag_series[bus]["critical_time_idx"].append(max_demand_idx) + ag_series[bus]["condition"].append(CriticalCondition.MAX_DEMAND) + + if CriticalCondition.MIN_DEMAND in critical_conditions: + min_demand_idx = np.where( + ag_series[bus]["demand"] == np.amin(ag_series[bus]["demand"]) + )[0].tolist()[0] + ag_series[bus]["critical_time_idx"].append(min_demand_idx) + ag_series[bus]["condition"].append(CriticalCondition.MIN_DEMAND) + # ag_series[bus]['critical_time_idx'] += [max_demand_idx, min_demand_idx] + + if dic["generation"]: + for data in dic["generation"]: + if "generation" in ag_series[bus]: + ag_series[bus]["generation"] += data[0] * profile_data[data[1]]["time_series"] + else: + ag_series[bus]["generation"] = data[0] * profile_data[data[1]]["time_series"] + used_profiles.append(data[1]) + if CriticalCondition.MAX_GENERATION in critical_conditions: + max_gen_idx = np.where( + ag_series[bus]["generation"] == np.amax(ag_series[bus]["generation"]) + )[0].tolist()[0] + ag_series[bus]["critical_time_idx"].append(max_gen_idx) + ag_series[bus]["condition"].append(CriticalCondition.MAX_GENERATION) + if ( + "demand" in ag_series[bus] + and CriticalCondition.MAX_NET_GENERATION in critical_conditions + ): + arr = ag_series[bus]["generation"] - ag_series[bus]["demand"] + max_netgen_idx = np.where(arr == np.amax(arr))[0].tolist()[0] + ag_series[bus]["critical_time_idx"].append(max_netgen_idx) + ag_series[bus]["condition"].append(CriticalCondition.MAX_NET_GENERATION) + + total_gen = sum([dic["generation"] for bus, dic in ag_series.items() if "generation" in dic]) + total_dem = sum([dic["demand"] for bus, dic in ag_series.items() if "demand" in dic]) + net_total_gen = total_gen - total_dem + if CriticalCondition.MAX_DEMAND in critical_conditions: + max_demand_idx = np.where(total_dem == np.amax(total_dem))[0].tolist()[0] + head_critical_time_indices.append(max_demand_idx) + if CriticalCondition.MIN_DEMAND in critical_conditions: + min_demand_idx = np.where(total_dem == np.amin(total_dem))[0].tolist()[0] + head_critical_time_indices.append(min_demand_idx) + if CriticalCondition.MAX_GENERATION in critical_conditions: + max_gen_idx = np.where(total_gen == np.amax(total_gen))[0].tolist()[0] + head_critical_time_indices.append(max_gen_idx) + if CriticalCondition.MAX_NET_GENERATION in critical_conditions: + max_netgen_idx = np.where(net_total_gen == np.amax(net_total_gen))[0].tolist()[0] + head_critical_time_indices.append(max_netgen_idx) + + critical_time_indices = [ + t + for bus, dic in ag_series.items() + for t in dic["critical_time_idx"] + if "critical_time_idx" in dic + ] + critical_time_indices += head_critical_time_indices + critical_time_indices = sorted(set(critical_time_indices)) + compression_rate = 0 + if recreate_profiles: + destination_profile_dir = destination_dir / "new_profiles" + destination_profile_dir.mkdir() + + for profile, val in profile_data.items(): + if profile in used_profiles: + base_len = len(val["time_series"]) + compression_rate = len(critical_time_indices) / base_len + if recreate_profiles: + data = val["time_series"][critical_time_indices] + new_profile_path = destination_profile_dir / f"{profile}.csv" + pd.DataFrame(data).to_csv(new_profile_path, index=False, header=False) + + if create_new_circuit: + reset_profile_data(used_profiles, critical_time_indices) + before_path = destination_dir / "power_flow_results_before" + after_path = destination_dir / "power_flow_results_after" + destination_model_dir = destination_dir / "new_model" + destination_model_dir.mkdir() + save_circuit(destination_model_dir) + master_file = destination_model_dir / "Master.dss" + with open(master_file, "a") as f_out: + f_out.write("Calcvoltagebases\n") + f_out.write("Solve\n") + + return ag_series, head_critical_time_indices, critical_time_indices, compression_rate + + +def get_metadata(ag_series, head_critical_time_indices, critical_conditions): + """ + This function exports metadata associated with selected timepoints. + INPUT: + ag_series: Dictionary + head_critical_time_indices: List + OUTPUT: + metadata_df: DataFrame + """ + buses_list = [] + critical_timepoint_list = [] + condition_list = [] + for bus, dic in ag_series.items(): + buses_list.append(bus) + critical_timepoint_list.append(dic["critical_time_idx"]) + condition_list.append(dic["condition"]) + buses_list.append("feederhead") + critical_timepoint_list.append(head_critical_time_indices) + condition_list.append(critical_conditions) + df = pd.DataFrame( + list(zip(buses_list, critical_timepoint_list, condition_list)), + columns=["buses_list", "critical_timepoint_list", "condition_list"], + ) + expanded_df = df.explode(["critical_timepoint_list", "condition_list"]) + expanded_df.loc[expanded_df["condition_list"].notnull(), "condition_list"] = expanded_df.loc[ + expanded_df["condition_list"].notnull() + ]["condition_list"].apply(lambda x: x.value) + metadata_df = expanded_df.groupby("critical_timepoint_list").agg(list) + return metadata_df + + +def main( + path_to_master: Path, + categories, + critical_conditions=tuple(x for x in CriticalCondition), + recreate_profiles=False, + destination_dir=None, + create_new_circuit=True, + fix_master_file=False, +): + """ + INPUT: + category_path_dict: a dictionary where: + the keys are power conversion asset categories: "demand" and "generation" + the values are a list of paths pointing to the corresponding power conversions assets' .dss files such as "Loads.dss" and "PVSystems.dss" files + example: category_path_dict = {'demand': [LOAD_PATH, EVLOAD_PATH], 'generation': [PV_PATH]} + profile_files: a list of paths pointing to shape (profile) files such as "LoadShapes.dss" + OUTPUT: + bus_data: + profile_data: + ag_series: + head_time_indices: list of critical time indices when only feeder-head timeseries are considered + critical_time_indices: all critical time indices (individual buses as well as feeder-head considered) + compression_rate: ratio between the number of critical timepoints and the total number of timepoints in the timeseries + + """ + critical_conditions = set(critical_conditions) + if not destination_dir.exists(): + raise FileNotFoundError(f"destination_dir={destination_dir} does not exist") + + bus_data = {} + load_feeder(path_to_master, destination_dir, fix_master_file) + for category, param_classes in categories.items(): + for param_class in param_classes: + bus_data = get_param_values(param_class, bus_data, category) + profile_data = get_profile_data() + ag_series, head_time_indices, critical_time_indices, compression_rate = aggregate_series( + bus_data, + profile_data, + critical_conditions, + recreate_profiles, + destination_dir, + create_new_circuit, + ) + metadata_df = get_metadata(ag_series, head_time_indices, critical_conditions) + metadata_df.to_csv(destination_dir / "metadata.csv") + + logger.info("head_time_indices = %s length = %s", head_time_indices, len(head_time_indices)) + logger.info( + "critical_time_indices = %s length = %s", critical_time_indices, len(critical_time_indices) + ) + + return ( + bus_data, + profile_data, + ag_series, + head_time_indices, + critical_time_indices, + compression_rate, + ) + + +if __name__ == "__main__": + path_to_master = Path( + r"tests/data/generic-models/p1uhs23_1247/p1udt21301/PVDeployments/p1uhs23_1247__p1udt21301__random__2__15.dss" + ) + destination = Path(r"test-output-timepoint") + destination.mkdir(exist_ok=True) + st = time.time() + category_class_dict = { + "demand": [DemandCategory.LOAD], + "generation": [GenerationCategory.PV_SYSTEM], + } + critical_conditions = [CriticalCondition.MAX_DEMAND, CriticalCondition.MAX_NET_GENERATION] + + ( + bus_data, + profile_data, + ag_series, + head_time_indices, + critical_time_indices, + compression_rate, + ) = main( + path_to_master, + category_class_dict, + critical_conditions=critical_conditions, + destination_dir=destination, + recreate_profiles=True, + ) + et = time.time() + elapse_time = et - st diff --git a/tests/integration/test_time_point_selection.py b/tests/integration/test_time_point_selection.py new file mode 100644 index 00000000..8b3fc78f --- /dev/null +++ b/tests/integration/test_time_point_selection.py @@ -0,0 +1,156 @@ +import fileinput +import math +import shutil +import subprocess +from pathlib import Path + +import opendssdirect as dss +import pytest + +from disco.preprocess.select_timepoints2 import ( + CriticalCondition, + DemandCategory, + GenerationCategory, + InvalidParameter, + main, + get_profile, +) + + +MASTER_FILE = ( + Path("tests") + / "data" + / "generic-models" + / "p1uhs23_1247" + / "p1udt21301" + / "PVDeployments" + / "p1uhs23_1247__p1udt21301__random__2__15.dss" +) + + +def test_time_point_selection(tmp_path): + categories = { + "demand": [DemandCategory.LOAD], + "generation": [GenerationCategory.PV_SYSTEM], + } + critical_conditions = [CriticalCondition.MAX_DEMAND, CriticalCondition.MAX_NET_GENERATION] + ( + bus_data, + profile_data, + ag_series, + head_time_indices, + critical_time_indices, + compression_rate, + ) = main( + MASTER_FILE, + categories=categories, + critical_conditions=critical_conditions, + destination_dir=tmp_path, + ) + dss.Text.Command(f"Clear") + dss.Text.Command(f"Redirect {MASTER_FILE}") + connected_profiles = get_connected_load_shape_profiles() + before = get_pmult_data(connected_profiles) + + new_master = tmp_path / "new_model" / "Master.dss" + assert new_master.exists() + dss.Text.Command(f"Clear") + dss.Text.Command(f"Redirect {new_master}") + after = get_pmult_data(connected_profiles) + assert sorted(before.keys()) == sorted(after.keys()) + + count = 0 + for name in before: + for i, main_index in enumerate(critical_time_indices): + num_decimals = 2 + val1 = round(before[name][main_index], num_decimals) + val2 = round(after[name][i], num_decimals) + assert math.isclose( + val1, val2 + ), f"Mismatch for LoadShape {name} at time_point={main_index} before={val1} after={val2}" + count += 1 + + +def test_invalid_master_file(tmp_path): + bad_file = MASTER_FILE.parent / (MASTER_FILE.name + ".tmp") + shutil.copyfile(MASTER_FILE, bad_file) + with fileinput.input(files=[bad_file], inplace=True) as f: + for line in f: + if "Solve" in line: + print("Solve mode=yearly") + else: + print(line, end="") + + categories = { + "demand": [DemandCategory.LOAD], + "generation": [GenerationCategory.PV_SYSTEM], + } + critical_conditions = [CriticalCondition.MAX_DEMAND, CriticalCondition.MAX_NET_GENERATION] + try: + with pytest.raises(InvalidParameter): + main( + bad_file, + categories=categories, + critical_conditions=critical_conditions, + destination_dir=tmp_path, + fix_master_file=False, + ) + main( + bad_file, + categories=categories, + critical_conditions=critical_conditions, + destination_dir=tmp_path, + fix_master_file=True, + ) + finally: + if bad_file.exists(): + bad_file.unlink() + + +def get_connected_load_shape_profiles(): + load_shapes = set() + for cls in (dss.Loads, dss.PVsystems, dss.Storages): + flag = cls.First() + while flag > 0: + profile = get_profile() + if profile: + load_shapes.add(profile) + flag = cls.Next() + + return load_shapes + + +def get_pmult_data(connected_profiles): + flag = dss.LoadShape.First() + load_shapes = {} + while flag > 0: + name = dss.LoadShape.Name() + if name in connected_profiles: + assert name not in load_shapes, name + load_shapes[name] = dss.LoadShape.PMult() + flag = dss.LoadShape.Next() + + return load_shapes + + +def test_time_point_selection_cli(tmp_path): + cmd = [ + "disco", + "select-time-points", + str(MASTER_FILE), + "-d", + "load", + "-g", + "pv_system", + "-c", + CriticalCondition.MAX_DEMAND.value, + "-c", + CriticalCondition.MIN_DEMAND.value, + "-c", + CriticalCondition.MAX_DEMAND.value, + "-c", + CriticalCondition.MAX_NET_GENERATION.value, + "-o", + str(tmp_path / "output"), + ] + subprocess.run(cmd, check=True)