fa1083e98907a6a7dc48e8decfa7146708510341
Marco Ricci Add prototype for "vault v0...

Marco Ricci authored 1 month ago

1) #!/usr/bin/python3
2) 
3) from __future__ import annotations
4) 
5) import abc
6) import base64
7) import json
8) import logging
9) import warnings
Marco Ricci Add preliminary tests for t...

Marco Ricci authored 3 weeks ago

10) from typing import TYPE_CHECKING, Any
11) 
12) if TYPE_CHECKING:
13)     from cryptography import exceptions as crypt_exceptions
14)     from cryptography import utils as crypt_utils
15)     from cryptography.hazmat.primitives import ciphers, hashes, hmac, padding
16)     from cryptography.hazmat.primitives.ciphers import algorithms, modes
17)     from cryptography.hazmat.primitives.kdf import pbkdf2
18) else:
19)     try:
20)         from cryptography import exceptions as crypt_exceptions
21)         from cryptography import utils as crypt_utils
22)         from cryptography.hazmat.primitives import (
23)             ciphers,
24)             hashes,
25)             hmac,
26)             padding,
27)         )
28)         from cryptography.hazmat.primitives.ciphers import algorithms, modes
29)         from cryptography.hazmat.primitives.kdf import pbkdf2
30)     except ModuleNotFoundError as exc:
31) 
32)         class DummyModule:
33)             def __init__(self, exc: type[Exception]) -> None:
34)                 self.exc = exc
35) 
36)             def __getattr__(self, name: str) -> Any:
37)                 def func(*args: Any, **kwargs: Any) -> Any:  # noqa: ARG001
38)                     raise self.exc
39) 
40)                 return func
41) 
42)         crypt_exceptions = crypt_utils = DummyModule(exc)
43)         ciphers = hashes = hmac = padding = DummyModule(exc)
44)         algorithms = modes = pbkdf2 = DummyModule(exc)
45)         STUBBED = True
46)     else:
47)         STUBBED = False
Marco Ricci Add prototype for "vault v0...

Marco Ricci authored 1 month ago

48) 
Marco Ricci Move vault key and path det...

Marco Ricci authored 1 month ago

49) from derivepassphrase import exporter, vault
Marco Ricci Add prototype for "vault v0...

Marco Ricci authored 1 month ago

