2c2295fea6d47278f346c32dd4a0daf4c2428571
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

1) #!/usr/bin/python3
2) 
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

3) from __future__ import annotations
4) 
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

5) import base64
6) import glob
7) import json
8) import logging
9) import os
10) import os.path
11) import struct
12) from typing import TypedDict
13) 
14) from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding
15) from cryptography.hazmat.primitives.ciphers import algorithms, modes
16) from cryptography.hazmat.primitives.kdf import pbkdf2
17) 
18) STOREROOM_MASTER_KEYS_UUID = b'35b7c7ed-f71e-4adf-9051-02fb0f1e0e17'
19) VAULT_CIPHER_UUID = b'73e69e8a-cb05-4b50-9f42-59d76a511299'
20) IV_SIZE = 16
21) KEY_SIZE = MAC_SIZE = 32
22) ENCRYPTED_KEYPAIR_SIZE = 128
23) VERSION_SIZE = 1
24) MASTER_KEYS_KEY = (
25)     os.getenv('VAULT_KEY')
26)     or os.getenv('LOGNAME')
27)     or os.getenv('USER')
28)     or os.getenv('USERNAME')
29) )
30) 
31) logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
32) logger = logging.getLogger('derivepassphrase.exporter.vault_storeroom')
33) 
34) 
35) class KeyPair(TypedDict):
36)     encryption_key: bytes
37)     signing_key: bytes
38) 
39) 
40) class MasterKeys(TypedDict):
41)     hashing_key: bytes
42)     encryption_key: bytes
43)     signing_key: bytes
44) 
45) 
46) def derive_master_keys_keys(password: str | bytes, iterations: int) -> KeyPair:
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

47)     """Derive encryption and signing keys for the master keys data.
48) 
49)     The master password is run through a key derivation function to
50)     obtain a 64-byte string, which is then split to yield two 32-byte
51)     keys.  The key derivation function is PBKDF2, using HMAC-SHA1 and
52)     salted with the storeroom master keys UUID.
53) 
54)     Args:
55)         password:
56)             A master password for the storeroom instance.  Usually read
57)             from the `VAULT_KEY` environment variable, otherwise
58)             defaults to the username.
59)         iterations:
60)             A count of rounds for the underlying key derivation
61)             function.  Usually stored as a setting next to the encrypted
62)             master keys data.
63) 
64)     Returns:
65)         A 2-tuple of keys, the encryption key and the signing key, to
66)         decrypt and verify the master keys data with.
67) 
68)     """
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

69)     if isinstance(password, str):
70)         password = password.encode('ASCII')
71)     master_keys_keys_blob = pbkdf2.PBKDF2HMAC(
72)         algorithm=hashes.SHA1(),  # noqa: S303
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

73)         length=2 * KEY_SIZE,
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

74)         salt=STOREROOM_MASTER_KEYS_UUID,
75)         iterations=iterations,
76)     ).derive(password)
77)     encryption_key, signing_key = struct.unpack(
78)         f'{KEY_SIZE}s {KEY_SIZE}s', master_keys_keys_blob
79)     )
80)     logger.debug(
81)         (
82)             'derived master_keys_keys bytes.fromhex(%s) (encryption) '
83)             'and bytes.fromhex(%s) (signing) '
84)             'from password bytes.fromhex(%s), '
85)             'using call '
86)             'pbkdf2(algorithm=%s, length=%d, salt=%s, iterations=%d)'
87)         ),
88)         repr(encryption_key.hex(' ')),
89)         repr(signing_key.hex(' ')),
90)         repr(password.hex(' ')),
91)         repr('SHA256'),
92)         64,
93)         repr(STOREROOM_MASTER_KEYS_UUID),
94)         iterations,
95)     )
96)     return {
97)         'encryption_key': encryption_key,
98)         'signing_key': signing_key,
99)     }
100) 
101) 
102) def decrypt_master_keys_data(data: bytes, keys: KeyPair) -> MasterKeys:
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

