From 61db39a1eae00b2fc3d9ae97921ccc38eddbe82b Mon Sep 17 00:00:00 2001 From: Cleveland Date: Fri, 5 Jan 2024 08:11:52 -0700 Subject: [PATCH] Add paprallel file parsring (can be turned off with OPPPY_USE_THREADS=False --- opppy/dump_utils.py | 35 +++++++++++++--- opppy/output.py | 60 +++++++++++++++++++++------- opppy/plot_dump_dictionary.py | 10 ++++- opppy/tally.py | 75 ++++++++++++++++++++++++++--------- 4 files changed, 141 insertions(+), 39 deletions(-) diff --git a/opppy/dump_utils.py b/opppy/dump_utils.py index 0062a47..5aca34b 100644 --- a/opppy/dump_utils.py +++ b/opppy/dump_utils.py @@ -26,11 +26,17 @@ ''' from numpy import * +import os import sys import pickle from opppy.progress import progress +USE_THREADS = not os.getenv("OPPPY_USE_THREADS", 'True').lower() in ('false', '0', 'f') +if(USE_THREADS): + from multiprocessing import Process, Manager + + def point_value_1d(data, x_key, value_key, x_value, method='nearest'): ''' Grid data function. This function takes a 1D data structure from dictionary @@ -477,13 +483,32 @@ def append_dumps(data, dump_files, opppy_parser, key_words=None): append_date bool to specify if the data should be appended to the file name for tracking purposes ''' + total = len(dump_files) count = 0 - for dump in dump_files: - # append new dictionary data to the pickle file - data[dump.split('/')[-1]] = opppy_parser.build_data_dictionary(dump,key_words) - count += 1 - progress(count,total, 'of dump files read') + print('') + print("Number of files to be read: ", total) + if(USE_THREADS): + def thread_all(file_name, key_words, result_d): + result_d[file_name.split('/')[-1]] = opppy_parser.build_data_dictionary(file_name,key_words) + with Manager() as manager: + result_d = manager.dict() + threads = [] + for file_name in dump_files: + thread = Process(target=thread_all, args=(file_name, key_words, result_d,)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + count += 1 + progress(count,total, 'of input files read') + data.update(result_d) + else: + for dump in dump_files: + # append new dictionary data to the pickle file + data[dump.split('/')[-1]] = opppy_parser.build_data_dictionary(dump,key_words) + count += 1 + progress(count,total, 'of dump files read') print('') print('') diff --git a/opppy/output.py b/opppy/output.py index 8c4918d..9ff0634 100644 --- a/opppy/output.py +++ b/opppy/output.py @@ -26,11 +26,17 @@ import sys import pickle import io +import os import numpy as np from opppy.version import __version__ from opppy.progress import * +USE_THREADS = not os.getenv("OPPPY_USE_THREADS", 'True').lower() in ('false', '0', 'f') +if(USE_THREADS): + from multiprocessing import Process, Manager + + def append_cycle_data(cycle_data, data, sort_key_string): ''' @@ -330,7 +336,6 @@ def append_output_dictionary(data, output_files, opppy_parser, append_date=False print("This data dictionary has no version") print("This version of OPPPY is ", __version__) sys.exit(0) - time = '' if append_date: time = time+'.'+datetime.datetime.now().strftime ("%Y%m%d%H%M%S") @@ -338,25 +343,52 @@ def append_output_dictionary(data, output_files, opppy_parser, append_date=False total = len(output_files) print('') print("Number of files to be read: ", total) - cycle_string_list=[] + data_list = [] + if(USE_THREADS): + def thread_all(file_name, result_d): + thread_cycle_string_list = get_output_lines(file_name, opppy_parser.cycle_opening_string, + opppy_parser.cycle_closing_string, opppy_parser.file_end_string); + thread_data = [] + for cycle_string in thread_cycle_string_list: + thread_data.append(extract_cycle_data(cycle_string, opppy_parser)) + result_d[file_name]=thread_data + with Manager() as manager: + result_d = manager.dict() + threads = [] + for file_name in output_files: + thread = Process(target=thread_all, args=(file_name, result_d,)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + count += 1 + progress(count,total, 'of input files read') + for file_name in output_files: + data_list += result_d[file_name] + else: + cycle_string_list=[] + for file_name in output_files: + cycle_string_list+=get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string) + count += 1 + progress(count,total, 'of input files read') + + count = 0 + total = len(cycle_string_list) + print('') + print("Number of cycles to be parsed: ", total) + for cycle_string in cycle_string_list: + data_list.append(extract_cycle_data(cycle_string, opppy_parser)) + count += 1 + progress(count,total, 'of cycles parsed') + print('') + for file_name in output_files: - cycle_string_list+=get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string) if 'appended_files' in data: data['appended_files'].append(file_name.split('/')[-1]+time) else: data['appended_files'] = [file_name.split('/')[-1]+time] - count += 1 - progress(count,total, 'of input files read') - - total = len(cycle_string_list) - count = 0 - print('') - print("Number of cycles to be parsed: ", total) - for cycle_string in cycle_string_list: - cycle_data = extract_cycle_data(cycle_string, opppy_parser) + for cycle_data in data_list: data = append_cycle_data(cycle_data,data,opppy_parser.sort_key_string) - count += 1 - progress(count,total, 'of cycles parsed') print('') print('') diff --git a/opppy/plot_dump_dictionary.py b/opppy/plot_dump_dictionary.py index def737b..c0b9ea1 100644 --- a/opppy/plot_dump_dictionary.py +++ b/opppy/plot_dump_dictionary.py @@ -1000,8 +1000,14 @@ def init_contour(): xmax=(np.array(series_data[0][xname])*args.scale_x).max() ymin=(np.array(series_data[0][yname])*args.scale_y).min() ymax=(np.array(series_data[0][yname])*args.scale_y).max() + bias = 0.0 for data, index_value in zip(series_data, series_pair.index[index_key]): v = np.array(data[dname])*args.scale_value + if(args.log_scale): + bias = v.min() + bias = 0.0 if bias>0.0 else abs(bias) + v = np.array([ [math.log10(val+bias) if (val+bias)>0.0 else 0.0 + for val in vals] for vals in v]) x = np.array(data[xname])*args.scale_x y = np.array(data[yname])*args.scale_y vmin = min(v.min(),vmin) @@ -1059,8 +1065,8 @@ def init_contour(): PyPloter.legend(loc='best') if(args.data_bounds): - vmin = args.data_bounds[0] - vmax = args.data_bounds[1] + vmin = args.data_bounds[0] if not args.log_scale else math.log10(args.data_bounds[0]+bias) + vmax = args.data_bounds[1] if not args.log_scale else math.log10(args.data_bounds[1]+bias) imshow = PyPloter.imshow(series_pair.grid[0][dname], extent=(xmin,xmax,ymin,ymax), vmin=vmin, vmax=vmax, origin='lower', animated=True, cmap='jet') PyPloter.colorbar() diff --git a/opppy/tally.py b/opppy/tally.py index 0418d58..20a15ef 100644 --- a/opppy/tally.py +++ b/opppy/tally.py @@ -25,12 +25,18 @@ import sys import pickle import io +import os import numpy as np from opppy.version import __version__ from opppy.progress import * from opppy.output import * +USE_THREADS = not os.getenv("OPPPY_USE_THREADS", 'True').lower() in ('false', '0', 'f') +if(USE_THREADS): + from multiprocessing import Process, Manager + + def append_tally_data(cycle_data, data, sort_key_string): ''' @@ -239,25 +245,58 @@ def append_tally_dictionary(data, output_files, opppy_parser, append_date=False) total = len(output_files) print('') print("Number of files to be read: ", total) - cycle_string_list=[] - for file_name in output_files: - cycle_string_list+=get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string) - if 'appended_files' in data: - data['appended_files'].append(file_name.split('/')[-1]+time) - else: - data['appended_files'] = [file_name.split('/')[-1]+time] - count += 1 - progress(count,total, 'of input files read') + data_list = [] + if(USE_THREADS): + def thread_all(file_name, result_d): + thread_cycle_string_list = get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string) + thread_data=[] + for cycle_string in thread_cycle_string_list: + print(cycle_string) + thread_data.append(extract_cycle_data(cycle_string, opppy_parser)) + print(thread_data) + result_d[file_name] = thread_data - total = len(cycle_string_list) - count = 0 - print('') - print("Number of cycles to be parsed: ", total) - for cycle_string in cycle_string_list: - cycle_data = extract_cycle_data(cycle_string, opppy_parser) - data = append_tally_data(cycle_data,data,opppy_parser.sort_key_string) - count += 1 - progress(count,total, 'of cycles parsed') + with Manager() as manager: + result_d = manager.dict() + threads = [] + for file_name in output_files: + thread = Process(target=thread_all, args=(file_name, result_d,)) + thread.start() + threads.append(thread) + for thread in threads: + thread.join() + count += 1 + progress(count,total, 'of input files read') + for file_name in output_files: + data_list += result_d[file_name] + else: + cycle_string_list=[] + for file_name in output_files: + cycle_string_list+=get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string) + if 'appended_files' in data: + data['appended_files'].append(file_name.split('/')[-1]+time) + else: + data['appended_files'] = [file_name.split('/')[-1]+time] + count += 1 + progress(count,total, 'of input files read') + + total = len(cycle_string_list) + count = 0 + print('') + print("Number of cycles to be parsed: ", total) + for cycle_string in cycle_string_list: + data_list.append(extract_cycle_data(cycle_string, opppy_parser)) + count += 1 + progress(count,total, 'of cycles parsed') + print('') + + for file_name in output_files: + if 'appended_files' in data: + data['appended_files'].append(file_name.split('/')[-1]+time) + else: + data['appended_files'] = [file_name.split('/')[-1]+time] + for cycle_data in data_list: + data = append_tally_data(cycle_data,data,opppy_parser.sort_key_string) print('') print('')