-
Notifications
You must be signed in to change notification settings - Fork 1
/
unit_test.py
150 lines (106 loc) · 4.53 KB
/
unit_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python
# PyLCG - Linear Congruential Generator for IP Sharding - Developed by acidvegas ib Python (https://github.com/acidvegas/pylcg)
# unit_test.py
import ipaddress
import time
import unittest
from pylcg import IPRange, ip_stream, LCG
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
CYAN = '\033[96m'
RED = '\033[91m'
ENDC = '\033[0m'
def print_header(message: str) -> None:
print(f'\n\n{Colors.BLUE}{"="*80}')
print(f'TEST: {message}')
print(f'{"="*80}{Colors.ENDC}\n')
def print_success(message: str) -> None:
print(f'{Colors.GREEN}✓ {message}{Colors.ENDC}')
def print_info(message: str) -> None:
print(f"{Colors.CYAN}ℹ {message}{Colors.ENDC}")
def print_warning(message: str) -> None:
print(f"{Colors.YELLOW}! {message}{Colors.ENDC}")
class TestIPSharder(unittest.TestCase):
@classmethod
def setUpClass(cls):
print_header('Setting up test environment')
cls.test_cidr = '192.0.0.0/16' # 65,536 IPs
cls.test_seed = 12345
cls.total_shards = 4
# Calculate expected IPs
network = ipaddress.ip_network(cls.test_cidr)
cls.all_ips = {str(ip) for ip in network}
print_success(f"Initialized test environment with {len(cls.all_ips):,} IPs")
def test_ip_range_initialization(self):
print_header('Testing IPRange initialization')
start_time = time.perf_counter()
ip_range = IPRange(self.test_cidr)
self.assertEqual(ip_range.total, 65536)
first_ip = ip_range.get_ip_at_index(0)
last_ip = ip_range.get_ip_at_index(ip_range.total - 1)
elapsed = time.perf_counter() - start_time
print_success(f'IP range initialization completed in {elapsed:.6f}s')
print_info(f'IP range spans from {first_ip} to {last_ip}')
print_info(f'Total IPs in range: {ip_range.total:,}')
def test_lcg_sequence(self):
print_header('Testing LCG sequence generation')
# Test sequence generation speed
lcg = LCG(seed=self.test_seed)
iterations = 1_000_000
start_time = time.perf_counter()
for _ in range(iterations):
lcg.next()
elapsed = time.perf_counter() - start_time
print_success(f'Generated {iterations:,} random numbers in {elapsed:.6f}s')
print_info(f'Average time per number: {(elapsed/iterations)*1000000:.2f} microseconds')
# Test deterministic behavior
lcg1 = LCG(seed=self.test_seed)
lcg2 = LCG(seed=self.test_seed)
start_time = time.perf_counter()
for _ in range(1000):
self.assertEqual(lcg1.next(), lcg2.next())
elapsed = time.perf_counter() - start_time
print_success(f'Verified LCG determinism in {elapsed:.6f}s')
def test_shard_distribution(self):
print_header('Testing shard distribution and randomness')
# Test distribution across shards
sample_size = 65_536 # Full size for /16
shard_counts = {i: 0 for i in range(1, self.total_shards + 1)} # 1-based sharding
unique_ips = set()
duplicate_count = 0
start_time = time.perf_counter()
# Collect IPs from each shard
for shard in range(1, self.total_shards + 1): # 1-based sharding
ip_gen = ip_stream(self.test_cidr, shard, self.total_shards, self.test_seed)
shard_unique = set()
# Get all IPs from this shard
for ip in ip_gen:
if ip in unique_ips:
duplicate_count += 1
else:
unique_ips.add(ip)
shard_unique.add(ip)
shard_counts[shard] = len(shard_unique)
elapsed = time.perf_counter() - start_time
# Print distribution statistics
print_success(f'Generated {len(unique_ips):,} IPs in {elapsed:.6f}s')
print_info(f'Average time per IP: {(elapsed/len(unique_ips))*1000000:.2f} microseconds')
print_info(f'Unique IPs generated: {len(unique_ips):,}')
if duplicate_count > 0:
print_warning(f'Duplicates found: {duplicate_count:,} ({(duplicate_count/len(unique_ips))*100:.2f}%)')
expected_per_shard = sample_size // self.total_shards
for shard, count in shard_counts.items():
deviation = abs(count - expected_per_shard) / expected_per_shard * 100
print_info(f'Shard {shard}: {count:,} unique IPs ({deviation:.2f}% deviation from expected)')
# Test randomness by checking sequential patterns
ips_list = sorted([int(ipaddress.ip_address(ip)) for ip in list(unique_ips)[:1000]])
sequential_count = sum(1 for i in range(len(ips_list)-1) if ips_list[i] + 1 == ips_list[i+1])
sequential_percentage = (sequential_count / (len(ips_list)-1)) * 100
print_info(f'Sequential IP pairs in first 1000: {sequential_percentage:.2f}% (lower is more random)')
if __name__ == '__main__':
print(f"\n{Colors.CYAN}{'='*80}")
print(f"Starting IP Sharder Tests - Testing with 65,536 IPs (/16 network)")
print(f"{'='*80}{Colors.ENDC}\n")
unittest.main(verbosity=2)