Marco Ricci commited on 2025-01-21 21:02:31
Zeige 1 geänderte Dateien mit 183 Einfügungen und 188 Löschungen.
Introduce a new workhorse class that provides the validity checking and the falsy value cleanup functionality. The existing top-level functions are now mere facades. The primary reason is to keep the complexity and the nesting level of these functions low-ish: they still trigger linting errors that need to be silenced, but the nesting is tamer, and some common functionality (e.g., tree traversal) can be usefully extracted.
... | ... |
@@ -6,13 +6,12 @@ |
6 | 6 |
|
7 | 7 |
from __future__ import annotations |
8 | 8 |
|
9 |
-import collections |
|
10 | 9 |
import enum |
11 | 10 |
import json |
12 | 11 |
import math |
13 | 12 |
import string |
14 | 13 |
import warnings |
15 |
-from typing import TYPE_CHECKING, Generic, TypeVar |
|
14 |
+from typing import TYPE_CHECKING, Generic, TypeVar, cast |
|
16 | 15 |
|
17 | 16 |
from typing_extensions import ( |
18 | 17 |
Buffer, |
... | ... |
@@ -25,7 +24,7 @@ from typing_extensions import ( |
25 | 24 |
) |
26 | 25 |
|
27 | 26 |
if TYPE_CHECKING: |
28 |
- from collections.abc import MutableSequence, Sequence |
|
27 |
+ from collections.abc import Iterator, Sequence |
|
29 | 28 |
from typing import Literal |
30 | 29 |
|
31 | 30 |
from typing_extensions import ( |
... | ... |
@@ -226,6 +225,180 @@ def json_path(path: Sequence[str | int], /) -> str: |
226 | 225 |
return ''.join(chunks) |
227 | 226 |
|
228 | 227 |
|
228 |
+class _VaultConfigValidator: |
|
229 |
+ INVALID_CONFIG_ERROR = 'vault config is invalid' |
|
230 |
+ |
|
231 |
+ def __init__(self, maybe_config: Any) -> None: # noqa: ANN401 |
|
232 |
+ self.maybe_config = maybe_config |
|
233 |
+ |
|
234 |
+ def traverse_path(self, path: tuple[str, ...]) -> Any: # noqa: ANN401 |
|
235 |
+ obj = self.maybe_config |
|
236 |
+ for key in path: |
|
237 |
+ obj = obj[key] |
|
238 |
+ return obj |
|
239 |
+ |
|
240 |
+ def walk_subconfigs( |
|
241 |
+ self, |
|
242 |
+ ) -> Iterator[tuple[tuple[str] | tuple[str, str], str, Any]]: |
|
243 |
+ obj = cast('dict[str, dict[str, Any]]', self.maybe_config) |
|
244 |
+ if isinstance(obj.get('global', False), dict): |
|
245 |
+ for k, v in list(obj['global'].items()): |
|
246 |
+ yield ('global',), k, v |
|
247 |
+ for sv_name, sv_obj in list(obj['services'].items()): |
|
248 |
+ for k, v in list(sv_obj.items()): |
|
249 |
+ yield ('services', sv_name), k, v |
|
250 |
+ |
|
251 |
+ def validate( # noqa: C901,PLR0912 |
|
252 |
+ self, |
|
253 |
+ *, |
|
254 |
+ allow_unknown_settings: bool = False, |
|
255 |
+ ) -> None: |
|
256 |
+ err_obj_not_a_dict = 'vault config is not a dict' |
|
257 |
+ err_non_str_service_name = ( |
|
258 |
+ 'vault config contains non-string service name {sv_name!r}' |
|
259 |
+ ) |
|
260 |
+ err_not_a_dict = 'vault config entry {json_path_str} is not a dict' |
|
261 |
+ err_not_a_string = 'vault config entry {json_path_str} is not a string' |
|
262 |
+ err_not_an_int = 'vault config entry {json_path_str} is not an integer' |
|
263 |
+ err_unknown_setting = ( |
|
264 |
+ 'vault config entry {json_path_str} uses unknown setting {key!r}' |
|
265 |
+ ) |
|
266 |
+ err_bad_number0 = 'vault config entry {json_path_str} is negative' |
|
267 |
+ err_bad_number1 = 'vault config entry {json_path_str} is not positive' |
|
268 |
+ |
|
269 |
+ kwargs: dict[str, Any] = { |
|
270 |
+ 'allow_unknown_settings': allow_unknown_settings, |
|
271 |
+ } |
|
272 |
+ if not isinstance(self.maybe_config, dict): |
|
273 |
+ raise TypeError(err_obj_not_a_dict.format(**kwargs)) |
|
274 |
+ if 'global' in self.maybe_config: |
|
275 |
+ o_global = self.maybe_config['global'] |
|
276 |
+ if not isinstance(o_global, dict): |
|
277 |
+ kwargs['json_path_str'] = json_path(['global']) |
|
278 |
+ raise TypeError(err_not_a_dict.format(**kwargs)) |
|
279 |
+ if not isinstance(self.maybe_config.get('services'), dict): |
|
280 |
+ kwargs['json_path_str'] = json_path(['services']) |
|
281 |
+ raise TypeError(err_not_a_dict.format(**kwargs)) |
|
282 |
+ for sv_name, service in self.maybe_config['services'].items(): |
|
283 |
+ if not isinstance(sv_name, str): |
|
284 |
+ kwargs['sv_name'] = sv_name |
|
285 |
+ raise TypeError(err_non_str_service_name.format(**kwargs)) |
|
286 |
+ if not isinstance(service, dict): |
|
287 |
+ kwargs['json_path_str'] = json_path(['services', sv_name]) |
|
288 |
+ raise TypeError(err_not_a_dict.format(**kwargs)) |
|
289 |
+ for path, key, value in self.walk_subconfigs(): |
|
290 |
+ kwargs['path'] = path |
|
291 |
+ kwargs['key'] = key |
|
292 |
+ kwargs['value'] = value |
|
293 |
+ kwargs['json_path_str'] = json_path([*path, key]) |
|
294 |
+ # Use match/case here once Python 3.9 becomes unsupported. |
|
295 |
+ if key in {'key', 'phrase'}: |
|
296 |
+ if not isinstance(value, str): |
|
297 |
+ raise TypeError(err_not_a_string.format(**kwargs)) |
|
298 |
+ elif key == 'unicode_normalization_form' and path == ( |
|
299 |
+ 'global', |
|
300 |
+ ): |
|
301 |
+ if not isinstance(value, str): |
|
302 |
+ raise TypeError(err_not_a_string.format(**kwargs)) |
|
303 |
+ if not allow_unknown_settings: |
|
304 |
+ raise ValueError(err_unknown_setting.format(**kwargs)) |
|
305 |
+ elif key == 'notes' and path != ('global',): |
|
306 |
+ if not isinstance(value, str): |
|
307 |
+ raise TypeError(err_not_a_string.format(**kwargs)) |
|
308 |
+ elif key in { |
|
309 |
+ 'length', |
|
310 |
+ 'repeat', |
|
311 |
+ 'lower', |
|
312 |
+ 'upper', |
|
313 |
+ 'number', |
|
314 |
+ 'space', |
|
315 |
+ 'dash', |
|
316 |
+ 'symbol', |
|
317 |
+ }: |
|
318 |
+ if not isinstance(value, int): |
|
319 |
+ raise TypeError(err_not_an_int.format(**kwargs)) |
|
320 |
+ if key == 'length' and value < 1: |
|
321 |
+ raise ValueError(err_bad_number1.format(**kwargs)) |
|
322 |
+ if key != 'length' and value < 0: |
|
323 |
+ raise ValueError(err_bad_number0.format(**kwargs)) |
|
324 |
+ elif not allow_unknown_settings: |
|
325 |
+ raise ValueError(err_unknown_setting.format(**kwargs)) |
|
326 |
+ |
|
327 |
+ def clean_up_falsy_values(self) -> Iterator[CleanupStep]: # noqa: C901 |
|
328 |
+ obj = self.maybe_config |
|
329 |
+ if ( |
|
330 |
+ not isinstance(obj, dict) |
|
331 |
+ or 'services' not in obj |
|
332 |
+ or not isinstance(obj['services'], dict) |
|
333 |
+ ): |
|
334 |
+ raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover |
|
335 |
+ if 'global' in obj and not isinstance(obj['global'], dict): |
|
336 |
+ raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover |
|
337 |
+ if not all( |
|
338 |
+ isinstance(service_obj, dict) |
|
339 |
+ for service_obj in obj['services'].values() |
|
340 |
+ ): |
|
341 |
+ raise ValueError(self.INVALID_CONFIG_ERROR) # pragma: no cover |
|
342 |
+ |
|
343 |
+ def falsy(value: Any) -> bool: # noqa: ANN401 |
|
344 |
+ return not js_truthiness(value) |
|
345 |
+ |
|
346 |
+ def falsy_but_not_zero(value: Any) -> bool: # noqa: ANN401 |
|
347 |
+ return not js_truthiness(value) and not ( |
|
348 |
+ isinstance(value, int) and value == 0 |
|
349 |
+ ) |
|
350 |
+ |
|
351 |
+ def falsy_but_not_string(value: Any) -> bool: # noqa: ANN401 |
|
352 |
+ return not js_truthiness(value) and value != '' # noqa: PLC1901 |
|
353 |
+ |
|
354 |
+ for path, key, value in self.walk_subconfigs(): |
|
355 |
+ service_obj = self.traverse_path(path) |
|
356 |
+ # Use match/case here once Python 3.9 becomes unsupported. |
|
357 |
+ if key == 'phrase' and falsy_but_not_string(value): |
|
358 |
+ yield CleanupStep( |
|
359 |
+ (*path, key), service_obj[key], 'replace', '' |
|
360 |
+ ) |
|
361 |
+ service_obj[key] = '' |
|
362 |
+ elif key == 'notes' and falsy(value): |
|
363 |
+ yield CleanupStep( |
|
364 |
+ (*path, key), service_obj[key], 'remove', None |
|
365 |
+ ) |
|
366 |
+ service_obj.pop(key) |
|
367 |
+ elif key == 'key' and falsy(value): |
|
368 |
+ if path == ('global',): |
|
369 |
+ yield CleanupStep( |
|
370 |
+ (*path, key), service_obj[key], 'remove', None |
|
371 |
+ ) |
|
372 |
+ service_obj.pop(key) |
|
373 |
+ else: |
|
374 |
+ yield CleanupStep( |
|
375 |
+ (*path, key), service_obj[key], 'replace', '' |
|
376 |
+ ) |
|
377 |
+ service_obj[key] = '' |
|
378 |
+ elif key == 'length' and falsy(value): |
|
379 |
+ yield CleanupStep( |
|
380 |
+ (*path, key), service_obj[key], 'replace', 20 |
|
381 |
+ ) |
|
382 |
+ service_obj[key] = 20 |
|
383 |
+ elif key == 'repeat' and falsy_but_not_zero(value): |
|
384 |
+ yield CleanupStep( |
|
385 |
+ (*path, key), service_obj[key], 'replace', 0 |
|
386 |
+ ) |
|
387 |
+ service_obj[key] = 0 |
|
388 |
+ elif key in { |
|
389 |
+ 'lower', |
|
390 |
+ 'upper', |
|
391 |
+ 'number', |
|
392 |
+ 'space', |
|
393 |
+ 'dash', |
|
394 |
+ 'symbol', |
|
395 |
+ } and falsy_but_not_zero(value): |
|
396 |
+ yield CleanupStep( |
|
397 |
+ (*path, key), service_obj[key], 'remove', None |
|
398 |
+ ) |
|
399 |
+ service_obj.pop(key) |
|
400 |
+ |
|
401 |
+ |
|
229 | 402 |
@overload |
230 | 403 |
@deprecated( |
231 | 404 |
'allow_derivepassphrase_extensions argument is deprecated since v0.4.0, ' |
... | ... |
@@ -249,7 +422,7 @@ def validate_vault_config( |
249 | 422 |
) -> None: ... |
250 | 423 |
|
251 | 424 |
|
252 |
-def validate_vault_config( # noqa: C901,PLR0912 |
|
425 |
+def validate_vault_config( |
|
253 | 426 |
obj: Any, |
254 | 427 |
/, |
255 | 428 |
*, |
... | ... |
@@ -291,96 +464,10 @@ def validate_vault_config( # noqa: C901,PLR0912 |
291 | 464 |
DeprecationWarning, |
292 | 465 |
stacklevel=2, |
293 | 466 |
) |
294 |
- err_obj_not_a_dict = 'vault config is not a dict' |
|
295 |
- err_non_str_service_name = ( |
|
296 |
- 'vault config contains non-string service name {!r}' |
|
297 |
- ) |
|
298 |
- |
|
299 |
- def err_not_a_dict(path: Sequence[str], /) -> str: |
|
300 |
- json_path_str = json_path(path) |
|
301 |
- return f'vault config entry {json_path_str} is not a dict' |
|
302 |
- |
|
303 |
- def err_not_a_string(path: Sequence[str], /) -> str: |
|
304 |
- json_path_str = json_path(path) |
|
305 |
- return f'vault config entry {json_path_str} is not a string' |
|
306 | 467 |
|
307 |
- def err_not_an_int(path: Sequence[str], /) -> str: |
|
308 |
- json_path_str = json_path(path) |
|
309 |
- return f'vault config entry {json_path_str} is not an integer' |
|
310 |
- |
|
311 |
- def err_unknown_setting(key: str, path: Sequence[str], /) -> str: |
|
312 |
- json_path_str = json_path(path) |
|
313 |
- return ( |
|
314 |
- f'vault config entry {json_path_str} uses unknown setting {key!r}' |
|
315 |
- ) |
|
316 |
- |
|
317 |
- def err_bad_number( |
|
318 |
- key: str, |
|
319 |
- path: Sequence[str], |
|
320 |
- /, |
|
321 |
- *, |
|
322 |
- strictly_positive: bool = False, |
|
323 |
- ) -> str: |
|
324 |
- json_path_str = json_path((*path, key)) |
|
325 |
- return f'vault config entry {json_path_str} is ' + ( |
|
326 |
- 'not positive' if strictly_positive else 'negative' |
|
327 |
- ) |
|
328 |
- |
|
329 |
- if not isinstance(obj, dict): |
|
330 |
- raise TypeError(err_obj_not_a_dict) |
|
331 |
- queue_to_check: list[tuple[dict[str, Any], tuple[str, ...]]] = [] |
|
332 |
- if 'global' in obj: |
|
333 |
- o_global = obj['global'] |
|
334 |
- if not isinstance(o_global, dict): |
|
335 |
- raise TypeError(err_not_a_dict(['global'])) |
|
336 |
- queue_to_check.append((o_global, ('global',))) |
|
337 |
- if not isinstance(obj.get('services'), dict): |
|
338 |
- raise TypeError(err_not_a_dict(['services'])) |
|
339 |
- for sv_name, service in obj['services'].items(): |
|
340 |
- if not isinstance(sv_name, str): |
|
341 |
- raise TypeError(err_non_str_service_name.format(sv_name)) |
|
342 |
- if not isinstance(service, dict): |
|
343 |
- raise TypeError(err_not_a_dict(['services', sv_name])) |
|
344 |
- queue_to_check.append((service, ('services', sv_name))) |
|
345 |
- for settings, path in queue_to_check: |
|
346 |
- for key, value in settings.items(): |
|
347 |
- # Use match/case here once Python 3.9 becomes unsupported. |
|
348 |
- if key in {'key', 'phrase'}: |
|
349 |
- if not isinstance(value, str): |
|
350 |
- raise TypeError(err_not_a_string((*path, key))) |
|
351 |
- elif key == 'unicode_normalization_form' and path == ('global',): |
|
352 |
- if not isinstance(value, str): |
|
353 |
- raise TypeError(err_not_a_string((*path, key))) |
|
354 |
- if ( |
|
355 |
- not allow_derivepassphrase_extensions |
|
356 |
- and not allow_unknown_settings |
|
357 |
- ): |
|
358 |
- raise ValueError(err_unknown_setting(key, path)) |
|
359 |
- elif key == 'notes' and path != ('global',): |
|
360 |
- if not isinstance(value, str): |
|
361 |
- raise TypeError(err_not_a_string((*path, key))) |
|
362 |
- elif key in { |
|
363 |
- 'length', |
|
364 |
- 'repeat', |
|
365 |
- 'lower', |
|
366 |
- 'upper', |
|
367 |
- 'number', |
|
368 |
- 'space', |
|
369 |
- 'dash', |
|
370 |
- 'symbol', |
|
371 |
- }: |
|
372 |
- if not isinstance(value, int): |
|
373 |
- raise TypeError(err_not_an_int((*path, key))) |
|
374 |
- if key == 'length' and value < 1: |
|
375 |
- raise ValueError( |
|
376 |
- err_bad_number(key, path, strictly_positive=True) |
|
377 |
- ) |
|
378 |
- if key != 'length' and value < 0: |
|
379 |
- raise ValueError( |
|
380 |
- err_bad_number(key, path, strictly_positive=False) |
|
468 |
+ return _VaultConfigValidator(obj).validate( |
|
469 |
+ allow_unknown_settings=allow_unknown_settings |
|
381 | 470 |
) |
382 |
- elif not allow_unknown_settings: |
|
383 |
- raise ValueError(err_unknown_setting(key, path)) |
|
384 | 471 |
|
385 | 472 |
|
386 | 473 |
def is_vault_config(obj: Any) -> TypeIs[VaultConfig]: # noqa: ANN401 |
... | ... |
@@ -470,7 +557,7 @@ class CleanupStep(NamedTuple): |
470 | 557 |
"""""" |
471 | 558 |
|
472 | 559 |
|
473 |
-def clean_up_falsy_vault_config_values( # noqa: C901,PLR0912 |
|
560 |
+def clean_up_falsy_vault_config_values( |
|
474 | 561 |
obj: Any, # noqa: ANN401 |
475 | 562 |
) -> Sequence[CleanupStep] | None: |
476 | 563 |
"""Convert falsy values in a vault config to correct types, in-place. |
... | ... |
@@ -500,102 +587,10 @@ def clean_up_falsy_vault_config_values( # noqa: C901,PLR0912 |
500 | 587 |
vault configuration, then `None` is returned, directly. |
501 | 588 |
|
502 | 589 |
""" |
503 |
- if ( # pragma: no cover |
|
504 |
- not isinstance(obj, dict) |
|
505 |
- or 'services' not in obj |
|
506 |
- or not isinstance(obj['services'], dict) |
|
507 |
- ): |
|
508 |
- # config is invalid |
|
509 |
- return None |
|
510 |
- service_objects: MutableSequence[ |
|
511 |
- tuple[Sequence[str | int], dict[str, Any]] |
|
512 |
- ] = collections.deque() |
|
513 |
- if 'global' in obj: |
|
514 |
- if isinstance(obj['global'], dict): |
|
515 |
- service_objects.append((['global'], obj['global'])) |
|
516 |
- else: # pragma: no cover |
|
517 |
- # config is invalid |
|
518 |
- return None |
|
519 |
- service_objects.extend( |
|
520 |
- (['services', sv], val) for sv, val in obj['services'].items() |
|
521 |
- ) |
|
522 |
- if not all( # pragma: no cover |
|
523 |
- isinstance(service_obj, dict) for _, service_obj in service_objects |
|
524 |
- ): |
|
525 |
- # config is invalid |
|
590 |
+ try: |
|
591 |
+ return list(_VaultConfigValidator(obj).clean_up_falsy_values()) |
|
592 |
+ except ValueError: |
|
526 | 593 |
return None |
527 |
- cleanup_completed: MutableSequence[CleanupStep] = collections.deque() |
|
528 |
- for path, service_obj in service_objects: |
|
529 |
- for key, value in list(service_obj.items()): |
|
530 |
- # Use match/case here once Python 3.9 becomes unsupported. |
|
531 |
- if key == 'phrase': |
|
532 |
- if not js_truthiness(value) and value != '': # noqa: PLC1901 |
|
533 |
- cleanup_completed.append( |
|
534 |
- CleanupStep( |
|
535 |
- (*path, key), service_obj[key], 'replace', '' |
|
536 |
- ) |
|
537 |
- ) |
|
538 |
- service_obj[key] = '' |
|
539 |
- elif key == 'notes': |
|
540 |
- if not js_truthiness(value): |
|
541 |
- cleanup_completed.append( |
|
542 |
- CleanupStep( |
|
543 |
- (*path, key), service_obj[key], 'remove', None |
|
544 |
- ) |
|
545 |
- ) |
|
546 |
- service_obj.pop(key) |
|
547 |
- elif key == 'key': |
|
548 |
- if not js_truthiness(value): |
|
549 |
- if path == ['global']: |
|
550 |
- cleanup_completed.append( |
|
551 |
- CleanupStep( |
|
552 |
- (*path, key), service_obj[key], 'remove', None |
|
553 |
- ) |
|
554 |
- ) |
|
555 |
- service_obj.pop(key) |
|
556 |
- else: |
|
557 |
- cleanup_completed.append( |
|
558 |
- CleanupStep( |
|
559 |
- (*path, key), service_obj[key], 'replace', '' |
|
560 |
- ) |
|
561 |
- ) |
|
562 |
- service_obj[key] = '' |
|
563 |
- elif key == 'length': |
|
564 |
- if not js_truthiness(value): |
|
565 |
- cleanup_completed.append( |
|
566 |
- CleanupStep( |
|
567 |
- (*path, key), service_obj[key], 'replace', 20 |
|
568 |
- ) |
|
569 |
- ) |
|
570 |
- service_obj[key] = 20 |
|
571 |
- elif key == 'repeat': |
|
572 |
- if not js_truthiness(value) and not ( |
|
573 |
- isinstance(value, int) and value == 0 |
|
574 |
- ): |
|
575 |
- cleanup_completed.append( |
|
576 |
- CleanupStep( |
|
577 |
- (*path, key), service_obj[key], 'replace', 0 |
|
578 |
- ) |
|
579 |
- ) |
|
580 |
- service_obj[key] = 0 |
|
581 |
- elif key in { # noqa: SIM102 |
|
582 |
- 'lower', |
|
583 |
- 'upper', |
|
584 |
- 'number', |
|
585 |
- 'space', |
|
586 |
- 'dash', |
|
587 |
- 'symbol', |
|
588 |
- }: |
|
589 |
- if not js_truthiness(value) and not ( |
|
590 |
- isinstance(value, int) and value == 0 |
|
591 |
- ): |
|
592 |
- cleanup_completed.append( |
|
593 |
- CleanupStep( |
|
594 |
- (*path, key), service_obj[key], 'remove', None |
|
595 |
- ) |
|
596 |
- ) |
|
597 |
- service_obj.pop(key) |
|
598 |
- return cleanup_completed |
|
599 | 594 |
|
600 | 595 |
|
601 | 596 |
T_Buffer = TypeVar('T_Buffer', bound=Buffer) |
602 | 597 |