Move storeroom helper types into `_types` module, as named tuples
Marco Ricci

Marco Ricci commited on 2025-01-15 21:07:32
Zeige 8 geänderte Dateien mit 116 Einfügungen und 118 Löschungen.


Move `exporter.storeroom.KeyPair` and `exporter.storeroom.MasterKeys`
into the `_types` module, as `StoreroomKeyPair` and
`StoreroomMasterKeys`, respectively.  Also convert them from `TypedDict`
to `NamedTuple`.

Also rename `_types.KeyCommentPair` to `_types.SSHKeyCommentPair` for
consistency.
... ...
@@ -34,7 +34,7 @@ if TYPE_CHECKING:
34 34
 __all__ = (
35 35
     'SSH_AGENT',
36 36
     'SSH_AGENTC',
37
-    'KeyCommentPair',
37
+    'SSHKeyCommentPair',
38 38
     'VaultConfig',
39 39
     'is_vault_config',
40 40
 )
... ...
@@ -577,7 +577,7 @@ def clean_up_falsy_vault_config_values(  # noqa: C901,PLR0912
577 577
     return cleanup_completed
578 578
 
579 579
 
580
-class KeyCommentPair(NamedTuple):
580
+class SSHKeyCommentPair(NamedTuple):
581 581
     """SSH key plus comment pair.  For typing purposes.
582 582
 
583 583
     Attributes:
... ...
@@ -660,3 +660,44 @@ class SSH_AGENT(enum.Enum):  # noqa: N801
660 660
     """"""
661 661
     EXTENSION_RESPONSE: int = 29
662 662
     """"""
663
+
664
+
665
+class StoreroomKeyPair(NamedTuple):
666
+    """A pair of AES256 keys, one for encryption and one for signing.
667
+
668
+    Attributes:
669
+        encryption_key:
670
+            AES256 key, used for encryption with AES256-CBC (with PKCS#7
671
+            padding).
672
+        signing_key:
673
+            AES256 key, used for signing with HMAC-SHA256.
674
+
675
+    """
676
+
677
+    encryption_key: bytes
678
+    """"""
679
+    signing_key: bytes
680
+    """"""
681
+
682
+
683
+class StoreroomMasterKeys(NamedTuple):
684
+    """A triple of AES256 keys, for encryption, signing and hashing.
685
+
686
+    Attributes:
687
+        hashing_key:
688
+            AES256 key, used for hashing with HMAC-SHA256 to derive
689
+            a hash table slot for an item.
690
+        encryption_key:
691
+            AES256 key, used for encryption with AES256-CBC (with PKCS#7
692
+            padding).
693
+        signing_key:
694
+            AES256 key, used for signing with HMAC-SHA256.
695
+
696
+    """
697
+
698
+    hashing_key: bytes
699
+    """"""
700
+    encryption_key: bytes
701
+    """"""
702
+    signing_key: bytes
703
+    """"""
... ...
@@ -1794,7 +1794,7 @@ def _load_user_config() -> dict[str, Any]:
1794 1794
 
1795 1795
 def _get_suitable_ssh_keys(
1796 1796
     conn: ssh_agent.SSHAgentClient | socket.socket | None = None, /
1797
-) -> Iterator[_types.KeyCommentPair]:
1797
+) -> Iterator[_types.SSHKeyCommentPair]:
1798 1798
     """Yield all SSH keys suitable for passphrase derivation.
1799 1799
 
1800 1800
     Suitable SSH keys are queried from the running SSH agent (see
... ...
@@ -32,10 +32,10 @@ import logging
32 32
 import os
33 33
 import os.path
34 34
 import struct
35
-from typing import TYPE_CHECKING, Any, TypedDict
35
+from typing import TYPE_CHECKING, Any
36 36
 
37 37
 from derivepassphrase import _cli_msg as _msg
38
-from derivepassphrase import exporter
38
+from derivepassphrase import _types, exporter
39 39
 
40 40
 if TYPE_CHECKING:
41 41
     from collections.abc import Iterator
... ...
@@ -90,51 +90,10 @@ def _h(bs: Buffer) -> str:
90 90
     return '<{}>'.format(memoryview(bs).hex(' '))
91 91
 
92 92
 
93
-class KeyPair(TypedDict):
94
-    """A pair of AES256 keys, one for encryption and one for signing.
95
-
96
-    Attributes:
97
-        encryption_key:
98
-            AES256 key, used for encryption with AES256-CBC (with PKCS#7
99
-            padding).
100
-        signing_key:
101
-            AES256 key, used for signing with HMAC-SHA256.
102
-
103
-    """
104
-
105
-    encryption_key: bytes
106
-    """"""
107
-    signing_key: bytes
108
-    """"""
109
-
110
-
111
-class MasterKeys(TypedDict):
112
-    """A triple of AES256 keys, for encryption, signing and hashing.
113
-
114
-    Attributes:
115
-        hashing_key:
116
-            AES256 key, used for hashing with HMAC-SHA256 to derive
117
-            a hash table slot for an item.
118
-        encryption_key:
119
-            AES256 key, used for encryption with AES256-CBC (with PKCS#7
120
-            padding).
121
-        signing_key:
122
-            AES256 key, used for signing with HMAC-SHA256.
123
-
124
-    """
125
-
126
-    hashing_key: bytes
127
-    """"""
128
-    encryption_key: bytes
129
-    """"""
130
-    signing_key: bytes
131
-    """"""
132
-
133
-
134 93
 def derive_master_keys_keys(
135 94
     password: str | Buffer,
136 95
     iterations: int,
137
-) -> KeyPair:
96
+) -> _types.StoreroomKeyPair:
138 97
     """Derive encryption and signing keys for the master keys data.
139 98
 
140 99
     The master password is run through a key derivation function to
... ...
@@ -185,16 +144,16 @@ def derive_master_keys_keys(
185 144
             iterations=iterations,
186 145
         ),
187 146
     )
188
-    return {
189
-        'encryption_key': encryption_key,
190
-        'signing_key': signing_key,
191
-    }
147
+    return _types.StoreroomKeyPair(
148
+        encryption_key=encryption_key,
149
+        signing_key=signing_key,
150
+    )
192 151
 
193 152
 
194 153
 def decrypt_master_keys_data(
195 154
     data: Buffer,
196
-    keys: KeyPair,
197
-) -> MasterKeys:
155
+    keys: _types.StoreroomKeyPair,
156
+) -> _types.StoreroomMasterKeys:
198 157
     r"""Decrypt the master keys data.
199 158
 
200 159
     The master keys data contains:
... ...
@@ -245,12 +204,12 @@ def decrypt_master_keys_data(
245 204
     ciphertext, claimed_mac = struct.unpack(
246 205
         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
247 206
     )
248
-    actual_mac = hmac.HMAC(keys['signing_key'], hashes.SHA256())
207
+    actual_mac = hmac.HMAC(keys.signing_key, hashes.SHA256())
249 208
     actual_mac.update(ciphertext)
250 209
     logger.debug(
251 210
         _msg.TranslatedString(
252 211
             _msg.DebugMsgTemplate.MASTER_KEYS_DATA_MAC_INFO,
253
-            sign_key=_h(keys['signing_key']),
212
+            sign_key=_h(keys.signing_key),
254 213
             ciphertext=_h(ciphertext),
255 214
             claimed_mac=_h(claimed_mac),
256 215
             actual_mac=_h(actual_mac.copy().finalize()),
... ...
@@ -263,7 +222,7 @@ def decrypt_master_keys_data(
263 222
             f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
264 223
         )
265 224
         decryptor = ciphers.Cipher(
266
-            algorithms.AES256(keys['encryption_key']), modes.CBC(iv)
225
+            algorithms.AES256(keys.encryption_key), modes.CBC(iv)
267 226
         ).decryptor()
268 227
         padded_plaintext = bytearray()
269 228
         padded_plaintext.extend(decryptor.update(payload))
... ...
@@ -278,17 +237,17 @@ def decrypt_master_keys_data(
278 237
     except (ValueError, struct.error) as exc:
279 238
         msg = 'Invalid encrypted master keys payload'
280 239
         raise ValueError(msg) from exc
281
-    return {
282
-        'hashing_key': hashing_key,
283
-        'encryption_key': encryption_key,
284
-        'signing_key': signing_key,
285
-    }
240
+    return _types.StoreroomMasterKeys(
241
+        hashing_key=hashing_key,
242
+        encryption_key=encryption_key,
243
+        signing_key=signing_key,
244
+    )
286 245
 
287 246
 
288 247
 def decrypt_session_keys(
289 248
     data: Buffer,
290
-    master_keys: MasterKeys,
291
-) -> KeyPair:
249
+    master_keys: _types.StoreroomMasterKeys,
250
+) -> _types.StoreroomKeyPair:
292 251
     r"""Decrypt the bucket item's session keys.
293 252
 
294 253
     The bucket item's session keys are single-use keys for encrypting
... ...
@@ -337,12 +296,12 @@ def decrypt_session_keys(
337 296
     ciphertext, claimed_mac = struct.unpack(
338 297
         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
339 298
     )
340
-    actual_mac = hmac.HMAC(master_keys['signing_key'], hashes.SHA256())
299
+    actual_mac = hmac.HMAC(master_keys.signing_key, hashes.SHA256())
341 300
     actual_mac.update(ciphertext)
342 301
     logger.debug(
343 302
         _msg.TranslatedString(
344 303
             _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_SESSION_KEYS_MAC_INFO,
345
-            sign_key=_h(master_keys['signing_key']),
304
+            sign_key=_h(master_keys.signing_key),
346 305
             ciphertext=_h(ciphertext),
347 306
             claimed_mac=_h(claimed_mac),
348 307
             actual_mac=_h(actual_mac.copy().finalize()),
... ...
@@ -355,7 +314,7 @@ def decrypt_session_keys(
355 314
             f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
356 315
         )
357 316
         decryptor = ciphers.Cipher(
358
-            algorithms.AES256(master_keys['encryption_key']), modes.CBC(iv)
317
+            algorithms.AES256(master_keys.encryption_key), modes.CBC(iv)
359 318
         ).decryptor()
360 319
         padded_plaintext = bytearray()
361 320
         padded_plaintext.extend(decryptor.update(payload))
... ...
@@ -371,23 +330,23 @@ def decrypt_session_keys(
371 330
         msg = 'Invalid encrypted session keys payload'
372 331
         raise ValueError(msg) from exc
373 332
 
374
-    session_keys: KeyPair = {
375
-        'encryption_key': session_encryption_key,
376
-        'signing_key': session_signing_key,
377
-    }
333
+    session_keys = _types.StoreroomKeyPair(
334
+        encryption_key=session_encryption_key,
335
+        signing_key=session_signing_key,
336
+    )
378 337
 
379 338
     logger.debug(
380 339
         _msg.TranslatedString(
381 340
             _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_SESSION_KEYS_INFO,
382
-            enc_key=_h(master_keys['encryption_key']),
341
+            enc_key=_h(master_keys.encryption_key),
383 342
             iv=_h(iv),
384 343
             ciphertext=_h(payload),
385 344
             plaintext=_h(plaintext),
386 345
             code=_msg.TranslatedString(
387
-                '{{"encryption_key": bytes.fromhex({enc_key!r}), '
388
-                '"signing_key": bytes.fromhex({sign_key!r})}}',
389
-                enc_key=session_keys['encryption_key'].hex(' '),
390
-                sign_key=session_keys['signing_key'].hex(' '),
346
+                'StoreroomKeyPair(encryption_key=bytes.fromhex({enc_key!r}), '
347
+                'signing_key=bytes.fromhex({sign_key!r}))',
348
+                enc_key=session_keys.encryption_key.hex(' '),
349
+                sign_key=session_keys.signing_key.hex(' '),
391 350
             ),
392 351
         ),
393 352
     )
... ...
@@ -397,7 +356,7 @@ def decrypt_session_keys(
397 356
 
398 357
 def decrypt_contents(
399 358
     data: Buffer,
400
-    session_keys: KeyPair,
359
+    session_keys: _types.StoreroomKeyPair,
401 360
 ) -> Buffer:
402 361
     """Decrypt the bucket item's contents.
403 362
 
... ...
@@ -443,12 +402,12 @@ def decrypt_contents(
443 402
     ciphertext, claimed_mac = struct.unpack(
444 403
         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
445 404
     )
446
-    actual_mac = hmac.HMAC(session_keys['signing_key'], hashes.SHA256())
405
+    actual_mac = hmac.HMAC(session_keys.signing_key, hashes.SHA256())
447 406
     actual_mac.update(ciphertext)
448 407
     logger.debug(
449 408
         _msg.TranslatedString(
450 409
             _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_MAC_INFO,
451
-            sign_key=_h(session_keys['signing_key']),
410
+            sign_key=_h(session_keys.signing_key),
452 411
             ciphertext=_h(ciphertext),
453 412
             claimed_mac=_h(claimed_mac),
454 413
             actual_mac=_h(actual_mac.copy().finalize()),
... ...
@@ -460,7 +419,7 @@ def decrypt_contents(
460 419
         f'{IV_SIZE}s {len(ciphertext) - IV_SIZE}s', ciphertext
461 420
     )
462 421
     decryptor = ciphers.Cipher(
463
-        algorithms.AES256(session_keys['encryption_key']), modes.CBC(iv)
422
+        algorithms.AES256(session_keys.encryption_key), modes.CBC(iv)
464 423
     ).decryptor()
465 424
     padded_plaintext = bytearray()
466 425
     padded_plaintext.extend(decryptor.update(payload))
... ...
@@ -473,7 +432,7 @@ def decrypt_contents(
473 432
     logger.debug(
474 433
         _msg.TranslatedString(
475 434
             _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_INFO,
476
-            enc_key=_h(session_keys['encryption_key']),
435
+            enc_key=_h(session_keys.encryption_key),
477 436
             iv=_h(iv),
478 437
             ciphertext=_h(payload),
479 438
             plaintext=_h(plaintext),
... ...
@@ -485,7 +444,7 @@ def decrypt_contents(
485 444
 
486 445
 def decrypt_bucket_item(
487 446
     bucket_item: Buffer,
488
-    master_keys: MasterKeys,
447
+    master_keys: _types.StoreroomMasterKeys,
489 448
 ) -> Buffer:
490 449
     """Decrypt a bucket item.
491 450
 
... ...
@@ -519,8 +478,8 @@ def decrypt_bucket_item(
519 478
         _msg.TranslatedString(
520 479
             _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_KEY_INFO,
521 480
             plaintext=_h(bucket_item),
522
-            enc_key=_h(master_keys['encryption_key']),
523
-            sign_key=_h(master_keys['signing_key']),
481
+            enc_key=_h(master_keys.encryption_key),
482
+            sign_key=_h(master_keys.signing_key),
524 483
         ),
525 484
     )
526 485
     data_version, encrypted_session_keys, data_contents = struct.unpack(
... ...
@@ -539,7 +498,7 @@ def decrypt_bucket_item(
539 498
 
540 499
 def decrypt_bucket_file(
541 500
     filename: str,
542
-    master_keys: MasterKeys,
501
+    master_keys: _types.StoreroomMasterKeys,
543 502
     *,
544 503
     root_dir: str | bytes | os.PathLike = '.',
545 504
 ) -> Iterator[Buffer]:
... ...
@@ -484,7 +484,7 @@ class SSHAgentClient:
484 484
             raise SSHAgentFailedError(response[0], response[1:])
485 485
         return response[1:]
486 486
 
487
-    def list_keys(self) -> Sequence[_types.KeyCommentPair]:
487
+    def list_keys(self) -> Sequence[_types.SSHKeyCommentPair]:
488 488
         """Request a list of keys known to the SSH agent.
489 489
 
490 490
         Returns:
... ...
@@ -521,7 +521,7 @@ class SSHAgentClient:
521 521
             return bytes(buf)
522 522
 
523 523
         key_count = int.from_bytes(shift(4), 'big')
524
-        keys: collections.deque[_types.KeyCommentPair]
524
+        keys: collections.deque[_types.SSHKeyCommentPair]
525 525
         keys = collections.deque()
526 526
         for _ in range(key_count):
527 527
             key_size = int.from_bytes(shift(4), 'big')
... ...
@@ -529,7 +529,7 @@ class SSHAgentClient:
529 529
             comment_size = int.from_bytes(shift(4), 'big')
530 530
             comment = shift(comment_size)
531 531
             # Both `key` and `comment` are not wrapped as SSH strings.
532
-            keys.append(_types.KeyCommentPair(key, comment))
532
+            keys.append(_types.SSHKeyCommentPair(key, comment))
533 533
         if response_stream:
534 534
             raise TrailingDataError
535 535
         return keys
... ...
@@ -1364,9 +1364,9 @@ def hypothesis_settings_coverage_compatible_with_caplog(
1364 1364
     return settings if f is None else settings(f)
1365 1365
 
1366 1366
 
1367
-def list_keys(self: Any = None) -> list[_types.KeyCommentPair]:
1367
+def list_keys(self: Any = None) -> list[_types.SSHKeyCommentPair]:
1368 1368
     del self  # Unused.
1369
-    Pair = _types.KeyCommentPair  # noqa: N806
1369
+    Pair = _types.SSHKeyCommentPair  # noqa: N806
1370 1370
     list1 = [
1371 1371
         Pair(value['public_key_data'], f'{key} test key'.encode('ASCII'))
1372 1372
         for key, value in SUPPORTED_KEYS.items()
... ...
@@ -1390,9 +1390,9 @@ def sign(
1390 1390
     raise AssertionError
1391 1391
 
1392 1392
 
1393
-def list_keys_singleton(self: Any = None) -> list[_types.KeyCommentPair]:
1393
+def list_keys_singleton(self: Any = None) -> list[_types.SSHKeyCommentPair]:
1394 1394
     del self  # Unused.
1395
-    Pair = _types.KeyCommentPair  # noqa: N806
1395
+    Pair = _types.SSHKeyCommentPair  # noqa: N806
1396 1396
     list1 = [
1397 1397
         Pair(value['public_key_data'], f'{key} test key'.encode('ASCII'))
1398 1398
         for key, value in SUPPORTED_KEYS.items()
... ...
@@ -1400,9 +1400,9 @@ def list_keys_singleton(self: Any = None) -> list[_types.KeyCommentPair]:
1400 1400
     return list1[:1]
1401 1401
 
1402 1402
 
1403
-def suitable_ssh_keys(conn: Any) -> Iterator[_types.KeyCommentPair]:
1403
+def suitable_ssh_keys(conn: Any) -> Iterator[_types.SSHKeyCommentPair]:
1404 1404
     del conn  # Unused.
1405
-    Pair = _types.KeyCommentPair  # noqa: N806
1405
+    Pair = _types.SSHKeyCommentPair  # noqa: N806
1406 1406
     yield from [
1407 1407
         Pair(DUMMY_KEY1, b'no comment'),
1408 1408
         Pair(DUMMY_KEY2, b'a comment'),
... ...
@@ -1563,7 +1563,7 @@ contents go here
1563 1563
             def func(
1564 1564
                 *_args: Any,
1565 1565
                 **_kwargs: Any,
1566
-            ) -> list[_types.KeyCommentPair]:
1566
+            ) -> list[_types.SSHKeyCommentPair]:
1567 1567
                 return []
1568 1568
 
1569 1569
             monkeypatch.setattr(ssh_agent.SSHAgentClient, 'list_keys', func)
... ...
@@ -297,11 +297,11 @@ class TestStoreroom:
297 297
         bucket_item = (
298 298
             b'\xff' + bytes(storeroom.ENCRYPTED_KEYPAIR_SIZE) + bytes(3)
299 299
         )
300
-        master_keys: storeroom.MasterKeys = {
301
-            'encryption_key': bytes(storeroom.KEY_SIZE),
302
-            'signing_key': bytes(storeroom.KEY_SIZE),
303
-            'hashing_key': bytes(storeroom.KEY_SIZE),
304
-        }
300
+        master_keys = _types.StoreroomMasterKeys(
301
+            encryption_key=bytes(storeroom.KEY_SIZE),
302
+            signing_key=bytes(storeroom.KEY_SIZE),
303
+            hashing_key=bytes(storeroom.KEY_SIZE),
304
+        )
305 305
         with pytest.raises(ValueError, match='Cannot handle version 255'):
306 306
             storeroom.decrypt_bucket_item(bucket_item, master_keys)
307 307
 
... ...
@@ -312,11 +312,11 @@ class TestStoreroom:
312 312
         config: str,
313 313
     ) -> None:
314 314
         runner = click.testing.CliRunner(mix_stderr=False)
315
-        master_keys: storeroom.MasterKeys = {
316
-            'encryption_key': bytes(storeroom.KEY_SIZE),
317
-            'signing_key': bytes(storeroom.KEY_SIZE),
318
-            'hashing_key': bytes(storeroom.KEY_SIZE),
319
-        }
315
+        master_keys = _types.StoreroomMasterKeys(
316
+            encryption_key=bytes(storeroom.KEY_SIZE),
317
+            signing_key=bytes(storeroom.KEY_SIZE),
318
+            hashing_key=bytes(storeroom.KEY_SIZE),
319
+        )
320 320
         with tests.isolated_vault_exporter_config(
321 321
             monkeypatch=monkeypatch,
322 322
             runner=runner,
... ...
@@ -443,7 +443,8 @@ class TestStoreroom:
443 443
             match=r'Invalid encrypted master keys payload',
444 444
         ):
445 445
             storeroom.decrypt_master_keys_data(
446
-                data, {'encryption_key': key, 'signing_key': key}
446
+                data,
447
+                _types.StoreroomKeyPair(encryption_key=key, signing_key=key),
447 448
             )
448 449
         with pytest.raises(
449 450
             ValueError,
... ...
@@ -451,11 +452,9 @@ class TestStoreroom:
451 452
         ):
452 453
             storeroom.decrypt_session_keys(
453 454
                 data,
454
-                {
455
-                    'hashing_key': key,
456
-                    'encryption_key': key,
457
-                    'signing_key': key,
458
-                },
455
+                _types.StoreroomMasterKeys(
456
+                    hashing_key=key, encryption_key=key, signing_key=key
457
+                ),
459 458
             )
460 459
 
461 460
     @tests.hypothesis_settings_coverage_compatible
... ...
@@ -472,16 +471,15 @@ class TestStoreroom:
472 471
         # such random sampling is astronomically tiny.
473 472
         with pytest.raises(cryptography.exceptions.InvalidSignature):
474 473
             storeroom.decrypt_master_keys_data(
475
-                data, {'encryption_key': key, 'signing_key': key}
474
+                data,
475
+                _types.StoreroomKeyPair(encryption_key=key, signing_key=key),
476 476
             )
477 477
         with pytest.raises(cryptography.exceptions.InvalidSignature):
478 478
             storeroom.decrypt_session_keys(
479 479
                 data,
480
-                {
481
-                    'hashing_key': key,
482
-                    'encryption_key': key,
483
-                    'signing_key': key,
484
-                },
480
+                _types.StoreroomMasterKeys(
481
+                    hashing_key=key, encryption_key=key, signing_key=key
482
+                ),
485 483
             )
486 484
 
487 485
 
... ...
@@ -560,9 +560,9 @@ class TestAgentInteraction:
560 560
         with monkeypatch.context() as monkeypatch2:
561 561
             client = ssh_agent.SSHAgentClient()
562 562
             monkeypatch2.setattr(client, 'request', request)
563
-            KeyCommentPair = _types.KeyCommentPair  # noqa: N806
563
+            SSHKeyCommentPair = _types.SSHKeyCommentPair  # noqa: N806
564 564
             loaded_keys = [
565
-                KeyCommentPair(v['public_key_data'], b'no comment')
565
+                SSHKeyCommentPair(v['public_key_data'], b'no comment')
566 566
                 for v in tests.SUPPORTED_KEYS.values()
567 567
             ]
568 568
             monkeypatch2.setattr(client, 'list_keys', lambda: loaded_keys)
569 569