# SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info> # # SPDX-License-Identifier: Zlib """Exporter for the vault native configuration format (v0.2 or v0.3). The vault native formats are the configuration formats used by vault v0.2 and v0.3. The configuration is stored as a single encrypted file, which is encrypted and authenticated. v0.2 and v0.3 differ in some details concerning key derivation and expected format of internal structures, so they are *not* compatible. v0.2 additionally contains cryptographic weaknesses (API misuse of a key derivation function, and a low-entropy method of generating initialization vectors for CBC block encryption mode) and should thus be avoided if possible. The public interface is the [`export_vault_native_data`][] function. Multiple *non-public* classes are additionally documented here for didactical and educational reasons, but they are not part of the module API, are subject to change without notice (including removal), and should *not* be used or relied on. """ # ruff: noqa: S303 from __future__ import annotations import abc import base64 import importlib import json import logging import os import warnings from typing import TYPE_CHECKING from derivepassphrase import _cli_msg as _msg from derivepassphrase import exporter, vault if TYPE_CHECKING: from typing import Any from typing_extensions import Buffer if TYPE_CHECKING: from cryptography import exceptions as crypt_exceptions from cryptography import utils as crypt_utils from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding from cryptography.hazmat.primitives.ciphers import algorithms, modes from cryptography.hazmat.primitives.kdf import pbkdf2 else: try: importlib.import_module('cryptography') except ModuleNotFoundError as exc: class _DummyModule: # pragma: no cover def __init__(self, exc: type[Exception]) -> None: self.exc = exc def __getattr__(self, name: str) -> Any: # noqa: ANN401 def func(*args: Any, **kwargs: Any) -> Any: # noqa: ANN401,ARG001 raise self.exc return func crypt_exceptions = crypt_utils = _DummyModule(exc) ciphers = hashes = hmac = padding = _DummyModule(exc) algorithms = modes = pbkdf2 = _DummyModule(exc) STUBBED = True else: from cryptography import exceptions as crypt_exceptions from cryptography import utils as crypt_utils from cryptography.hazmat.primitives import ( ciphers, hashes, hmac, padding, ) from cryptography.hazmat.primitives.ciphers import algorithms, modes from cryptography.hazmat.primitives.kdf import pbkdf2 STUBBED = False __all__ = ('export_vault_native_data',) logger = logging.getLogger(__name__) def _h(bs: Buffer) -> str: return '<{}>'.format(memoryview(bs).hex(' ')) class VaultNativeConfigParser(abc.ABC): """A base parser for vault's native configuration format. Certain details are specific to the respective vault versions, and are abstracted out. This class by itself is not instantiable because of this. """ def __init__(self, contents: Buffer, password: str | Buffer) -> None: """Initialize the parser. Args: contents: The binary contents of the encrypted configuration file. Note: On disk, these are usually stored in base64-encoded form, not in the "raw" form as needed here. password: The vault master key/master passphrase the file is encrypted with. Must be non-empty. See [`exporter.get_vault_key`][] for details. If this is a text string, then the UTF-8 encoding of the string is used as the binary password. Raises: ValueError: The password must not be empty. Warning: Non-public class, provided for didactical and educational purposes only. Subject to change without notice, including removal. """ if not password: msg = 'Password must not be empty' raise ValueError(msg) self._contents = bytes(contents) self._iv_size = 0 self._mac_size = 0 self._encryption_key = b'' self._encryption_key_size = 0 self._signing_key = b'' self._signing_key_size = 0 self._message = b'' self._message_tag = b'' self._iv = b'' self._payload = b'' self._password = password self._sentinel: object = object() self._data: Any = self._sentinel def __call__(self) -> Any: # noqa: ANN401 """Return the decrypted and parsed vault configuration. Raises: cryptography.exceptions.InvalidSignature: The encrypted configuration does not contain a valid signature. ValueError: The format is invalid, in a non-cryptographic way. (For example, it contains an unsupported version marker, or unexpected extra contents, or invalid padding.) """ if self._data is self._sentinel: self._parse_contents() self._derive_keys() self._check_signature() self._data = self._decrypt_payload() return self._data @staticmethod def _pbkdf2( password: str | Buffer, key_size: int, iterations: int ) -> bytes: """Generate a key from a password. Uses PBKDF2 with HMAC-SHA1, with the vault UUID as a fixed salt value. Args: password: The password from which to derive the key. key_size: The size of the output string. The effective key size (in bytes) is thus half of this output string size. iterations: The PBKDF2 iteration count. Returns: The PBKDF2-derived key, encoded as a lowercase ASCII hexadecimal string. Danger: Insecure use of cryptography This function is insecure because it uses a fixed salt value, which is not secure against rainbow tables. It is further difficult to use because the effective key size is only half as large as the "size" parameter (output string size). Finally, though the use of SHA-1 in HMAC per se is not known to be insecure, SHA-1 is known not to be collision-resistant. """ if isinstance(password, str): password = password.encode('utf-8') raw_key = pbkdf2.PBKDF2HMAC( algorithm=hashes.SHA1(), length=key_size // 2, salt=vault.Vault._UUID, # noqa: SLF001 iterations=iterations, ).derive(bytes(password)) result_key = raw_key.hex().lower().encode('ASCII') logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_PBKDF2_CALL, password=password, salt=vault.Vault._UUID, # noqa: SLF001 iterations=iterations, key_size=key_size // 2, algorithm='sha1', raw_result=raw_key, result_key=result_key.decode('ASCII'), ), ) return result_key def _parse_contents(self) -> None: """Parse the contents into IV, payload and MAC. This operates on, and sets, multiple internal attributes of the parser. Raises: ValueError: The configuration file contents are clearly truncated. """ logger.info( _msg.TranslatedString( _msg.InfoMsgTemplate.VAULT_NATIVE_PARSING_IV_PAYLOAD_MAC, ), ) if len(self._contents) < self._iv_size + 16 + self._mac_size: msg = 'Invalid vault configuration file: file is truncated' raise ValueError(msg) def cut(buffer: bytes, cutpoint: int) -> tuple[bytes, bytes]: return buffer[:cutpoint], buffer[cutpoint:] cutpos1 = len(self._contents) - self._mac_size cutpos2 = self._iv_size self._message, self._message_tag = cut(self._contents, cutpos1) self._iv, self._payload = cut(self._message, cutpos2) logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_PARSE_BUFFER, contents=_h(self._contents), iv=_h(self._iv), payload=_h(self._payload), mac=_h(self._message_tag), ), ) def _derive_keys(self) -> None: """Derive the signing and encryption keys. This is a bookkeeping method. The actual work is done in [`_generate_keys`][]. """ logger.info( _msg.TranslatedString( _msg.InfoMsgTemplate.VAULT_NATIVE_DERIVING_KEYS, ), ) self._generate_keys() assert len(self._encryption_key) == self._encryption_key_size, ( 'Derived encryption key is invalid' ) assert len(self._signing_key) == self._signing_key_size, ( 'Derived signing key is invalid' ) @abc.abstractmethod def _generate_keys(self) -> None: """Derive the signing and encryption keys, and set the key sizes. Subclasses must override this, as the derivation system is version-specific. The default implementation raises an error. Raises: AssertionError: There is no default implementation. """ raise AssertionError def _check_signature(self) -> None: """Check for a valid MAC on the encrypted vault configuration. The MAC uses HMAC-SHA1, and thus is 32 bytes long, before encoding. Raises: ValueError: The MAC is invalid. """ logger.info( _msg.TranslatedString( _msg.InfoMsgTemplate.VAULT_NATIVE_CHECKING_MAC, ), ) mac = hmac.HMAC(self._signing_key, hashes.SHA256()) mac_input = self._hmac_input() logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_CHECKING_MAC_DETAILS, mac_input=_h(mac_input), mac=_h(self._message_tag), ), ) mac.update(mac_input) try: mac.verify(self._message_tag) except crypt_exceptions.InvalidSignature: msg = 'File does not contain a valid signature' raise ValueError(msg) from None @abc.abstractmethod def _hmac_input(self) -> bytes: """Return the input the MAC is supposed to verify. Subclasses must override this, as the MAC-attested data is version-specific. The default implementation raises an error. Raises: AssertionError: There is no default implementation. """ raise AssertionError def _decrypt_payload(self) -> Any: # noqa: ANN401 """Return the decrypted vault configuration. Requires [`_parse_contents`][] and [`_derive_keys`][] to have run, and relies on [`_check_signature`][] for tampering detection. """ logger.info( _msg.TranslatedString( _msg.InfoMsgTemplate.VAULT_NATIVE_DECRYPTING_CONTENTS, ), ) decryptor = self._make_decryptor() padded_plaintext = bytearray() padded_plaintext.extend(decryptor.update(self._payload)) padded_plaintext.extend(decryptor.finalize()) logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_PADDED_PLAINTEXT, contents=_h(padded_plaintext), ), ) unpadder = padding.PKCS7(self._iv_size * 8).unpadder() plaintext = bytearray() plaintext.extend(unpadder.update(padded_plaintext)) plaintext.extend(unpadder.finalize()) logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_PLAINTEXT, contents=_h(plaintext), ), ) return json.loads(plaintext) @abc.abstractmethod def _make_decryptor(self) -> ciphers.CipherContext: """Return the cipher context object used for decryption. Subclasses must override this, as the cipher setup is version-specific. The default implementation raises an error. Raises: AssertionError: There is no default implementation. """ raise AssertionError class VaultNativeV03ConfigParser(VaultNativeConfigParser): """A parser for vault's native configuration format (v0.3). This is the modern, pre-storeroom configuration format. Warning: Non-public class, provided for didactical and educational purposes only. Subject to change without notice, including removal. """ KEY_SIZE = 32 """ Key size for both the encryption and the signing key, including the encoding as a hexadecimal string. (The effective cryptographic strength is half of this value.) """ def __init__(self, *args: Any, **kwargs: Any) -> None: # noqa: ANN401 super().__init__(*args, **kwargs) self._iv_size = 16 self._mac_size = 32 def _generate_keys(self) -> None: """Derive the signing and encryption keys, and set the key sizes. Version 0.3 vault configurations use a constant key size; see [`KEY_SIZE`][]. The encryption and signing keys differ in how many rounds of PBKDF2 they use (100 and 200, respectively). Danger: Insecure use of cryptography This function makes use of the insecure function [`VaultNativeConfigParser._pbkdf2`][], without any attempts at mitigating its insecurity. It further uses `_pbkdf2` with the low iteration count of 100 and 200 rounds, which is *drastically* insufficient to defend against password guessing attacks using GPUs or ASICs. We provide this function for the purpose of interoperability with existing vault installations. Do not rely on this system to keep your vault configuration secure against access by even moderately determined attackers! """ self._encryption_key = self._pbkdf2(self._password, self.KEY_SIZE, 100) self._signing_key = self._pbkdf2(self._password, self.KEY_SIZE, 200) self._encryption_key_size = self._signing_key_size = self.KEY_SIZE def _hmac_input(self) -> bytes: """Return the input the MAC is supposed to verify. This includes hexadecimal encoding of the message payload. """ return self._message.hex().lower().encode('ASCII') def _make_decryptor(self) -> ciphers.CipherContext: """Return the cipher context object used for decryption. This is a standard AES256-CBC cipher context using the previously derived encryption key and the IV declared in the (MAC-verified) message payload. """ return ciphers.Cipher( algorithms.AES256(self._encryption_key), modes.CBC(self._iv) ).decryptor() class VaultNativeV02ConfigParser(VaultNativeConfigParser): """A parser for vault's native configuration format (v0.2). This is the classic configuration format. Compared to v0.3, it contains an (accidental) API misuse for the generation of the master keys, a low-entropy method of generating initialization vectors for the AES-CBC encryption step, and extra layers of base64 encoding. Because of these significantly weakened confidentiality guarantees, v0.2 configurations should be upgraded to at least v0.3 as soon as possible. Warning: Non-public class, provided for didactical and educational purposes only. Subject to change without notice, including removal. """ def __init__(self, *args: Any, **kwargs: Any) -> None: # noqa: ANN401 super().__init__(*args, **kwargs) self._iv_size = 16 self._mac_size = 64 def _parse_contents(self) -> None: """Parse the contents into IV, payload and MAC. Like the base class implementation, this operates on, and sets, multiple internal attributes of the parser. In version 0.2 vault configurations, the payload is encoded in base64 and the message tag (MAC) is encoded in hexadecimal, so unlike the base class implementation, we additionally decode the payload and the MAC. Raises: ValueError: The configuration file contents are clearly truncated, or the payload or the message tag cannot be decoded properly. """ super()._parse_contents() self._payload = base64.standard_b64decode(self._payload) self._message_tag = bytes.fromhex(self._message_tag.decode('ASCII')) logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_V02_PAYLOAD_MAC_POSTPROCESSING, payload=_h(self._payload), mac=_h(self._message_tag), ), ) def _generate_keys(self) -> None: """Derive the signing and encryption keys, and set the key sizes. Version 0.2 vault configurations use 8-byte encryption keys and 16-byte signing keys, including the hexadecimal encoding. They both use 16 rounds of PBKDF2. This is due to an oversight in vault, where the author mistakenly supplied the intended iteration count as the key size, and the key size as the iteration count. Danger: Insecure use of cryptography This function makes use of the insecure function [`VaultNativeConfigParser._pbkdf2`][], without any attempts at mitigating its insecurity. It further uses `_pbkdf2` with the low iteration count of 16 rounds, which is *drastically* insufficient to defend against password guessing attacks using GPUs or ASICs, and generates the encryption key as a truncation of the signing key. We provide this function for the purpose of interoperability with existing vault installations. Do not rely on this system to keep your vault configuration secure against access by even moderately determined attackers! """ self._encryption_key = self._pbkdf2(self._password, 8, 16) self._signing_key = self._pbkdf2(self._password, 16, 16) self._encryption_key_size = 8 self._signing_key_size = 16 def _hmac_input(self) -> bytes: """Return the input the MAC is supposed to verify. This includes hexadecimal encoding of the message payload. """ return base64.standard_b64encode(self._message) @staticmethod def _evp_bytestokey_md5_one_iteration_no_salt( data: bytes, key_size: int, iv_size: int ) -> tuple[bytes, bytes]: """Reimplement OpenSSL's `EVP_BytesToKey` with fixed parameters. `EVP_BytesToKey` in general is a key derivation function, i.e., a function that derives key material from an input byte string. `EVP_BytesToKey` conceptually splits the derived key material into an encryption key and an initialization vector (IV). Note: Algorithm description `EVP_BytesToKey` takes an input byte string, two output size (encryption key size and IV size), a message digest function, a salt value and an iteration count. The derived key material is calculated in blocks, each of which is the output of (iterated application of) the message digest function. The input to the message digest function is the concatenation of the previous block (if any) with the input byte string and the salt value (if any): ~~~~ python data = block_input = b''.join([ previous_block, input_string, salt ]) for i in range(iteration_count): data = message_digest(data) block = data ~~~~ We use as many blocks as are necessary to cover the total output byte string size. The first few bytes (dictated by the encryption key size) form the encryption key, the other bytes (dictated by the IV size) form the IV. We implement exactly the subset of `EVP_BytesToKey` that the Node.js `crypto` library (v21 series and older) uses in its implementation of `crypto.createCipher("aes256", password)`. Specifically, the message digest function is fixed to MD5, the salt is always empty, and the iteration count is fixed at one. Returns: A 2-tuple containing the derived encryption key and the derived initialization vector. Danger: Insecure use of cryptography This function reimplements the OpenSSL function `EVP_BytesToKey`, which generates cryptographically weak keys, without any attempts at mitigating its insecurity. We provide this function for the purpose of interoperability with existing vault installations. Do not rely on this system to keep your vault configuration secure against access by even moderately determined attackers! """ total_size = key_size + iv_size buffer = bytearray() last_block = b'' salt = b'' logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_EVP_BYTESTOKEY_INIT, data=_h(data), salt=_h(salt), key_size=key_size, iv_size=iv_size, buffer_length=len(buffer), buffer=_h(buffer), ), ) while len(buffer) < total_size: with warnings.catch_warnings(): warnings.simplefilter( 'ignore', crypt_utils.CryptographyDeprecationWarning ) block = hashes.Hash(hashes.MD5()) block.update(last_block) block.update(data) block.update(salt) last_block = block.finalize() buffer.extend(last_block) logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_EVP_BYTESTOKEY_ROUND, buffer_length=len(buffer), buffer=_h(buffer), ), ) logger.debug( _msg.TranslatedString( _msg.DebugMsgTemplate.VAULT_NATIVE_EVP_BYTESTOKEY_RESULT, enc_key=_h(buffer[:key_size]), iv=_h(buffer[key_size:total_size]), ), ) return bytes(buffer[:key_size]), bytes(buffer[key_size:total_size]) def _make_decryptor(self) -> ciphers.CipherContext: """Return the cipher context object used for decryption. This is a standard AES256-CBC cipher context. The encryption key and the IV are derived via the OpenSSL `EVP_BytesToKey` function (using MD5, no salt, and one iteration). This is what the Node.js `crypto` library (v21 series and older) used in its implementation of `crypto.createCipher("aes256", password)`. Danger: Insecure use of cryptography This function makes use of (an implementation of) the OpenSSL function `EVP_BytesToKey`, which generates cryptographically weak keys, without any attempts at mitigating its insecurity. We provide this function for the purpose of interoperability with existing vault installations. Do not rely on this system to keep your vault configuration secure against access by even moderately determined attackers! """ data = base64.standard_b64encode(self._iv + self._encryption_key) encryption_key, iv = self._evp_bytestokey_md5_one_iteration_no_salt( data, key_size=32, iv_size=16 ) return ciphers.Cipher( algorithms.AES256(encryption_key), modes.CBC(iv) ).decryptor() @exporter.register_export_vault_config_data_handler('v0.2', 'v0.3') def export_vault_native_data( # noqa: D417 path: str | bytes | os.PathLike | None = None, key: str | Buffer | None = None, *, format: str, # noqa: A002 ) -> Any: # noqa: ANN401 """Export the full configuration stored in vault native format. See [`exporter.ExportVaultConfigDataFunction`][] for an explanation of the call signature, and the exceptions to expect. Other Args: format: The only supported formats are `v0.2` and `v0.3`. """ # noqa: DOC201,DOC501 # Trigger import errors if necessary. importlib.import_module('cryptography') if path is None: path = exporter.get_vault_path() with open(path, 'rb') as infile: contents = base64.standard_b64decode(infile.read()) if key is None: key = exporter.get_vault_key() parser_class: type[VaultNativeConfigParser] | None = { 'v0.2': VaultNativeV02ConfigParser, 'v0.3': VaultNativeV03ConfigParser, }.get(format) if parser_class is None: # pragma: no cover msg = exporter.INVALID_VAULT_NATIVE_CONFIGURATION_FORMAT.format( fmt=format ) raise ValueError(msg) try: return parser_class(contents, key)() except ValueError as exc: raise exporter.NotAVaultConfigError( os.fsdecode(path), format=format, ) from exc if __name__ == '__main__': import os logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING')) with open(exporter.get_vault_path(), 'rb') as infile: contents = base64.standard_b64decode(infile.read()) password = exporter.get_vault_key() try: config = VaultNativeV03ConfigParser(contents, password)() except ValueError: config = VaultNativeV02ConfigParser(contents, password)() print(json.dumps(config, indent=2, sort_keys=True)) # noqa: T201