git.schokokeks.org
Repositories
Help
Report an Issue
derivepassphrase.git
Code
Commits
Branches
Tags
Suche
Strukturansicht:
12a294f
Branches
Tags
documentation-tree
master
0.1.0
0.1.1
0.1.2
0.1.3
0.2.0
0.3.0
0.3.1
0.3.2
derivepassphrase.git
src
derivepassphrase
exporter
storeroom.py
Add prototype for "storeroom"-type data export
Marco Ricci
commited
12a294f
at 2024-08-03 23:09:08
storeroom.py
Blame
History
Raw
#!/usr/bin/python3 import base64 import glob import json import logging import os import os.path import struct from typing import TypedDict from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding from cryptography.hazmat.primitives.ciphers import algorithms, modes from cryptography.hazmat.primitives.kdf import pbkdf2 STOREROOM_MASTER_KEYS_UUID = b'35b7c7ed-f71e-4adf-9051-02fb0f1e0e17' VAULT_CIPHER_UUID = b'73e69e8a-cb05-4b50-9f42-59d76a511299' IV_SIZE = 16 KEY_SIZE = MAC_SIZE = 32 ENCRYPTED_KEYPAIR_SIZE = 128 VERSION_SIZE = 1 MASTER_KEYS_KEY = ( os.getenv('VAULT_KEY') or os.getenv('LOGNAME') or os.getenv('USER') or os.getenv('USERNAME') ) logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING')) logger = logging.getLogger('derivepassphrase.exporter.vault_storeroom') class KeyPair(TypedDict): encryption_key: bytes signing_key: bytes class MasterKeys(TypedDict): hashing_key: bytes encryption_key: bytes signing_key: bytes def derive_master_keys_keys(password: str | bytes, iterations: int) -> KeyPair: if isinstance(password, str): password = password.encode('ASCII') master_keys_keys_blob = pbkdf2.PBKDF2HMAC( algorithm=hashes.SHA1(), # noqa: S303 length=64, salt=STOREROOM_MASTER_KEYS_UUID, iterations=iterations, ).derive(password) encryption_key, signing_key = struct.unpack( f'{KEY_SIZE}s {KEY_SIZE}s', master_keys_keys_blob ) logger.debug( ( 'derived master_keys_keys bytes.fromhex(%s) (encryption) ' 'and bytes.fromhex(%s) (signing) ' 'from password bytes.fromhex(%s), ' 'using call ' 'pbkdf2(algorithm=%s, length=%d, salt=%s, iterations=%d)' ), repr(encryption_key.hex(' ')), repr(signing_key.hex(' ')), repr(password.hex(' ')), repr('SHA256'), 64, repr(STOREROOM_MASTER_KEYS_UUID), iterations, ) return { 'encryption_key': encryption_key, 'signing_key': signing_key, } def decrypt_master_keys_data(data: bytes, keys: KeyPair) -> MasterKeys: ciphertext, claimed_mac = struct.unpack( f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data ) actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256()) actual_mac.update(ciphertext) logger.debug( ( 'master_keys_data mac_key = bytes.fromhex(%s), ' 'hashed_content = bytes.fromhex(%s), ' 'claimed_mac = bytes.fromhex(%s), ' 'actual_mac = bytes.fromhex(%s)' ), repr(keys['signing_key'].hex(' ')), repr(ciphertext.hex(' ')), repr(claimed_mac.hex(' ')), repr(actual_mac.copy().finalize().hex(' ')), ) actual_mac.verify(claimed_mac) iv, payload = struct.unpack( f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext ) decryptor = ciphers.Cipher( algorithms.AES256(keys['encryption_key']), modes.CBC(iv) ).decryptor() padded_plaintext = bytearray() padded_plaintext.extend(decryptor.update(payload)) padded_plaintext.extend(decryptor.finalize()) unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() plaintext = bytearray() plaintext.extend(unpadder.update(padded_plaintext)) plaintext.extend(unpadder.finalize()) if len(plaintext) != 3 * KEY_SIZE: msg = ( f'Expecting 3 encrypted keys at {3 * KEY_SIZE} bytes total, ' f'but found {len(plaintext)} instead' ) raise RuntimeError(msg) hashing_key, encryption_key, signing_key = struct.unpack( f'{KEY_SIZE}s {KEY_SIZE}s {KEY_SIZE}s', plaintext ) return { 'hashing_key': hashing_key, 'encryption_key': encryption_key, 'signing_key': signing_key, } def decrypt_session_keys(data: bytes, keys: MasterKeys) -> KeyPair: ciphertext, claimed_mac = struct.unpack( f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data ) actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256()) actual_mac.update(ciphertext) logger.debug( ( 'decrypt_bucket_line (session_keys): ' 'mac_key = bytes.fromhex(%s) (master), ' 'hashed_content = bytes.fromhex(%s), ' 'claimed_mac = bytes.fromhex(%s), ' 'actual_mac = bytes.fromhex(%s)' ), repr(keys['signing_key'].hex(' ')), repr(ciphertext.hex(' ')), repr(claimed_mac.hex(' ')), repr(actual_mac.copy().finalize().hex(' ')), ) actual_mac.verify(claimed_mac) iv, payload = struct.unpack( f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext ) decryptor = ciphers.Cipher( algorithms.AES256(keys['encryption_key']), modes.CBC(iv) ).decryptor() padded_plaintext = bytearray() padded_plaintext.extend(decryptor.update(payload)) padded_plaintext.extend(decryptor.finalize()) unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() plaintext = bytearray() plaintext.extend(unpadder.update(padded_plaintext)) plaintext.extend(unpadder.finalize()) session_encryption_key, session_signing_key, inner_payload = struct.unpack( f'{KEY_SIZE}s {KEY_SIZE}s {len(plaintext) - 2 * KEY_SIZE}s', plaintext, ) session_keys: KeyPair = { 'encryption_key': session_encryption_key, 'signing_key': session_signing_key, } logger.debug( ( 'decrypt_bucket_line (session_keys): ' 'decrypt_aes256_cbc_and_unpad(key=bytes.fromhex(%s), ' 'iv=bytes.fromhex(%s))(bytes.fromhex(%s)) ' '= bytes.fromhex(%s) ' '= {"encryption_key": bytes.fromhex(%s), ' '"signing_key": bytes.fromhex(%s)}' ), repr(keys['encryption_key'].hex(' ')), repr(iv.hex(' ')), repr(payload.hex(' ')), repr(plaintext.hex(' ')), repr(session_keys['encryption_key'].hex(' ')), repr(session_keys['signing_key'].hex(' ')), ) if inner_payload: logger.debug( 'ignoring misplaced inner payload bytes.fromhex(%s)', repr(inner_payload.hex(' ')), ) return session_keys def decrypt_contents(data: bytes, keys: KeyPair) -> bytes: ciphertext, claimed_mac = struct.unpack( f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data ) actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256()) actual_mac.update(ciphertext) logger.debug( ( 'decrypt_bucket_line (contents): ' 'mac_key = bytes.fromhex(%s), ' 'hashed_content = bytes.fromhex(%s), ' 'claimed_mac = bytes.fromhex(%s), ' 'actual_mac = bytes.fromhex(%s)' ), repr(keys['signing_key'].hex(' ')), repr(ciphertext.hex(' ')), repr(claimed_mac.hex(' ')), repr(actual_mac.copy().finalize().hex(' ')), ) actual_mac.verify(claimed_mac) iv, payload = struct.unpack( f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext ) decryptor = ciphers.Cipher( algorithms.AES256(keys['encryption_key']), modes.CBC(iv) ).decryptor() padded_plaintext = bytearray() padded_plaintext.extend(decryptor.update(payload)) padded_plaintext.extend(decryptor.finalize()) unpadder = padding.PKCS7(IV_SIZE * 8).unpadder() plaintext = bytearray() plaintext.extend(unpadder.update(padded_plaintext)) plaintext.extend(unpadder.finalize()) logger.debug( ( 'decrypt_bucket_line (contents): ' 'decrypt_aes256_cbc_and_unpad(key=bytes.fromhex(%s), ' 'iv=bytes.fromhex(%s))(bytes.fromhex(%s)) ' '= bytes.fromhex(%s)' ), repr(keys['encryption_key'].hex(' ')), repr(iv.hex(' ')), repr(payload.hex(' ')), repr(plaintext.hex(' ')), ) return plaintext def decrypt_bucket_line(bucket_line: bytes, master_keys: MasterKeys) -> bytes: logger.debug( ( 'decrypt_bucket_line: data = bytes.fromhex(%s), ' 'encryption_key = bytes.fromhex(%s), ' 'signing_key = bytes.fromhex(%s)' ), repr(bucket_line.hex(' ')), repr(master_keys['encryption_key'].hex(' ')), repr(master_keys['signing_key'].hex(' ')), ) data_version, encrypted_session_keys, data_contents = struct.unpack( ( f'B {ENCRYPTED_KEYPAIR_SIZE}s ' f'{len(bucket_line) - 1 - ENCRYPTED_KEYPAIR_SIZE}s' ), bucket_line, ) if data_version != 1: msg = f'Cannot handle version {data_version} encrypted data' raise RuntimeError(msg) session_keys = decrypt_session_keys(encrypted_session_keys, master_keys) return decrypt_contents(data_contents, session_keys) def decrypt_bucket_file(filename: str, master_keys: MasterKeys) -> None: with ( open(filename, 'rb') as bucket_file, open(filename + '.decrypted', 'wb') as decrypted_file, ): header_line = bucket_file.readline() try: header = json.loads(header_line) except ValueError as exc: msg = f'Invalid bucket file: {filename}' raise RuntimeError(msg) from exc if header != {'version': 1}: msg = f'Invalid bucket file: {filename}' raise RuntimeError(msg) from None decrypted_file.write(header_line) for line in bucket_file: decrypted_contents = ( decrypt_bucket_line( base64.standard_b64decode(line), master_keys ).removesuffix(b'\n') + b'\n' ) decrypted_file.write(decrypted_contents) def main() -> None: with open('.keys', encoding='utf-8') as master_keys_file: header = json.loads(master_keys_file.readline()) if header != {'version': 1}: msg = 'bad or unsupported keys version header' raise RuntimeError(msg) raw_keys_data = base64.standard_b64decode(master_keys_file.readline()) encrypted_keys_params, encrypted_keys = struct.unpack( f'B {len(raw_keys_data) - 1}s', raw_keys_data ) if master_keys_file.read(): msg = 'trailing data; cannot make sense of .keys file' raise RuntimeError(msg) encrypted_keys_version = encrypted_keys_params >> 4 if encrypted_keys_version != 1: msg = f'cannot handle version {encrypted_keys_version} encrypted keys' raise RuntimeError(msg) encrypted_keys_iterations = 2 ** (10 + (encrypted_keys_params & 0x0F)) master_keys_keys = derive_master_keys_keys( MASTER_KEYS_KEY, encrypted_keys_iterations ) master_keys = decrypt_master_keys_data(encrypted_keys, master_keys_keys) for file in glob.glob('[01][0-9a-f]'): decrypt_bucket_file(file, master_keys) if __name__ == '__main__': main()