-
Notifications
You must be signed in to change notification settings - Fork 3
/
snap.py
155 lines (122 loc) · 3.95 KB
/
snap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import hashlib
from pathlib import Path
from typing import Optional, Iterable, NamedTuple, TextIO, Set, Union
import sys, json
# --
# # Snap
#
# Snap is a simple Python module that takes a snapshot of one directory and allows to compare
# the result later one. It is a simplified reimplementation of the key functionality
# provided in [Sink](https://github.com/sebastien/sink).
class Stat(NamedTuple):
mode: int
uid: int
gid: int
size: int
ctime: float
mtime: float
class PathElement(NamedTuple):
path: str
meta: Optional[Stat] = None
sig: Optional[Stat] = None
def file_meta(path: str) -> Stat:
r = os.stat(path)
return Stat(
mode=r.st_mode,
uid=r.st_uid,
gid=r.st_gid,
size=r.st_size,
ctime=r.st_ctime,
mtime=r.st_mtime,
)
def file_sig(path: str) -> Optional[str]:
h = hashlib.new("sha512_256")
with open(path, "rb") as f:
h.update(f.read())
return h.hexdigest()
def walk(path: str) -> Iterable[PathElement]:
for dirpath, _, filenames in os.walk(path, followlinks=False):
for _ in filenames:
path = f"{dirpath}/{_}"
if os.path.exists(path) and os.path.isfile(path):
meta = file_meta(path)
sig = file_sig(path)
yield PathElement(path, meta, sig)
else:
yield PathElement(path, None, None)
def index(elements: Iterable[PathElement]) -> dict[str, PathElement]:
return dict((_.path, _) for _ in elements)
def same(a: PathElement, b: PathElement) -> bool:
return a.sig == b.sig and a.meta == b.meta
def older(a: PathElement, b: PathElement) -> bool:
if a.meta == None:
return True
elif b.meta == None:
return False
else:
return a.meta.mtime < b.meta.time
def read(stream: Union[str, Path, TextIO]) -> Iterable[PathElement]:
if isinstance(stream, str) or isinstance(stream, Path):
with open(stream) as f:
yield from read(f)
else:
for i, line in enumerate(stream.readlines()):
res = parse(line)
if not res:
raise RuntimeError(f"Malformed line {i}: {repr(line)}")
else:
yield res
def write(elements: Iterable[PathElement], stream: TextIO):
for _ in elements:
stream.write(fmt(_))
def parse(line: str) -> PathElement:
fields = line.split("\t")
if len(fields) != 3:
return None
else:
return PathElement(
path,
Stat(*((int if i < 4 else float)(_) for i, _ in enumerate(meta.split(","))))
if meta
else None,
sig if sig else None,
)
def fmt(element: PathElement) -> str:
path, meta, sig = element
return f"{path}\t{','.join(str(_) for _ in meta) if meta else ''}\t{sig if sig else ''}"
def paths(indexes: Iterable[dict[str, PathElement]]) -> Set[str]:
res = set()
for _ in indexes:
res = res.union(k for k in _)
return res
def compare(paths: Iterable[str]) -> Iterable[tuple[str, list[str]]]:
sources = [index(read(_)) for _ in paths]
all_paths = paths(sources)
for p in all_paths:
entry = []
src = None
o = None
for i, s in enumerate(sources):
if i == 0:
src = s
o = src[p] if p in src else None
entry.append("=" if p in s else "!")
else:
d = src[p]
if p not in s:
entry.append("!")
elif not o:
entry.append("+")
elif same(o, d):
entry.append("=")
else:
entry.append(">" if older(o, d) else "<")
yield (p, entry)
# SEP = "\t"
# for path, entries in compare(SOURCES):
# print(f"{SEP.join(entries)}{SEP}{path}")
for element in walk(sys.argv[1]):
json.dump(element, sys.stdout)
sys.stdout.write("\n")
# EOF