Skip to content

Commit 7df00ad

Browse files
authored
Merge branch 'AllDotPy:master' into master
2 parents 850e5d7 + d8b1d45 commit 7df00ad

File tree

3 files changed

+494
-1
lines changed

3 files changed

+494
-1
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,4 @@ __marimo__/
211211
docs/*
212212
rules/*
213213
tests/*
214-
valkyrie/*
214+
# valkyrie/*

valkyrie/core/scanner.py

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
"""
2+
Valkyrie Security Scanner - Core Architecture
3+
"""
4+
5+
from pathlib import Path
6+
from typing import Dict, List, Optional
7+
import asyncio
8+
import logging
9+
from datetime import datetime
10+
11+
12+
from valkyrie.core.types import (
13+
RuleRepository, ScannerPlugin, ScanRule,
14+
ScanConfig, SecurityFinding, ScanResult,
15+
ScanStatus
16+
)
17+
18+
19+
####
20+
## VALKYRIE SCANNER ENGINE
21+
#####
22+
class ValkyrieScanner:
23+
"""
24+
Main scanner engine that orchestrates security scanning
25+
"""
26+
27+
def __init__(
28+
self,
29+
rule_repository: RuleRepository,
30+
logger: Optional[logging.Logger] = None
31+
):
32+
self.rule_repository = rule_repository
33+
self.logger = logger or logging.getLogger(__name__)
34+
self.plugins: Dict[str, ScannerPlugin] = {}
35+
self._rules_cache: Optional[List[ScanRule]] = None
36+
37+
async def register_plugin(self, plugin: ScannerPlugin) -> None:
38+
"""Register a scanner plugin"""
39+
40+
# Initialize the plugin
41+
await plugin.initialize({})
42+
43+
# Then add it to the scanner registry
44+
self.plugins[plugin.name] = plugin
45+
self._rules_cache = None # Invalidate cache
46+
self.logger.info(f"Registered plugin: {plugin.name} v{plugin.version}")
47+
48+
async def unregister_plugin(self, plugin_name: str) -> None:
49+
"""Unregister a scanner plugin"""
50+
51+
if plugin_name in self.plugins:
52+
# Then perform cleanup before deleting it
53+
await self.plugins[plugin_name].cleanup()
54+
55+
del self.plugins[plugin_name]
56+
self._rules_cache = None # Invalidate cache
57+
self.logger.info(f"Unregistered plugin: {plugin_name}")
58+
59+
async def _load_all_rules(self) -> List[ScanRule]:
60+
"""Load rules from repository and plugins"""
61+
62+
if self._rules_cache is not None:
63+
return self._rules_cache
64+
65+
# Load from repository
66+
rules = await self.rule_repository.load_rules()
67+
68+
# Load from plugins
69+
for plugin in self.plugins.values():
70+
plugin_rules = await plugin.get_rules()
71+
rules.extend(plugin_rules)
72+
73+
self._rules_cache = rules
74+
return rules
75+
76+
def _get_scannable_files(self, config: ScanConfig) -> List[Path]:
77+
"""Get list of files to scan based on configuration"""
78+
79+
files = []
80+
81+
for pattern in config.include_patterns:
82+
for file_path in config.target_path.glob(pattern):
83+
if not file_path.is_file():
84+
continue
85+
86+
# Check file size
87+
if file_path.stat().st_size > config.max_file_size:
88+
self.logger.warning(f"Skipping large file: {file_path}")
89+
continue
90+
91+
# Check exclude patterns
92+
should_exclude = any(
93+
file_path.match(exclude_pattern)
94+
for exclude_pattern in config.exclude_patterns
95+
)
96+
97+
if not should_exclude:
98+
files.append(file_path)
99+
100+
return files
101+
102+
async def _scan_file(
103+
self,
104+
file_path: Path,
105+
rules: List[ScanRule],
106+
config: ScanConfig
107+
) -> List[SecurityFinding]:
108+
"""Scan a single file with applicable rules"""
109+
findings = []
110+
111+
try:
112+
# Read file content
113+
content = file_path.read_text(encoding='utf-8', errors='ignore')
114+
115+
# Apply applicable rules
116+
for rule in rules:
117+
# Ignore disabled rules
118+
if not rule.metadata.enabled:
119+
continue
120+
121+
# Rule is not in rule filters
122+
if config.rule_filters and rule.metadata.id not in config.rule_filters:
123+
continue
124+
125+
# Rule does'nt have the least severity
126+
if rule.metadata.severity.value < config.severity_threshold.value:
127+
continue
128+
129+
if rule.is_applicable(file_path):
130+
try:
131+
rule_findings = await rule.scan(file_path, content)
132+
findings.extend(rule_findings)
133+
except Exception as e:
134+
self.logger.error(f"Rule {rule.metadata.id} failed on {file_path}: {e}")
135+
136+
except Exception as e:
137+
self.logger.error(f"Failed to scan file {file_path}: {e}")
138+
139+
return findings
140+
141+
async def scan(self, config: ScanConfig) -> ScanResult:
142+
"""
143+
Execute security scan based on configuration
144+
145+
Args:
146+
config: Scan configuration
147+
148+
Returns:
149+
Complete scan results
150+
"""
151+
152+
scan_id = f"scan_{datetime.now().isoformat()}"
153+
start_time = datetime.now()
154+
155+
self.logger.info(f"Starting security scan: {scan_id}")
156+
157+
try:
158+
# Load all rules
159+
rules = await self._load_all_rules()
160+
self.logger.info(f"Loaded {len(rules)} security rules")
161+
162+
# Get files to scan
163+
files_to_scan = self._get_scannable_files(config)
164+
self.logger.info(f"Scanning {len(files_to_scan)} files")
165+
166+
# Create scan tasks
167+
semaphore = asyncio.Semaphore(config.parallel_workers)
168+
169+
async def scan_with_semaphore(file_path: Path) -> List[SecurityFinding]:
170+
async with semaphore:
171+
return await self._scan_file(file_path, rules, config)
172+
173+
# Execute scans concurrently
174+
scan_tasks = [scan_with_semaphore(file_path) for file_path in files_to_scan]
175+
results = await asyncio.gather(*scan_tasks, return_exceptions=True)
176+
177+
# Collect findings and errors
178+
all_findings = []
179+
errors = []
180+
181+
for result in results:
182+
if isinstance(result, Exception):
183+
errors.append(str(result))
184+
else:
185+
all_findings.extend(result)
186+
187+
# Calculate duration
188+
scan_duration = (datetime.now() - start_time).total_seconds()
189+
190+
scan_result = ScanResult(
191+
scan_id=scan_id,
192+
status=ScanStatus.COMPLETED,
193+
findings=all_findings,
194+
scan_duration=scan_duration,
195+
scanned_files=set(files_to_scan),
196+
errors=errors
197+
)
198+
199+
self.logger.info(
200+
f"Scan completed: {len(all_findings)} findings, "
201+
f"{scan_result.critical_count} critical, "
202+
f"{scan_result.high_count} high severity"
203+
)
204+
205+
return scan_result
206+
207+
except Exception as e:
208+
self.logger.error(f"Scan failed: {e}")
209+
return ScanResult(
210+
scan_id=scan_id,
211+
status=ScanStatus.FAILED,
212+
errors=[str(e)],
213+
scan_duration=(datetime.now() - start_time).total_seconds()
214+
)

0 commit comments

Comments
 (0)