|
1 | | -import base64 |
2 | 1 | import logging |
3 | | -import quopri |
4 | 2 | import random |
5 | 3 | import re |
6 | 4 | from datetime import timezone |
7 | | -from enum import Enum |
8 | 5 | from typing import Any, Callable, Dict, Optional |
9 | 6 |
|
10 | 7 | import rstr |
11 | 8 | from faker import Faker |
12 | 9 |
|
13 | 10 | from jsf.schema_types.base import BaseSchema, ProviderNotSetException |
| 11 | +from jsf.schema_types.string_utils import content_encoding, content_type |
| 12 | +from jsf.schema_types.string_utils.content_type.text__plain import random_fixed_length_sentence |
14 | 13 |
|
15 | 14 | logger = logging.getLogger() |
16 | 15 | faker = Faker() |
|
19 | 18 | URI_PATTERN = f"https?://{{hostname}}(?:{FRAGMENT})+" |
20 | 19 | PARAM_PATTERN = "(?:\\?([a-z]{1,7}(=\\w{1,5})?&){0,3})?" |
21 | 20 |
|
22 | | -LOREM = """Lorem ipsum dolor sit amet consectetur adipisicing elit. |
23 | | -Hic molestias, esse veniam placeat officiis nobis architecto modi |
24 | | -possimus reiciendis accusantium exercitationem quas illum libero odit magnam, |
25 | | -reprehenderit ipsum, repellendus culpa!""".split() |
26 | | - |
27 | 21 |
|
28 | 22 | def temporal_duration( |
29 | 23 | positive: bool = True, |
@@ -123,100 +117,35 @@ def fake_duration(): |
123 | 117 | } |
124 | 118 |
|
125 | 119 |
|
126 | | -def random_fixed_length_sentence(_min: int, _max: int) -> str: |
127 | | - output = "" |
128 | | - while len(output) < _max: |
129 | | - remaining = _max - len(output) |
130 | | - valid_words = list(filter(lambda s: len(s) < remaining, LOREM)) |
131 | | - if len(valid_words) == 0: |
132 | | - break |
133 | | - output += random.choice(valid_words) + " " |
134 | | - if len(output) > _min and random.uniform(0, 1) > 0.9: |
135 | | - break |
136 | | - return output.strip() |
137 | | - |
138 | | - |
139 | | -class ContentEncoding(str, Enum): |
140 | | - SEVEN_BIT = "7-bit" |
141 | | - EIGHT_BIT = "8-bit" |
142 | | - BINARY = "binary" |
143 | | - QUOTED_PRINTABLE = "quoted-printable" |
144 | | - BASE16 = "base-16" |
145 | | - BASE32 = "base-32" |
146 | | - BASE64 = "base-64" |
147 | | - |
148 | | - |
149 | | -def binary_encoder(string: str) -> str: |
150 | | - return "".join(format(x, "b") for x in bytearray(string, "utf-8")) |
151 | | - |
152 | | - |
153 | | -def bytes_str_repr(b: bytes) -> str: |
154 | | - return repr(b)[2:-1] |
155 | | - |
156 | | - |
157 | | -def seven_bit_encoder(string: str) -> str: |
158 | | - return bytes_str_repr(string.encode("utf-7")) |
159 | | - |
160 | | - |
161 | | -def eight_bit_encoder(string: str) -> str: |
162 | | - return bytes_str_repr(string.encode("utf-8")) |
163 | | - |
164 | | - |
165 | | -def quoted_printable_encoder(string: str) -> str: |
166 | | - return bytes_str_repr(quopri.encodestring(string.encode("utf-8"))) |
167 | | - |
168 | | - |
169 | | -def b16_encoder(string: str) -> str: |
170 | | - return bytes_str_repr(base64.b16encode(string.encode("utf-8"))) |
171 | | - |
172 | | - |
173 | | -def b32_encoder(string: str) -> str: |
174 | | - return bytes_str_repr(base64.b32encode(string.encode("utf-8"))) |
175 | | - |
176 | | - |
177 | | -def b64_encoder(string: str) -> str: |
178 | | - return bytes_str_repr(base64.b64encode(string.encode("utf-8"))) |
179 | | - |
180 | | - |
181 | | -Encoder = { |
182 | | - ContentEncoding.SEVEN_BIT: seven_bit_encoder, |
183 | | - ContentEncoding.EIGHT_BIT: eight_bit_encoder, |
184 | | - ContentEncoding.BINARY: binary_encoder, |
185 | | - ContentEncoding.QUOTED_PRINTABLE: quoted_printable_encoder, |
186 | | - ContentEncoding.BASE16: b16_encoder, |
187 | | - ContentEncoding.BASE32: b32_encoder, |
188 | | - ContentEncoding.BASE64: b64_encoder, |
189 | | -} |
190 | | - |
191 | | - |
192 | | -def encode(string: str, encoding: Optional[ContentEncoding]) -> str: |
193 | | - return Encoder.get(encoding, lambda s: s)(string) |
194 | | - |
195 | | - |
196 | 120 | class String(BaseSchema): |
197 | 121 | minLength: Optional[float] = 0 |
198 | 122 | maxLength: Optional[float] = 50 |
199 | 123 | pattern: Optional[str] = None |
200 | 124 | format: Optional[str] = None |
201 | 125 | # enum: Optional[List[Union[str, int, float]]] = None # NOTE: Not used - enums go to enum class |
202 | | - # contentMediaType: Optional[str] = None # TODO: Long list, need to document which ones will be supported and how to extend |
203 | | - contentEncoding: Optional[ContentEncoding] |
204 | | - # contentSchema # No docs detailing this yet... |
| 126 | + contentMediaType: Optional[str] = None |
| 127 | + contentEncoding: Optional[content_encoding.ContentEncoding] |
| 128 | + # contentSchema # Doesnt help with generation |
205 | 129 |
|
206 | 130 | def generate(self, context: Dict[str, Any]) -> Optional[str]: |
207 | 131 | try: |
208 | 132 | s = super().generate(context) |
209 | | - return str(encode(s, self.contentEncoding)) if s else s |
| 133 | + return str(content_encoding.encode(s, self.contentEncoding)) if s else s |
210 | 134 | except ProviderNotSetException: |
211 | 135 | format_map["regex"] = lambda: rstr.xeger(self.pattern) |
212 | 136 | format_map["relative-json-pointer"] = lambda: random.choice( |
213 | 137 | context["state"]["__all_json_paths__"] |
214 | 138 | ) |
215 | 139 | if format_map.get(self.format) is not None: |
216 | | - return encode(format_map[self.format](), self.contentEncoding) |
| 140 | + return content_encoding.encode(format_map[self.format](), self.contentEncoding) |
217 | 141 | if self.pattern is not None: |
218 | | - return encode(rstr.xeger(self.pattern), self.contentEncoding) |
219 | | - return encode( |
| 142 | + return content_encoding.encode(rstr.xeger(self.pattern), self.contentEncoding) |
| 143 | + if self.contentMediaType is not None: |
| 144 | + return content_encoding.encode( |
| 145 | + content_type.generate(self.contentMediaType, self.minLength, self.maxLength), |
| 146 | + self.contentEncoding, |
| 147 | + ) |
| 148 | + return content_encoding.encode( |
220 | 149 | random_fixed_length_sentence(self.minLength, self.maxLength), self.contentEncoding |
221 | 150 | ) |
222 | 151 |
|
|
0 commit comments