-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCytoscape_table_from_hhblits.py
128 lines (104 loc) · 5.44 KB
/
Cytoscape_table_from_hhblits.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
# authors: Jakub Barylski, Sophia Bałdysz
# coding: utf-8
"""
Generate Cytoscape-importable tabular file that represents network of HMM-HMM similarities generated by HHblits.
Requires a directory of hhblits result generated using command similar to:
hhblits -i ${query_hmm} -d ${hhstyle_hmm_database} -o ${hhr_output} -blasttab ${btb_output} > ${redirected_log}
"""
import json
from pathlib import Path
from typing import List, Dict, Any
import pandas as pd
def read_hhout(input_path: Path,
dont_filter: bool = True,
) -> List[Dict[str, Any]]:
"""
Parse hhblits output file line by line and return list of alignments with associated statistics
:param input_path: hhblits hhr-formatted output (set by the '-o' flag) or redirected standard output
:param dont_filter: should self-alignments of the HMM be filtered out (default True)
:return: [{'query': 'begg_2ZCQV, 'Prob': 100.0, 'E-value': 2E-98, (...)},
{'query': 'PFAM_PF12671.8, (...)},
(...)]
"""
expected_header = ['No', 'Hit', 'Prob', 'E-value', 'P-value', 'Score',
'SS', 'Cols', 'Query_alignment', 'Template_alignment', 'Template_length']
obsolete_pfams = ('PF19127', # PFAM families no longer present in the PFAM version 33.0
'PF19085',
'PF18885',
'PF18896',
'PF19087',
'PF18979',
'PF18994',
'PF18994')
records = []
with input_path.open() as hh_out:
header_found = False
footer_found = False
query = input_path.stem.replace('__', '_').replace('_', ':', 1)
for line in hh_out:
line = line.rstrip('\n)').replace('(', ' ')
split_line = line.split()
if split_line[:7] == expected_header[:7]:
if header_found:
raise ValueError('Redundant header')
header_found = True
elif header_found:
if not line.strip():
footer_found = True
else:
record = {'query': query}
parsed_line = {k: v for k, v in zip(expected_header, split_line)}
del parsed_line['SS'], parsed_line['No']
parsed_line['Hit'] = parsed_line['Hit'].replace('_', ':', 1)
if parsed_line['Hit'].startswith('PFAM:PF'):
translation_key = parsed_line['Hit'].split(':')[1].split('.')[0]
if translation_key in obsolete_pfams:
print(f'Skipping obsolete PFAM record - {translation_key}')
continue
translation = pfam_translator[translation_key]
parsed_line['Hit'] = f'PFAM:{translation}'
if parsed_line['Hit'] != query or dont_filter:
parsed_line['Prob'] = float(parsed_line['Prob'])
parsed_line['E-value'] = float(parsed_line['E-value'])
parsed_line['P-value'] = float(parsed_line['P-value'])
parsed_line['Score'] = float(parsed_line['Score'])
parsed_line['Cols'] = int(parsed_line['Cols'])
parsed_line['Template_length'] = int(parsed_line['Template_length'])
parsed_line['Template_coverage'] = parsed_line['Cols'] / parsed_line['Template_length']
record.update(parsed_line)
records.append(record)
assert header_found and footer_found # file is complete
return records
if __name__ == '__main__':
# similarity thresholds
e_value_threshold = 1e-5
cov_threshod = 0.75
# input and output files
hhblits_dir = Path(' ... /hhblits_result')
iput_files = [f for f in hhblits_dir.iterdir() if f.suffix == '.log']
pfam_translator_file = Path(' ... /PFAM_translator.json') # file with dictionary matching PF to model names (e.g. {"PF08863.10": "YolD", (...)})
with pfam_translator_file.open() as pt:
pfam_translator = json.load(pt)
pfam_translator = {k.split('.')[0]: v for k, v in pfam_translator.items()}
output_dir = Path(' ... /hhblits_network')
output_dir.mkdir()
# parsing and filtering alignments
all_records = []
log = []
for f in iput_files:
filtered_records = [r for r in read_hhout(f) if r['E-value'] <= e_value_threshold]
if cov_threshod:
filtered_records = [r for r in filtered_records if r['Template_coverage'] >= cov_threshod]
log.append(f'{f.name}\t{len(filtered_records)}')
all_records.extend(filtered_records)
all_nodes = set([r['query'] for r in all_records])
all_nodes.update([r['Hit'] for r in all_records])
print(f'found {len(all_nodes)} nodes and {len(all_records)} edges')
# writing output files
result_frame = pd.DataFrame.from_records(all_records)
out_xlsx = hhblits_dir.parent.joinpath(f'{hhblits_dir.name}.xlsx')
out_tsv = hhblits_dir.parent.joinpath(f'{hhblits_dir.name}.tsv')
result_frame.to_excel(out_xlsx.as_posix(), engine='xlsxwriter', index=False)
result_frame.to_csv(out_tsv.as_posix(), sep='\t', index=False)
print(f'RESULTS AT:\t{out_tsv.as_posix()}')