Marco Ricci commited on 2025-01-19 21:10:38
Zeige 2 geänderte Dateien mit 211 Einfügungen und 211 Löschungen.
For both the `storeroom` and the `vault_native` modules, move the exporter function to the top of the module.
| ... | ... |
@@ -86,6 +86,174 @@ __all__ = ('export_storeroom_data',)
|
| 86 | 86 |
logger = logging.getLogger(__name__) |
| 87 | 87 |
|
| 88 | 88 |
|
| 89 |
+@exporter.register_export_vault_config_data_handler('storeroom')
|
|
| 90 |
+def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 |
|
| 91 |
+ path: str | bytes | os.PathLike | None = None, |
|
| 92 |
+ key: str | Buffer | None = None, |
|
| 93 |
+ *, |
|
| 94 |
+ format: str = 'storeroom', # noqa: A002 |
|
| 95 |
+) -> dict[str, Any]: |
|
| 96 |
+ """Export the full configuration stored in the storeroom. |
|
| 97 |
+ |
|
| 98 |
+ See [`exporter.ExportVaultConfigDataFunction`][] for an explanation |
|
| 99 |
+ of the call signature, and the exceptions to expect. |
|
| 100 |
+ |
|
| 101 |
+ Other Args: |
|
| 102 |
+ format: |
|
| 103 |
+ The only supported format is `storeroom`. |
|
| 104 |
+ |
|
| 105 |
+ """ # noqa: DOC201,DOC501 |
|
| 106 |
+ # Trigger import errors if necessary. |
|
| 107 |
+ importlib.import_module('cryptography')
|
|
| 108 |
+ if path is None: |
|
| 109 |
+ path = exporter.get_vault_path() |
|
| 110 |
+ if key is None: |
|
| 111 |
+ key = exporter.get_vault_key() |
|
| 112 |
+ if format != 'storeroom': # pragma: no cover |
|
| 113 |
+ msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
|
| 114 |
+ fmt=format |
|
| 115 |
+ ) |
|
| 116 |
+ raise ValueError(msg) |
|
| 117 |
+ try: |
|
| 118 |
+ master_keys_file = open( # noqa: SIM115 |
|
| 119 |
+ os.path.join(os.fsdecode(path), '.keys'), |
|
| 120 |
+ encoding='utf-8', |
|
| 121 |
+ ) |
|
| 122 |
+ except FileNotFoundError as exc: |
|
| 123 |
+ raise exporter.NotAVaultConfigError( |
|
| 124 |
+ os.fsdecode(path), |
|
| 125 |
+ format='storeroom', |
|
| 126 |
+ ) from exc |
|
| 127 |
+ with master_keys_file: |
|
| 128 |
+ header = json.loads(master_keys_file.readline()) |
|
| 129 |
+ if header != {'version': 1}:
|
|
| 130 |
+ msg = 'bad or unsupported keys version header' |
|
| 131 |
+ raise RuntimeError(msg) |
|
| 132 |
+ raw_keys_data = base64.standard_b64decode(master_keys_file.readline()) |
|
| 133 |
+ encrypted_keys_params, encrypted_keys = struct.unpack( |
|
| 134 |
+ f'B {len(raw_keys_data) - 1}s', raw_keys_data
|
|
| 135 |
+ ) |
|
| 136 |
+ if master_keys_file.read(): |
|
| 137 |
+ msg = 'trailing data; cannot make sense of .keys file' |
|
| 138 |
+ raise RuntimeError(msg) |
|
| 139 |
+ encrypted_keys_version = encrypted_keys_params >> 4 |
|
| 140 |
+ if encrypted_keys_version != 1: |
|
| 141 |
+ msg = f'cannot handle version {encrypted_keys_version} encrypted keys'
|
|
| 142 |
+ raise RuntimeError(msg) |
|
| 143 |
+ logger.info( |
|
| 144 |
+ _msg.TranslatedString(_msg.InfoMsgTemplate.PARSING_MASTER_KEYS_DATA) |
|
| 145 |
+ ) |
|
| 146 |
+ encrypted_keys_iterations = 2 ** (10 + (encrypted_keys_params & 0x0F)) |
|
| 147 |
+ master_keys_keys = _derive_master_keys_keys(key, encrypted_keys_iterations) |
|
| 148 |
+ master_keys = _decrypt_master_keys_data(encrypted_keys, master_keys_keys) |
|
| 149 |
+ |
|
| 150 |
+ config_structure: dict[str, Any] = {}
|
|
| 151 |
+ json_contents: dict[str, bytes] = {}
|
|
| 152 |
+ # Use glob.glob(..., root_dir=...) here once Python 3.9 becomes |
|
| 153 |
+ # unsupported. |
|
| 154 |
+ storeroom_path_str = os.fsdecode(path) |
|
| 155 |
+ valid_hashdirs = [ |
|
| 156 |
+ hashdir_name |
|
| 157 |
+ for hashdir_name in os.listdir(storeroom_path_str) |
|
| 158 |
+ if fnmatch.fnmatch(hashdir_name, '[01][0-9a-f]') |
|
| 159 |
+ ] |
|
| 160 |
+ for file in valid_hashdirs: |
|
| 161 |
+ logger.info( |
|
| 162 |
+ _msg.TranslatedString( |
|
| 163 |
+ _msg.InfoMsgTemplate.DECRYPTING_BUCKET, |
|
| 164 |
+ bucket_number=file, |
|
| 165 |
+ ) |
|
| 166 |
+ ) |
|
| 167 |
+ bucket_contents = [ |
|
| 168 |
+ bytes(item) |
|
| 169 |
+ for item in _decrypt_bucket_file(file, master_keys, root_dir=path) |
|
| 170 |
+ ] |
|
| 171 |
+ bucket_index = json.loads(bucket_contents.pop(0)) |
|
| 172 |
+ for pos, item in enumerate(bucket_index): |
|
| 173 |
+ json_contents[item] = bucket_contents[pos] |
|
| 174 |
+ logger.debug( |
|
| 175 |
+ _msg.TranslatedString( |
|
| 176 |
+ _msg.DebugMsgTemplate.BUCKET_ITEM_FOUND, |
|
| 177 |
+ path=item, |
|
| 178 |
+ value=bucket_contents[pos], |
|
| 179 |
+ ) |
|
| 180 |
+ ) |
|
| 181 |
+ dirs_to_check: dict[str, list[str]] = {}
|
|
| 182 |
+ json_payload: Any |
|
| 183 |
+ logger.info( |
|
| 184 |
+ _msg.TranslatedString(_msg.InfoMsgTemplate.ASSEMBLING_CONFIG_STRUCTURE) |
|
| 185 |
+ ) |
|
| 186 |
+ for item_path, json_content in sorted(json_contents.items()): |
|
| 187 |
+ if item_path.endswith('/'):
|
|
| 188 |
+ logger.debug( |
|
| 189 |
+ _msg.TranslatedString( |
|
| 190 |
+ _msg.DebugMsgTemplate.POSTPONING_DIRECTORY_CONTENTS_CHECK, |
|
| 191 |
+ path=item_path, |
|
| 192 |
+ contents=json_content.decode('utf-8'),
|
|
| 193 |
+ ) |
|
| 194 |
+ ) |
|
| 195 |
+ json_payload = json.loads(json_content) |
|
| 196 |
+ if not isinstance(json_payload, list) or any( |
|
| 197 |
+ not isinstance(x, str) for x in json_payload |
|
| 198 |
+ ): |
|
| 199 |
+ msg = ( |
|
| 200 |
+ f'Directory index is not actually an index: ' |
|
| 201 |
+ f'{json_content!r}'
|
|
| 202 |
+ ) |
|
| 203 |
+ raise RuntimeError(msg) |
|
| 204 |
+ dirs_to_check[item_path] = json_payload |
|
| 205 |
+ logger.debug( |
|
| 206 |
+ _msg.TranslatedString( |
|
| 207 |
+ _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS_EMPTY_DIRECTORY, |
|
| 208 |
+ path=item_path, |
|
| 209 |
+ ), |
|
| 210 |
+ ) |
|
| 211 |
+ _store(config_structure, item_path, b'{}')
|
|
| 212 |
+ else: |
|
| 213 |
+ logger.debug( |
|
| 214 |
+ _msg.TranslatedString( |
|
| 215 |
+ _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS, |
|
| 216 |
+ path=item_path, |
|
| 217 |
+ value=json_content.decode('utf-8'),
|
|
| 218 |
+ ), |
|
| 219 |
+ ) |
|
| 220 |
+ _store(config_structure, item_path, json_content) |
|
| 221 |
+ logger.info( |
|
| 222 |
+ _msg.TranslatedString( |
|
| 223 |
+ _msg.InfoMsgTemplate.CHECKING_CONFIG_STRUCTURE_CONSISTENCY, |
|
| 224 |
+ ) |
|
| 225 |
+ ) |
|
| 226 |
+ # Sorted order is important; see `maybe_obj` below. |
|
| 227 |
+ for dir_, namelist_ in sorted(dirs_to_check.items()): |
|
| 228 |
+ namelist = [x.rstrip('/') for x in namelist_]
|
|
| 229 |
+ obj: dict[Any, Any] = config_structure |
|
| 230 |
+ for part in dir_.split('/'):
|
|
| 231 |
+ if part: |
|
| 232 |
+ # Because we iterate paths in sorted order, parent |
|
| 233 |
+ # directories are encountered before child directories. |
|
| 234 |
+ # So parent directories always exist (lest we would have |
|
| 235 |
+ # aborted earlier). |
|
| 236 |
+ # |
|
| 237 |
+ # Of course, the type checker doesn't necessarily know |
|
| 238 |
+ # this, so we need to use assertions anyway. |
|
| 239 |
+ maybe_obj = obj.get(part) |
|
| 240 |
+ assert isinstance(maybe_obj, dict), ( |
|
| 241 |
+ f'Cannot traverse storage path {dir_!r}'
|
|
| 242 |
+ ) |
|
| 243 |
+ obj = maybe_obj |
|
| 244 |
+ if set(obj.keys()) != set(namelist): |
|
| 245 |
+ msg = f'Object key mismatch for path {dir_!r}'
|
|
| 246 |
+ raise RuntimeError(msg) |
|
| 247 |
+ logger.debug( |
|
| 248 |
+ _msg.TranslatedString( |
|
| 249 |
+ _msg.DebugMsgTemplate.DIRECTORY_CONTENTS_CHECK_OK, |
|
| 250 |
+ path=dir_, |
|
| 251 |
+ contents=json.dumps(namelist_), |
|
| 252 |
+ ) |
|
| 253 |
+ ) |
|
| 254 |
+ return config_structure |
|
| 255 |
+ |
|
| 256 |
+ |
|
| 89 | 257 |
def _h(bs: Buffer) -> str: |
| 90 | 258 |
return '<{}>'.format(memoryview(bs).hex(' '))
|
| 91 | 259 |
|
| ... | ... |
@@ -585,174 +753,6 @@ def _store(config: dict[str, Any], path: str, json_contents: bytes) -> None: |
| 585 | 753 |
config[path_parts[-1]] = contents |
| 586 | 754 |
|
| 587 | 755 |
|
| 588 |
-@exporter.register_export_vault_config_data_handler('storeroom')
|
|
| 589 |
-def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 |
|
| 590 |
- path: str | bytes | os.PathLike | None = None, |
|
| 591 |
- key: str | Buffer | None = None, |
|
| 592 |
- *, |
|
| 593 |
- format: str = 'storeroom', # noqa: A002 |
|
| 594 |
-) -> dict[str, Any]: |
|
| 595 |
- """Export the full configuration stored in the storeroom. |
|
| 596 |
- |
|
| 597 |
- See [`exporter.ExportVaultConfigDataFunction`][] for an explanation |
|
| 598 |
- of the call signature, and the exceptions to expect. |
|
| 599 |
- |
|
| 600 |
- Other Args: |
|
| 601 |
- format: |
|
| 602 |
- The only supported format is `storeroom`. |
|
| 603 |
- |
|
| 604 |
- """ # noqa: DOC201,DOC501 |
|
| 605 |
- # Trigger import errors if necessary. |
|
| 606 |
- importlib.import_module('cryptography')
|
|
| 607 |
- if path is None: |
|
| 608 |
- path = exporter.get_vault_path() |
|
| 609 |
- if key is None: |
|
| 610 |
- key = exporter.get_vault_key() |
|
| 611 |
- if format != 'storeroom': # pragma: no cover |
|
| 612 |
- msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
|
| 613 |
- fmt=format |
|
| 614 |
- ) |
|
| 615 |
- raise ValueError(msg) |
|
| 616 |
- try: |
|
| 617 |
- master_keys_file = open( # noqa: SIM115 |
|
| 618 |
- os.path.join(os.fsdecode(path), '.keys'), |
|
| 619 |
- encoding='utf-8', |
|
| 620 |
- ) |
|
| 621 |
- except FileNotFoundError as exc: |
|
| 622 |
- raise exporter.NotAVaultConfigError( |
|
| 623 |
- os.fsdecode(path), |
|
| 624 |
- format='storeroom', |
|
| 625 |
- ) from exc |
|
| 626 |
- with master_keys_file: |
|
| 627 |
- header = json.loads(master_keys_file.readline()) |
|
| 628 |
- if header != {'version': 1}:
|
|
| 629 |
- msg = 'bad or unsupported keys version header' |
|
| 630 |
- raise RuntimeError(msg) |
|
| 631 |
- raw_keys_data = base64.standard_b64decode(master_keys_file.readline()) |
|
| 632 |
- encrypted_keys_params, encrypted_keys = struct.unpack( |
|
| 633 |
- f'B {len(raw_keys_data) - 1}s', raw_keys_data
|
|
| 634 |
- ) |
|
| 635 |
- if master_keys_file.read(): |
|
| 636 |
- msg = 'trailing data; cannot make sense of .keys file' |
|
| 637 |
- raise RuntimeError(msg) |
|
| 638 |
- encrypted_keys_version = encrypted_keys_params >> 4 |
|
| 639 |
- if encrypted_keys_version != 1: |
|
| 640 |
- msg = f'cannot handle version {encrypted_keys_version} encrypted keys'
|
|
| 641 |
- raise RuntimeError(msg) |
|
| 642 |
- logger.info( |
|
| 643 |
- _msg.TranslatedString(_msg.InfoMsgTemplate.PARSING_MASTER_KEYS_DATA) |
|
| 644 |
- ) |
|
| 645 |
- encrypted_keys_iterations = 2 ** (10 + (encrypted_keys_params & 0x0F)) |
|
| 646 |
- master_keys_keys = derive_master_keys_keys(key, encrypted_keys_iterations) |
|
| 647 |
- master_keys = decrypt_master_keys_data(encrypted_keys, master_keys_keys) |
|
| 648 |
- |
|
| 649 |
- config_structure: dict[str, Any] = {}
|
|
| 650 |
- json_contents: dict[str, bytes] = {}
|
|
| 651 |
- # Use glob.glob(..., root_dir=...) here once Python 3.9 becomes |
|
| 652 |
- # unsupported. |
|
| 653 |
- storeroom_path_str = os.fsdecode(path) |
|
| 654 |
- valid_hashdirs = [ |
|
| 655 |
- hashdir_name |
|
| 656 |
- for hashdir_name in os.listdir(storeroom_path_str) |
|
| 657 |
- if fnmatch.fnmatch(hashdir_name, '[01][0-9a-f]') |
|
| 658 |
- ] |
|
| 659 |
- for file in valid_hashdirs: |
|
| 660 |
- logger.info( |
|
| 661 |
- _msg.TranslatedString( |
|
| 662 |
- _msg.InfoMsgTemplate.DECRYPTING_BUCKET, |
|
| 663 |
- bucket_number=file, |
|
| 664 |
- ) |
|
| 665 |
- ) |
|
| 666 |
- bucket_contents = [ |
|
| 667 |
- bytes(item) |
|
| 668 |
- for item in decrypt_bucket_file(file, master_keys, root_dir=path) |
|
| 669 |
- ] |
|
| 670 |
- bucket_index = json.loads(bucket_contents.pop(0)) |
|
| 671 |
- for pos, item in enumerate(bucket_index): |
|
| 672 |
- json_contents[item] = bucket_contents[pos] |
|
| 673 |
- logger.debug( |
|
| 674 |
- _msg.TranslatedString( |
|
| 675 |
- _msg.DebugMsgTemplate.BUCKET_ITEM_FOUND, |
|
| 676 |
- path=item, |
|
| 677 |
- value=bucket_contents[pos], |
|
| 678 |
- ) |
|
| 679 |
- ) |
|
| 680 |
- dirs_to_check: dict[str, list[str]] = {}
|
|
| 681 |
- json_payload: Any |
|
| 682 |
- logger.info( |
|
| 683 |
- _msg.TranslatedString(_msg.InfoMsgTemplate.ASSEMBLING_CONFIG_STRUCTURE) |
|
| 684 |
- ) |
|
| 685 |
- for item_path, json_content in sorted(json_contents.items()): |
|
| 686 |
- if item_path.endswith('/'):
|
|
| 687 |
- logger.debug( |
|
| 688 |
- _msg.TranslatedString( |
|
| 689 |
- _msg.DebugMsgTemplate.POSTPONING_DIRECTORY_CONTENTS_CHECK, |
|
| 690 |
- path=item_path, |
|
| 691 |
- contents=json_content.decode('utf-8'),
|
|
| 692 |
- ) |
|
| 693 |
- ) |
|
| 694 |
- json_payload = json.loads(json_content) |
|
| 695 |
- if not isinstance(json_payload, list) or any( |
|
| 696 |
- not isinstance(x, str) for x in json_payload |
|
| 697 |
- ): |
|
| 698 |
- msg = ( |
|
| 699 |
- f'Directory index is not actually an index: ' |
|
| 700 |
- f'{json_content!r}'
|
|
| 701 |
- ) |
|
| 702 |
- raise RuntimeError(msg) |
|
| 703 |
- dirs_to_check[item_path] = json_payload |
|
| 704 |
- logger.debug( |
|
| 705 |
- _msg.TranslatedString( |
|
| 706 |
- _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS_EMPTY_DIRECTORY, |
|
| 707 |
- path=item_path, |
|
| 708 |
- ), |
|
| 709 |
- ) |
|
| 710 |
- _store(config_structure, item_path, b'{}')
|
|
| 711 |
- else: |
|
| 712 |
- logger.debug( |
|
| 713 |
- _msg.TranslatedString( |
|
| 714 |
- _msg.DebugMsgTemplate.SETTING_CONFIG_STRUCTURE_CONTENTS, |
|
| 715 |
- path=item_path, |
|
| 716 |
- value=json_content.decode('utf-8'),
|
|
| 717 |
- ), |
|
| 718 |
- ) |
|
| 719 |
- _store(config_structure, item_path, json_content) |
|
| 720 |
- logger.info( |
|
| 721 |
- _msg.TranslatedString( |
|
| 722 |
- _msg.InfoMsgTemplate.CHECKING_CONFIG_STRUCTURE_CONSISTENCY, |
|
| 723 |
- ) |
|
| 724 |
- ) |
|
| 725 |
- # Sorted order is important; see `maybe_obj` below. |
|
| 726 |
- for dir_, namelist_ in sorted(dirs_to_check.items()): |
|
| 727 |
- namelist = [x.rstrip('/') for x in namelist_]
|
|
| 728 |
- obj: dict[Any, Any] = config_structure |
|
| 729 |
- for part in dir_.split('/'):
|
|
| 730 |
- if part: |
|
| 731 |
- # Because we iterate paths in sorted order, parent |
|
| 732 |
- # directories are encountered before child directories. |
|
| 733 |
- # So parent directories always exist (lest we would have |
|
| 734 |
- # aborted earlier). |
|
| 735 |
- # |
|
| 736 |
- # Of course, the type checker doesn't necessarily know |
|
| 737 |
- # this, so we need to use assertions anyway. |
|
| 738 |
- maybe_obj = obj.get(part) |
|
| 739 |
- assert isinstance(maybe_obj, dict), ( |
|
| 740 |
- f'Cannot traverse storage path {dir_!r}'
|
|
| 741 |
- ) |
|
| 742 |
- obj = maybe_obj |
|
| 743 |
- if set(obj.keys()) != set(namelist): |
|
| 744 |
- msg = f'Object key mismatch for path {dir_!r}'
|
|
| 745 |
- raise RuntimeError(msg) |
|
| 746 |
- logger.debug( |
|
| 747 |
- _msg.TranslatedString( |
|
| 748 |
- _msg.DebugMsgTemplate.DIRECTORY_CONTENTS_CHECK_OK, |
|
| 749 |
- path=dir_, |
|
| 750 |
- contents=json.dumps(namelist_), |
|
| 751 |
- ) |
|
| 752 |
- ) |
|
| 753 |
- return config_structure |
|
| 754 |
- |
|
| 755 |
- |
|
| 756 | 756 |
if __name__ == '__main__': |
| 757 | 757 |
logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
|
| 758 | 758 |
config_structure = export_storeroom_data(format='storeroom') |
| ... | ... |
@@ -86,6 +86,49 @@ __all__ = ('export_vault_native_data',)
|
| 86 | 86 |
logger = logging.getLogger(__name__) |
| 87 | 87 |
|
| 88 | 88 |
|
| 89 |
+@exporter.register_export_vault_config_data_handler('v0.2', 'v0.3')
|
|
| 90 |
+def export_vault_native_data( # noqa: D417 |
|
| 91 |
+ path: str | bytes | os.PathLike | None = None, |
|
| 92 |
+ key: str | Buffer | None = None, |
|
| 93 |
+ *, |
|
| 94 |
+ format: str, # noqa: A002 |
|
| 95 |
+) -> Any: # noqa: ANN401 |
|
| 96 |
+ """Export the full configuration stored in vault native format. |
|
| 97 |
+ |
|
| 98 |
+ See [`exporter.ExportVaultConfigDataFunction`][] for an explanation |
|
| 99 |
+ of the call signature, and the exceptions to expect. |
|
| 100 |
+ |
|
| 101 |
+ Other Args: |
|
| 102 |
+ format: |
|
| 103 |
+ The only supported formats are `v0.2` and `v0.3`. |
|
| 104 |
+ |
|
| 105 |
+ """ # noqa: DOC201,DOC501 |
|
| 106 |
+ # Trigger import errors if necessary. |
|
| 107 |
+ importlib.import_module('cryptography')
|
|
| 108 |
+ if path is None: |
|
| 109 |
+ path = exporter.get_vault_path() |
|
| 110 |
+ with open(path, 'rb') as infile: |
|
| 111 |
+ contents = base64.standard_b64decode(infile.read()) |
|
| 112 |
+ if key is None: |
|
| 113 |
+ key = exporter.get_vault_key() |
|
| 114 |
+ parser_class: type[VaultNativeConfigParser] | None = {
|
|
| 115 |
+ 'v0.2': VaultNativeV02ConfigParser, |
|
| 116 |
+ 'v0.3': VaultNativeV03ConfigParser, |
|
| 117 |
+ }.get(format) |
|
| 118 |
+ if parser_class is None: # pragma: no cover |
|
| 119 |
+ msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
|
| 120 |
+ fmt=format |
|
| 121 |
+ ) |
|
| 122 |
+ raise ValueError(msg) |
|
| 123 |
+ try: |
|
| 124 |
+ return parser_class(contents, key)() |
|
| 125 |
+ except ValueError as exc: |
|
| 126 |
+ raise exporter.NotAVaultConfigError( |
|
| 127 |
+ os.fsdecode(path), |
|
| 128 |
+ format=format, |
|
| 129 |
+ ) from exc |
|
| 130 |
+ |
|
| 131 |
+ |
|
| 89 | 132 |
def _h(bs: Buffer) -> str: |
| 90 | 133 |
return '<{}>'.format(memoryview(bs).hex(' '))
|
| 91 | 134 |
|
| ... | ... |
@@ -681,49 +724,6 @@ class VaultNativeV02ConfigParser(VaultNativeConfigParser): |
| 681 | 724 |
).decryptor() |
| 682 | 725 |
|
| 683 | 726 |
|
| 684 |
-@exporter.register_export_vault_config_data_handler('v0.2', 'v0.3')
|
|
| 685 |
-def export_vault_native_data( # noqa: D417 |
|
| 686 |
- path: str | bytes | os.PathLike | None = None, |
|
| 687 |
- key: str | Buffer | None = None, |
|
| 688 |
- *, |
|
| 689 |
- format: str, # noqa: A002 |
|
| 690 |
-) -> Any: # noqa: ANN401 |
|
| 691 |
- """Export the full configuration stored in vault native format. |
|
| 692 |
- |
|
| 693 |
- See [`exporter.ExportVaultConfigDataFunction`][] for an explanation |
|
| 694 |
- of the call signature, and the exceptions to expect. |
|
| 695 |
- |
|
| 696 |
- Other Args: |
|
| 697 |
- format: |
|
| 698 |
- The only supported formats are `v0.2` and `v0.3`. |
|
| 699 |
- |
|
| 700 |
- """ # noqa: DOC201,DOC501 |
|
| 701 |
- # Trigger import errors if necessary. |
|
| 702 |
- importlib.import_module('cryptography')
|
|
| 703 |
- if path is None: |
|
| 704 |
- path = exporter.get_vault_path() |
|
| 705 |
- with open(path, 'rb') as infile: |
|
| 706 |
- contents = base64.standard_b64decode(infile.read()) |
|
| 707 |
- if key is None: |
|
| 708 |
- key = exporter.get_vault_key() |
|
| 709 |
- parser_class: type[VaultNativeConfigParser] | None = {
|
|
| 710 |
- 'v0.2': VaultNativeV02ConfigParser, |
|
| 711 |
- 'v0.3': VaultNativeV03ConfigParser, |
|
| 712 |
- }.get(format) |
|
| 713 |
- if parser_class is None: # pragma: no cover |
|
| 714 |
- msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( |
|
| 715 |
- fmt=format |
|
| 716 |
- ) |
|
| 717 |
- raise ValueError(msg) |
|
| 718 |
- try: |
|
| 719 |
- return parser_class(contents, key)() |
|
| 720 |
- except ValueError as exc: |
|
| 721 |
- raise exporter.NotAVaultConfigError( |
|
| 722 |
- os.fsdecode(path), |
|
| 723 |
- format=format, |
|
| 724 |
- ) from exc |
|
| 725 |
- |
|
| 726 |
- |
|
| 727 | 727 |
if __name__ == '__main__': |
| 728 | 728 |
import os |
| 729 | 729 |
|
| 730 | 730 |