Add prototype for "vault v0...
Marco Ricci authored 2 months ago
|
19)
20) logger = logging.getLogger(__name__)
21)
22)
23) def _h(bs: bytes | bytearray) -> str:
24) return 'bytes.fromhex({!r})'.format(bs.hex(' '))
25)
26)
27) class Reader(abc.ABC):
28) def __init__(
29) self, contents: bytes | bytearray, password: str | bytes | bytearray
30) ) -> None:
31) if not password:
32) msg = 'No password given; check VAULT_KEY environment variable'
33) raise ValueError(msg)
34) self.contents = contents
35) self.password = password
36) self.iv_size = 0
37) self.mac_size = 0
38) self.encryption_key = b''
39) self.encryption_key_size = 0
40) self.signing_key = b''
41) self.signing_key_size = 0
42)
43) def run(self) -> Any:
44) self._parse_contents()
45) self._derive_keys()
46) self._check_signature()
47) self._decrypt_payload()
48) return self._data
49)
50) @staticmethod
51) def pbkdf2(
52) password: str | bytes | bytearray, key_size: int, iterations: int
53) ) -> bytes:
54) if isinstance(password, str):
55) password = password.encode('utf-8')
56) raw_key = pbkdf2.PBKDF2HMAC(
57) algorithm=hashes.SHA1(), # noqa: S303
58) length=key_size // 2,
59) salt=vault.Vault._UUID, # noqa: SLF001
60) iterations=iterations,
61) ).derive(password)
62) logger.debug(
63) 'binary = pbkdf2(%s, %s, %s, %s, %s) = %s -> %s',
64) repr(password),
65) repr(vault.Vault._UUID), # noqa: SLF001
66) iterations,
67) key_size // 2,
68) repr('sha1'),
69) _h(raw_key),
70) _h(raw_key.hex().lower().encode('ASCII')),
71) )
72) return raw_key.hex().lower().encode('ASCII')
73)
74) def _parse_contents(self) -> None:
75) logger.info('Parsing IV, payload and signature from the file contents')
76)
77) if len(self.contents) < self.iv_size + 16 + self.mac_size:
78) msg = 'File contents are too small to parse'
79) raise ValueError(msg)
80)
81) cutpos1 = self.iv_size
82) cutpos2 = len(self.contents) - self.mac_size
83)
84) self.message = self.contents[:cutpos2]
85) self.message_tag = self.contents[cutpos2:]
86) self.iv = self.message[:cutpos1]
87) self.payload = self.message[cutpos1:]
88)
89) logger.debug(
90) 'buffer %s = [[%s, %s], %s]',
91) _h(self.contents),
92) _h(self.iv),
93) _h(self.payload),
94) _h(self.message_tag),
95) )
96)
97) def _derive_keys(self) -> None:
98) logger.info('Deriving an encryption and signing key')
99) self._generate_keys()
100) assert (
101) len(self.encryption_key) == self.encryption_key_size
102) ), 'Derived encryption key is not valid'
103) assert (
104) len(self.signing_key) == self.signing_key_size
105) ), 'Derived signing key is not valid'
106)
107) @abc.abstractmethod
108) def _generate_keys(self) -> None:
109) raise AssertionError
110)
111) def _check_signature(self) -> None:
112) logger.info('Checking HMAC signature')
113) mac = hmac.HMAC(self.signing_key, hashes.SHA256())
114) mac_input = self._hmac_input()
115) logger.debug(
116) 'mac_input = %s, expected_tag = %s',
117) _h(mac_input),
118) _h(self.message_tag),
119) )
120) mac.update(mac_input)
121) try:
122) mac.verify(self.message_tag)
123) except crypt_exceptions.InvalidSignature:
124) msg = 'File does not contain a valid HMAC-SHA256 signature'
125) raise ValueError(msg) from None
126)
127) @abc.abstractmethod
128) def _hmac_input(self) -> bytes:
129) raise AssertionError
130)
131) def _decrypt_payload(self) -> None:
132) decryptor = self._make_decryptor()
133) padded_plaintext = bytearray()
134) padded_plaintext.extend(decryptor.update(self.payload))
135) padded_plaintext.extend(decryptor.finalize())
136) logger.debug('padded plaintext = %s', _h(padded_plaintext))
137) unpadder = padding.PKCS7(self.iv_size * 8).unpadder()
138) plaintext = bytearray()
139) plaintext.extend(unpadder.update(padded_plaintext))
140) plaintext.extend(unpadder.finalize())
141) logger.debug('plaintext = %s', _h(plaintext))
142) self._data = json.loads(plaintext)
143)
144) @abc.abstractmethod
145) def _make_decryptor(self) -> ciphers.CipherContext:
146) raise AssertionError
147)
148)
149) class V03Reader(Reader):
150) KEY_SIZE = 32
151)
152) def __init__(self, *args: Any, **kwargs: Any) -> None:
153) super().__init__(*args, **kwargs)
154) self.iv_size = 16
155) self.mac_size = 32
156)
157) def run(self) -> Any:
158) logger.info('Attempting to parse as v0.3 configuration')
159) return super().run()
160)
161) def _generate_keys(self) -> None:
162) self.encryption_key = self.pbkdf2(self.password, self.KEY_SIZE, 100)
163) self.signing_key = self.pbkdf2(self.password, self.KEY_SIZE, 200)
164) self.encryption_key_size = self.signing_key_size = self.KEY_SIZE
165)
166) def _hmac_input(self) -> bytes:
167) return self.message.hex().lower().encode('ASCII')
168)
169) def _make_decryptor(self) -> ciphers.CipherContext:
170) return ciphers.Cipher(
171) algorithms.AES256(self.encryption_key), modes.CBC(self.iv)
172) ).decryptor()
173)
174)
175) class V02Reader(Reader):
176) def __init__(self, *args: Any, **kwargs: Any) -> None:
177) super().__init__(*args, **kwargs)
178) self.iv_size = 16
179) self.mac_size = 64
180)
181) def run(self) -> Any:
182) logger.info('Attempting to parse as v0.2 configuration')
183) return super().run()
184)
185) def _parse_contents(self) -> None:
186) super()._parse_contents()
187) logger.debug('Decoding payload (base64) and message tag (hex)')
188) self.payload = base64.standard_b64decode(self.payload)
189) self.message_tag = bytes.fromhex(self.message_tag.decode('ASCII'))
190)
191) def _generate_keys(self) -> None:
192) self.encryption_key = self.pbkdf2(self.password, 8, 16)
193) self.signing_key = self.pbkdf2(self.password, 16, 16)
194) self.encryption_key_size = 8
195) self.signing_key_size = 16
196)
197) def _hmac_input(self) -> bytes:
198) return base64.standard_b64encode(self.message)
199)
200) def _make_decryptor(self) -> ciphers.CipherContext:
201) def evp_bytestokey_md5_one_iteration(
202) data: bytes, salt: bytes | None, key_size: int, iv_size: int
203) ) -> tuple[bytes, bytes]:
204) total_size = key_size + iv_size
205) buffer = bytearray()
206) last_block = b''
207) if salt is None:
208) salt = b''
209) logging.debug(
210) (
211) 'data = %s, salt = %s, key_size = %s, iv_size = %s, '
212) 'buffer length = %s, buffer = %s'
213) ),
214) _h(data),
215) _h(salt),
216) key_size,
217) iv_size,
218) len(buffer),
219) _h(buffer),
220) )
221) while len(buffer) < total_size:
222) with warnings.catch_warnings():
223) warnings.simplefilter(
224) 'ignore', crypt_utils.CryptographyDeprecationWarning
225) )
226) block = hashes.Hash(hashes.MD5()) # noqa: S303
227) block.update(last_block)
228) block.update(data)
229) block.update(salt)
230) last_block = block.finalize()
231) buffer.extend(last_block)
232) logging.debug(
233) 'buffer length = %s, buffer = %s', len(buffer), _h(buffer)
234) )
235) logging.debug(
236) 'encryption_key = %s, iv = %s',
237) _h(buffer[:key_size]),
238) _h(buffer[key_size:total_size]),
239) )
240) return bytes(buffer[:key_size]), bytes(buffer[key_size:total_size])
241)
242) data = base64.standard_b64encode(self.iv + self.encryption_key)
243) encryption_key, iv = evp_bytestokey_md5_one_iteration(
244) data, salt=None, key_size=32, iv_size=16
245) )
246) return ciphers.Cipher(
247) algorithms.AES256(encryption_key), modes.CBC(iv)
248) ).decryptor()
249)
250)
251) if __name__ == '__main__':
252) import os
253)
254) logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
|