-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoffline_checksum.py
149 lines (125 loc) · 4.43 KB
/
offline_checksum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:Mod: offline_checksum
:Synopsis:
Perform checksum analysis of offline data files
:Author:
servilla
:Created:
8/23/20
"""
import logging
import hashlib
import os
from pathlib import Path
import click
import daiquiri
cwd = os.path.dirname(os.path.realpath(__file__))
logfile = cwd + "/offline_checksum.log"
daiquiri.setup(level=logging.DEBUG,
outputs=(daiquiri.output.File(logfile), "stdout",))
logger = daiquiri.getLogger(__name__)
def get_files(data: Path, ext: str = ""):
f = list()
if len(ext) > 0:
ext = "." + ext.lstrip(".")
files = data.rglob(f"*{ext}")
for file in files:
if Path(file).is_file():
f.append(file)
return f
def do_report(report: str, results: dict):
if report is None:
for file, checksum in results.items():
result = f"{file},{checksum}"
print(result)
else:
with open(report, "w") as r:
for file, checksum in results.items():
result = f"{file},{checksum}\n"
r.write(result)
report_help = "Report file (defaults to stdout only)"
manifest_help = "Import a manifest of prior checksums for comparison"
verbose_help = "Print progress to stdout"
md5_help = "Perform MD5 checksum analysis only"
sha1_help = "Perform SHA1 checksum analysis only"
ext_help = "Data file extension (default is none)"
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.argument("data", nargs=1, required=True)
@click.option("-r", "--report", default=None, help=report_help)
@click.option("-m", "--manifest", default=None, help=manifest_help)
@click.option("-v", "--verbose", is_flag=True, default=False, help=verbose_help)
@click.option("--md5", is_flag=True, help=md5_help)
@click.option("--sha1", is_flag=True, help=sha1_help)
@click.option("--ext", default="", help=ext_help)
def main(
data: str,
report: str,
manifest: str,
verbose: bool,
md5: bool,
sha1: bool,
ext: str
):
"""
Perform checksum analysis of offline data files. By default, both
MD5 and SHA1 checksum analysis are done per file.
\b
DATA: Data directory where checksum analysis begins.
"""
d = Path(data)
if not (d.exists() and d.is_dir()):
msg = f"Data directory '{data}' was not found or is not a directory"
raise FileNotFoundError(msg)
if report is not None:
r = Path(report)
if not (r.parent.exists() and r.parent.is_dir()):
msg = f"Report '{report}' path is not a valid path"
raise FileNotFoundError(msg)
if manifest is not None:
m = Path(manifest)
if not (m.exists() and m.is_file()):
msg = f"Manifest '{manifest}' was not found or is not a file"
raise FileNotFoundError(msg)
if not (md5 or sha1):
msg = "Either an MD5 or SHA1 hash algorithm must be selected"
raise ValueError(msg)
elif md5 and sha1:
msg = "Only one of MD5 or SHA1 hash algorithms should be selected"
raise ValueError(msg)
if md5:
hash_algorithm = hashlib.md5
else:
hash_algorithm = hashlib.sha1
files = get_files(d, ext)
results = dict()
for index, file in enumerate(files, start=1):
file = str(file)
checksum = hash_algorithm(open(file, "rb").read()).hexdigest()
results[file] = checksum if (manifest is None) else checksum + ","
if verbose:
print(f"{index}: {str(file)} - {checksum}")
if manifest is None:
do_report(report, results)
else:
with open(manifest, "r") as m:
lines = m.readlines()
for line in lines:
m_file, m_checksum = line.split(",")
if m_file in results:
checksum = m_checksum.strip()
if checksum + "," != results[m_file]:
results[m_file] = results[m_file] + "fail"
msg = f"Checksum mismatch - {results[m_file]}"
logger.warning(msg)
else:
results[m_file] = results[m_file] + "pass"
else:
msg = f"Manifest `{m_file}` not found in data directory"
logger.warning(msg)
do_report(report, results)
return 0
if __name__ == "__main__":
main()