d525f4302bd1f00edffe1159f78b8da745f06d24
Marco Ricci Add prototype for "vault v0...

Marco Ricci authored 1 month ago

1) #!/usr/bin/python3
2) 
3) from __future__ import annotations
4) 
5) import abc
6) import base64
7) import json
8) import logging
9) import warnings
10) from typing import Any
11) 
12) from cryptography import exceptions as crypt_exceptions
13) from cryptography import utils as crypt_utils
14) from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding
15) from cryptography.hazmat.primitives.ciphers import algorithms, modes
16) from cryptography.hazmat.primitives.kdf import pbkdf2
17) 
Marco Ricci Move vault key and path det...

Marco Ricci authored 1 month ago

18) from derivepassphrase import exporter, vault
Marco Ricci Add prototype for "vault v0...

Marco Ricci authored 1 month ago

19) 
20) logger = logging.getLogger(__name__)
21) 
22) 
23) def _h(bs: bytes | bytearray) -> str:
24)     return 'bytes.fromhex({!r})'.format(bs.hex(' '))
25) 
26) 
27) class Reader(abc.ABC):
28)     def __init__(
29)         self, contents: bytes | bytearray, password: str | bytes | bytearray
30)     ) -> None:
31)         if not password:
32)             msg = 'No password given; check VAULT_KEY environment variable'
33)             raise ValueError(msg)
34)         self.contents = contents
35)         self.password = password
36)         self.iv_size = 0
37)         self.mac_size = 0
38)         self.encryption_key = b''
39)         self.encryption_key_size = 0
40)         self.signing_key = b''
41)         self.signing_key_size = 0
42) 
43)     def run(self) -> Any:
44)         self._parse_contents()
45)         self._derive_keys()
46)         self._check_signature()
47)         self._decrypt_payload()
48)         return self._data
49) 
50)     @staticmethod
51)     def pbkdf2(
52)         password: str | bytes | bytearray, key_size: int, iterations: int
53)     ) -> bytes:
54)         if isinstance(password, str):
55)             password = password.encode('utf-8')
56)         raw_key = pbkdf2.PBKDF2HMAC(
57)             algorithm=hashes.SHA1(),  # noqa: S303
58)             length=key_size // 2,
59)             salt=vault.Vault._UUID,  # noqa: SLF001
60)             iterations=iterations,
61)         ).derive(password)
62)         logger.debug(
63)             'binary = pbkdf2(%s, %s, %s, %s, %s) = %s -> %s',
64)             repr(password),
65)             repr(vault.Vault._UUID),  # noqa: SLF001
66)             iterations,
67)             key_size // 2,
68)             repr('sha1'),
69)             _h(raw_key),
70)             _h(raw_key.hex().lower().encode('ASCII')),
71)         )
72)         return raw_key.hex().lower().encode('ASCII')
73) 
74)     def _parse_contents(self) -> None:
75)         logger.info('Parsing IV, payload and signature from the file contents')
76) 
77)         if len(self.contents) < self.iv_size + 16 + self.mac_size:
78)             msg = 'File contents are too small to parse'
79)             raise ValueError(msg)
80) 
81)         cutpos1 = self.iv_size
82)         cutpos2 = len(self.contents) - self.mac_size
83) 
84)         self.message = self.contents[:cutpos2]
85)         self.message_tag = self.contents[cutpos2:]
86)         self.iv = self.message[:cutpos1]
87)         self.payload = self.message[cutpos1:]
88) 
89)         logger.debug(
90)             'buffer %s = [[%s, %s], %s]',
91)             _h(self.contents),
92)             _h(self.iv),
93)             _h(self.payload),
94)             _h(self.message_tag),
95)         )
96) 
97)     def _derive_keys(self) -> None:
98)         logger.info('Deriving an encryption and signing key')
99)         self._generate_keys()
100)         assert (
101)             len(self.encryption_key) == self.encryption_key_size
102)         ), 'Derived encryption key is not valid'
103)         assert (
104)             len(self.signing_key) == self.signing_key_size
105)         ), 'Derived signing key is not valid'
106) 
107)     @abc.abstractmethod
108)     def _generate_keys(self) -> None:
109)         raise AssertionError
110) 
111)     def _check_signature(self) -> None:
112)         logger.info('Checking HMAC signature')
113)         mac = hmac.HMAC(self.signing_key, hashes.SHA256())
114)         mac_input = self._hmac_input()
115)         logger.debug(
116)             'mac_input = %s, expected_tag = %s',
117)             _h(mac_input),
118)             _h(self.message_tag),
119)         )
120)         mac.update(mac_input)
121)         try:
122)             mac.verify(self.message_tag)
123)         except crypt_exceptions.InvalidSignature:
124)             msg = 'File does not contain a valid HMAC-SHA256 signature'
125)             raise ValueError(msg) from None
126) 
127)     @abc.abstractmethod
128)     def _hmac_input(self) -> bytes:
129)         raise AssertionError
130) 
131)     def _decrypt_payload(self) -> None:
132)         decryptor = self._make_decryptor()
133)         padded_plaintext = bytearray()
134)         padded_plaintext.extend(decryptor.update(self.payload))
135)         padded_plaintext.extend(decryptor.finalize())
136)         logger.debug('padded plaintext = %s', _h(padded_plaintext))
137)         unpadder = padding.PKCS7(self.iv_size * 8).unpadder()
138)         plaintext = bytearray()
139)         plaintext.extend(unpadder.update(padded_plaintext))
140)         plaintext.extend(unpadder.finalize())
141)         logger.debug('plaintext = %s', _h(plaintext))
142)         self._data = json.loads(plaintext)
143) 
144)     @abc.abstractmethod
145)     def _make_decryptor(self) -> ciphers.CipherContext:
146)         raise AssertionError
147) 
148) 
149) class V03Reader(Reader):
150)     KEY_SIZE = 32
151) 
152)     def __init__(self, *args: Any, **kwargs: Any) -> None:
153)         super().__init__(*args, **kwargs)
154)         self.iv_size = 16
155)         self.mac_size = 32
156) 
157)     def run(self) -> Any:
158)         logger.info('Attempting to parse as v0.3 configuration')
159)         return super().run()
160) 
161)     def _generate_keys(self) -> None:
162)         self.encryption_key = self.pbkdf2(self.password, self.KEY_SIZE, 100)
163)         self.signing_key = self.pbkdf2(self.password, self.KEY_SIZE, 200)
164)         self.encryption_key_size = self.signing_key_size = self.KEY_SIZE
165) 
166)     def _hmac_input(self) -> bytes:
167)         return self.message.hex().lower().encode('ASCII')
168) 
169)     def _make_decryptor(self) -> ciphers.CipherContext:
170)         return ciphers.Cipher(
171)             algorithms.AES256(self.encryption_key), modes.CBC(self.iv)
172)         ).decryptor()
173) 
174) 
175) class V02Reader(Reader):
176)     def __init__(self, *args: Any, **kwargs: Any) -> None:
177)         super().__init__(*args, **kwargs)
178)         self.iv_size = 16
179)         self.mac_size = 64
180) 
181)     def run(self) -> Any:
182)         logger.info('Attempting to parse as v0.2 configuration')
183)         return super().run()
184) 
185)     def _parse_contents(self) -> None:
186)         super()._parse_contents()
187)         logger.debug('Decoding payload (base64) and message tag (hex)')
188)         self.payload = base64.standard_b64decode(self.payload)
189)         self.message_tag = bytes.fromhex(self.message_tag.decode('ASCII'))
190) 
191)     def _generate_keys(self) -> None:
192)         self.encryption_key = self.pbkdf2(self.password, 8, 16)
193)         self.signing_key = self.pbkdf2(self.password, 16, 16)
194)         self.encryption_key_size = 8
195)         self.signing_key_size = 16
196) 
197)     def _hmac_input(self) -> bytes:
198)         return base64.standard_b64encode(self.message)
199) 
200)     def _make_decryptor(self) -> ciphers.CipherContext:
201)         def evp_bytestokey_md5_one_iteration(
202)             data: bytes, salt: bytes | None, key_size: int, iv_size: int
203)         ) -> tuple[bytes, bytes]:
204)             total_size = key_size + iv_size
205)             buffer = bytearray()
206)             last_block = b''
207)             if salt is None:
208)                 salt = b''
209)             logging.debug(
210)                 (
211)                     'data = %s, salt = %s, key_size = %s, iv_size = %s, '
212)                     'buffer length = %s, buffer = %s'
213)                 ),
214)                 _h(data),
215)                 _h(salt),
216)                 key_size,
217)                 iv_size,
218)                 len(buffer),
219)                 _h(buffer),
220)             )
221)             while len(buffer) < total_size:
222)                 with warnings.catch_warnings():
223)                     warnings.simplefilter(
224)                         'ignore', crypt_utils.CryptographyDeprecationWarning
225)                     )
226)                     block = hashes.Hash(hashes.MD5())  # noqa: S303
227)                 block.update(last_block)
228)                 block.update(data)
229)                 block.update(salt)
230)                 last_block = block.finalize()
231)                 buffer.extend(last_block)
232)                 logging.debug(
233)                     'buffer length = %s, buffer = %s', len(buffer), _h(buffer)
234)                 )
235)             logging.debug(
236)                 'encryption_key = %s, iv = %s',
237)                 _h(buffer[:key_size]),
238)                 _h(buffer[key_size:total_size]),
239)             )
240)             return bytes(buffer[:key_size]), bytes(buffer[key_size:total_size])
241) 
242)         data = base64.standard_b64encode(self.iv + self.encryption_key)
243)         encryption_key, iv = evp_bytestokey_md5_one_iteration(
244)             data, salt=None, key_size=32, iv_size=16
245)         )
246)         return ciphers.Cipher(
247)             algorithms.AES256(encryption_key), modes.CBC(iv)
248)         ).decryptor()
249) 
250) 
251) if __name__ == '__main__':
252)     import os
253) 
254)     logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
Marco Ricci Move vault key and path det...

Marco Ricci authored 1 month ago

255)     with open(exporter.get_vault_path(), 'rb') as infile:
Marco Ricci Add prototype for "vault v0...

Marco Ricci authored 1 month ago

256)         contents = base64.standard_b64decode(infile.read())
Marco Ricci Move vault key and path det...

Marco Ricci authored 1 month ago

257)     password = exporter.get_vault_key()