Skip to content

Commit

Permalink
add parallel striding and cleanup memory high water mark in serial pa…
Browse files Browse the repository at this point in the history
…rser
  • Loading branch information
Cleveland committed Jan 23, 2024
1 parent c7c6e64 commit 53d0aa5
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 93 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ jobs:
## flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
## # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
## flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test
- name: Threaded Test
run: |
python -m pytest --cov-report term-missing --cov=opppy tests/
- name: Serial Test
run: |
OPPPY_USE_THREADS=false python -m pytest --cov-report term-missing --cov=opppy tests/
32 changes: 19 additions & 13 deletions opppy/dump_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
import sys
import pickle
import math
from multiprocessing import Process, Manager
from multiprocessing import Process, Manager, cpu_count

from opppy.progress import progress

USE_THREADS = os.getenv("OPPPY_USE_THREADS", 'True').lower() in ('true', '1', 't')
NTHREADS = int(os.getenv("OPPPY_N_THREADS", str(min(cpu_count(),4))))

def point_value_1d(data, x_key, value_key, x_value, method='nearest'):
'''
Expand Down Expand Up @@ -508,18 +509,23 @@ def append_dumps(data, dump_files, opppy_parser, key_words=None):
if(USE_THREADS):
def thread_all(file_name, key_words, result_d):
result_d[file_name.split('/')[-1]] = opppy_parser.build_data_dictionary(file_name,key_words)
with Manager() as manager:
result_d = manager.dict()
threads = []
for file_name in dump_files:
thread = Process(target=thread_all, args=(file_name, key_words, result_d,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
count += 1
progress(count,total, 'of input files read')
data.update(result_d)
print("Number of threads used for processing: ",NTHREADS)
for stride in range(math.ceil(float(total)/float(NTHREADS))):
files = dump_files[NTHREADS*stride:min(NTHREADS*(stride+1),len(dump_files))]
with Manager() as manager:
result_d = manager.dict()
threads = []
for file_name in files:
thread = Process(target=thread_all, args=(file_name, key_words, result_d,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
count += 1
progress(count,total, 'of input files read')
data.update(result_d)
del result_d
del threads
else:
for dump in dump_files:
# append new dictionary data to the pickle file
Expand Down
71 changes: 34 additions & 37 deletions opppy/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import pickle
import io
import os
import math
import numpy as np
from multiprocessing import Process, Manager
from multiprocessing import Process, Manager, cpu_count

from opppy.version import __version__
from opppy.progress import *

USE_THREADS = os.getenv("OPPPY_USE_THREADS", 'True').lower() in ('true', '1', 't')
NTHREADS = int(os.getenv("OPPPY_N_THREADS", str(min(cpu_count(),4))))

def append_cycle_data(cycle_data, data, sort_key_string):
'''
Expand Down Expand Up @@ -340,57 +342,52 @@ def append_output_dictionary(data, output_files, opppy_parser, append_date=False
time = ''
if append_date:
time = time+'.'+datetime.datetime.now().strftime ("%Y%m%d%H%M%S")
for file_name in output_files:
if 'appended_files' in data:
data['appended_files'].append(file_name.split('/')[-1]+time)
else:
data['appended_files'] = [file_name.split('/')[-1]+time]

count = 0
total = len(output_files)
print('')
print("Number of files to be read: ", total)
data_list = []
if(USE_THREADS):
def thread_all(file_name, result_d):
def thread_all(file_name, file_index, result_l):
thread_cycle_string_list = get_output_lines(file_name, opppy_parser.cycle_opening_string,
opppy_parser.cycle_closing_string, opppy_parser.file_end_string);
thread_data = []
for cycle_string in thread_cycle_string_list:
thread_data.append(extract_cycle_data(cycle_string, opppy_parser))
result_d[file_name]=thread_data
with Manager() as manager:
result_d = manager.dict()
threads = []
for file_name in output_files:
thread = Process(target=thread_all, args=(file_name, result_d,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
count += 1
progress(count,total, 'of input files read')
for file_name in output_files:
data_list += result_d[file_name]
result_l[file_index]=thread_data
print("Number of threads used for processing: ",NTHREADS)
for stride in range(math.ceil(float(total)/float(NTHREADS))):
files = output_files[NTHREADS*stride:min(NTHREADS*(stride+1),len(output_files))]
with Manager() as manager:
result_l = manager.list(range(len(files)))
threads = []
for file_index, file_name in enumerate(files):
thread = Process(target=thread_all, args=(file_name, file_index, result_l,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
count += 1
progress(count,total, 'of input files read')
for file_data in result_l:
for cycle_data in file_data:
data = append_cycle_data(cycle_data,data,opppy_parser.sort_key_string)
del result_l
del threads
else:
cycle_string_list=[]
for file_name in output_files:
cycle_string_list+=get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string)
cycle_string_list = get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string)
for cycle_string in cycle_string_list:
cycle_data = extract_cycle_data(cycle_string, opppy_parser)
data = append_cycle_data(cycle_data,data,opppy_parser.sort_key_string)
count += 1
progress(count,total, 'of input files read')

count = 0
total = len(cycle_string_list)
print('')
print("Number of cycles to be parsed: ", total)
for cycle_string in cycle_string_list:
data_list.append(extract_cycle_data(cycle_string, opppy_parser))
count += 1
progress(count,total, 'of cycles parsed')
print('')

for file_name in output_files:
if 'appended_files' in data:
data['appended_files'].append(file_name.split('/')[-1]+time)
else:
data['appended_files'] = [file_name.split('/')[-1]+time]
for cycle_data in data_list:
data = append_cycle_data(cycle_data,data,opppy_parser.sort_key_string)

print('')
print('')
print_dictionary_data(data)
Expand Down
76 changes: 34 additions & 42 deletions opppy/tally.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
import pickle
import io
import os
import math
import numpy as np
from multiprocessing import Process, Manager
from multiprocessing import Process, Manager, cpu_count

from opppy.version import __version__
from opppy.progress import *
from opppy.output import *

USE_THREADS = os.getenv("OPPPY_USE_THREADS", 'True').lower() in ('true', '1', 't')
NTHREADS = int(os.getenv("OPPPY_N_THREADS", str(min(cpu_count(),4))))

def append_tally_data(cycle_data, data, sort_key_string):
'''
Expand Down Expand Up @@ -242,61 +244,51 @@ def append_tally_dictionary(data, output_files, opppy_parser, append_date=False)
time = ''
if append_date:
time = time+'.'+datetime.datetime.now().strftime ("%Y%m%d%H%M%S")
for file_name in output_files:
if 'appended_files' in data:
data['appended_files'].append(file_name.split('/')[-1]+time)
else:
data['appended_files'] = [file_name.split('/')[-1]+time]
count = 0
total = len(output_files)
print('')
print("Number of files to be read: ", total)
data_list = []
if(USE_THREADS):
def thread_all(file_name, result_d):
def thread_all(file_name, file_index, result_l):
thread_cycle_string_list = get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string)
thread_data=[]
for cycle_string in thread_cycle_string_list:
thread_data.append(extract_cycle_data(cycle_string, opppy_parser))
result_d[file_name] = thread_data

with Manager() as manager:
result_d = manager.dict()
threads = []
for file_name in output_files:
thread = Process(target=thread_all, args=(file_name, result_d,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
count += 1
progress(count,total, 'of input files read')
for file_name in output_files:
data_list += result_d[file_name]
result_l[file_index] = thread_data

print("Number of threads used for processing: ",NTHREADS)
for stride in range(math.ceil(float(total)/float(NTHREADS))):
files = output_files[NTHREADS*stride:min(NTHREADS*(stride+1),len(output_files))]
with Manager() as manager:
result_l = manager.list(range(len(files)))
threads = []
for file_index, file_name in enumerate(output_files):
thread = Process(target=thread_all, args=(file_name, file_index, result_l,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
count += 1
progress(count,total, 'of input files read')
for file_data in result_l:
for cycle_data in file_data:
data = append_tally_data(cycle_data,data,opppy_parser.sort_key_string)
del result_l
del threads
else:
cycle_string_list=[]
for file_name in output_files:
cycle_string_list+=get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string)
if 'appended_files' in data:
data['appended_files'].append(file_name.split('/')[-1]+time)
else:
data['appended_files'] = [file_name.split('/')[-1]+time]
cycle_string_list = get_output_lines(file_name, opppy_parser.cycle_opening_string, opppy_parser.cycle_closing_string, opppy_parser.file_end_string)
for cycle_string in cycle_string_list:
cycle_data = extract_cycle_data(cycle_string, opppy_parser)
data = append_tally_data(cycle_data,data,opppy_parser.sort_key_string)
count += 1
progress(count,total, 'of input files read')

total = len(cycle_string_list)
count = 0
print('')
print("Number of cycles to be parsed: ", total)
for cycle_string in cycle_string_list:
data_list.append(extract_cycle_data(cycle_string, opppy_parser))
count += 1
progress(count,total, 'of cycles parsed')
print('')

for file_name in output_files:
if 'appended_files' in data:
data['appended_files'].append(file_name.split('/')[-1]+time)
else:
data['appended_files'] = [file_name.split('/')[-1]+time]
for cycle_data in data_list:
data = append_tally_data(cycle_data,data,opppy_parser.sort_key_string)

print('')
print('')
print_tally_data(data)
Expand Down

0 comments on commit 53d0aa5

Please sign in to comment.