Add prototype for "vault v0...
Marco Ricci authored 2 months ago
|
50)
51) logger = logging.getLogger(__name__)
52)
53)
54) def _h(bs: bytes | bytearray) -> str:
55) return 'bytes.fromhex({!r})'.format(bs.hex(' '))
56)
57)
58) class Reader(abc.ABC):
59) def __init__(
60) self, contents: bytes | bytearray, password: str | bytes | bytearray
61) ) -> None:
62) if not password:
63) msg = 'No password given; check VAULT_KEY environment variable'
64) raise ValueError(msg)
65) self.contents = contents
66) self.password = password
67) self.iv_size = 0
68) self.mac_size = 0
69) self.encryption_key = b''
70) self.encryption_key_size = 0
71) self.signing_key = b''
72) self.signing_key_size = 0
73)
74) def run(self) -> Any:
75) self._parse_contents()
76) self._derive_keys()
77) self._check_signature()
78) self._decrypt_payload()
79) return self._data
80)
81) @staticmethod
82) def pbkdf2(
83) password: str | bytes | bytearray, key_size: int, iterations: int
84) ) -> bytes:
85) if isinstance(password, str):
86) password = password.encode('utf-8')
87) raw_key = pbkdf2.PBKDF2HMAC(
88) algorithm=hashes.SHA1(), # noqa: S303
89) length=key_size // 2,
90) salt=vault.Vault._UUID, # noqa: SLF001
91) iterations=iterations,
92) ).derive(password)
93) logger.debug(
94) 'binary = pbkdf2(%s, %s, %s, %s, %s) = %s -> %s',
95) repr(password),
96) repr(vault.Vault._UUID), # noqa: SLF001
97) iterations,
98) key_size // 2,
99) repr('sha1'),
100) _h(raw_key),
101) _h(raw_key.hex().lower().encode('ASCII')),
102) )
103) return raw_key.hex().lower().encode('ASCII')
104)
105) def _parse_contents(self) -> None:
106) logger.info('Parsing IV, payload and signature from the file contents')
107)
108) if len(self.contents) < self.iv_size + 16 + self.mac_size:
109) msg = 'File contents are too small to parse'
110) raise ValueError(msg)
111)
112) cutpos1 = self.iv_size
113) cutpos2 = len(self.contents) - self.mac_size
114)
115) self.message = self.contents[:cutpos2]
116) self.message_tag = self.contents[cutpos2:]
117) self.iv = self.message[:cutpos1]
118) self.payload = self.message[cutpos1:]
119)
120) logger.debug(
121) 'buffer %s = [[%s, %s], %s]',
122) _h(self.contents),
123) _h(self.iv),
124) _h(self.payload),
125) _h(self.message_tag),
126) )
127)
128) def _derive_keys(self) -> None:
129) logger.info('Deriving an encryption and signing key')
130) self._generate_keys()
131) assert (
132) len(self.encryption_key) == self.encryption_key_size
133) ), 'Derived encryption key is not valid'
134) assert (
135) len(self.signing_key) == self.signing_key_size
136) ), 'Derived signing key is not valid'
137)
138) @abc.abstractmethod
139) def _generate_keys(self) -> None:
140) raise AssertionError
141)
142) def _check_signature(self) -> None:
143) logger.info('Checking HMAC signature')
144) mac = hmac.HMAC(self.signing_key, hashes.SHA256())
145) mac_input = self._hmac_input()
146) logger.debug(
147) 'mac_input = %s, expected_tag = %s',
148) _h(mac_input),
149) _h(self.message_tag),
150) )
151) mac.update(mac_input)
152) try:
153) mac.verify(self.message_tag)
154) except crypt_exceptions.InvalidSignature:
155) msg = 'File does not contain a valid HMAC-SHA256 signature'
156) raise ValueError(msg) from None
157)
158) @abc.abstractmethod
159) def _hmac_input(self) -> bytes:
160) raise AssertionError
161)
162) def _decrypt_payload(self) -> None:
163) decryptor = self._make_decryptor()
164) padded_plaintext = bytearray()
165) padded_plaintext.extend(decryptor.update(self.payload))
166) padded_plaintext.extend(decryptor.finalize())
167) logger.debug('padded plaintext = %s', _h(padded_plaintext))
168) unpadder = padding.PKCS7(self.iv_size * 8).unpadder()
169) plaintext = bytearray()
170) plaintext.extend(unpadder.update(padded_plaintext))
171) plaintext.extend(unpadder.finalize())
172) logger.debug('plaintext = %s', _h(plaintext))
173) self._data = json.loads(plaintext)
174)
175) @abc.abstractmethod
176) def _make_decryptor(self) -> ciphers.CipherContext:
177) raise AssertionError
178)
179)
180) class V03Reader(Reader):
181) KEY_SIZE = 32
182)
183) def __init__(self, *args: Any, **kwargs: Any) -> None:
184) super().__init__(*args, **kwargs)
185) self.iv_size = 16
186) self.mac_size = 32
187)
188) def run(self) -> Any:
189) logger.info('Attempting to parse as v0.3 configuration')
190) return super().run()
191)
192) def _generate_keys(self) -> None:
193) self.encryption_key = self.pbkdf2(self.password, self.KEY_SIZE, 100)
194) self.signing_key = self.pbkdf2(self.password, self.KEY_SIZE, 200)
195) self.encryption_key_size = self.signing_key_size = self.KEY_SIZE
196)
197) def _hmac_input(self) -> bytes:
198) return self.message.hex().lower().encode('ASCII')
199)
200) def _make_decryptor(self) -> ciphers.CipherContext:
201) return ciphers.Cipher(
202) algorithms.AES256(self.encryption_key), modes.CBC(self.iv)
203) ).decryptor()
204)
205)
206) class V02Reader(Reader):
207) def __init__(self, *args: Any, **kwargs: Any) -> None:
208) super().__init__(*args, **kwargs)
209) self.iv_size = 16
210) self.mac_size = 64
211)
212) def run(self) -> Any:
213) logger.info('Attempting to parse as v0.2 configuration')
214) return super().run()
215)
216) def _parse_contents(self) -> None:
217) super()._parse_contents()
218) logger.debug('Decoding payload (base64) and message tag (hex)')
219) self.payload = base64.standard_b64decode(self.payload)
220) self.message_tag = bytes.fromhex(self.message_tag.decode('ASCII'))
221)
222) def _generate_keys(self) -> None:
223) self.encryption_key = self.pbkdf2(self.password, 8, 16)
224) self.signing_key = self.pbkdf2(self.password, 16, 16)
225) self.encryption_key_size = 8
226) self.signing_key_size = 16
227)
228) def _hmac_input(self) -> bytes:
229) return base64.standard_b64encode(self.message)
230)
231) def _make_decryptor(self) -> ciphers.CipherContext:
232) def evp_bytestokey_md5_one_iteration(
233) data: bytes, salt: bytes | None, key_size: int, iv_size: int
234) ) -> tuple[bytes, bytes]:
235) total_size = key_size + iv_size
236) buffer = bytearray()
237) last_block = b''
238) if salt is None:
239) salt = b''
240) logging.debug(
241) (
242) 'data = %s, salt = %s, key_size = %s, iv_size = %s, '
243) 'buffer length = %s, buffer = %s'
244) ),
245) _h(data),
246) _h(salt),
247) key_size,
248) iv_size,
249) len(buffer),
250) _h(buffer),
251) )
252) while len(buffer) < total_size:
253) with warnings.catch_warnings():
254) warnings.simplefilter(
255) 'ignore', crypt_utils.CryptographyDeprecationWarning
256) )
257) block = hashes.Hash(hashes.MD5()) # noqa: S303
258) block.update(last_block)
259) block.update(data)
260) block.update(salt)
261) last_block = block.finalize()
262) buffer.extend(last_block)
263) logging.debug(
264) 'buffer length = %s, buffer = %s', len(buffer), _h(buffer)
265) )
266) logging.debug(
267) 'encryption_key = %s, iv = %s',
268) _h(buffer[:key_size]),
269) _h(buffer[key_size:total_size]),
270) )
271) return bytes(buffer[:key_size]), bytes(buffer[key_size:total_size])
272)
273) data = base64.standard_b64encode(self.iv + self.encryption_key)
274) encryption_key, iv = evp_bytestokey_md5_one_iteration(
275) data, salt=None, key_size=32, iv_size=16
276) )
277) return ciphers.Cipher(
278) algorithms.AES256(encryption_key), modes.CBC(iv)
279) ).decryptor()
280)
281)
282) if __name__ == '__main__':
283) import os
284)
285) logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING'))
|