-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstability_collector.py
144 lines (129 loc) · 5.86 KB
/
stability_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/python
'''
A script that parses stability session logs
'''
import os
import re
import logging
import time
import diamond.collector
from diamond.metric import Metric
class StabilityCollector(diamond.collector.Collector):
def __init__(self, config=None, handlers=[], name=None, configfile=None):
diamond.collector.Collector.__init__(self, config=config,
handlers=handlers, name=name, configfile=configfile)
self.ingest_dir = 'Ingested'
self.scanner_location = 'Harvard/Northwest/Bay1'
self.base_dir = '/ncf/dicom-backups/_Scanner'
##self.scanner_location = 'sample'
##self.base_dir = '/Users/hhoke1/mri_stability_diamondcollector'
# default location of files to process
self.logfiles = []
# set up logging
self.log.setLevel(logging.INFO)
fh = logging.FileHandler(os.path.join(self.base_dir,self.scanner_location,'graphite.log'))
fh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
fh.setFormatter(formatter)
self.log.addHandler(fh)
def dotlocation(self):
return self.scanner_location.replace('/','.')
def new_logfiles(self, logdir):
'''
updates self.logfiles with new files, returns 'True' if new files exist
'''
newfiles = [os.path.join(logdir,f) for f in os.listdir(logdir)
if f[0:10] == 'Stability_' ]
if newfiles:
self.log.debug(self.logfiles)
self.logfiles = newfiles
self.log.debug('after newfiles:')
self.log.debug(self.logfiles)
return True
else:
return False
def collect(self):
if not self.new_logfiles(os.path.join(self.base_dir,self.scanner_location)):
self.log.info('no new files found')
return
for f in self.logfiles:
epoch = self.parse_epoch(f)
with open(f, 'r') as input:
first_line = input.readline()
try:
channel_no = re.match('Stability configuration: 16 slices, 500 measurements, ([0-9]{2}) channels\n', first_line).group(1)
coil = self._resolve_channels(channel_no)
except (AttributeError,KeyError) as e:
self.log.info('error \'{}\' for file \'{}\', line {}'.format(e,f,first_line))
continue
lines = input.read()
# divide document into sections
sections = re.findall('Stability (\w+) results:\n\nslice#(.*)\n 1(.*)\n 2(.*)\n 3(.*)\n 4(.*)\n 5(.*)\n 6(.*)\n 7(.*)\n 8(.*)\n 9(.*)\n10(.*)\n11(.*)\n12(.*)\n13(.*)\n14(.*)\n15(.*)\n16(.*)\n', lines, re.MULTILINE)
# parse each section
for section in sections:
section = list(section)
section_type = section.pop(0)
header = section.pop(0)
# to conform to graphic metric name standards
header_list = [c.replace('[%]','pct') for c in header.split()]
header_list = [re.sub(r'\W+', '', s) for s in header_list]
section = [r.split() for r in section]
# tableType.columnName.rowNum value
metricnames = [('{}.{}.{}.{}.{}'.format(self.dotlocation(),coil,section_type,header_list[i],s+1),v) for s,r in enumerate(section) for i,v in enumerate(r)]
for metricname,value in metricnames:
self.publish(metricname,value,timestamp=epoch,dry_run=True)
self.publish(metricname,value,timestamp=epoch)
# mark file as ingested
head,tail = os.path.split(f)
new_file = os.path.join(head,self.ingest_dir,tail)
if not os.path.exists(os.path.join(head,self.ingest_dir)):
os.mkdir(os.path.join(head,self.ingest_dir))
os.rename(f,new_file)
self.log.info('processed {} with coil {}'.format(f,coil))
def _resolve_channels(self, channel):
channelmap = {
'32': '32',
'48': '64',
'64': '64'
}
return channelmap[channel]
def parse_epoch(self, s):
date_time = re.search('Stability_([0-9]{8}T[0-9]{6}).txt',s).group(1)
pattern = '%Y%m%dT%H%M%S'
epoch = int(time.mktime(time.strptime(date_time, pattern)))
assert(epoch)
return epoch
def publish(self, name, value, raw_value=None, precision=2,
metric_type='GAUGE', instance=None, timestamp=None, dry_run=False):
'''
Publish a metric with the given name (monkey patch for creating the metric with a timestamp)
'''
# Check whitelist/blacklist
if self.config['metrics_whitelist']:
if not self.config['metrics_whitelist'].match(name):
return
elif self.config['metrics_blacklist']:
if self.config['metrics_blacklist'].match(name):
return
# Get metric Path
path = self.get_metric_path(name, instance=instance)
# Get metric TTL
ttl = float(self.config['interval']) * float(
self.config['ttl_multiplier'])
# Create Metric
try:
metric = Metric(path, value, raw_value=raw_value, timestamp=timestamp,
precision=precision, host=self.get_hostname(),
metric_type=metric_type, ttl=ttl)
except DiamondException:
self.log.error(('Error when creating new Metric: path=%r, '
'value=%r'), path, value)
raise
# Publish Metric
if dry_run:
self.log.info('dry run sample: {}'.format(metric))
else:
self.publish_metric(metric)
if __name__ == '__main__':
instance = StabilityCollector()
instance.collect()