forked from cory-weller/longread-report-parser
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathextract.py
220 lines (171 loc) · 7.23 KB
/
extract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#!/usr/bin/env python3
import argparse
import pandas as pd
from bs4 import BeautifulSoup
import glob
from dateutil import parser
import re
def read_html(path):
# Load the HTML file
with open(path, 'r') as file:
html_content = file.read()
# Create a BeautifulSoup object
soup = BeautifulSoup(html_content, "html.parser")
return soup
# check if vbz compress
def check_fast5(html_body):
# convert to string
body_str = str(html_body)
# check is vbz compress is present
if 'vbz_compress' in body_str:
return 'vbz_compress'
else:
return 'off'
def get_timev2(html_body, fast5, ret= 'both'):
if fast5 == 'off':
# start and end time stamp data stored in first <script> tag
script_text = html_body.find('script').text
# Regex to find the getElementById function calls and extract both the ID and the time data
pattern = r"getElementById\('(.+?)'\)\.innerHTML = convertTime\('(.+?)'\);"
matches = re.findall(pattern, script_text)
# Create a dictionary to hold each ID with its corresponding timestamp
timestamp_dict = {id: timestamp for id, timestamp in matches}
# store start and end timestamps in vars
start = timestamp_dict['start-timestamp']
end = timestamp_dict['end-timestamp']
# Use dateutil's parser to handle the ISO 8601 format automatically
timestamp_start = parser.isoparse(start).date()
timestamp_end = parser.isoparse(end).date()
# create data strings
startdate = f'{timestamp_start.month}.{timestamp_start.day}.{timestamp_start.year}'
enddate = f'{timestamp_end.month}.{timestamp_end.day}.{timestamp_end.year}'
elif fast5 == 'vbz_compress':
# start and end time stamp data stored in run-report-header
script_text = html_body.find('header', {'class':"run-report-header"}).find('div', {'class': 'run-details'}).text
# Regex to find the end and start dates
pattern = r'\d+\s*[a-zA-Z]+\s*\d+'
matches = re.findall(pattern, script_text)
# store start and end timestamps in vars
start = matches[0]
end = matches[1]
# Use dateutil's parser to handle the ISO 8601 format automatically
timestamp_start = parser.parse(start).date()
timestamp_end = parser.parse(end).date()
# create data strings
startdate = f'{timestamp_start.month}.{timestamp_start.day}.{timestamp_start.year}'
enddate = f'{timestamp_end.month}.{timestamp_end.day}.{timestamp_end.year}'
if ret == 'start':
return startdate
elif ret == 'end':
return enddate
else:
return startdate, enddate
def get_promid(html_body, fast5):
# value located in first <h1> tag in body
promid_text = html_body.find('h1').text
if fast5 == 'off':
# Regex pattern
pattern = r'PromethION\s\((\w+\d+)\)'
# search
match = re.search(pattern, promid_text)
# get value from search
promid = match.group(1)
elif fast5 == 'vbz_compress':
# Regex pattern
pattern = r'PromethION\s\d+\s\((\w+\d+)\)'
# search
match = re.search(pattern, promid_text)
# get value from search
promid = match.group(1)
return promid
def get_ex_sam_name(html_body):
# value located in first <div> tag
run = html_body.find('div', {'class':'run-details'})
run_text = run.text
# Regex pattern
pattern = r'\·\s*([A-Za-z0-9_]+)'
# search for text values (formt appears to be experiment name, sample name, and unknown value)
run_text_list = re.findall(pattern, run_text)
# get vars
ex_name = run_text_list[0] # experiment name
sample_name = run_text_list[1] # sample name
return {'Experiment Name':ex_name, 'Sample Name': sample_name}
def get_runsum(html_body):
# narrow down to data output
tmp_do = html_body.find('section', {'id':'run-summary'}).find('div', {'class':'container data-output'})
# extract estimated bases/data output(GB) text
do_text = tmp_do.find('div', class_='header', string='Estimated bases').find_next_sibling('div').text
do_val = float(do_text.split(' ')[0])
# estimated N50
n50_text = tmp_do.find('div', class_='header', string='Estimated N50').find_next_sibling('div').text
n50_val = float(n50_text.split(' ')[0])
return {'Data Output': do_val,'N50': n50_val}
def get_flowcell(html_body):
# narrow down to run setup tag content
tmp_set = html_body.find('section', {'id':'run-configuration'}).find('div', {'class':'config-section'}).find('div', {'class':'accordion content'})
# extract flow cell id
fcid_text = tmp_set.find('div', class_ = 'title', string = 'Flow cell ID').find_next_sibling('div').text
return fcid_text
def get_minknow(html_body):
# data in 3rd div tag instance
tmp = html_body.find_all('div', {'class':"config-section"})[3]
# extract version text
vers = tmp.find('div', class_='title', string='MinKNOW').find_next_sibling('div').text
return vers
def create_obs_row(_soup):
# extract body contents - everything should be in here
_body = _soup.body
# fast5
fast5 = check_fast5(_body)
# Create dictionary for row
obs_row = {}
# fill in with values
ex_sam = get_ex_sam_name(_body)
obs_row['Experiment_Name'] = ex_sam['Experiment Name']
obs_row['Sample_Name'] = ex_sam['Sample Name']
obs_row['Run_Date'] = get_timev2(_body, fast5, 'start')
obs_row['PROM_ID'] = get_promid(_body, fast5)
obs_row['Flow_Cell_ID'] = get_flowcell(_body)
runsum = get_runsum(_body)
obs_row['Data_output_(Gb)'] = runsum['Data Output']
obs_row['N50_(kb)'] = runsum['N50']
obs_row['MinKNOW Version'] = get_minknow(_body)
return obs_row
def format_row(_row):
keys = ['Experiment_Name',
'Sample_Name',
'Run_Date',
'PROM_ID',
'Flow_Cell_ID',
'Data_output_(Gb)',
'N50_(kb)',
'MinKNOW Version'
]
values = list( map(_row.get, keys) )
return('\t'.join([str(x) for x in values]))
obs_row['MinKNOW Version'] = get_minknow(_body)
# user input
inparser = argparse.ArgumentParser(description = 'Extract data from long read HTML report')
inparser.add_argument('--html_dir', default=None, type=str, help = 'path to directory containing html files, if converting whole directory')
inparser.add_argument('--filelist', default=None, type=str, help = 'text file containing list of all html reports to parse')
args = inparser.parse_args()
# get list of files
if args.html_dir is not None:
files = glob.glob(f'{args.html_dir}/*.html')
elif args.filelist is not None:
with open(args.filelist, 'r') as infile:
files = [x.strip() for x in infile.readlines()]
else:
quit('ERROR: No directory (--html_dir) or file list (--filelist) provided!')
# Initialize dictionary
_rows = []
for x in files:
soup = read_html(x)
row = create_obs_row(soup)
_rows.append(row)
header = '\t'.join(['Experiment_Name','Sample_Name','Run_Date','PROM_ID','Flow_Cell_ID','Data_output_(Gb)','N50_(kb)', 'MinKNOW Version'])
if len(_rows) > 0:
print(header)
for row in _rows:
print(format_row(row))
quit()