103)     """Decrypt the master keys data.
104) 
105)     The master keys data contains:
106) 
107)     - a 16-byte IV,
108)     - a 96-byte AES256-CBC-encrypted payload (using PKCS7 padding on the
109)       inside), and
110)     - a 32-byte MAC of the preceding 112 bytes.
111) 
112)     The decrypted payload itself consists of three 32-byte keys: the
113)     hashing, encryption and signing keys, in that order.
114) 
115)     The encrypted payload is encrypted with the encryption key, and the
116)     MAC is created based on the signing key.  As per standard
117)     cryptographic procedure, the MAC can be verified before attempting
118)     to decrypt the payload.
119) 
120)     Because the payload size is both fixed and a multiple of the
121)     cipher blocksize, in this case, the PKCS7 padding is a no-op.
122) 
123)     Args:
124)         data:
125)             The encrypted master keys data.
126)         keys:
127)             The encryption and signing keys for the master keys data.
128)             These should have previously been derived via the
129)             [`derivepassphrase.exporter.storeroom.derive_master_keys_keys`][]
130)             function.
131) 
132)     Returns:
133)         The master encryption, signing and hashing keys.
134) 
135)     """
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

136)     ciphertext, claimed_mac = struct.unpack(
137)         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
138)     )
139)     actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256())
140)     actual_mac.update(ciphertext)
141)     logger.debug(
142)         (
143)             'master_keys_data mac_key = bytes.fromhex(%s), '
144)             'hashed_content = bytes.fromhex(%s), '
145)             'claimed_mac = bytes.fromhex(%s), '
146)             'actual_mac = bytes.fromhex(%s)'
147)         ),
148)         repr(keys['signing_key'].hex(' ')),
149)         repr(ciphertext.hex(' ')),
150)         repr(claimed_mac.hex(' ')),
151)         repr(actual_mac.copy().finalize().hex(' ')),
152)     )
153)     actual_mac.verify(claimed_mac)
154) 
155)     iv, payload = struct.unpack(
156)         f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
157)     )
158)     decryptor = ciphers.Cipher(
159)         algorithms.AES256(keys['encryption_key']), modes.CBC(iv)
160)     ).decryptor()
161)     padded_plaintext = bytearray()
162)     padded_plaintext.extend(decryptor.update(payload))
163)     padded_plaintext.extend(decryptor.finalize())
164)     unpadder = padding.PKCS7(IV_SIZE * 8).unpadder()
165)     plaintext = bytearray()
166)     plaintext.extend(unpadder.update(padded_plaintext))
167)     plaintext.extend(unpadder.finalize())
168)     if len(plaintext) != 3 * KEY_SIZE:
169)         msg = (
170)             f'Expecting 3 encrypted keys at {3 * KEY_SIZE} bytes total, '
171)             f'but found {len(plaintext)} instead'
172)         )
173)         raise RuntimeError(msg)
174)     hashing_key, encryption_key, signing_key = struct.unpack(
175)         f'{KEY_SIZE}s {KEY_SIZE}s {KEY_SIZE}s', plaintext
176)     )
177)     return {
178)         'hashing_key': hashing_key,
179)         'encryption_key': encryption_key,
180)         'signing_key': signing_key,
181)     }
182) 
183) 
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

184) def decrypt_session_keys(data: bytes, master_keys: MasterKeys) -> KeyPair:
185)     """Decrypt the bucket item's session keys.
186) 
187)     The bucket item's session keys are single-use keys for encrypting
188)     and signing a single item in the storage bucket.  The encrypted
189)     session key data consists of:
190) 
191)     - a 16-byte IV,
192)     - a 64-byte AES256-CBC-encrypted payload (using PKCS7 padding on the
193)       inside), and
194)     - a 32-byte MAC of the preceding 80 bytes.
195) 
196)     The encrypted payload is encrypted with the master encryption key,
197)     and the MAC is created with the master signing key.  As per standard
198)     cryptographic procedure, the MAC can be verified before attempting
199)     to decrypt the payload.
200) 
201)     Because the payload size is both fixed and a multiple of the
202)     cipher blocksize, in this case, the PKCS7 padding is a no-op.
203) 
204)     Args:
205)         data:
206)             The encrypted bucket item session key data.
207)         master_keys:
208)             The master keys.  Presumably these have previously been
209)             obtained via the
210)             [`derivepassphrase.exporter.storeroom.decrypt_master_keys_data`][]
211)             function.
212) 
213)     Returns:
214)         The bucket item's encryption and signing keys.
215) 
216)     """
217) 
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