50) 
51) logger = logging.getLogger(__name__)
52) 
53) 
54) def _h(bs: bytes | bytearray) -> str:
55)     return 'bytes.fromhex({!r})'.format(bs.hex(' '))
56) 
57) 
58) class Reader(abc.ABC):
59)     def __init__(
60)         self, contents: bytes | bytearray, password: str | bytes | bytearray
61)     ) -> None:
62)         if not password:
63)             msg = 'No password given; check VAULT_KEY environment variable'
64)             raise ValueError(msg)
65)         self.contents = contents
66)         self.password = password
67)         self.iv_size = 0
68)         self.mac_size = 0
69)         self.encryption_key = b''
70)         self.encryption_key_size = 0
71)         self.signing_key = b''
72)         self.signing_key_size = 0
73) 
74)     def run(self) -> Any:
75)         self._parse_contents()
76)         self._derive_keys()
77)         self._check_signature()
78)         self._decrypt_payload()
79)         return self._data
80) 
81)     @staticmethod
82)     def pbkdf2(
83)         password: str | bytes | bytearray, key_size: int, iterations: int
84)     ) -> bytes:
85)         if isinstance(password, str):
86)             password = password.encode('utf-8')
87)         raw_key = pbkdf2.PBKDF2HMAC(
88)             algorithm=hashes.SHA1(),  # noqa: S303
89)             length=key_size // 2,
90)             salt=vault.Vault._UUID,  # noqa: SLF001
91)             iterations=iterations,
92)         ).derive(password)
93)         logger.debug(
94)             'binary = pbkdf2(%s, %s, %s, %s, %s) = %s -> %s',
95)             repr(password),
96)             repr(vault.Vault._UUID),  # noqa: SLF001
97)             iterations,
98)             key_size // 2,
99)             repr('sha1'),
100)             _h(raw_key),
101)             _h(raw_key.hex().lower().encode('ASCII')),
102)         )
103)         return raw_key.hex().lower().encode('ASCII')
104) 
105)     def _parse_contents(self) -> None:
106)         logger.info('Parsing IV, payload and signature from the file contents')
107) 
108)         if len(self.contents) < self.iv_size + 16 + self.mac_size:
109)             msg = 'File contents are too small to parse'
110)             raise ValueError(msg)
111) 
112)         cutpos1 = self.iv_size
113)         cutpos2 = len(self.contents) - self.mac_size
114) 
115)         self.message = self.contents[:cutpos2]
116)         self.message_tag = self.contents[cutpos2:]
117)         self.iv = self.message[:cutpos1]
118)         self.payload = self.message[cutpos1:]
119) 
120)         logger.debug(
121)             'buffer %s = [[%s, %s], %s]',
122)             _h(self.contents),
123)             _h(self.iv),
124)             _h(self.payload),
125)             _h(self.message_tag),
126)         )
127) 
128)     def _derive_keys(self) -> None:
129)         logger.info('Deriving an encryption and signing key')
130)         self._generate_keys()
131)         assert (
132)             len(self.encryption_key) == self.encryption_key_size
133)         ), 'Derived encryption key is not valid'
134)         assert (
135)             len(self.signing_key) == self.signing_key_size
136)         ), 'Derived signing key is not valid'
137) 
138)     @abc.abstractmethod
139)     def _generate_keys(self) -> None:
140)         raise AssertionError
141) 
142)     def _check_signature(self) -> None:
143)         logger.info('Checking HMAC signature')
144)         mac = hmac.HMAC(self.signing_key, hashes.SHA256())
145)         mac_input = self._hmac_input()
146)         logger.debug(
147)             'mac_input = %s, expected_tag = %s',
148)             _h(mac_input),
149)             _h(self.message_tag),
150)         )
151)         mac.update(mac_input)
152)         try:
153)             mac.verify(self.message_tag)
154)         except crypt_exceptions.InvalidSignature:
155)             msg = 'File does not contain a valid HMAC-SHA256 signature'
156)             raise ValueError(msg) from None
157) 
158)     @abc.abstractmethod
159)     def _hmac_input(self) -> bytes:
160)         raise AssertionError
161) 
162)     def _decrypt_payload(self) -> None:
163)         decryptor = self._make_decryptor()
164)         padded_plaintext = bytearray()
165)         padded_plaintext.extend(decryptor.update(self.payload))
166)         padded_plaintext.extend(decryptor.finalize())
167)         logger.debug('padded plaintext = %s', _h(padded_plaintext))
168)         unpadder = padding.PKCS7(self.iv_size * 8).unpadder()
169)         plaintext = bytearray()
170)         plaintext.extend(unpadder.update(padded_plaintext))
171)         plaintext.extend(unpadder.finalize())
172)         logger.debug('plaintext = %s', _h(plaintext))
173)         self._data = json.loads(plaintext)
174) 
175)     @abc.abstractmethod
176)     def _make_decryptor(self) -> ciphers.CipherContext:
177)         raise AssertionError
178) 
179) 
180) class V03Reader(Reader):
181)     KEY_SIZE = 32
182) 
183)     def __init__(self, *args: Any, **kwargs: Any) -> None:
184)         super().__init__(*args, **kwargs)
185)         self.iv_size = 16
186)         self.mac_size = 32
187) 
188)     def run(self) -> Any:
189)         logger.info('Attempting to parse as v0.3 configuration')
190)         return super().run()
191) 
192)     def _generate_keys(self) -> None:
193)         self.encryption_key = self.pbkdf2(self.password, self.KEY_SIZE, 100)
194)         self.signing_key = self.pbkdf2(self.password, self.KEY_SIZE, 200)
195)         self.encryption_key_size = self.signing_key_size = self.KEY_SIZE
196) 
197)     def _hmac_input(self) -> bytes:
198)         return self.message.hex().lower().encode('ASCII')
199) 
200)     def _make_decryptor(self) -> ciphers.CipherContext:
201)         return ciphers.Cipher(
202)             algorithms.AES256(self.encryption_key), modes.CBC(self.iv)
203)         ).decryptor()
204) 
205) 
206) class V02Reader(Reader):
207)     def __init__(self, *args: Any, **kwargs: Any) -> None:
208)         super().__init__(*args, **kwargs)
209)         self.iv_size = 16
210)         self.mac_size = 64
211) 
212)     def run(self) -> Any:
213)         logger.info('Attempting to parse as v0.2 configuration')
214)         return super().run()
215) 
216)     def _parse_contents(self) -> None:
217)         super()._parse_contents()
218)         logger.debug('Decoding payload (base64) and message tag (hex)')
219)         self.payload = base64.standard_b64decode(self.payload)
220)         self.message_tag = bytes.fromhex(self.message_tag.decode('ASCII'))
221) 
222)     def _generate_keys(self) -> None:
223)         self.encryption_key = self.pbkdf2(self.password, 8, 16)
224)         self.signing_key = self.pbkdf2(self.password, 16, 16)
225)         self.encryption_key_size = 8
226)         self.signing_key_size = 16
227) 
228)     def _hmac_input(self) -> bytes:
229)         return base64.standard_b64encode(self.message)
230) 
231)     def _make_decryptor(self) -> ciphers.CipherContext:
232)         def evp_bytestokey_md5_one_iteration(
233)             data: bytes, salt: bytes | None, key_size: int, iv_size: int
234)         ) -> tuple[bytes, bytes]:
235)             total_size = key_size + iv_size
236)             buffer = bytearray()
237)             last_block = b''
238)             if salt is None:
239)                 salt = b''
240)             logging.debug(
241)                 (
242)                     'data = %s, salt = %s, key_size = %s, iv_size = %s, '
243)                     'buffer length = %s, buffer = %s'
244)                 ),
245)                 _h(data),
246)                 _h(salt),
247)                 key_size,
248)                 iv_size,
249)                 len(buffer),
250)                 _h(buffer),
251)             )
252)             while len(buffer) < total_size:
253)                 with warnings.catch_warnings():
254)                     warnings.simplefilter(
255)                         'ignore', crypt_utils.CryptographyDeprecationWarning
256)                     )
257)                     block = hashes.Hash(hashes.MD5())  # noqa: S303
258)                 block.update(last_block)
259)                 block.update(data)
260)                 block.update(salt)
261)                 last_block = block.finalize()
262)                 buffer.extend(last_block)
263)                 logging.debug(
264)                     'buffer length = %s, buffer = %s', len(buffer), _h(buffer)
265)                 )
266)             logging.debug(
267)                 'encryption_key = %s, iv = %s',
268)                 _h(buffer[:key_size]),
269)                 _h(buffer[key_size:total_size]),
270)             )
271)             return bytes(buffer[:key_size]), bytes(buffer[key_size:total_size])
272) 
273)         data = base64.standard_b64encode(self.iv + self.encryption_key)
274)         encryption_key, iv = evp_bytestokey_md5_one_iteration(
275)             data, salt=None, key_size=32, iv_size=16
276)         )
277)         return ciphers.Cipher(
278)             algorithms.AES256(encryption_key), modes.CBC(iv)
279)         ).decryptor()
280) 
281) 
282) if __name__ == '__main__':
283)     import os
284) 
285)     logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
Marco Ricci Move vault key and path det...

Marco Ricci authored 1 month ago

286)     with open(exporter.get_vault_path(), 'rb') as infile:
Marco Ricci Add prototype for "vault v0...

Marco Ricci authored 1 month ago

287)         contents = base64.standard_b64decode(infile.read())
Marco Ricci Move vault key and path det...

Marco Ricci authored 1 month ago

288)     password = exporter.get_vault_key()