Make key pairs, key sets and key-comment pairs generic
Marco Ricci

Marco Ricci commited on 2025-01-16 01:05:38
Zeige 3 geänderte Dateien mit 48 Einfügungen und 16 Löschungen.


Parametrize `_types.SSHKeyCommentPair`, `_types.StoreroomKeyPair` and
`_types.StoreroomMasterKeys` over the exact `Buffer` type being used.
Also supply a `.toreadonly()` method (similar to
`memoryview.toreadonly()`) that converts all `Buffer`s to `bytes`
objects.
... ...
@@ -11,9 +11,10 @@ import enum
11 11
 import json
12 12
 import math
13 13
 import string
14
-from typing import TYPE_CHECKING
14
+from typing import TYPE_CHECKING, Generic, TypeVar
15 15
 
16 16
 from typing_extensions import (
17
+    Buffer,
17 18
     NamedTuple,
18 19
     NotRequired,
19 20
     TypedDict,
... ...
@@ -577,7 +578,10 @@ def clean_up_falsy_vault_config_values(  # noqa: C901,PLR0912
577 578
     return cleanup_completed
578 579
 
579 580
 
580
-class SSHKeyCommentPair(NamedTuple):
581
+B = TypeVar('B', bound=Buffer)
582
+
583
+
584
+class SSHKeyCommentPair(NamedTuple, Generic[B]):
581 585
     """SSH key plus comment pair.  For typing purposes.
582 586
 
583 587
     Attributes:
... ...
@@ -586,11 +590,18 @@ class SSHKeyCommentPair(NamedTuple):
586 590
 
587 591
     """
588 592
 
589
-    key: bytes | bytearray
593
+    key: B
590 594
     """"""
591
-    comment: bytes | bytearray
595
+    comment: B
592 596
     """"""
593 597
 
598
+    def toreadonly(self) -> SSHKeyCommentPair[bytes]:
599
+        """Return a copy with read-only entries."""
600
+        return SSHKeyCommentPair(
601
+            key=bytes(self.key),
602
+            comment=bytes(self.comment),
603
+        )
604
+
594 605
 
595 606
 class SSH_AGENTC(enum.Enum):  # noqa: N801
596 607
     """SSH agent protocol numbers: client requests.
... ...
@@ -662,7 +673,7 @@ class SSH_AGENT(enum.Enum):  # noqa: N801
662 673
     """"""
663 674
 
664 675
 
665
-class StoreroomKeyPair(NamedTuple):
676
+class StoreroomKeyPair(NamedTuple, Generic[B]):
666 677
     """A pair of AES256 keys, one for encryption and one for signing.
667 678
 
668 679
     Attributes:
... ...
@@ -674,13 +685,20 @@ class StoreroomKeyPair(NamedTuple):
674 685
 
675 686
     """
676 687
 
677
-    encryption_key: bytes
688
+    encryption_key: B
678 689
     """"""
679
-    signing_key: bytes
690
+    signing_key: B
680 691
     """"""
681 692
 
693
+    def toreadonly(self) -> StoreroomKeyPair[bytes]:
694
+        """Return a copy with read-only entries."""
695
+        return StoreroomKeyPair(
696
+            encryption_key=bytes(self.encryption_key),
697
+            signing_key=bytes(self.signing_key),
698
+        )
699
+
682 700
 
683
-class StoreroomMasterKeys(NamedTuple):
701
+class StoreroomMasterKeys(NamedTuple, Generic[B]):
684 702
     """A triple of AES256 keys, for encryption, signing and hashing.
685 703
 
686 704
     Attributes:
... ...
@@ -695,9 +713,17 @@ class StoreroomMasterKeys(NamedTuple):
695 713
 
696 714
     """
697 715
 
698
-    hashing_key: bytes
716
+    hashing_key: B
699 717
     """"""
700
-    encryption_key: bytes
718
+    encryption_key: B
701 719
     """"""
702
-    signing_key: bytes
720
+    signing_key: B
703 721
     """"""
722
+
723
+    def toreadonly(self) -> StoreroomMasterKeys[bytes]:
724
+        """Return a copy with read-only entries."""
725
+        return StoreroomMasterKeys(
726
+            hashing_key=bytes(self.hashing_key),
727
+            encryption_key=bytes(self.encryption_key),
728
+            signing_key=bytes(self.signing_key),
729
+        )
... ...
@@ -147,7 +147,7 @@ def derive_master_keys_keys(
147 147
     return _types.StoreroomKeyPair(
148 148
         encryption_key=encryption_key,
149 149
         signing_key=signing_key,
150
-    )
150
+    ).toreadonly()
151 151
 
152 152
 
153 153
 def decrypt_master_keys_data(
... ...
@@ -201,6 +201,7 @@ def decrypt_master_keys_data(
201 201
 
202 202
     """
203 203
     data = memoryview(data).toreadonly().cast('c')
204
+    keys = keys.toreadonly()
204 205
     ciphertext, claimed_mac = struct.unpack(
205 206
         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
206 207
     )
... ...
@@ -241,7 +242,7 @@ def decrypt_master_keys_data(
241 242
         hashing_key=hashing_key,
242 243
         encryption_key=encryption_key,
243 244
         signing_key=signing_key,
244
-    )
245
+    ).toreadonly()
245 246
 
246 247
 
247 248
 def decrypt_session_keys(
... ...
@@ -293,6 +294,7 @@ def decrypt_session_keys(
293 294
 
294 295
     """
295 296
     data = memoryview(data).toreadonly().cast('c')
297
+    master_keys = master_keys.toreadonly()
296 298
     ciphertext, claimed_mac = struct.unpack(
297 299
         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
298 300
     )
... ...
@@ -333,7 +335,7 @@ def decrypt_session_keys(
333 335
     session_keys = _types.StoreroomKeyPair(
334 336
         encryption_key=session_encryption_key,
335 337
         signing_key=session_signing_key,
336
-    )
338
+    ).toreadonly()
337 339
 
338 340
     logger.debug(
339 341
         _msg.TranslatedString(
... ...
@@ -399,6 +401,7 @@ def decrypt_contents(
399 401
 
400 402
     """
401 403
     data = memoryview(data).toreadonly().cast('c')
404
+    session_keys = session_keys.toreadonly()
402 405
     ciphertext, claimed_mac = struct.unpack(
403 406
         f'{len(data) - MAC_SIZE}s {MAC_SIZE}s', data
404 407
     )
... ...
@@ -474,6 +477,7 @@ def decrypt_bucket_item(
474 477
 
475 478
     """
476 479
     bucket_item = memoryview(bucket_item).toreadonly().cast('c')
480
+    master_keys = master_keys.toreadonly()
477 481
     logger.debug(
478 482
         _msg.TranslatedString(
479 483
             _msg.DebugMsgTemplate.DECRYPT_BUCKET_ITEM_KEY_INFO,
... ...
@@ -532,6 +536,7 @@ def decrypt_bucket_file(
532 536
         removal.
533 537
 
534 538
     """
539
+    master_keys = master_keys.toreadonly()
535 540
     with open(
536 541
         os.path.join(os.fsdecode(root_dir), filename), 'rb'
537 542
     ) as bucket_file:
... ...
@@ -559,9 +559,10 @@ class TestAgentInteraction:
559 559
         with monkeypatch.context() as monkeypatch2:
560 560
             client = ssh_agent.SSHAgentClient()
561 561
             monkeypatch2.setattr(client, 'request', request)
562
-            SSHKeyCommentPair = _types.SSHKeyCommentPair  # noqa: N806
562
+            Pair = _types.SSHKeyCommentPair  # noqa: N806
563
+            com = b'no comment'
563 564
             loaded_keys = [
564
-                SSHKeyCommentPair(v.public_key_data, b'no comment')
565
+                Pair(v.public_key_data, com).toreadonly()
565 566
                 for v in tests.SUPPORTED_KEYS.values()
566 567
             ]
567 568
             monkeypatch2.setattr(client, 'list_keys', lambda: loaded_keys)
568 569