Add prototype for "storeroom"-type data export
Marco Ricci

Marco Ricci commited on 2024-08-03 23:09:08
Zeige 2 geänderte Dateien mit 326 Einfügungen und 0 Löschungen.

... ...
@@ -0,0 +1,326 @@
1
+#!/usr/bin/python3
2
+
3
+import base64
4
+import glob
5
+import json
6
+import logging
7
+import os
8
+import os.path
9
+import struct
10
+from typing import TypedDict
11
+
12
+from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding
13
+from cryptography.hazmat.primitives.ciphers import algorithms, modes
14
+from cryptography.hazmat.primitives.kdf import pbkdf2
15
+
16
+STOREROOM_MASTER_KEYS_UUID = b'35b7c7ed-f71e-4adf-9051-02fb0f1e0e17'
17
+VAULT_CIPHER_UUID = b'73e69e8a-cb05-4b50-9f42-59d76a511299'
18
+IV_SIZE = 16
19
+KEY_SIZE = MAC_SIZE = 32
20
+ENCRYPTED_KEYPAIR_SIZE = 128
21
+VERSION_SIZE = 1
22
+MASTER_KEYS_KEY = (
23
+    os.getenv('VAULT_KEY')
24
+    or os.getenv('LOGNAME')
25
+    or os.getenv('USER')
26
+    or os.getenv('USERNAME')
27
+)
28
+
29
+logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
30
+logger = logging.getLogger('derivepassphrase.exporter.vault_storeroom')
31
+
32
+
33
+class KeyPair(TypedDict):
34
+    encryption_key: bytes
35
+    signing_key: bytes
36
+
37
+
38
+class MasterKeys(TypedDict):
39
+    hashing_key: bytes
40
+    encryption_key: bytes
41
+    signing_key: bytes
42
+
43
+
44
+def derive_master_keys_keys(password: str | bytes, iterations: int) -> KeyPair:
45
+    if isinstance(password, str):
46
+        password = password.encode('ASCII')
47
+    master_keys_keys_blob = pbkdf2.PBKDF2HMAC(
48
+        algorithm=hashes.SHA1(),  # noqa: S303
49
+        length=64,
50
+        salt=STOREROOM_MASTER_KEYS_UUID,
51
+        iterations=iterations,
52
+    ).derive(password)
53
+    encryption_key, signing_key = struct.unpack(
54
+        f'{KEY_SIZE}s {KEY_SIZE}s', master_keys_keys_blob
55
+    )
56
+    logger.debug(
57
+        (
58
+            'derived master_keys_keys bytes.fromhex(%s) (encryption) '
59
+            'and bytes.fromhex(%s) (signing) '
60
+            'from password bytes.fromhex(%s), '
61
+            'using call '
62
+            'pbkdf2(algorithm=%s, length=%d, salt=%s, iterations=%d)'
63
+        ),
64
+        repr(encryption_key.hex(' ')),
65
+        repr(signing_key.hex(' ')),
66
+        repr(password.hex(' ')),
67
+        repr('SHA256'),
68
+        64,
69
+        repr(STOREROOM_MASTER_KEYS_UUID),
70
+        iterations,
71
+    )
72
+    return {
73
+        'encryption_key': encryption_key,
74
+        'signing_key': signing_key,
75
+    }
76
+
77
+
78
+def decrypt_master_keys_data(data: bytes, keys: KeyPair) -> MasterKeys:
79
+    ciphertext, claimed_mac = struct.unpack(
80
+        f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
81
+    )
82
+    actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256())
83
+    actual_mac.update(ciphertext)
84
+    logger.debug(
85
+        (
86
+            'master_keys_data mac_key = bytes.fromhex(%s), '
87
+            'hashed_content = bytes.fromhex(%s), '
88
+            'claimed_mac = bytes.fromhex(%s), '
89
+            'actual_mac = bytes.fromhex(%s)'
90
+        ),
91
+        repr(keys['signing_key'].hex(' ')),
92
+        repr(ciphertext.hex(' ')),
93
+        repr(claimed_mac.hex(' ')),
94
+        repr(actual_mac.copy().finalize().hex(' ')),
95
+    )
96
+    actual_mac.verify(claimed_mac)
97
+
98
+    iv, payload = struct.unpack(
99
+        f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
100
+    )
101
+    decryptor = ciphers.Cipher(
102
+        algorithms.AES256(keys['encryption_key']), modes.CBC(iv)
103
+    ).decryptor()
104
+    padded_plaintext = bytearray()
105
+    padded_plaintext.extend(decryptor.update(payload))
106
+    padded_plaintext.extend(decryptor.finalize())
107
+    unpadder = padding.PKCS7(IV_SIZE * 8).unpadder()
108
+    plaintext = bytearray()
109
+    plaintext.extend(unpadder.update(padded_plaintext))
110
+    plaintext.extend(unpadder.finalize())
111
+    if len(plaintext) != 3 * KEY_SIZE:
112
+        msg = (
113
+            f'Expecting 3 encrypted keys at {3 * KEY_SIZE} bytes total, '
114
+            f'but found {len(plaintext)} instead'
115
+        )
116
+        raise RuntimeError(msg)
117
+    hashing_key, encryption_key, signing_key = struct.unpack(
118
+        f'{KEY_SIZE}s {KEY_SIZE}s {KEY_SIZE}s', plaintext
119
+    )
120
+    return {
121
+        'hashing_key': hashing_key,
122
+        'encryption_key': encryption_key,
123
+        'signing_key': signing_key,
124
+    }
125
+
126
+
127
+def decrypt_session_keys(data: bytes, keys: MasterKeys) -> KeyPair:
128
+    ciphertext, claimed_mac = struct.unpack(
129
+        f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
130
+    )
131
+    actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256())
132
+    actual_mac.update(ciphertext)
133
+    logger.debug(
134
+        (
135
+            'decrypt_bucket_line (session_keys): '
136
+            'mac_key = bytes.fromhex(%s) (master), '
137
+            'hashed_content = bytes.fromhex(%s), '
138
+            'claimed_mac = bytes.fromhex(%s), '
139
+            'actual_mac = bytes.fromhex(%s)'
140
+        ),
141
+        repr(keys['signing_key'].hex(' ')),
142
+        repr(ciphertext.hex(' ')),
143
+        repr(claimed_mac.hex(' ')),
144
+        repr(actual_mac.copy().finalize().hex(' ')),
145
+    )
146
+    actual_mac.verify(claimed_mac)
147
+
148
+    iv, payload = struct.unpack(
149
+        f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
150
+    )
151
+    decryptor = ciphers.Cipher(
152
+        algorithms.AES256(keys['encryption_key']), modes.CBC(iv)
153
+    ).decryptor()
154
+    padded_plaintext = bytearray()
155
+    padded_plaintext.extend(decryptor.update(payload))
156
+    padded_plaintext.extend(decryptor.finalize())
157
+    unpadder = padding.PKCS7(IV_SIZE * 8).unpadder()
158
+    plaintext = bytearray()
159
+    plaintext.extend(unpadder.update(padded_plaintext))
160
+    plaintext.extend(unpadder.finalize())
161
+
162
+    session_encryption_key, session_signing_key, inner_payload = struct.unpack(
163
+        f'{KEY_SIZE}s {KEY_SIZE}s {len(plaintext) - 2 * KEY_SIZE}s',
164
+        plaintext,
165
+    )
166
+    session_keys: KeyPair = {
167
+        'encryption_key': session_encryption_key,
168
+        'signing_key': session_signing_key,
169
+    }
170
+
171
+    logger.debug(
172
+        (
173
+            'decrypt_bucket_line (session_keys): '
174
+            'decrypt_aes256_cbc_and_unpad(key=bytes.fromhex(%s), '
175
+            'iv=bytes.fromhex(%s))(bytes.fromhex(%s)) '
176
+            '= bytes.fromhex(%s) '
177
+            '= {"encryption_key": bytes.fromhex(%s), '
178
+            '"signing_key": bytes.fromhex(%s)}'
179
+        ),
180
+        repr(keys['encryption_key'].hex(' ')),
181
+        repr(iv.hex(' ')),
182
+        repr(payload.hex(' ')),
183
+        repr(plaintext.hex(' ')),
184
+        repr(session_keys['encryption_key'].hex(' ')),
185
+        repr(session_keys['signing_key'].hex(' ')),
186
+    )
187
+
188
+    if inner_payload:
189
+        logger.debug(
190
+            'ignoring misplaced inner payload bytes.fromhex(%s)',
191
+            repr(inner_payload.hex(' ')),
192
+        )
193
+
194
+    return session_keys
195
+
196
+
197
+def decrypt_contents(data: bytes, keys: KeyPair) -> bytes:
198
+    ciphertext, claimed_mac = struct.unpack(
199
+        f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
200
+    )
201
+    actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256())
202
+    actual_mac.update(ciphertext)
203
+    logger.debug(
204
+        (
205
+            'decrypt_bucket_line (contents): '
206
+            'mac_key = bytes.fromhex(%s), '
207
+            'hashed_content = bytes.fromhex(%s), '
208
+            'claimed_mac = bytes.fromhex(%s), '
209
+            'actual_mac = bytes.fromhex(%s)'
210
+        ),
211
+        repr(keys['signing_key'].hex(' ')),
212
+        repr(ciphertext.hex(' ')),
213
+        repr(claimed_mac.hex(' ')),
214
+        repr(actual_mac.copy().finalize().hex(' ')),
215
+    )
216
+    actual_mac.verify(claimed_mac)
217
+
218
+    iv, payload = struct.unpack(
219
+        f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
220
+    )
221
+    decryptor = ciphers.Cipher(
222
+        algorithms.AES256(keys['encryption_key']), modes.CBC(iv)
223
+    ).decryptor()
224
+    padded_plaintext = bytearray()
225
+    padded_plaintext.extend(decryptor.update(payload))
226
+    padded_plaintext.extend(decryptor.finalize())
227
+    unpadder = padding.PKCS7(IV_SIZE * 8).unpadder()
228
+    plaintext = bytearray()
229
+    plaintext.extend(unpadder.update(padded_plaintext))
230
+    plaintext.extend(unpadder.finalize())
231
+
232
+    logger.debug(
233
+        (
234
+            'decrypt_bucket_line (contents): '
235
+            'decrypt_aes256_cbc_and_unpad(key=bytes.fromhex(%s), '
236
+            'iv=bytes.fromhex(%s))(bytes.fromhex(%s)) '
237
+            '= bytes.fromhex(%s)'
238
+        ),
239
+        repr(keys['encryption_key'].hex(' ')),
240
+        repr(iv.hex(' ')),
241
+        repr(payload.hex(' ')),
242
+        repr(plaintext.hex(' ')),
243
+    )
244
+
245
+    return plaintext
246
+
247
+
248
+def decrypt_bucket_line(bucket_line: bytes, master_keys: MasterKeys) -> bytes:
249
+    logger.debug(
250
+        (
251
+            'decrypt_bucket_line: data = bytes.fromhex(%s), '
252
+            'encryption_key = bytes.fromhex(%s), '
253
+            'signing_key = bytes.fromhex(%s)'
254
+        ),
255
+        repr(bucket_line.hex(' ')),
256
+        repr(master_keys['encryption_key'].hex(' ')),
257
+        repr(master_keys['signing_key'].hex(' ')),
258
+    )
259
+    data_version, encrypted_session_keys, data_contents = struct.unpack(
260
+        (
261
+            f'B {ENCRYPTED_KEYPAIR_SIZE}s '
262
+            f'{len(bucket_line) - 1 - ENCRYPTED_KEYPAIR_SIZE}s'
263
+        ),
264
+        bucket_line,
265
+    )
266
+    if data_version != 1:
267
+        msg = f'Cannot handle version {data_version} encrypted data'
268
+        raise RuntimeError(msg)
269
+    session_keys = decrypt_session_keys(encrypted_session_keys, master_keys)
270
+    return decrypt_contents(data_contents, session_keys)
271
+
272
+
273
+def decrypt_bucket_file(filename: str, master_keys: MasterKeys) -> None:
274
+    with (
275
+        open(filename, 'rb') as bucket_file,
276
+        open(filename + '.decrypted', 'wb') as decrypted_file,
277
+    ):
278
+        header_line = bucket_file.readline()
279
+        try:
280
+            header = json.loads(header_line)
281
+        except ValueError as exc:
282
+            msg = f'Invalid bucket file: {filename}'
283
+            raise RuntimeError(msg) from exc
284
+        if header != {'version': 1}:
285
+            msg = f'Invalid bucket file: {filename}'
286
+            raise RuntimeError(msg) from None
287
+        decrypted_file.write(header_line)
288
+        for line in bucket_file:
289
+            decrypted_contents = (
290
+                decrypt_bucket_line(
291
+                    base64.standard_b64decode(line), master_keys
292
+                ).removesuffix(b'\n')
293
+                + b'\n'
294
+            )
295
+            decrypted_file.write(decrypted_contents)
296
+
297
+
298
+def main() -> None:
299
+    with open('.keys', encoding='utf-8') as master_keys_file:
300
+        header = json.loads(master_keys_file.readline())
301
+        if header != {'version': 1}:
302
+            msg = 'bad or unsupported keys version header'
303
+            raise RuntimeError(msg)
304
+        raw_keys_data = base64.standard_b64decode(master_keys_file.readline())
305
+        encrypted_keys_params, encrypted_keys = struct.unpack(
306
+            f'B {len(raw_keys_data) - 1}s', raw_keys_data
307
+        )
308
+        if master_keys_file.read():
309
+            msg = 'trailing data; cannot make sense of .keys file'
310
+            raise RuntimeError(msg)
311
+    encrypted_keys_version = encrypted_keys_params >> 4
312
+    if encrypted_keys_version != 1:
313
+        msg = f'cannot handle version {encrypted_keys_version} encrypted keys'
314
+        raise RuntimeError(msg)
315
+    encrypted_keys_iterations = 2 ** (10 + (encrypted_keys_params & 0x0F))
316
+    master_keys_keys = derive_master_keys_keys(
317
+        MASTER_KEYS_KEY, encrypted_keys_iterations
318
+    )
319
+    master_keys = decrypt_master_keys_data(encrypted_keys, master_keys_keys)
320
+
321
+    for file in glob.glob('[01][0-9a-f]'):
322
+        decrypt_bucket_file(file, master_keys)
323
+
324
+
325
+if __name__ == '__main__':
326
+    main()
0 327