forked from elastic/detection-rules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathversion_lock.py
339 lines (263 loc) · 15.7 KB
/
version_lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Helper utilities to manage the version lock."""
from copy import deepcopy
from dataclasses import dataclass
from pathlib import Path
from typing import ClassVar, Dict, List, Optional, Union
import click
from semver import Version
from .config import parse_rules_config
from .mixins import LockDataclassMixin, MarshmallowDataclassMixin
from .rule_loader import RuleCollection
from .schemas import definitions
from .utils import cached
RULES_CONFIG = parse_rules_config()
# This was the original version the lock was created under. This constant has been replaced by
# schemas.get_min_supported_stack_version to dynamically determine the minimum
# MIN_LOCK_VERSION_DEFAULT = Version("7.13.0")
@dataclass(frozen=True)
class BaseEntry:
rule_name: definitions.RuleName
sha256: definitions.Sha256
type: definitions.RuleType
version: definitions.PositiveInteger
@dataclass(frozen=True)
class PreviousEntry(BaseEntry):
# this is Optional for resiliency in already tagged branches missing this field. This means we should strictly
# validate elsewhere
max_allowable_version: Optional[int]
@dataclass(frozen=True)
class VersionLockFileEntry(MarshmallowDataclassMixin, BaseEntry):
"""Schema for a rule entry in the version lock."""
min_stack_version: Optional[definitions.SemVerMinorOnly]
previous: Optional[Dict[definitions.SemVerMinorOnly, PreviousEntry]]
@dataclass(frozen=True)
class VersionLockFile(LockDataclassMixin):
"""Schema for the full version lock file."""
data: Dict[Union[definitions.UUIDString, definitions.KNOWN_BAD_RULE_IDS], VersionLockFileEntry]
file_path: ClassVar[Path] = RULES_CONFIG.version_lock_file
def __contains__(self, rule_id: str):
"""Check if a rule is in the map by comparing IDs."""
return rule_id in self.data
def __getitem__(self, item) -> VersionLockFileEntry:
"""Return entries by rule id."""
if item not in self.data:
raise KeyError(item)
return self.data[item]
@dataclass(frozen=True)
class DeprecatedRulesEntry(MarshmallowDataclassMixin):
"""Schema for rule entry in the deprecated rules file."""
deprecation_date: Union[definitions.Date, definitions.KNOWN_BAD_DEPRECATED_DATES]
rule_name: definitions.RuleName
stack_version: definitions.SemVer
@dataclass(frozen=True)
class DeprecatedRulesFile(LockDataclassMixin):
"""Schema for the full deprecated rules file."""
data: Dict[Union[definitions.UUIDString, definitions.KNOWN_BAD_RULE_IDS], DeprecatedRulesEntry]
file_path: ClassVar[Path] = RULES_CONFIG.deprecated_rules_file
def __contains__(self, rule_id: str):
"""Check if a rule is in the map by comparing IDs."""
return rule_id in self.data
def __getitem__(self, item) -> DeprecatedRulesEntry:
"""Return entries by rule id."""
if item not in self.data:
raise KeyError(item)
return self.data[item]
@cached
def load_versions() -> dict:
"""Load and validate the default version.lock file."""
version_lock_file = VersionLockFile.load_from_file()
return version_lock_file.to_dict()
# for tagged branches which existed before the types were added and validation enforced, we will need to manually add
# them to allow them to pass validation. These will only ever currently be loaded via the RuleCollection.load_git_tag
# method, which is primarily for generating diffs across releases, so there is no risk to versioning
def add_rule_types_to_lock(lock_contents: dict, rule_map: Dict[str, dict]):
"""Add the rule type to entries in the lock file,if missing."""
for rule_id, lock in lock_contents.items():
rule = rule_map.get(rule_id, {})
# this defaults to query if the rule is not found - it is just for validation so should not impact
rule_type = rule.get('rule', {}).get('type', 'query')
# the type is a bit less important than the structure to pass validation
lock['type'] = rule_type
if 'previous' in lock:
for _, prev_lock in lock['previous'].items():
prev_lock['type'] = rule_type
return lock_contents
class VersionLock:
"""Version handling for rule files and collections."""
def __init__(self, version_lock_file: Optional[Path] = None, deprecated_lock_file: Optional[Path] = None,
version_lock: Optional[dict] = None, deprecated_lock: Optional[dict] = None,
name: Optional[str] = None, invalidated: Optional[bool] = False):
if invalidated:
err_msg = "This VersionLock configuration is not valid when configued to bypass_version_lock."
raise NotImplementedError(err_msg)
assert (version_lock_file or version_lock), 'Must provide version lock file or contents'
assert (deprecated_lock_file or deprecated_lock), 'Must provide deprecated lock file or contents'
self.name = name
self.version_lock_file = version_lock_file
self.deprecated_lock_file = deprecated_lock_file
if version_lock_file:
self.version_lock = VersionLockFile.load_from_file(version_lock_file)
else:
self.version_lock = VersionLockFile.from_dict(dict(data=version_lock))
if deprecated_lock_file:
self.deprecated_lock = DeprecatedRulesFile.load_from_file(deprecated_lock_file)
else:
self.deprecated_lock = DeprecatedRulesFile.from_dict(dict(data=deprecated_lock))
@staticmethod
def save_file(path: Path, lock_file: Union[VersionLockFile, DeprecatedRulesFile]):
assert path, f'{path} not set'
lock_file.save_to_file(path)
print(f'Updated {path} file')
def get_locked_version(self, rule_id: str, min_stack_version: Optional[str] = None) -> Optional[int]:
if rule_id in self.version_lock:
latest_version_info = self.version_lock[rule_id]
if latest_version_info.previous and latest_version_info.previous.get(min_stack_version):
stack_version_info = latest_version_info.previous.get(min_stack_version)
else:
stack_version_info = latest_version_info
return stack_version_info.version
def get_locked_hash(self, rule_id: str, min_stack_version: Optional[str] = None) -> Optional[str]:
"""Get the version info matching the min_stack_version if present."""
if rule_id in self.version_lock:
latest_version_info = self.version_lock[rule_id]
if latest_version_info.previous and latest_version_info.previous.get(min_stack_version):
stack_version_info = latest_version_info.previous.get(min_stack_version)
else:
stack_version_info = latest_version_info
existing_sha256: str = stack_version_info.sha256
return existing_sha256
def manage_versions(self, rules: RuleCollection,
exclude_version_update=False, save_changes=False,
verbose=True, buffer_int: int = 100) -> (List[str], List[str], List[str]):
"""Update the contents of the version.lock file and optionally save changes."""
from .packaging import current_stack_version
version_lock_hash = self.version_lock.sha256()
lock_file_contents = deepcopy(self.version_lock.to_dict())
current_deprecated_lock = deepcopy(self.deprecated_lock.to_dict())
verbose_echo = click.echo if verbose else (lambda x: None)
already_deprecated = set(current_deprecated_lock)
deprecated_rules = set(rules.deprecated.id_map)
new_rules = set(rule.id for rule in rules if rule.contents.saved_version is None) - deprecated_rules
changed_rules = set(rule.id for rule in rules if rule.contents.is_dirty) - deprecated_rules
# manage deprecated rules
newly_deprecated = deprecated_rules - already_deprecated
if not (new_rules or changed_rules or newly_deprecated):
return list(changed_rules), list(new_rules), list(newly_deprecated)
verbose_echo('Rule changes detected!')
changes = []
def log_changes(r, route_taken, new_rule_version, *msg):
new = [f' {route_taken}: {r.id}, new version: {new_rule_version}']
new.extend([f' - {m}' for m in msg if m])
changes.extend(new)
for rule in rules:
if rule.contents.metadata.maturity == "production" or rule.id in newly_deprecated:
# assume that older stacks are always locked first
min_stack = Version.parse(rule.contents.get_supported_version(),
optional_minor_and_patch=True)
lock_from_rule = rule.contents.lock_info(bump=not exclude_version_update)
lock_from_file: dict = lock_file_contents.setdefault(rule.id, {})
# scenarios to handle, assuming older stacks are always locked first:
# 1) no breaking changes ever made or the first time a rule is created
# 2) on the latest, after a breaking change has been locked
# 3) on the latest stack, locking in a breaking change
# 4) on an old stack, after a breaking change has been made
latest_locked_stack_version = rule.contents.convert_supported_version(
lock_from_file.get("min_stack_version"))
# strip version down to only major.minor to compare against lock file versioning
stripped_version = f"{min_stack.major}.{min_stack.minor}"
if not lock_from_file or min_stack == latest_locked_stack_version:
route = 'A'
# 1) no breaking changes ever made or the first time a rule is created
# 2) on the latest, after a breaking change has been locked
lock_from_file.update(lock_from_rule)
new_version = lock_from_rule['version']
# add the min_stack_version to the lock if it's explicitly set
if rule.contents.metadata.min_stack_version is not None:
lock_from_file["min_stack_version"] = stripped_version
log_msg = f'min_stack_version added: {min_stack}'
log_changes(rule, route, new_version, log_msg)
elif min_stack > latest_locked_stack_version:
route = 'B'
# 3) on the latest stack, locking in a breaking change
stripped_latest_locked_stack_version = f"{latest_locked_stack_version.major}." \
f"{latest_locked_stack_version.minor}"
# preserve buffer space to support forked version spacing
if exclude_version_update:
buffer_int -= 1
lock_from_rule["version"] = lock_from_file["version"] + buffer_int
previous_lock_info = {
"max_allowable_version": lock_from_rule['version'] - 1,
"rule_name": lock_from_file["rule_name"],
"sha256": lock_from_file["sha256"],
"version": lock_from_file["version"],
"type": lock_from_file["type"]
}
lock_from_file.setdefault("previous", {})
# move the current locked info into the previous section
lock_from_file["previous"][stripped_latest_locked_stack_version] = previous_lock_info
# overwrite the "latest" part of the lock at the top level
lock_from_file.update(lock_from_rule, min_stack_version=stripped_version)
new_version = lock_from_rule['version']
log_changes(
rule, route, new_version,
f'previous {stripped_latest_locked_stack_version} saved as \
version: {previous_lock_info["version"]}',
f'current min_stack updated to {stripped_version}'
)
elif min_stack < latest_locked_stack_version:
route = 'C'
# 4) on an old stack, after a breaking change has been made (updated fork)
assert stripped_version in lock_from_file.get("previous", {}), \
f"Expected {rule.id} @ v{stripped_version} in the rule lock"
# TODO: Figure out whether we support locking old versions and if we want to
# "leave room" by skipping versions when breaking changes are made.
# We can still inspect the version lock manually after locks are made,
# since it's a good summary of everything that happens
previous_entry = lock_from_file["previous"][stripped_version]
max_allowable_version = previous_entry['max_allowable_version']
# if version bump collides with future bump: fail
# if space: change and log
info_from_rule = (lock_from_rule['sha256'], lock_from_rule['version'])
info_from_file = (previous_entry['sha256'], previous_entry['version'])
if lock_from_rule['version'] > max_allowable_version:
raise ValueError(f'Forked rule: {rule.id} - {rule.name} has changes that will force it to '
f'exceed the max allowable version of {max_allowable_version}')
if info_from_rule != info_from_file:
lock_from_file["previous"][stripped_version].update(lock_from_rule)
new_version = lock_from_rule["version"]
log_changes(rule, route, 'unchanged',
f'previous version {stripped_version} updated version to {new_version}')
continue
else:
raise RuntimeError("Unreachable code")
for rule in rules.deprecated:
if rule.id in newly_deprecated:
current_deprecated_lock[rule.id] = {
"rule_name": rule.name,
"stack_version": current_stack_version(),
"deprecation_date": rule.contents.metadata['deprecation_date']
}
if save_changes or verbose:
click.echo(f' - {len(changed_rules)} changed rules')
click.echo(f' - {len(new_rules)} new rules')
click.echo(f' - {len(newly_deprecated)} newly deprecated rules')
if not save_changes:
verbose_echo(
'run `build-release --update-version-lock` to update version.lock.json and deprecated_rules.json')
return list(changed_rules), list(new_rules), list(newly_deprecated)
click.echo('Detailed changes: \n' + '\n'.join(changes))
# reset local version lock
self.version_lock = VersionLockFile.from_dict(dict(data=lock_file_contents))
self.deprecated_lock = DeprecatedRulesFile.from_dict(dict(data=current_deprecated_lock))
new_hash = self.version_lock.sha256()
if version_lock_hash != new_hash:
self.save_file(self.version_lock_file, self.version_lock)
if newly_deprecated:
self.save_file(self.deprecated_lock_file, self.deprecated_lock)
return changed_rules, list(new_rules), newly_deprecated
name = str(RULES_CONFIG.version_lock_file)
loaded_version_lock = VersionLock(RULES_CONFIG.version_lock_file, RULES_CONFIG.deprecated_rules_file, name=name)