218)     ciphertext, claimed_mac = struct.unpack(
219)         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
220)     )
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

221)     actual_mac = hmac.HMAC(master_keys['signing_key'], hashes.SHA256())
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

222)     actual_mac.update(ciphertext)
223)     logger.debug(
224)         (
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

225)             'decrypt_bucket_item (session_keys): '
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

226)             'mac_key = bytes.fromhex(%s) (master), '
227)             'hashed_content = bytes.fromhex(%s), '
228)             'claimed_mac = bytes.fromhex(%s), '
229)             'actual_mac = bytes.fromhex(%s)'
230)         ),
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

231)         repr(master_keys['signing_key'].hex(' ')),
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

232)         repr(ciphertext.hex(' ')),
233)         repr(claimed_mac.hex(' ')),
234)         repr(actual_mac.copy().finalize().hex(' ')),
235)     )
236)     actual_mac.verify(claimed_mac)
237) 
238)     iv, payload = struct.unpack(
239)         f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
240)     )
241)     decryptor = ciphers.Cipher(
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

242)         algorithms.AES256(master_keys['encryption_key']), modes.CBC(iv)
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

243)     ).decryptor()
244)     padded_plaintext = bytearray()
245)     padded_plaintext.extend(decryptor.update(payload))
246)     padded_plaintext.extend(decryptor.finalize())
247)     unpadder = padding.PKCS7(IV_SIZE * 8).unpadder()
248)     plaintext = bytearray()
249)     plaintext.extend(unpadder.update(padded_plaintext))
250)     plaintext.extend(unpadder.finalize())
251) 
252)     session_encryption_key, session_signing_key, inner_payload = struct.unpack(
253)         f'{KEY_SIZE}s {KEY_SIZE}s {len(plaintext) - 2 * KEY_SIZE}s',
254)         plaintext,
255)     )
256)     session_keys: KeyPair = {
257)         'encryption_key': session_encryption_key,
258)         'signing_key': session_signing_key,
259)     }
260) 
261)     logger.debug(
262)         (
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

263)             'decrypt_bucket_item (session_keys): '
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

264)             'decrypt_aes256_cbc_and_unpad(key=bytes.fromhex(%s), '
265)             'iv=bytes.fromhex(%s))(bytes.fromhex(%s)) '
266)             '= bytes.fromhex(%s) '
267)             '= {"encryption_key": bytes.fromhex(%s), '
268)             '"signing_key": bytes.fromhex(%s)}'
269)         ),
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

270)         repr(master_keys['encryption_key'].hex(' ')),
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

271)         repr(iv.hex(' ')),
272)         repr(payload.hex(' ')),
273)         repr(plaintext.hex(' ')),
274)         repr(session_keys['encryption_key'].hex(' ')),
275)         repr(session_keys['signing_key'].hex(' ')),
276)     )
277) 
278)     if inner_payload:
279)         logger.debug(
280)             'ignoring misplaced inner payload bytes.fromhex(%s)',
281)             repr(inner_payload.hex(' ')),
282)         )
283) 
284)     return session_keys
285) 
286) 
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

287) def decrypt_contents(data: bytes, session_keys: KeyPair) -> bytes:
288)     """Decrypt the bucket item's contents.
289) 
290)     The data consists of:
291) 
292)     - a 16-byte IV,
293)     - a variable-sized AES256-CBC-encrypted payload (using PKCS7 padding
294)       on the inside), and
295)     - a 32-byte MAC of the preceding 80 bytes.
296) 
297)     The encrypted payload is encrypted with the bucket item's session
298)     encryption key, and the MAC is created with the bucket item's
299)     session signing key.  As per standard cryptographic procedure, the
300)     MAC can be verified before attempting to decrypt the payload.
301) 
302)     Args:
303)         data:
304)             The encrypted bucket item payload data.
305)         session_keys:
306)             The bucket item's session keys.  Presumably these have
307)             previously been obtained via the
308)             [`derivepassphrase.exporter.storeroom.decrypt_session_keys`][]
309)             function.
310) 
311)     Returns:
312)         The bucket item's payload.
313) 
314)     """
315) 
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

316)     ciphertext, claimed_mac = struct.unpack(
317)         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
318)     )
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

