Marco Ricci commited on 2025-01-11 19:04:57
Zeige 6 geänderte Dateien mit 495 Einfügungen und 167 Löschungen.
Harmonize the interface for the `storeroom` and the `vault_native` config export handlers. Specifically, introduce a common interface `exporter.ExportVaultConfigDataFunction` which all such handlers must adhere to, as well as a registry of handlers and a decorator for registering handlers immediately upon definition. A new top-level function `exporter.export_vault_config_data` adhering to this interface will dispatch to the correct implementation, based on the handler registry; its use is recommended over calling the handlers directly. Though it is not encoded in the programmatic interface definition, the intent is to use common exceptions and exception messages for common failure modes. (The intended meanings of the exceptions are listed in the docstring of `exporter.export_vault_config_data`.) Specifically, introduce a new error message constant for when a given handler name is not registered in the registry, and an exception class `exporter.NotAVaultConfigError` for when the target path cannot be decrypted and decoded in the requested vault config encoding format with the given or inferred storage master key. Also update some outdated or wrong aspects of the handler docstrings. Additionally, the registry is empty upon definition to avoild circular imports. A separate call to the `exporter.find_vault_config_data_handlers` function is necessary to populate the standard set of handlers. `exporter.export_vault_config_data` will call this automatically. Because of this updated interface, the `vault_native.export_vault_native_data` handler no longer supports specifying multiple formats to try. The caller in `derivepassphrase.cli` already handles trying multiple formats natively, but additionally no longer dispatches to the handler functions itself, relying on `exporter.export_vault_config_data` for this task instead. Finally, update the tests to include the handler registry, the decorator, the change of parameter names and the change in handling attempting multiple formats.
... | ... |
@@ -13,7 +13,6 @@ import collections |
13 | 13 |
import copy |
14 | 14 |
import enum |
15 | 15 |
import functools |
16 |
-import importlib |
|
17 | 16 |
import inspect |
18 | 17 |
import json |
19 | 18 |
import logging |
... | ... |
@@ -38,7 +37,6 @@ from typing_extensions import ( |
38 | 37 |
Any, |
39 | 38 |
ParamSpec, |
40 | 39 |
Self, |
41 |
- assert_never, |
|
42 | 40 |
override, |
43 | 41 |
) |
44 | 42 |
|
... | ... |
@@ -1473,45 +1471,6 @@ def derivepassphrase_export(ctx: click.Context, /) -> None: |
1473 | 1471 |
return None |
1474 | 1472 |
|
1475 | 1473 |
|
1476 |
-def _load_data( |
|
1477 |
- fmt: Literal['v0.2', 'v0.3', 'storeroom'], |
|
1478 |
- path: str | bytes | os.PathLike[str], |
|
1479 |
- key: bytes, |
|
1480 |
-) -> Any: # noqa: ANN401 |
|
1481 |
- contents: bytes |
|
1482 |
- module: types.ModuleType |
|
1483 |
- # Use match/case here once Python 3.9 becomes unsupported. |
|
1484 |
- if fmt == 'v0.2': |
|
1485 |
- module = importlib.import_module( |
|
1486 |
- 'derivepassphrase.exporter.vault_native' |
|
1487 |
- ) |
|
1488 |
- if module.STUBBED: |
|
1489 |
- raise ModuleNotFoundError |
|
1490 |
- with open(path, 'rb') as infile: |
|
1491 |
- contents = base64.standard_b64decode(infile.read()) |
|
1492 |
- return module.export_vault_native_data( |
|
1493 |
- contents, key, try_formats=['v0.2'] |
|
1494 |
- ) |
|
1495 |
- elif fmt == 'v0.3': # noqa: RET505 |
|
1496 |
- module = importlib.import_module( |
|
1497 |
- 'derivepassphrase.exporter.vault_native' |
|
1498 |
- ) |
|
1499 |
- if module.STUBBED: |
|
1500 |
- raise ModuleNotFoundError |
|
1501 |
- with open(path, 'rb') as infile: |
|
1502 |
- contents = base64.standard_b64decode(infile.read()) |
|
1503 |
- return module.export_vault_native_data( |
|
1504 |
- contents, key, try_formats=['v0.3'] |
|
1505 |
- ) |
|
1506 |
- elif fmt == 'storeroom': |
|
1507 |
- module = importlib.import_module('derivepassphrase.exporter.storeroom') |
|
1508 |
- if module.STUBBED: |
|
1509 |
- raise ModuleNotFoundError |
|
1510 |
- return module.export_storeroom_data(path, key) |
|
1511 |
- else: # pragma: no cover |
|
1512 |
- assert_never(fmt) |
|
1513 |
- |
|
1514 |
- |
|
1515 | 1474 |
@derivepassphrase_export.command( |
1516 | 1475 |
'vault', |
1517 | 1476 |
context_settings={'help_option_names': ['-h', '--help']}, |
... | ... |
@@ -1578,7 +1537,7 @@ def derivepassphrase_export_vault( |
1578 | 1537 |
ctx: click.Context, |
1579 | 1538 |
/, |
1580 | 1539 |
*, |
1581 |
- path: str | bytes | os.PathLike[str], |
|
1540 |
+ path: str | bytes | os.PathLike[str] | None, |
|
1582 | 1541 |
formats: Sequence[Literal['v0.2', 'v0.3', 'storeroom']] = (), |
1583 | 1542 |
key: str | bytes | None = None, |
1584 | 1543 |
) -> None: |
... | ... |
@@ -1595,24 +1554,22 @@ def derivepassphrase_export_vault( |
1595 | 1554 |
""" |
1596 | 1555 |
logger = logging.getLogger(PROG_NAME) |
1597 | 1556 |
if path in {'VAULT_PATH', b'VAULT_PATH'}: |
1598 |
- path = exporter.get_vault_path() |
|
1599 |
- if key is None: |
|
1600 |
- key = exporter.get_vault_key() |
|
1601 |
- elif isinstance(key, str): # pragma: no branch |
|
1557 |
+ path = None |
|
1558 |
+ if isinstance(key, str): # pragma: no branch |
|
1602 | 1559 |
key = key.encode('utf-8') |
1603 | 1560 |
for fmt in formats: |
1604 | 1561 |
try: |
1605 |
- config = _load_data(fmt, path, key) |
|
1562 |
+ config = exporter.export_vault_config_data(path, key, format=fmt) |
|
1606 | 1563 |
except ( |
1607 | 1564 |
IsADirectoryError, |
1608 | 1565 |
NotADirectoryError, |
1609 |
- ValueError, |
|
1566 |
+ exporter.NotAVaultConfigError, |
|
1610 | 1567 |
RuntimeError, |
1611 | 1568 |
): |
1612 | 1569 |
logger.info( |
1613 | 1570 |
_msg.TranslatedString( |
1614 | 1571 |
_msg.InfoMsgTemplate.CANNOT_LOAD_AS_VAULT_CONFIG, |
1615 |
- path=path, |
|
1572 |
+ path=path or exporter.get_vault_path(), |
|
1616 | 1573 |
fmt=fmt, |
1617 | 1574 |
), |
1618 | 1575 |
extra={'color': ctx.color}, |
... | ... |
@@ -6,16 +6,49 @@ |
6 | 6 |
|
7 | 7 |
from __future__ import annotations |
8 | 8 |
|
9 |
+import importlib |
|
9 | 10 |
import os |
11 |
+from typing import TYPE_CHECKING, Protocol |
|
10 | 12 |
|
11 | 13 |
import derivepassphrase as dpp |
12 | 14 |
|
15 |
+if TYPE_CHECKING: |
|
16 |
+ from collections.abc import Callable |
|
17 |
+ from typing import Any |
|
18 |
+ |
|
19 |
+ from typing_extensions import Buffer |
|
20 |
+ |
|
13 | 21 |
__author__ = dpp.__author__ |
14 | 22 |
__version__ = dpp.__version__ |
15 | 23 |
|
16 | 24 |
__all__ = () |
17 | 25 |
|
18 | 26 |
|
27 |
+INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT = ( |
|
28 |
+ 'Invalid vault native configuration format: {fmt!r}' |
|
29 |
+) |
|
30 |
+ |
|
31 |
+ |
|
32 |
+class NotAVaultConfigError(ValueError): |
|
33 |
+ """The `path` does not hold a `format`-type vault configuration.""" |
|
34 |
+ |
|
35 |
+ def __init__( |
|
36 |
+ self, |
|
37 |
+ path: str | bytes, |
|
38 |
+ format: str | None = None, # noqa: A002 |
|
39 |
+ ) -> None: |
|
40 |
+ self.path = path |
|
41 |
+ self.format = format |
|
42 |
+ |
|
43 |
+ def __str__(self) -> str: # pragma: no cover |
|
44 |
+ formatted_format = ( |
|
45 |
+ f'vault {self.format} configuration' |
|
46 |
+ if self.format |
|
47 |
+ else 'vault configuration' |
|
48 |
+ ) |
|
49 |
+ return f'Not a {formatted_format}: {self.path!r}' |
|
50 |
+ |
|
51 |
+ |
|
19 | 52 |
def get_vault_key() -> bytes: |
20 | 53 |
"""Automatically determine the vault(1) master key/password. |
21 | 54 |
|
... | ... |
@@ -70,3 +103,123 @@ def get_vault_path() -> str | bytes | os.PathLike: |
70 | 103 |
msg = 'Cannot determine home directory' |
71 | 104 |
raise RuntimeError(msg) |
72 | 105 |
return result |
106 |
+ |
|
107 |
+ |
|
108 |
+class ExportVaultConfigDataFunction(Protocol): # pragma: no cover |
|
109 |
+ def __call__( |
|
110 |
+ self, |
|
111 |
+ path: str | bytes | os.PathLike | None = None, |
|
112 |
+ key: str | Buffer | None = None, |
|
113 |
+ *, |
|
114 |
+ format: str, # noqa: A002 |
|
115 |
+ ) -> Any: ... # noqa: ANN401 |
|
116 |
+ |
|
117 |
+ |
|
118 |
+_export_vault_config_data_registry: dict[ |
|
119 |
+ str, |
|
120 |
+ ExportVaultConfigDataFunction, |
|
121 |
+] = {} |
|
122 |
+ |
|
123 |
+ |
|
124 |
+def register_export_vault_config_data_handler( |
|
125 |
+ *names: str, |
|
126 |
+) -> Callable[[ExportVaultConfigDataFunction], ExportVaultConfigDataFunction]: |
|
127 |
+ if not names: |
|
128 |
+ msg = 'No names given to export_data handler registry' |
|
129 |
+ raise ValueError(msg) |
|
130 |
+ if '' in names: |
|
131 |
+ msg = 'Cannot register export_data handler under an empty name' |
|
132 |
+ raise ValueError(msg) |
|
133 |
+ |
|
134 |
+ def wrapper( |
|
135 |
+ f: ExportVaultConfigDataFunction, |
|
136 |
+ ) -> ExportVaultConfigDataFunction: |
|
137 |
+ for name in names: |
|
138 |
+ if name in _export_vault_config_data_registry: |
|
139 |
+ msg = f'export_data handler already registered: {name!r}' |
|
140 |
+ raise ValueError(msg) |
|
141 |
+ _export_vault_config_data_registry[name] = f |
|
142 |
+ return f |
|
143 |
+ |
|
144 |
+ return wrapper |
|
145 |
+ |
|
146 |
+ |
|
147 |
+def find_vault_config_data_handlers() -> None: |
|
148 |
+ """Find all export handlers for vault config data. |
|
149 |
+ |
|
150 |
+ (This function is idempotent.) |
|
151 |
+ |
|
152 |
+ Raises: |
|
153 |
+ ModuleNotFoundError: |
|
154 |
+ A required module was not found. |
|
155 |
+ |
|
156 |
+ """ |
|
157 |
+ # Defer imports (and handler registrations) to avoid circular |
|
158 |
+ # imports. The modules themselves contain function definitions that |
|
159 |
+ # register themselves automatically with |
|
160 |
+ # `_export_vault_config_data_registry`. |
|
161 |
+ importlib.import_module('derivepassphrase.exporter.storeroom') |
|
162 |
+ importlib.import_module('derivepassphrase.exporter.vault_native') |
|
163 |
+ |
|
164 |
+ |
|
165 |
+def export_vault_config_data( |
|
166 |
+ path: str | bytes | os.PathLike | None = None, |
|
167 |
+ key: str | Buffer | None = None, |
|
168 |
+ *, |
|
169 |
+ format: str, # noqa: A002 |
|
170 |
+) -> Any: # noqa: ANN401 |
|
171 |
+ """Export the full vault-native configuration stored in `path`. |
|
172 |
+ |
|
173 |
+ Args: |
|
174 |
+ path: |
|
175 |
+ The path to the vault configuration file or directory. If |
|
176 |
+ not given, then query [`get_vault_path`][] for the correct |
|
177 |
+ value. |
|
178 |
+ key: |
|
179 |
+ Encryption key/password for the configuration file or |
|
180 |
+ directory, usually the username, or passed via the |
|
181 |
+ `VAULT_KEY` environment variable. If not given, then query |
|
182 |
+ [`exporter.get_vault_key`][] for the value. |
|
183 |
+ format: |
|
184 |
+ The format to attempt parsing as. Must be `v0.2`, `v0.3` or |
|
185 |
+ `storeroom`. |
|
186 |
+ |
|
187 |
+ Returns: |
|
188 |
+ The vault configuration, as recorded in the configuration file. |
|
189 |
+ |
|
190 |
+ This may or may not be a valid configuration according to |
|
191 |
+ `vault` or `derivepassphrase`. |
|
192 |
+ |
|
193 |
+ Raises: |
|
194 |
+ IsADirectoryError: |
|
195 |
+ The requested format requires a configuration file, but |
|
196 |
+ `path` points to a directory instead. |
|
197 |
+ NotADirectoryError: |
|
198 |
+ The requested format requires a configuration directory, but |
|
199 |
+ `path` points to something else instead. |
|
200 |
+ OSError: |
|
201 |
+ There was an OS error while accessing the configuration |
|
202 |
+ file/directory. |
|
203 |
+ RuntimeError: |
|
204 |
+ Something went wrong during data collection, e.g. we |
|
205 |
+ encountered unsupported or corrupted data in the |
|
206 |
+ configuration file/directory. |
|
207 |
+ json.JSONDecodeError: |
|
208 |
+ An internal JSON data structure failed to parse from disk. |
|
209 |
+ The configuration file/directory is probably corrupted. |
|
210 |
+ exporter.NotAVaultConfigError: |
|
211 |
+ The file/directory contents are not in the claimed |
|
212 |
+ configuration format. |
|
213 |
+ ValueError: |
|
214 |
+ The requested format is invalid. |
|
215 |
+ ModuleNotFoundError: |
|
216 |
+ The requested format requires support code, which failed to |
|
217 |
+ load because of missing Python libraries. |
|
218 |
+ |
|
219 |
+ """ |
|
220 |
+ find_vault_config_data_handlers() |
|
221 |
+ handler = _export_vault_config_data_registry.get(format) |
|
222 |
+ if handler is None: |
|
223 |
+ msg = INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format(fmt=format) |
|
224 |
+ raise ValueError(msg) |
|
225 |
+ return handler(path, key, format=format) |
... | ... |
@@ -20,10 +20,13 @@ should *not* be used or relied on. |
20 | 20 |
|
21 | 21 |
""" |
22 | 22 |
|
23 |
+# ruff: noqa: S303 |
|
24 |
+ |
|
23 | 25 |
from __future__ import annotations |
24 | 26 |
|
25 | 27 |
import base64 |
26 | 28 |
import fnmatch |
29 |
+import importlib |
|
27 | 30 |
import json |
28 | 31 |
import logging |
29 | 32 |
import os |
... | ... |
@@ -43,14 +46,7 @@ if TYPE_CHECKING: |
43 | 46 |
from typing_extensions import Buffer |
44 | 47 |
else: |
45 | 48 |
try: |
46 |
- from cryptography.hazmat.primitives import ( |
|
47 |
- ciphers, |
|
48 |
- hashes, |
|
49 |
- hmac, |
|
50 |
- padding, |
|
51 |
- ) |
|
52 |
- from cryptography.hazmat.primitives.ciphers import algorithms, modes |
|
53 |
- from cryptography.hazmat.primitives.kdf import pbkdf2 |
|
49 |
+ importlib.import_module('cryptography') |
|
54 | 50 |
except ModuleNotFoundError as exc: |
55 | 51 |
|
56 | 52 |
class _DummyModule: # pragma: no cover |
... | ... |
@@ -67,6 +63,15 @@ else: |
67 | 63 |
algorithms = modes = pbkdf2 = _DummyModule(exc) |
68 | 64 |
STUBBED = True |
69 | 65 |
else: |
66 |
+ from cryptography.hazmat.primitives import ( |
|
67 |
+ ciphers, |
|
68 |
+ hashes, |
|
69 |
+ hmac, |
|
70 |
+ padding, |
|
71 |
+ ) |
|
72 |
+ from cryptography.hazmat.primitives.ciphers import algorithms, modes |
|
73 |
+ from cryptography.hazmat.primitives.kdf import pbkdf2 |
|
74 |
+ |
|
70 | 75 |
STUBBED = False |
71 | 76 |
|
72 | 77 |
STOREROOM_MASTER_KEYS_UUID = b'35b7c7ed-f71e-4adf-9051-02fb0f1e0e17' |
... | ... |
@@ -616,27 +621,35 @@ def _store(config: dict[str, Any], path: str, json_contents: bytes) -> None: |
616 | 621 |
config[path_parts[-1]] = contents |
617 | 622 |
|
618 | 623 |
|
624 |
+@exporter.register_export_vault_config_data_handler('storeroom') |
|
619 | 625 |
def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
620 |
- storeroom_path: str | bytes | os.PathLike | None = None, |
|
621 |
- master_keys_key: str | Buffer | None = None, |
|
626 |
+ path: str | bytes | os.PathLike | None = None, |
|
627 |
+ key: str | Buffer | None = None, |
|
628 |
+ *, |
|
629 |
+ format: str = 'storeroom', # noqa: A002 |
|
622 | 630 |
) -> dict[str, Any]: |
623 | 631 |
"""Export the full configuration stored in the storeroom. |
624 | 632 |
|
625 | 633 |
Args: |
626 |
- storeroom_path: |
|
627 |
- Path to the storeroom; usually `~/.vault`. If not given, |
|
628 |
- then query [`exporter.get_vault_path`][] for the value. |
|
629 |
- master_keys_key: |
|
630 |
- Encryption key/password for the master keys, usually the |
|
631 |
- username, or passed via the `VAULT_KEY` environment |
|
632 |
- variable. If not given, then query |
|
633 |
- [`exporter.get_vault_key`][] for the value. |
|
634 |
+ path: |
|
635 |
+ The path to the vault configuration directory. If not |
|
636 |
+ given, then query [`exporter.get_vault_path`][] for the |
|
637 |
+ correct value. |
|
638 |
+ key: |
|
639 |
+ Encryption key/password for the (master keys file in the) |
|
640 |
+ configuration directory, usually the username, or passed via |
|
641 |
+ the `VAULT_KEY` environment variable. If not given, then |
|
642 |
+ query [`exporter.get_vault_key`][] for the value. |
|
643 |
+ format: |
|
644 |
+ The format to attempt parsing as. If specified, must be |
|
645 |
+ `storeroom`. |
|
634 | 646 |
|
635 | 647 |
Returns: |
636 |
- The full configuration, as stored in the storeroom. |
|
648 |
+ The vault configuration, as recorded in the configuration |
|
649 |
+ directory. |
|
637 | 650 |
|
638 |
- This may or may not be a valid configuration according to vault |
|
639 |
- or derivepassphrase. |
|
651 |
+ This may or may not be a valid configuration according to |
|
652 |
+ `vault` or `derivepassphrase`. |
|
640 | 653 |
|
641 | 654 |
Raises: |
642 | 655 |
RuntimeError: |
... | ... |
@@ -645,17 +658,34 @@ def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
645 | 658 |
json.JSONDecodeError: |
646 | 659 |
An internal JSON data structure failed to parse from disk. |
647 | 660 |
The storeroom is probably corrupted. |
661 |
+ exporter.NotAVaultConfigError: |
|
662 |
+ The directory does contain not a storeroom. |
|
663 |
+ ValueError: |
|
664 |
+ The requested format is invalid. |
|
648 | 665 |
|
649 | 666 |
""" |
650 |
- if storeroom_path is None: |
|
651 |
- storeroom_path = exporter.get_vault_path() |
|
652 |
- if master_keys_key is None: |
|
653 |
- master_keys_key = exporter.get_vault_key() |
|
654 |
- elif not isinstance(master_keys_key, str): |
|
655 |
- master_keys_key = memoryview(master_keys_key).toreadonly().cast('c') |
|
656 |
- with open( |
|
657 |
- os.path.join(os.fsdecode(storeroom_path), '.keys'), encoding='utf-8' |
|
658 |
- ) as master_keys_file: |
|
667 |
+ # Trigger import errors if necessary. |
|
668 |
+ importlib.import_module('cryptography') |
|
669 |
+ if path is None: |
|
670 |
+ path = exporter.get_vault_path() |
|
671 |
+ if key is None: |
|
672 |
+ key = exporter.get_vault_key() |
|
673 |
+ if format != 'storeroom': # pragma: no cover |
|
674 |
+ msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
|
675 |
+ fmt=format |
|
676 |
+ ) |
|
677 |
+ raise ValueError(msg) |
|
678 |
+ try: |
|
679 |
+ master_keys_file = open( # noqa: SIM115 |
|
680 |
+ os.path.join(os.fsdecode(path), '.keys'), |
|
681 |
+ encoding='utf-8', |
|
682 |
+ ) |
|
683 |
+ except FileNotFoundError as exc: |
|
684 |
+ raise exporter.NotAVaultConfigError( |
|
685 |
+ os.fsdecode(path), |
|
686 |
+ format='storeroom', |
|
687 |
+ ) from exc |
|
688 |
+ with master_keys_file: |
|
659 | 689 |
header = json.loads(master_keys_file.readline()) |
660 | 690 |
if header != {'version': 1}: |
661 | 691 |
msg = 'bad or unsupported keys version header' |
... | ... |
@@ -675,16 +705,14 @@ def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
675 | 705 |
_msg.TranslatedString(_msg.InfoMsgTemplate.PARSING_MASTER_KEYS_DATA) |
676 | 706 |
) |
677 | 707 |
encrypted_keys_iterations = 2 ** (10 + (encrypted_keys_params & 0x0F)) |
678 |
- master_keys_keys = derive_master_keys_keys( |
|
679 |
- master_keys_key, encrypted_keys_iterations |
|
680 |
- ) |
|
708 |
+ master_keys_keys = derive_master_keys_keys(key, encrypted_keys_iterations) |
|
681 | 709 |
master_keys = decrypt_master_keys_data(encrypted_keys, master_keys_keys) |
682 | 710 |
|
683 | 711 |
config_structure: dict[str, Any] = {} |
684 | 712 |
json_contents: dict[str, bytes] = {} |
685 | 713 |
# Use glob.glob(..., root_dir=...) here once Python 3.9 becomes |
686 | 714 |
# unsupported. |
687 |
- storeroom_path_str = os.fsdecode(storeroom_path) |
|
715 |
+ storeroom_path_str = os.fsdecode(path) |
|
688 | 716 |
valid_hashdirs = [ |
689 | 717 |
hashdir_name |
690 | 718 |
for hashdir_name in os.listdir(storeroom_path_str) |
... | ... |
@@ -699,7 +727,7 @@ def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
699 | 727 |
) |
700 | 728 |
bucket_contents = [ |
701 | 729 |
bytes(item) |
702 |
- for item in decrypt_bucket_file(file, master_keys, root_dir=storeroom_path) |
|
730 |
+ for item in decrypt_bucket_file(file, master_keys, root_dir=path) |
|
703 | 731 |
] |
704 | 732 |
bucket_index = json.loads(bucket_contents.pop(0)) |
705 | 733 |
for pos, item in enumerate(bucket_index): |
... | ... |
@@ -716,12 +744,12 @@ def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
716 | 744 |
logger.info( |
717 | 745 |
_msg.TranslatedString(_msg.InfoMsgTemplate.ASSEMBLING_CONFIG_STRUCTURE) |
718 | 746 |
) |
719 |
- for path, json_content in sorted(json_contents.items()): |
|
720 |
- if path.endswith('/'): |
|
747 |
+ for item_path, json_content in sorted(json_contents.items()): |
|
748 |
+ if item_path.endswith('/'): |
|
721 | 749 |
logger.debug( |
722 | 750 |
_msg.TranslatedString( |
723 | 751 |
_msg.DebugMsgTemplate.POSTPONING_DIRECTORY_CONTENTS_CHECK, |
724 |
- path=path, |
|
752 |
+ path=item_path, |
|
725 | 753 |
contents=json_content.decode('utf-8'), |
726 | 754 |
) |
727 | 755 |
) |
... | ... |
@@ -734,33 +762,33 @@ def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
734 | 762 |
f'{json_content!r}' |
735 | 763 |
) |
736 | 764 |
raise RuntimeError(msg) |
737 |
- dirs_to_check[path] = json_payload |
|
765 |
+ dirs_to_check[item_path] = json_payload |
|
738 | 766 |
logger.debug( |
739 | 767 |
_msg.TranslatedString( |
740 | 768 |
_msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS_EMPTY_DIRECTORY, |
741 |
- path=path, |
|
769 |
+ path=item_path, |
|
742 | 770 |
), |
743 | 771 |
) |
744 |
- _store(config_structure, path, b'{}') |
|
772 |
+ _store(config_structure, item_path, b'{}') |
|
745 | 773 |
else: |
746 | 774 |
logger.debug( |
747 | 775 |
_msg.TranslatedString( |
748 | 776 |
_msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS, |
749 |
- path=path, |
|
777 |
+ path=item_path, |
|
750 | 778 |
value=json_content.decode('utf-8'), |
751 | 779 |
), |
752 | 780 |
) |
753 |
- _store(config_structure, path, json_content) |
|
781 |
+ _store(config_structure, item_path, json_content) |
|
754 | 782 |
logger.info( |
755 | 783 |
_msg.TranslatedString( |
756 | 784 |
_msg.InfoMsgTemplate.CHECKING_CONFIG_STRUCTURE_CONSISTENCY, |
757 | 785 |
) |
758 | 786 |
) |
759 | 787 |
# Sorted order is important; see `maybe_obj` below. |
760 |
- for _dir, namelist in sorted(dirs_to_check.items()): |
|
761 |
- namelist = [x.rstrip('/') for x in namelist] # noqa: PLW2901 |
|
788 |
+ for dir_, namelist_ in sorted(dirs_to_check.items()): |
|
789 |
+ namelist = [x.rstrip('/') for x in namelist_] |
|
762 | 790 |
obj: dict[Any, Any] = config_structure |
763 |
- for part in _dir.split('/'): |
|
791 |
+ for part in dir_.split('/'): |
|
764 | 792 |
if part: |
765 | 793 |
# Because we iterate paths in sorted order, parent |
766 | 794 |
# directories are encountered before child directories. |
... | ... |
@@ -771,17 +799,17 @@ def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
771 | 799 |
# this, so we need to use assertions anyway. |
772 | 800 |
maybe_obj = obj.get(part) |
773 | 801 |
assert isinstance(maybe_obj, dict), ( |
774 |
- f'Cannot traverse storage path {_dir!r}' |
|
802 |
+ f'Cannot traverse storage path {dir_!r}' |
|
775 | 803 |
) |
776 | 804 |
obj = maybe_obj |
777 | 805 |
if set(obj.keys()) != set(namelist): |
778 |
- msg = f'Object key mismatch for path {_dir!r}' |
|
806 |
+ msg = f'Object key mismatch for path {dir_!r}' |
|
779 | 807 |
raise RuntimeError(msg) |
780 | 808 |
logger.debug( |
781 | 809 |
_msg.TranslatedString( |
782 | 810 |
_msg.DebugMsgTemplate.DIRECTORY_CONTENTS_CHECK_OK, |
783 |
- path=_dir, |
|
784 |
- contents=json.dumps(namelist), |
|
811 |
+ path=dir_, |
|
812 |
+ contents=json.dumps(namelist_), |
|
785 | 813 |
) |
786 | 814 |
) |
787 | 815 |
return config_structure |
... | ... |
@@ -789,5 +817,5 @@ def export_storeroom_data( # noqa: C901,PLR0912,PLR0914,PLR0915 |
789 | 817 |
|
790 | 818 |
if __name__ == '__main__': |
791 | 819 |
logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING')) |
792 |
- config_structure = export_storeroom_data() |
|
820 |
+ config_structure = export_storeroom_data(format='storeroom') |
|
793 | 821 |
print(json.dumps(config_structure, indent=2, sort_keys=True)) # noqa: T201 |
... | ... |
@@ -21,12 +21,16 @@ should *not* be used or relied on. |
21 | 21 |
|
22 | 22 |
""" |
23 | 23 |
|
24 |
+# ruff: noqa: S303 |
|
25 |
+ |
|
24 | 26 |
from __future__ import annotations |
25 | 27 |
|
26 | 28 |
import abc |
27 | 29 |
import base64 |
30 |
+import importlib |
|
28 | 31 |
import json |
29 | 32 |
import logging |
33 |
+import os |
|
30 | 34 |
import warnings |
31 | 35 |
from typing import TYPE_CHECKING |
32 | 36 |
|
... | ... |
@@ -34,7 +38,6 @@ from derivepassphrase import _cli_msg as _msg |
34 | 38 |
from derivepassphrase import exporter, vault |
35 | 39 |
|
36 | 40 |
if TYPE_CHECKING: |
37 |
- from collections.abc import Sequence |
|
38 | 41 |
from typing import Any |
39 | 42 |
|
40 | 43 |
from typing_extensions import Buffer |
... | ... |
@@ -47,16 +50,7 @@ if TYPE_CHECKING: |
47 | 50 |
from cryptography.hazmat.primitives.kdf import pbkdf2 |
48 | 51 |
else: |
49 | 52 |
try: |
50 |
- from cryptography import exceptions as crypt_exceptions |
|
51 |
- from cryptography import utils as crypt_utils |
|
52 |
- from cryptography.hazmat.primitives import ( |
|
53 |
- ciphers, |
|
54 |
- hashes, |
|
55 |
- hmac, |
|
56 |
- padding, |
|
57 |
- ) |
|
58 |
- from cryptography.hazmat.primitives.ciphers import algorithms, modes |
|
59 |
- from cryptography.hazmat.primitives.kdf import pbkdf2 |
|
53 |
+ importlib.import_module('cryptography') |
|
60 | 54 |
except ModuleNotFoundError as exc: |
61 | 55 |
|
62 | 56 |
class _DummyModule: # pragma: no cover |
... | ... |
@@ -74,6 +68,17 @@ else: |
74 | 68 |
algorithms = modes = pbkdf2 = _DummyModule(exc) |
75 | 69 |
STUBBED = True |
76 | 70 |
else: |
71 |
+ from cryptography import exceptions as crypt_exceptions |
|
72 |
+ from cryptography import utils as crypt_utils |
|
73 |
+ from cryptography.hazmat.primitives import ( |
|
74 |
+ ciphers, |
|
75 |
+ hashes, |
|
76 |
+ hmac, |
|
77 |
+ padding, |
|
78 |
+ ) |
|
79 |
+ from cryptography.hazmat.primitives.ciphers import algorithms, modes |
|
80 |
+ from cryptography.hazmat.primitives.kdf import pbkdf2 |
|
81 |
+ |
|
77 | 82 |
STUBBED = False |
78 | 83 |
|
79 | 84 |
__all__ = ('export_vault_native_data',) |
... | ... |
@@ -427,80 +432,69 @@ class VaultNativeV02ConfigParser(VaultNativeConfigParser): |
427 | 432 |
).decryptor() |
428 | 433 |
|
429 | 434 |
|
435 |
+@exporter.register_export_vault_config_data_handler('v0.2', 'v0.3') |
|
430 | 436 |
def export_vault_native_data( |
431 |
- contents: Buffer | None = None, |
|
437 |
+ path: str | bytes | os.PathLike | None = None, |
|
432 | 438 |
key: str | Buffer | None = None, |
433 | 439 |
*, |
434 |
- try_formats: Sequence[str] = ('v0.3', 'v0.2'), |
|
440 |
+ format: str, # noqa: A002 |
|
435 | 441 |
) -> Any: # noqa: ANN401 |
436 | 442 |
"""Export the full configuration stored in vault native format. |
437 | 443 |
|
438 | 444 |
Args: |
439 |
- contents: |
|
440 |
- The binary encrypted contents of the vault configuration |
|
441 |
- file. If not given, then query |
|
442 |
- [`exporter.get_vault_path`][] for the correct filename and |
|
443 |
- read the contents from there. |
|
444 |
- |
|
445 |
- Note: On disk, these are usually stored in base64-encoded |
|
446 |
- form, not in the "raw" form as needed here. |
|
445 |
+ path: |
|
446 |
+ The path to the vault configuration file. If not given, |
|
447 |
+ then query [`exporter.get_vault_path`][] for the correct |
|
448 |
+ value. |
|
447 | 449 |
key: |
448 | 450 |
Encryption key/password for the configuration file, usually |
449 | 451 |
the username, or passed via the `VAULT_KEY` environment |
450 | 452 |
variable. If not given, then query |
451 | 453 |
[`exporter.get_vault_key`][] for the value. |
452 |
- try_formats: |
|
453 |
- A sequence of formats to try out, in order. Each key must |
|
454 |
- be one of `v0.2` or `v0.3`. |
|
454 |
+ format: |
|
455 |
+ The format to attempt parsing as. Must be `v0.2` or `v0.3`. |
|
455 | 456 |
|
456 | 457 |
Returns: |
457 | 458 |
The vault configuration, as recorded in the configuration file. |
458 | 459 |
|
459 |
- This may or may not be a valid configuration according to vault |
|
460 |
- or derivepassphrase. |
|
460 |
+ This may or may not be a valid configuration according to |
|
461 |
+ `vault` or `derivepassphrase`. |
|
461 | 462 |
|
462 | 463 |
Raises: |
463 |
- RuntimeError: |
|
464 |
- Something went wrong during data collection, e.g. we |
|
465 |
- encountered unsupported or corrupted data in the storeroom. |
|
466 | 464 |
json.JSONDecodeError: |
467 | 465 |
An internal JSON data structure failed to parse from disk. |
468 |
- The storeroom is probably corrupted. |
|
466 |
+ The encrypted configuration is probably corrupted. |
|
467 |
+ exporter.NotAVaultConfigError: |
|
468 |
+ The (encrypted) contents are not in the claimed |
|
469 |
+ configuration format. |
|
469 | 470 |
ValueError: |
470 |
- The requested formats to try out are invalid, or the |
|
471 |
- encrypted contents aren't in any of the attempted |
|
472 |
- configuration formats. |
|
471 |
+ The requested format is invalid. |
|
473 | 472 |
|
474 | 473 |
""" |
475 |
- if contents is None: |
|
476 |
- with open(exporter.get_vault_path(), 'rb') as infile: |
|
474 |
+ # Trigger import errors if necessary. |
|
475 |
+ importlib.import_module('cryptography') |
|
476 |
+ if path is None: |
|
477 |
+ path = exporter.get_vault_path() |
|
478 |
+ with open(path, 'rb') as infile: |
|
477 | 479 |
contents = base64.standard_b64decode(infile.read()) |
478 | 480 |
if key is None: |
479 | 481 |
key = exporter.get_vault_key() |
480 |
- stored_exception: Exception | None = None |
|
481 |
- for config_format in try_formats: |
|
482 |
- # Use match/case here once Python 3.9 becomes unsupported. |
|
483 |
- if config_format == 'v0.2': |
|
484 |
- try: |
|
485 |
- return VaultNativeV02ConfigParser(contents, key)() |
|
486 |
- except ValueError as exc: |
|
487 |
- exc.__context__ = stored_exception |
|
488 |
- stored_exception = exc |
|
489 |
- elif config_format == 'v0.3': |
|
490 |
- try: |
|
491 |
- return VaultNativeV03ConfigParser(contents, key)() |
|
492 |
- except ValueError as exc: |
|
493 |
- exc.__context__ = stored_exception |
|
494 |
- stored_exception = exc |
|
495 |
- else: # pragma: no cover |
|
496 |
- msg = ( |
|
497 |
- f'Invalid vault native configuration format: {config_format!r}' |
|
482 |
+ parser_class: type[VaultNativeConfigParser] | None = { |
|
483 |
+ 'v0.2': VaultNativeV02ConfigParser, |
|
484 |
+ 'v0.3': VaultNativeV03ConfigParser, |
|
485 |
+ }.get(format) |
|
486 |
+ if parser_class is None: # pragma: no cover |
|
487 |
+ msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
|
488 |
+ fmt=format |
|
498 | 489 |
) |
499 | 490 |
raise ValueError(msg) |
500 |
- msg = ( |
|
501 |
- f'Not a valid vault native configuration. (We tried: {try_formats!r}.)' |
|
502 |
- ) |
|
503 |
- raise stored_exception or ValueError(msg) |
|
491 |
+ try: |
|
492 |
+ return parser_class(contents, key)() |
|
493 |
+ except ValueError as exc: |
|
494 |
+ raise exporter.NotAVaultConfigError( |
|
495 |
+ os.fsdecode(path), |
|
496 |
+ format=format, |
|
497 |
+ ) from exc |
|
504 | 498 |
|
505 | 499 |
|
506 | 500 |
if __name__ == '__main__': |
... | ... |
@@ -7,6 +7,7 @@ from __future__ import annotations |
7 | 7 |
import base64 |
8 | 8 |
import contextlib |
9 | 9 |
import json |
10 |
+import os |
|
10 | 11 |
from typing import TYPE_CHECKING |
11 | 12 |
|
12 | 13 |
import click.testing |
... | ... |
@@ -15,7 +16,7 @@ import pytest |
15 | 16 |
from hypothesis import strategies |
16 | 17 |
|
17 | 18 |
import tests |
18 |
-from derivepassphrase import cli |
|
19 |
+from derivepassphrase import _types, cli, exporter |
|
19 | 20 |
from derivepassphrase.exporter import storeroom, vault_native |
20 | 21 |
|
21 | 22 |
cryptography = pytest.importorskip('cryptography', minversion='38.0') |
... | ... |
@@ -35,6 +36,8 @@ if TYPE_CHECKING: |
35 | 36 |
from collections.abc import Callable |
36 | 37 |
from typing import Any |
37 | 38 |
|
39 |
+ from typing_extensions import Buffer, Literal |
|
40 |
+ |
|
38 | 41 |
|
39 | 42 |
class TestCLI: |
40 | 43 |
def test_200_path_parameter(self, monkeypatch: pytest.MonkeyPatch) -> None: |
... | ... |
@@ -165,6 +168,31 @@ class TestCLI: |
165 | 168 |
), 'expected error exit and known error message' |
166 | 169 |
assert tests.CANNOT_LOAD_CRYPTOGRAPHY not in result.stderr |
167 | 170 |
|
171 |
+ def test_302a_vault_config_invalid_just_a_directory( |
|
172 |
+ self, |
|
173 |
+ monkeypatch: pytest.MonkeyPatch, |
|
174 |
+ caplog: pytest.LogCaptureFixture, |
|
175 |
+ ) -> None: |
|
176 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
177 |
+ with tests.isolated_vault_exporter_config( |
|
178 |
+ monkeypatch=monkeypatch, |
|
179 |
+ runner=runner, |
|
180 |
+ vault_config='', |
|
181 |
+ vault_key=tests.VAULT_MASTER_KEY, |
|
182 |
+ ): |
|
183 |
+ os.remove('.vault') |
|
184 |
+ os.mkdir('.vault') |
|
185 |
+ result_ = runner.invoke( |
|
186 |
+ cli.derivepassphrase_export_vault, |
|
187 |
+ ['.vault'], |
|
188 |
+ ) |
|
189 |
+ result = tests.ReadableResult.parse(result_) |
|
190 |
+ assert result.error_exit( |
|
191 |
+ error="Cannot parse '.vault' as a valid vault-native config", |
|
192 |
+ record_tuples=caplog.record_tuples, |
|
193 |
+ ), 'expected error exit and known error message' |
|
194 |
+ assert tests.CANNOT_LOAD_CRYPTOGRAPHY not in result.stderr |
|
195 |
+ |
|
168 | 196 |
def test_403_invalid_vault_config_bad_signature( |
169 | 197 |
self, |
170 | 198 |
monkeypatch: pytest.MonkeyPatch, |
... | ... |
@@ -201,10 +229,14 @@ class TestCLI: |
201 | 229 |
vault_key=tests.VAULT_MASTER_KEY, |
202 | 230 |
): |
203 | 231 |
|
204 |
- def _load_data(*_args: Any, **_kwargs: Any) -> None: |
|
232 |
+ def export_vault_config_data(*_args: Any, **_kwargs: Any) -> None: |
|
205 | 233 |
return None |
206 | 234 |
|
207 |
- monkeypatch.setattr(cli, '_load_data', _load_data) |
|
235 |
+ monkeypatch.setattr( |
|
236 |
+ exporter, |
|
237 |
+ 'export_vault_config_data', |
|
238 |
+ export_vault_config_data, |
|
239 |
+ ) |
|
208 | 240 |
result_ = runner.invoke( |
209 | 241 |
cli.derivepassphrase_export_vault, |
210 | 242 |
['.vault'], |
... | ... |
@@ -218,20 +250,28 @@ class TestCLI: |
218 | 250 |
|
219 | 251 |
|
220 | 252 |
class TestStoreroom: |
253 |
+ @pytest.mark.parametrize('path', ['.vault', None]) |
|
221 | 254 |
@pytest.mark.parametrize( |
222 |
- ['path', 'key'], |
|
255 |
+ 'key', |
|
223 | 256 |
[ |
224 |
- ('.vault', tests.VAULT_MASTER_KEY), |
|
225 |
- ('.vault', None), |
|
226 |
- (None, tests.VAULT_MASTER_KEY), |
|
227 |
- (None, None), |
|
257 |
+ None, |
|
258 |
+ pytest.param(tests.VAULT_MASTER_KEY, id='str'), |
|
259 |
+ pytest.param(tests.VAULT_MASTER_KEY.encode('ascii'), id='bytes'), |
|
260 |
+ pytest.param( |
|
261 |
+ bytearray(tests.VAULT_MASTER_KEY.encode('ascii')), |
|
262 |
+ id='bytearray', |
|
263 |
+ ), |
|
264 |
+ pytest.param( |
|
265 |
+ memoryview(tests.VAULT_MASTER_KEY.encode('ascii')), |
|
266 |
+ id='memoryview', |
|
267 |
+ ), |
|
228 | 268 |
], |
229 | 269 |
) |
230 | 270 |
def test_200_export_data_path_and_keys_type( |
231 | 271 |
self, |
232 | 272 |
monkeypatch: pytest.MonkeyPatch, |
233 | 273 |
path: str | None, |
234 |
- key: str | None, |
|
274 |
+ key: str | Buffer | None, |
|
235 | 275 |
) -> None: |
236 | 276 |
runner = click.testing.CliRunner(mix_stderr=False) |
237 | 277 |
with tests.isolated_vault_exporter_config( |
... | ... |
@@ -437,8 +477,80 @@ class TestVaultNativeConfig: |
437 | 477 |
== result |
438 | 478 |
) |
439 | 479 |
|
480 |
+ @pytest.mark.parametrize( |
|
481 |
+ ['config', 'format', 'result'], |
|
482 |
+ [ |
|
483 |
+ pytest.param( |
|
484 |
+ tests.VAULT_V02_CONFIG, |
|
485 |
+ 'v0.2', |
|
486 |
+ tests.VAULT_V02_CONFIG_DATA, |
|
487 |
+ id='V02_CONFIG-v0.2', |
|
488 |
+ ), |
|
489 |
+ pytest.param( |
|
490 |
+ tests.VAULT_V02_CONFIG, |
|
491 |
+ 'v0.3', |
|
492 |
+ exporter.NotAVaultConfigError, |
|
493 |
+ id='V02_CONFIG-v0.3', |
|
494 |
+ ), |
|
495 |
+ pytest.param( |
|
496 |
+ tests.VAULT_V03_CONFIG, |
|
497 |
+ 'v0.2', |
|
498 |
+ exporter.NotAVaultConfigError, |
|
499 |
+ id='V03_CONFIG-v0.2', |
|
500 |
+ ), |
|
501 |
+ pytest.param( |
|
502 |
+ tests.VAULT_V03_CONFIG, |
|
503 |
+ 'v0.3', |
|
504 |
+ tests.VAULT_V03_CONFIG_DATA, |
|
505 |
+ id='V03_CONFIG-v0.3', |
|
506 |
+ ), |
|
507 |
+ ], |
|
508 |
+ ) |
|
440 | 509 |
def test_201_export_vault_native_data_no_arguments( |
441 |
- self, monkeypatch: pytest.MonkeyPatch |
|
510 |
+ self, |
|
511 |
+ monkeypatch: pytest.MonkeyPatch, |
|
512 |
+ config: str, |
|
513 |
+ format: Literal['v0.2', 'v0.3'], |
|
514 |
+ result: _types.VaultConfig | type[Exception], |
|
515 |
+ ) -> None: |
|
516 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
517 |
+ with tests.isolated_vault_exporter_config( |
|
518 |
+ monkeypatch=monkeypatch, |
|
519 |
+ runner=runner, |
|
520 |
+ vault_config=config, |
|
521 |
+ vault_key=tests.VAULT_MASTER_KEY, |
|
522 |
+ ): |
|
523 |
+ if isinstance(result, type): |
|
524 |
+ with pytest.raises(result): |
|
525 |
+ vault_native.export_vault_native_data(None, format=format) |
|
526 |
+ else: |
|
527 |
+ parsed_config = vault_native.export_vault_native_data( |
|
528 |
+ None, format=format |
|
529 |
+ ) |
|
530 |
+ assert parsed_config == result |
|
531 |
+ |
|
532 |
+ @pytest.mark.parametrize('path', ['.vault', None]) |
|
533 |
+ @pytest.mark.parametrize( |
|
534 |
+ 'key', |
|
535 |
+ [ |
|
536 |
+ None, |
|
537 |
+ pytest.param(tests.VAULT_MASTER_KEY, id='str'), |
|
538 |
+ pytest.param(tests.VAULT_MASTER_KEY.encode('ascii'), id='bytes'), |
|
539 |
+ pytest.param( |
|
540 |
+ bytearray(tests.VAULT_MASTER_KEY.encode('ascii')), |
|
541 |
+ id='bytearray', |
|
542 |
+ ), |
|
543 |
+ pytest.param( |
|
544 |
+ memoryview(tests.VAULT_MASTER_KEY.encode('ascii')), |
|
545 |
+ id='memoryview', |
|
546 |
+ ), |
|
547 |
+ ], |
|
548 |
+ ) |
|
549 |
+ def test_202_export_data_path_and_keys_type( |
|
550 |
+ self, |
|
551 |
+ monkeypatch: pytest.MonkeyPatch, |
|
552 |
+ path: str | None, |
|
553 |
+ key: str | Buffer | None, |
|
442 | 554 |
) -> None: |
443 | 555 |
runner = click.testing.CliRunner(mix_stderr=False) |
444 | 556 |
with tests.isolated_vault_exporter_config( |
... | ... |
@@ -5,6 +5,7 @@ |
5 | 5 |
from __future__ import annotations |
6 | 6 |
|
7 | 7 |
import os |
8 |
+from typing import TYPE_CHECKING, Any |
|
8 | 9 |
|
9 | 10 |
import click.testing |
10 | 11 |
import pytest |
... | ... |
@@ -12,6 +13,9 @@ import pytest |
12 | 13 |
import tests |
13 | 14 |
from derivepassphrase import cli, exporter |
14 | 15 |
|
16 |
+if TYPE_CHECKING: |
|
17 |
+ from typing_extensions import Buffer |
|
18 |
+ |
|
15 | 19 |
|
16 | 20 |
class Test001ExporterUtils: |
17 | 21 |
@pytest.mark.parametrize( |
... | ... |
@@ -82,6 +86,33 @@ class Test001ExporterUtils: |
82 | 86 |
os.path.realpath(exporter.get_vault_path()) |
83 | 87 |
) == os.path.realpath(os.path.expanduser(expected)) |
84 | 88 |
|
89 |
+ def test_220_register_export_vault_config_data_handler( |
|
90 |
+ self, monkeypatch: pytest.MonkeyPatch |
|
91 |
+ ) -> None: |
|
92 |
+ def handler( # pragma: no cover |
|
93 |
+ path: str | bytes | os.PathLike | None = None, |
|
94 |
+ key: str | Buffer | None = None, |
|
95 |
+ *, |
|
96 |
+ format: str, |
|
97 |
+ ) -> Any: |
|
98 |
+ del path, key |
|
99 |
+ raise ValueError(format) |
|
100 |
+ |
|
101 |
+ registry = {'dummy': handler} |
|
102 |
+ monkeypatch.setattr( |
|
103 |
+ exporter, '_export_vault_config_data_registry', registry |
|
104 |
+ ) |
|
105 |
+ dec = exporter.register_export_vault_config_data_handler( |
|
106 |
+ 'name1', |
|
107 |
+ 'name2', |
|
108 |
+ ) |
|
109 |
+ assert dec(handler) == handler |
|
110 |
+ assert registry == { |
|
111 |
+ 'dummy': handler, |
|
112 |
+ 'name1': handler, |
|
113 |
+ 'name2': handler, |
|
114 |
+ } |
|
115 |
+ |
|
85 | 116 |
def test_300_get_vault_key_without_envs( |
86 | 117 |
self, monkeypatch: pytest.MonkeyPatch |
87 | 118 |
) -> None: |
... | ... |
@@ -101,6 +132,59 @@ class Test001ExporterUtils: |
101 | 132 |
): |
102 | 133 |
exporter.get_vault_path() |
103 | 134 |
|
135 |
+ @pytest.mark.parametrize( |
|
136 |
+ ['namelist', 'err_pat'], |
|
137 |
+ [ |
|
138 |
+ pytest.param((), '[Nn]o names given', id='empty'), |
|
139 |
+ pytest.param( |
|
140 |
+ ('name1', '', 'name2'), |
|
141 |
+ '[Uu]nder an empty name', |
|
142 |
+ id='empty-string', |
|
143 |
+ ), |
|
144 |
+ pytest.param( |
|
145 |
+ ('dummy', 'name1', 'name2'), |
|
146 |
+ '[Aa]lready registered', |
|
147 |
+ id='existing', |
|
148 |
+ ), |
|
149 |
+ ], |
|
150 |
+ ) |
|
151 |
+ def test_320_register_export_vault_config_data_handler_errors( |
|
152 |
+ self, |
|
153 |
+ monkeypatch: pytest.MonkeyPatch, |
|
154 |
+ namelist: tuple[str, ...], |
|
155 |
+ err_pat: str, |
|
156 |
+ ) -> None: |
|
157 |
+ def handler( # pragma: no cover |
|
158 |
+ path: str | bytes | os.PathLike | None = None, |
|
159 |
+ key: str | Buffer | None = None, |
|
160 |
+ *, |
|
161 |
+ format: str, |
|
162 |
+ ) -> Any: |
|
163 |
+ del path, key |
|
164 |
+ raise ValueError(format) |
|
165 |
+ |
|
166 |
+ registry = {'dummy': handler} |
|
167 |
+ monkeypatch.setattr( |
|
168 |
+ exporter, '_export_vault_config_data_registry', registry |
|
169 |
+ ) |
|
170 |
+ with pytest.raises(ValueError, match=err_pat): |
|
171 |
+ exporter.register_export_vault_config_data_handler(*namelist)( |
|
172 |
+ handler |
|
173 |
+ ) |
|
174 |
+ |
|
175 |
+ def test_321_export_vault_config_data_bad_handler( |
|
176 |
+ self, monkeypatch: pytest.MonkeyPatch |
|
177 |
+ ) -> None: |
|
178 |
+ monkeypatch.setattr(exporter, '_export_vault_config_data_registry', {}) |
|
179 |
+ monkeypatch.setattr( |
|
180 |
+ exporter, 'find_vault_config_data_handlers', lambda: None |
|
181 |
+ ) |
|
182 |
+ with pytest.raises( |
|
183 |
+ ValueError, |
|
184 |
+ match=r'Invalid vault native configuration format', |
|
185 |
+ ): |
|
186 |
+ exporter.export_vault_config_data(format='v0.3') |
|
187 |
+ |
|
104 | 188 |
|
105 | 189 |
class Test002CLI: |
106 | 190 |
def test_300_invalid_format( |
107 | 191 |