Add prototype for "vault v0.2/v0.3"-type data export
Marco Ricci

Marco Ricci commited on 2024-08-18 00:45:19
Zeige 1 geänderte Dateien mit 273 Einfügungen und 0 Löschungen.


Based on a rather direct translation of vault's "legacy" configuration
migrator, for storeroom-enabled builds of vault.
... ...
@@ -0,0 +1,273 @@
1
+#!/usr/bin/python3
2
+
3
+from __future__ import annotations
4
+
5
+import abc
6
+import base64
7
+import json
8
+import logging
9
+import warnings
10
+from typing import Any
11
+
12
+from cryptography import exceptions as crypt_exceptions
13
+from cryptography import utils as crypt_utils
14
+from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding
15
+from cryptography.hazmat.primitives.ciphers import algorithms, modes
16
+from cryptography.hazmat.primitives.kdf import pbkdf2
17
+
18
+from derivepassphrase import vault
19
+
20
+logger = logging.getLogger(__name__)
21
+
22
+
23
+def _h(bs: bytes | bytearray) -> str:
24
+    return 'bytes.fromhex({!r})'.format(bs.hex(' '))
25
+
26
+
27
+class Reader(abc.ABC):
28
+    def __init__(
29
+        self, contents: bytes | bytearray, password: str | bytes | bytearray
30
+    ) -> None:
31
+        if not password:
32
+            msg = 'No password given; check VAULT_KEY environment variable'
33
+            raise ValueError(msg)
34
+        self.contents = contents
35
+        self.password = password
36
+        self.iv_size = 0
37
+        self.mac_size = 0
38
+        self.encryption_key = b''
39
+        self.encryption_key_size = 0
40
+        self.signing_key = b''
41
+        self.signing_key_size = 0
42
+
43
+    def run(self) -> Any:
44
+        self._parse_contents()
45
+        self._derive_keys()
46
+        self._check_signature()
47
+        self._decrypt_payload()
48
+        return self._data
49
+
50
+    @staticmethod
51
+    def pbkdf2(
52
+        password: str | bytes | bytearray, key_size: int, iterations: int
53
+    ) -> bytes:
54
+        if isinstance(password, str):
55
+            password = password.encode('utf-8')
56
+        raw_key = pbkdf2.PBKDF2HMAC(
57
+            algorithm=hashes.SHA1(),  # noqa: S303
58
+            length=key_size // 2,
59
+            salt=vault.Vault._UUID,  # noqa: SLF001
60
+            iterations=iterations,
61
+        ).derive(password)
62
+        logger.debug(
63
+            'binary = pbkdf2(%s, %s, %s, %s, %s) = %s -> %s',
64
+            repr(password),
65
+            repr(vault.Vault._UUID),  # noqa: SLF001
66
+            iterations,
67
+            key_size // 2,
68
+            repr('sha1'),
69
+            _h(raw_key),
70
+            _h(raw_key.hex().lower().encode('ASCII')),
71
+        )
72
+        return raw_key.hex().lower().encode('ASCII')
73
+
74
+    def _parse_contents(self) -> None:
75
+        logger.info('Parsing IV, payload and signature from the file contents')
76
+
77
+        if len(self.contents) < self.iv_size + 16 + self.mac_size:
78
+            msg = 'File contents are too small to parse'
79
+            raise ValueError(msg)
80
+
81
+        cutpos1 = self.iv_size
82
+        cutpos2 = len(self.contents) - self.mac_size
83
+
84
+        self.message = self.contents[:cutpos2]
85
+        self.message_tag = self.contents[cutpos2:]
86
+        self.iv = self.message[:cutpos1]
87
+        self.payload = self.message[cutpos1:]
88
+
89
+        logger.debug(
90
+            'buffer %s = [[%s, %s], %s]',
91
+            _h(self.contents),
92
+            _h(self.iv),
93
+            _h(self.payload),
94
+            _h(self.message_tag),
95
+        )
96
+
97
+    def _derive_keys(self) -> None:
98
+        logger.info('Deriving an encryption and signing key')
99
+        self._generate_keys()
100
+        assert (
101
+            len(self.encryption_key) == self.encryption_key_size
102
+        ), 'Derived encryption key is not valid'
103
+        assert (
104
+            len(self.signing_key) == self.signing_key_size
105
+        ), 'Derived signing key is not valid'
106
+
107
+    @abc.abstractmethod
108
+    def _generate_keys(self) -> None:
109
+        raise AssertionError
110
+
111
+    def _check_signature(self) -> None:
112
+        logger.info('Checking HMAC signature')
113
+        mac = hmac.HMAC(self.signing_key, hashes.SHA256())
114
+        mac_input = self._hmac_input()
115
+        logger.debug(
116
+            'mac_input = %s, expected_tag = %s',
117
+            _h(mac_input),
118
+            _h(self.message_tag),
119
+        )
120
+        mac.update(mac_input)
121
+        try:
122
+            mac.verify(self.message_tag)
123
+        except crypt_exceptions.InvalidSignature:
124
+            msg = 'File does not contain a valid HMAC-SHA256 signature'
125
+            raise ValueError(msg) from None
126
+
127
+    @abc.abstractmethod
128
+    def _hmac_input(self) -> bytes:
129
+        raise AssertionError
130
+
131
+    def _decrypt_payload(self) -> None:
132
+        decryptor = self._make_decryptor()
133
+        padded_plaintext = bytearray()
134
+        padded_plaintext.extend(decryptor.update(self.payload))
135
+        padded_plaintext.extend(decryptor.finalize())
136
+        logger.debug('padded plaintext = %s', _h(padded_plaintext))
137
+        unpadder = padding.PKCS7(self.iv_size * 8).unpadder()
138
+        plaintext = bytearray()
139
+        plaintext.extend(unpadder.update(padded_plaintext))
140
+        plaintext.extend(unpadder.finalize())
141
+        logger.debug('plaintext = %s', _h(plaintext))
142
+        self._data = json.loads(plaintext)
143
+
144
+    @abc.abstractmethod
145
+    def _make_decryptor(self) -> ciphers.CipherContext:
146
+        raise AssertionError
147
+
148
+
149
+class V03Reader(Reader):
150
+    KEY_SIZE = 32
151
+
152
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
153
+        super().__init__(*args, **kwargs)
154
+        self.iv_size = 16
155
+        self.mac_size = 32
156
+
157
+    def run(self) -> Any:
158
+        logger.info('Attempting to parse as v0.3 configuration')
159
+        return super().run()
160
+
161
+    def _generate_keys(self) -> None:
162
+        self.encryption_key = self.pbkdf2(self.password, self.KEY_SIZE, 100)
163
+        self.signing_key = self.pbkdf2(self.password, self.KEY_SIZE, 200)
164
+        self.encryption_key_size = self.signing_key_size = self.KEY_SIZE
165
+
166
+    def _hmac_input(self) -> bytes:
167
+        return self.message.hex().lower().encode('ASCII')
168
+
169
+    def _make_decryptor(self) -> ciphers.CipherContext:
170
+        return ciphers.Cipher(
171
+            algorithms.AES256(self.encryption_key), modes.CBC(self.iv)
172
+        ).decryptor()
173
+
174
+
175
+class V02Reader(Reader):
176
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
177
+        super().__init__(*args, **kwargs)
178
+        self.iv_size = 16
179
+        self.mac_size = 64
180
+
181
+    def run(self) -> Any:
182
+        logger.info('Attempting to parse as v0.2 configuration')
183
+        return super().run()
184
+
185
+    def _parse_contents(self) -> None:
186
+        super()._parse_contents()
187
+        logger.debug('Decoding payload (base64) and message tag (hex)')
188
+        self.payload = base64.standard_b64decode(self.payload)
189
+        self.message_tag = bytes.fromhex(self.message_tag.decode('ASCII'))
190
+
191
+    def _generate_keys(self) -> None:
192
+        self.encryption_key = self.pbkdf2(self.password, 8, 16)
193
+        self.signing_key = self.pbkdf2(self.password, 16, 16)
194
+        self.encryption_key_size = 8
195
+        self.signing_key_size = 16
196
+
197
+    def _hmac_input(self) -> bytes:
198
+        return base64.standard_b64encode(self.message)
199
+
200
+    def _make_decryptor(self) -> ciphers.CipherContext:
201
+        def evp_bytestokey_md5_one_iteration(
202
+            data: bytes, salt: bytes | None, key_size: int, iv_size: int
203
+        ) -> tuple[bytes, bytes]:
204
+            total_size = key_size + iv_size
205
+            buffer = bytearray()
206
+            last_block = b''
207
+            if salt is None:
208
+                salt = b''
209
+            logging.debug(
210
+                (
211
+                    'data = %s, salt = %s, key_size = %s, iv_size = %s, '
212
+                    'buffer length = %s, buffer = %s'
213
+                ),
214
+                _h(data),
215
+                _h(salt),
216
+                key_size,
217
+                iv_size,
218
+                len(buffer),
219
+                _h(buffer),
220
+            )
221
+            while len(buffer) < total_size:
222
+                with warnings.catch_warnings():
223
+                    warnings.simplefilter(
224
+                        'ignore', crypt_utils.CryptographyDeprecationWarning
225
+                    )
226
+                    block = hashes.Hash(hashes.MD5())  # noqa: S303
227
+                block.update(last_block)
228
+                block.update(data)
229
+                block.update(salt)
230
+                last_block = block.finalize()
231
+                buffer.extend(last_block)
232
+                logging.debug(
233
+                    'buffer length = %s, buffer = %s', len(buffer), _h(buffer)
234
+                )
235
+            logging.debug(
236
+                'encryption_key = %s, iv = %s',
237
+                _h(buffer[:key_size]),
238
+                _h(buffer[key_size:total_size]),
239
+            )
240
+            return bytes(buffer[:key_size]), bytes(buffer[key_size:total_size])
241
+
242
+        data = base64.standard_b64encode(self.iv + self.encryption_key)
243
+        encryption_key, iv = evp_bytestokey_md5_one_iteration(
244
+            data, salt=None, key_size=32, iv_size=16
245
+        )
246
+        return ciphers.Cipher(
247
+            algorithms.AES256(encryption_key), modes.CBC(iv)
248
+        ).decryptor()
249
+
250
+
251
+if __name__ == '__main__':
252
+    import os
253
+
254
+    logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
255
+    with open(
256
+        os.path.join(
257
+            os.path.expanduser('~'), os.getenv('VAULT_PATH', '.vault')
258
+        ),
259
+        'rb',
260
+    ) as infile:
261
+        contents = base64.standard_b64decode(infile.read())
262
+    password = (
263
+        os.getenv('VAULT_KEY')
264
+        or os.getenv('LOGNAME')
265
+        or os.getenv('USER')
266
+        or os.getenv('USERNAME')
267
+    )
268
+    assert password
269
+    try:
270
+        config = V03Reader(contents, password).run()
271
+    except ValueError:
272
+        config = V02Reader(contents, password).run()
273
+    print(json.dumps(config, indent=2, sort_keys=True))  # noqa: T201
0 274