319)     actual_mac = hmac.HMAC(session_keys['signing_key'], hashes.SHA256())
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

320)     actual_mac.update(ciphertext)
321)     logger.debug(
322)         (
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

323)             'decrypt_bucket_item (contents): '
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

324)             'mac_key = bytes.fromhex(%s), '
325)             'hashed_content = bytes.fromhex(%s), '
326)             'claimed_mac = bytes.fromhex(%s), '
327)             'actual_mac = bytes.fromhex(%s)'
328)         ),
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

329)         repr(session_keys['signing_key'].hex(' ')),
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

330)         repr(ciphertext.hex(' ')),
331)         repr(claimed_mac.hex(' ')),
332)         repr(actual_mac.copy().finalize().hex(' ')),
333)     )
334)     actual_mac.verify(claimed_mac)
335) 
336)     iv, payload = struct.unpack(
337)         f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
338)     )
339)     decryptor = ciphers.Cipher(
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

340)         algorithms.AES256(session_keys['encryption_key']), modes.CBC(iv)
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

341)     ).decryptor()
342)     padded_plaintext = bytearray()
343)     padded_plaintext.extend(decryptor.update(payload))
344)     padded_plaintext.extend(decryptor.finalize())
345)     unpadder = padding.PKCS7(IV_SIZE * 8).unpadder()
346)     plaintext = bytearray()
347)     plaintext.extend(unpadder.update(padded_plaintext))
348)     plaintext.extend(unpadder.finalize())
349) 
350)     logger.debug(
351)         (
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

352)             'decrypt_bucket_item (contents): '
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

353)             'decrypt_aes256_cbc_and_unpad(key=bytes.fromhex(%s), '
354)             'iv=bytes.fromhex(%s))(bytes.fromhex(%s)) '
355)             '= bytes.fromhex(%s)'
356)         ),
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

357)         repr(session_keys['encryption_key'].hex(' ')),
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

358)         repr(iv.hex(' ')),
359)         repr(payload.hex(' ')),
360)         repr(plaintext.hex(' ')),
361)     )
362) 
363)     return plaintext
364) 
365) 
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

366) def decrypt_bucket_item(bucket_item: bytes, master_keys: MasterKeys) -> bytes:
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

367)     logger.debug(
368)         (
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

369)             'decrypt_bucket_item: data = bytes.fromhex(%s), '
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

370)             'encryption_key = bytes.fromhex(%s), '
371)             'signing_key = bytes.fromhex(%s)'
372)         ),
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

373)         repr(bucket_item.hex(' ')),
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

374)         repr(master_keys['encryption_key'].hex(' ')),
375)         repr(master_keys['signing_key'].hex(' ')),
376)     )
377)     data_version, encrypted_session_keys, data_contents = struct.unpack(
378)         (
379)             f'B {ENCRYPTED_KEYPAIR_SIZE}s '
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

380)             f'{len(bucket_item) - 1 - ENCRYPTED_KEYPAIR_SIZE}s'
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

381)         ),
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

382)         bucket_item,
Marco Ricci Add prototype for "storeroo...

Marco Ricci authored 1 month ago

383)     )
384)     if data_version != 1:
385)         msg = f'Cannot handle version {data_version} encrypted data'
386)         raise RuntimeError(msg)
387)     session_keys = decrypt_session_keys(encrypted_session_keys, master_keys)
388)     return decrypt_contents(data_contents, session_keys)
389) 
390) 
391) def decrypt_bucket_file(filename: str, master_keys: MasterKeys) -> None:
392)     with (
393)         open(filename, 'rb') as bucket_file,
394)         open(filename + '.decrypted', 'wb') as decrypted_file,
395)     ):
396)         header_line = bucket_file.readline()
397)         try:
398)             header = json.loads(header_line)
399)         except ValueError as exc:
400)             msg = f'Invalid bucket file: {filename}'
401)             raise RuntimeError(msg) from exc
402)         if header != {'version': 1}:
403)             msg = f'Invalid bucket file: {filename}'
404)             raise RuntimeError(msg) from None
405)         decrypted_file.write(header_line)
406)         for line in bucket_file:
407)             decrypted_contents = (
Marco Ricci Add docstrings and better v...

Marco Ricci authored 1 month ago

408)                 decrypt_bucket_item(