Skip to content

Commit 870326c

Browse files
committed
Add the ability for candore to resume an extraction after an error
This change makes some structural changes to allow the extractor to store extraction to files that it can later resume from. Due to the async nature of the extractor, I haven't allowed this save/resume to happen within an entity, so the latest entity being extracted would lose its progress and have to start again.
1 parent 2633855 commit 870326c

10 files changed

+119
-43
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Introduction
44

5-
`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product.
5+
`Candore` is the command line interface data integrity tool. The tool is build to verify the change made in a product has any impact on data in product.
66

77
**The change** could be:
88
- Upgrade of the product to new version

candore/__init__.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from candore.modules.extractor import Extractor
1212
from candore.modules.finder import Finder
1313
from candore.modules.report import Reporting
14-
from candore.config import candore_settings
1514

1615

1716
class Candore:
@@ -22,7 +21,9 @@ def __init__(self, settings):
2221
def list_endpoints(self):
2322
return self.api_lister.lister_endpoints()
2423

25-
async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_percent=None):
24+
async def save_all_entities(
25+
self, mode, output_file, full, max_pages=None, skip_percent=None, resume=None
26+
):
2627
"""Save all the entities to a json file
2728
2829
:param mode: Pre or Post
@@ -39,6 +40,8 @@ async def save_all_entities(self, mode, output_file, full, max_pages=None, skip_
3940
extractor.full = True
4041
extractor.max_pages = max_pages
4142
extractor.skip_percent = skip_percent
43+
if resume:
44+
extractor.load_resume_info()
4245
data = await extractor.extract_all_entities()
4346
if hasattr(self.settings, 'rpms'):
4447
data.update({'installed_rpms': await extractor.extract_all_rpms()})

candore/cli.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def candore(ctx, version, settings_file, components_file, conf_dir):
2828
settings=candore_settings(
2929
option_settings_file=settings_file,
3030
option_components_file=components_file,
31-
conf_dir=conf_dir
31+
conf_dir=conf_dir,
3232
)
3333
)
3434
ctx.__dict__["candore"] = candore_obj
@@ -49,8 +49,9 @@ def apis(ctx):
4949
@click.option("--full", is_flag=True, help="Extract data from all the pages of a component")
5050
@click.option("--max-pages", type=int, help="The maximum number of pages to extract per entity")
5151
@click.option("--skip-percent", type=int, help="The percentage of pages to skip per entity")
52+
@click.option("--resume", is_flag=True, help="Resume the extraction from the last completed entity")
5253
@click.pass_context
53-
def extract(ctx, mode, output, full, max_pages, skip_percent):
54+
def extract(ctx, mode, output, full, max_pages, skip_percent, resume):
5455
loop = asyncio.get_event_loop()
5556
candore_obj = ctx.parent.candore
5657
loop.run_until_complete(
@@ -60,6 +61,7 @@ def extract(ctx, mode, output, full, max_pages, skip_percent):
6061
full=full,
6162
max_pages=max_pages,
6263
skip_percent=skip_percent,
64+
resume=resume,
6365
)
6466
)
6567

candore/modules/comparator.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ def custom_key(elem):
132132
def compare_all_pres_with_posts(self, pre_data, post_data, unique_key="", var_details=None):
133133
if unique_key:
134134
self.big_key.append(unique_key)
135-
if isinstance(pre_data, dict):
135+
if isinstance(pre_data, dict) and post_data:
136136
self._is_data_type_dict(pre_data, post_data, unique_key=unique_key)
137-
elif isinstance(pre_data, list):
137+
elif isinstance(pre_data, list) and post_data:
138138
self._is_data_type_list(pre_data, post_data, unique_key=unique_key)
139139
else:
140140
if pre_data != post_data:

candore/modules/extractor.py

+83-26
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import asyncio # noqa: F401
2+
import json
23
import math
3-
from functools import cached_property
4-
from candore.modules.ssh import Session
54
import re
5+
from functools import cached_property
6+
from pathlib import Path
7+
68
import aiohttp
79

10+
from candore.modules.ssh import Session
11+
812
# Max observed request duration in testing was approximately 888 seconds
913
# so we set the timeout to 2000 seconds to be overly safe
1014
EXTENDED_TIMEOUT = aiohttp.ClientTimeout(total=2000, connect=60, sock_read=2000, sock_connect=60)
15+
RESUME_FILE = Path("_resume_info.json")
16+
PARTIAL_FILE = Path("_partial_extraction.json")
1117

1218

1319
class Extractor:
@@ -27,6 +33,12 @@ def __init__(self, settings, apilister=None):
2733
self.apilister = apilister
2834
self.full = False
2935
self.semaphore = asyncio.Semaphore(self.settings.candore.max_connections)
36+
self._all_data = {}
37+
self._api_endpoints = None
38+
self._completed_entities = []
39+
self._current_entity = None
40+
self._current_endpoint = None
41+
self._retry_limit = 3
3042

3143
@cached_property
3244
def dependent_components(self):
@@ -40,7 +52,9 @@ def ignore_components(self):
4052

4153
@cached_property
4254
def api_endpoints(self):
43-
return self.apilister.lister_endpoints()
55+
if not self._api_endpoints:
56+
self._api_endpoints = self.apilister.lister_endpoints()
57+
return self._api_endpoints
4458

4559
async def _start_session(self):
4660
if not self.client:
@@ -56,13 +70,37 @@ async def __aenter__(self):
5670

5771
async def __aexit__(self, exc_type, exc_val, exc_tb):
5872
await self._end_session()
73+
if exc_val:
74+
with open("_partial_extraction.json", "w") as partial_file:
75+
json.dump(self._all_data, partial_file)
76+
with open("_resume_info.json", "w") as resume_file:
77+
json.dump(self.to_resume_dict(), resume_file, indent=4)
78+
79+
async def _retry_get(self, retries=None, **get_params):
80+
if not retries:
81+
retries = self._retry_limit
82+
try:
83+
async with self.client.get(**get_params) as response:
84+
if response.status == 200:
85+
json_data = await response.json()
86+
return response.status, json_data
87+
else:
88+
return response.status, {}
89+
except aiohttp.ClientError:
90+
if retries > 0:
91+
return await self._retry_get(retries=retries - 1, **get_params)
92+
else:
93+
print(
94+
f"Failed to get data from {get_params.get('url')} "
95+
f"in {self._retry_limit} retries."
96+
)
97+
raise
5998

6099
async def paged_results(self, **get_params):
61-
async with self.client.get(**get_params, timeout=EXTENDED_TIMEOUT) as response:
62-
if response.status == 200:
63-
_paged_results = await response.json()
64-
_paged_results = _paged_results.get("results")
65-
return _paged_results
100+
status, _paged_results = await self._retry_get(**get_params, timeout=EXTENDED_TIMEOUT)
101+
if status == 200:
102+
_paged_results = _paged_results.get("results")
103+
return _paged_results
66104

67105
async def fetch_page(self, page, _request):
68106
async with self.semaphore:
@@ -95,18 +133,17 @@ async def fetch_component_entities(self, **comp_params):
95133
_request = {"url": self.base + "/" + endpoint, "params": {}}
96134
if data and dependency:
97135
_request["params"].update({f"{dependency}_id": data})
98-
async with self.client.get(**_request) as response:
99-
if response.status == 200:
100-
results = await response.json()
101-
if "results" in results:
102-
entity_data.extend(results.get("results"))
103-
else:
104-
# Return an empty directory for endpoints
105-
# like services, api etc
106-
# which does not have results
107-
return entity_data
136+
status, results = await self._retry_get(**_request)
137+
if status == 200:
138+
if "results" in results:
139+
entity_data.extend(results.get("results"))
108140
else:
141+
# Return an empty directory for endpoints
142+
# like services, api etc
143+
# which does not have results
109144
return entity_data
145+
else:
146+
return entity_data
110147
total_pages = results.get("total") // results.get("per_page") + 1
111148
if total_pages > 1:
112149
print(f"Endpoint {endpoint} has {total_pages} pages.")
@@ -154,11 +191,12 @@ async def component_params(self, component_endpoint):
154191

155192
async def process_entities(self, endpoints):
156193
"""
157-
endpoints = ['katello/api/actiovationkeys']
194+
endpoints = ['katello/api/activationkeys']
158195
"""
159196
comp_data = []
160197
entities = None
161198
for endpoint in endpoints:
199+
self._current_endpoint = endpoint
162200
comp_params = await self.component_params(component_endpoint=endpoint)
163201
if comp_params:
164202
entities = []
@@ -183,21 +221,40 @@ async def extract_all_entities(self):
183221
184222
:return:
185223
"""
186-
all_data = {}
187224
for component, endpoints in self.api_endpoints.items():
188-
if endpoints:
225+
self._current_entity = component
226+
if endpoints and component not in self._completed_entities:
189227
comp_entities = await self.process_entities(endpoints=endpoints)
190-
all_data[component] = comp_entities
191-
return all_data
228+
self._all_data[component] = comp_entities
229+
self._completed_entities.append(component)
230+
return self._all_data
192231

193232
async def extract_all_rpms(self):
194233
"""Extracts all installed RPMs from server"""
195234
with Session(settings=self.settings) as ssh_client:
196235
rpms = ssh_client.execute('rpm -qa').stdout
197236
rpms = rpms.splitlines()
198237
name_version_pattern = rf'{self.settings.rpms.regex_pattern}'
199-
rpms_matches = [
200-
re.compile(name_version_pattern).match(rpm) for rpm in rpms
201-
]
238+
rpms_matches = [re.compile(name_version_pattern).match(rpm) for rpm in rpms]
202239
rpms_list = [rpm_match.groups()[:-1] for rpm_match in rpms_matches if rpm_match]
203240
return dict(rpms_list)
241+
242+
def to_resume_dict(self):
243+
"""Exports our latest extraction progress information to a dictionary"""
244+
return {
245+
"api_endpoints": self._api_endpoints,
246+
"completed_entities": self._completed_entities,
247+
"current_entity": self._current_entity,
248+
"current_endpoint": self._current_endpoint,
249+
}
250+
251+
def load_resume_info(self):
252+
"""Resumes our extraction from the last known state"""
253+
resume_info = json.load(RESUME_FILE.read_text())
254+
self._api_endpoints = resume_info["api_endpoints"]
255+
self._completed_entities = resume_info["completed_entities"]
256+
self._current_entity = resume_info["current_entity"]
257+
self._current_endpoint = resume_info["current_endpoint"]
258+
self._all_data = json.loads(PARTIAL_FILE.read_text())
259+
RESUME_FILE.unlink()
260+
PARTIAL_FILE.unlink()

candore/modules/report.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,22 @@ def _generate_csv_report(self, output_file, inverse):
8080
# Convert json to csv and write to output file
8181
csv_writer = csv.writer(output_file.open("w"))
8282
# Table Column Names
83-
columns = ["Path", "Pre-Upgrade", "Post-Upgrade", "Variation?" if not inverse else 'Constant?']
83+
columns = [
84+
"Path",
85+
"Pre-Upgrade",
86+
"Post-Upgrade",
87+
"Variation?" if not inverse else 'Constant?',
88+
]
8489
csv_writer.writerow(columns)
8590
# Writing Rows
8691
for var_path, vals in self.results.items():
87-
csv_writer.writerow([
88-
var_path, vals["pre"], vals["post"],
89-
vals["variation" if not inverse else "constant"]])
92+
csv_writer.writerow(
93+
[
94+
var_path,
95+
vals["pre"],
96+
vals["post"],
97+
vals["variation" if not inverse else "constant"],
98+
]
99+
)
90100
print("Wrote CSV report to {}".format(output_file))
91101
print("CSV report contains {} results".format(len(self.results)))

candore/modules/ssh.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
from hussh import Connection
21
from functools import cached_property
3-
from candore.config import candore_settings
42
from urllib.parse import urlparse
53

4+
from hussh import Connection
5+
66

77
class Session:
8-
98
def __init__(self, settings=None):
109
self.settings = settings
1110
self.hostname = urlparse(settings.candore.base_url).hostname

candore/modules/variations.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
`conf/variations` yaml file and convert them into processable list
44
"""
55
from functools import cached_property
6-
from candore.utils import yaml_reader, get_yaml_paths
6+
7+
from candore.utils import get_yaml_paths
8+
from candore.utils import yaml_reader
79

810

911
class Variations:
@@ -20,7 +22,6 @@ def expected_variations(self):
2022
yaml_data = self.variations.get("expected_variations") if self.variations else None
2123
return get_yaml_paths(yaml_data=yaml_data)
2224

23-
2425
@cached_property
2526
def skipped_variations(self):
2627
yaml_data = self.variations.get("skipped_variations") if self.variations else None

candore/utils.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
An utility helpers module
33
"""
44
from pathlib import Path
5+
56
import yaml
67

78

@@ -40,4 +41,4 @@ def get_yaml_paths(yaml_data, prefix="", separator="/"):
4041
paths.extend(get_yaml_paths(item, prefix, separator))
4142
else:
4243
paths.append(f"{prefix}{yaml_data}")
43-
return paths
44+
return paths

scripts/gen_constants.py

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import yaml
55

66
KEEP_FIELDS = ["name", "label", "title", "url", "description", "path"]
7+
SKIP_ENTITIES = ["errata", "package_groups", "repository_sets"]
78
SKIP_DICT = {}
89
HELP_TEXT = """
910
This script processes a comparison report, in the form of a csv file, and outputs a constants file.
@@ -26,6 +27,8 @@
2627

2728
def filter_parts(parts):
2829
for check in KEEP_FIELDS:
30+
if parts[0] in SKIP_ENTITIES:
31+
return
2932
if check in parts[-1]:
3033
return True
3134

0 commit comments

Comments
 (0)