Move the tests for the stubbed SSH agent socket to the machinery tests
Marco Ricci

Marco Ricci commited on 2025-11-30 14:23:54
Zeige 3 geänderte Dateien mit 381 Einfügungen und 375 Löschungen.


Even though the stubbed SSH agent socket exists to test the
`derivepassphrase.ssh_agent` module, it is still a piece of testing
machinery at its core, and thus belongs in the same place as the other
testing machinery tests belong.  Additionally, although the "basic"
tests for the `derivepassphrase.ssh_agent` module are already a giant
mixed bag of non-heavy-duty tests that all involve functionality from
the `derivepassphrase.ssh_agent` module, the stubbed SSH agent socket
tests differ from the other tests insofar as the other tests focus on
establishing correctness of the `derivepassphrase.ssh_agent` machinery,
whereas the stubbed SSH agent socket tests focus on the stubbed SSH
agent socket.  This, too, suggests that the stubbed SSH agent socket
tests do not belong in the same test module as the
`derivepassphrase.ssh_agent` tests.
... ...
@@ -40,7 +40,7 @@ nav:
40 40
         - pytest configuration: reference/tests.conftest.md
41 41
         - Common testing data: reference/tests.data.md
42 42
         - Common testing machinery: reference/tests.machinery.md
43
-        - Miscellaneous test machinery tests: reference/tests.test_000_testing_machinery.md
43
+        - Tests for the testing machinery: reference/tests.test_000_testing_machinery.md
44 44
       - Localization machinery: reference/tests.test_l10n.md
45 45
       - derivepassphrase command-line:
46 46
         - cli module, helpers and machinery:
... ...
@@ -2,24 +2,33 @@
2 2
 #
3 3
 # SPDX-License-Identifier: Zlib
4 4
 
5
-"""Miscellaneous tests for the test suite's data and machinery.
5
+"""Tests for the test suite's data and machinery.
6 6
 
7
-At the moment, this only entails testing [the SSH test
8
-keys][data.ALL_KEYS] for internal consistency.  Tests for the [stubbed
9
-SSH agent][tests.machinery.StubbedSSHAgentSocket] (in all variations)
10
-are included in the [basic tests for the `derivepassphrase.ssh_agent`
11
-module][tests.test_derivepassphrase_ssh_agent.test_000_basic].
7
+Currently, this entails testing [the SSH test keys][data.ALL_KEYS]
8
+for internal consistency, and testing the functionality of the [stubbed
9
+SSH agent][tests.machinery.StubbedSSHAgentSocket], in all variations.
12 10
 
13 11
 """
14 12
 
15 13
 from __future__ import annotations
16 14
 
17 15
 import base64
16
+import contextlib
17
+import errno
18
+import os
19
+import pathlib
20
+import re
21
+from typing import TYPE_CHECKING
18 22
 
19 23
 import pytest
20 24
 
21
-from derivepassphrase import ssh_agent
22
-from tests import data
25
+from derivepassphrase import _types, ssh_agent, vault
26
+from tests import data, machinery
27
+
28
+if TYPE_CHECKING:
29
+    from collections.abc import Iterator
30
+
31
+    from typing_extensions import Buffer
23 32
 
24 33
 OPENSSH_MAGIC = b"openssh-key-v1\x00"
25 34
 OPENSSH_HEADER = (
... ...
@@ -148,11 +157,111 @@ def minimize_openssh_keyfile_padding(
148 157
 class Parametrize:
149 158
     """Common test parametrizations."""
150 159
 
160
+    STUBBED_AGENT_ADDRESSES = pytest.mark.parametrize(
161
+        ["address", "exception", "match"],
162
+        [
163
+            pytest.param(None, KeyError, "SSH_AUTH_SOCK", id="unset"),
164
+            pytest.param("stub-ssh-agent:", None, "", id="standard"),
165
+            pytest.param(
166
+                str(pathlib.Path("~").expanduser()),
167
+                FileNotFoundError,
168
+                os.strerror(errno.ENOENT),
169
+                id="invalid-url",
170
+            ),
171
+            pytest.param(
172
+                "stub-ssh-agent:EPROTONOSUPPORT",
173
+                OSError,
174
+                os.strerror(errno.EPROTONOSUPPORT),
175
+                id="protocol-not-supported",
176
+            ),
177
+            pytest.param(
178
+                "stub-ssh-agent:ABCDEFGHIJKLMNOPQRSTUVWXYZ",
179
+                OSError,
180
+                os.strerror(errno.EINVAL),
181
+                id="invalid-error-code",
182
+            ),
183
+        ],
184
+    )
151 185
     TEST_KEYS = pytest.mark.parametrize(
152 186
         ["keyname", "key"],
153 187
         data.ALL_KEYS.items(),
154 188
         ids=data.ALL_KEYS.keys(),
155 189
     )
190
+    INVALID_SSH_AGENT_MESSAGES = pytest.mark.parametrize(
191
+        "message",
192
+        [
193
+            pytest.param(b"\x00\x00\x00\x00", id="empty-message"),
194
+            pytest.param(b"\x00\x00\x00\x0f\x0d", id="truncated-message"),
195
+            pytest.param(
196
+                b"\x00\x00\x00\x06\x1b\x00\x00\x00\x01\xff",
197
+                id="invalid-extension-name",
198
+            ),
199
+            pytest.param(
200
+                b"\x00\x00\x00\x11\x0d\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
201
+                id="sign-with-trailing-data",
202
+            ),
203
+        ],
204
+    )
205
+    UNSUPPORTED_SSH_AGENT_MESSAGES = pytest.mark.parametrize(
206
+        "message",
207
+        [
208
+            pytest.param(
209
+                ssh_agent.SSHAgentClient.string(
210
+                    b"".join([
211
+                        b"\x0d",
212
+                        ssh_agent.SSHAgentClient.string(
213
+                            data.ALL_KEYS["rsa"].public_key_data
214
+                        ),
215
+                        ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
216
+                        b"\x00\x00\x00\x02",
217
+                    ])
218
+                ),
219
+                id="sign-with-flags",
220
+            ),
221
+            pytest.param(
222
+                ssh_agent.SSHAgentClient.string(
223
+                    b"".join([
224
+                        b"\x0d",
225
+                        ssh_agent.SSHAgentClient.string(
226
+                            data.ALL_KEYS["ed25519"].public_key_data
227
+                        ),
228
+                        b"\x00\x00\x00\x08\x00\x01\x02\x03\x04\x05\x06\x07",
229
+                        b"\x00\x00\x00\x00",
230
+                    ])
231
+                ),
232
+                id="sign-with-nonstandard-passphrase",
233
+            ),
234
+            pytest.param(
235
+                ssh_agent.SSHAgentClient.string(
236
+                    b"".join([
237
+                        b"\x0d",
238
+                        ssh_agent.SSHAgentClient.string(
239
+                            data.ALL_KEYS["dsa1024"].public_key_data
240
+                        ),
241
+                        ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
242
+                        b"\x00\x00\x00\x00",
243
+                    ])
244
+                ),
245
+                id="sign-key-no-expected-signature",
246
+            ),
247
+            pytest.param(
248
+                ssh_agent.SSHAgentClient.string(
249
+                    b"".join([
250
+                        b"\x0d",
251
+                        b"\x00\x00\x00\x00",
252
+                        ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
253
+                        b"\x00\x00\x00\x00",
254
+                    ])
255
+                ),
256
+                id="sign-key-unregistered-test-key",
257
+            ),
258
+        ],
259
+    )
260
+    SUPPORTED_SSH_TEST_KEYS = pytest.mark.parametrize(
261
+        ["ssh_test_key_type", "ssh_test_key"],
262
+        list(data.SUPPORTED_KEYS.items()),
263
+        ids=data.SUPPORTED_KEYS.keys(),
264
+    )
156 265
 
157 266
 
158 267
 class TestTestKeys:
... ...
@@ -251,3 +360,265 @@ class TestTestKeys:
251 360
                 checkint=checkint,
252 361
             )
253 362
         )
363
+
364
+
365
+class TestStubbedSSHAgentSocketRequests:
366
+    """Test the stubbed SSH agent socket: normal requests."""
367
+
368
+    @contextlib.contextmanager
369
+    def _get_addressed_agent(
370
+        self, *, extended_agent: bool = False
371
+    ) -> Iterator[machinery.StubbedSSHAgentSocketWithAddress]:
372
+        agent_class: type[machinery.StubbedSSHAgentSocketWithAddress] = (
373
+            machinery.StubbedSSHAgentSocketWithAddressAndDeterministicDSA
374
+            if extended_agent
375
+            else machinery.StubbedSSHAgentSocketWithAddress
376
+        )
377
+        with contextlib.ExitStack() as stack:
378
+            monkeypatch = stack.enter_context(pytest.MonkeyPatch.context())
379
+            monkeypatch.setenv("SSH_AUTH_SOCK", agent_class.ADDRESS)
380
+            agent = stack.enter_context(agent_class())
381
+            yield agent
382
+
383
+    def test_query_extensions_base(self) -> None:
384
+        """The base agent implements no extensions."""
385
+        with self._get_addressed_agent(extended_agent=False) as agent:
386
+            assert "query" not in agent.enabled_extensions
387
+            query_request = (
388
+                # SSH string header
389
+                b"\x00\x00\x00\x0a"
390
+                # request code: SSH_AGENTC_EXTENSION
391
+                b"\x1b"
392
+                # payload: SSH string "query"
393
+                b"\x00\x00\x00\x05query"
394
+            )
395
+            query_response = (
396
+                # SSH string header
397
+                b"\x00\x00\x00\x01"
398
+                # response code: SSH_AGENT_FAILURE
399
+                b"\x05"
400
+            )
401
+            agent.sendall(query_request)
402
+            assert agent.recv(1000) == query_response
403
+
404
+    def test_query_extensions_extended(self) -> None:
405
+        """The extended agent implements a known list of extensions."""
406
+        with self._get_addressed_agent(extended_agent=True) as agent:
407
+            assert "query" in agent.enabled_extensions
408
+            query_request = (
409
+                # SSH string header
410
+                b"\x00\x00\x00\x0a"
411
+                # request code: SSH_AGENTC_EXTENSION
412
+                b"\x1b"
413
+                # payload: SSH string "query"
414
+                b"\x00\x00\x00\x05query"
415
+            )
416
+            query_response = (
417
+                # SSH string header
418
+                b"\x00\x00\x00\x40"
419
+                # response code: SSH_AGENT_EXTENSION_RESPONSE
420
+                b"\x1d"
421
+                # extension response: extension type ("query")
422
+                b"\x00\x00\x00\x05query"
423
+                # supported extension #1: query
424
+                b"\x00\x00\x00\x05query"
425
+                # supported extension #2:
426
+                # list-extended@putty.projects.tartarus.org
427
+                b"\x00\x00\x00\x29list-extended@putty.projects.tartarus.org"
428
+            )
429
+            agent.sendall(query_request)
430
+            assert agent.recv(1000) == query_response
431
+
432
+    def test_request_identities(self) -> None:
433
+        """The agent implements a known list of identities."""
434
+        unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix
435
+        with machinery.StubbedSSHAgentSocket() as agent:
436
+            query_request = (
437
+                # SSH string header
438
+                b"\x00\x00\x00\x01"
439
+                # request code: SSH_AGENTC_REQUEST_IDENTITIES
440
+                b"\x0b"
441
+            )
442
+            agent.sendall(query_request)
443
+            message_length = int.from_bytes(agent.recv(4), "big")
444
+            orig_message: bytes | bytearray = bytearray(
445
+                agent.recv(message_length)
446
+            )
447
+            assert (
448
+                _types.SSH_AGENT(orig_message[0])
449
+                == _types.SSH_AGENT.IDENTITIES_ANSWER
450
+            )
451
+            identity_count = int.from_bytes(orig_message[1:5], "big")
452
+            message = bytes(orig_message[5:])
453
+            for _ in range(identity_count):
454
+                key, message = unstring_prefix(message)
455
+                _comment, message = unstring_prefix(message)
456
+                assert key
457
+                assert key in {
458
+                    k.public_key_data for k in data.ALL_KEYS.values()
459
+                }
460
+            assert not message
461
+
462
+    def test_request_identities_extended(self) -> None:
463
+        """The extended agent implements PuTTY's `list-extended` extension."""
464
+        unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix
465
+        with self._get_addressed_agent(extended_agent=True) as agent:
466
+            extension_request = (
467
+                # SSH string header
468
+                b"\x00\x00\x00\x2e"
469
+                # request code: SSH_AGENTC_REQUEST_IDENTITIES
470
+                b"\x1b"
471
+                # extension type: list-extended@putty.projects.tartarus.org
472
+                b"\x00\x00\x00\x29list-extended@putty.projects.tartarus.org"
473
+                # (no payload)
474
+            )
475
+            agent.sendall(extension_request)
476
+            message_length = int.from_bytes(agent.recv(4), "big")
477
+            orig_message: bytes | bytearray = bytearray(
478
+                agent.recv(message_length)
479
+            )
480
+            assert (
481
+                _types.SSH_AGENT(orig_message[0]) == _types.SSH_AGENT.SUCCESS
482
+            )
483
+            identity_count = int.from_bytes(orig_message[1:5], "big")
484
+            message = bytes(orig_message[5:])
485
+            for _ in range(identity_count):
486
+                key, message = unstring_prefix(message)
487
+                _comment, message = unstring_prefix(message)
488
+                flags, message = unstring_prefix(message)
489
+                assert flags == b"\x00\x00\x00\x00"
490
+                assert key
491
+                assert key in {
492
+                    k.public_key_data for k in data.ALL_KEYS.values()
493
+                }
494
+            assert not message
495
+
496
+    @Parametrize.SUPPORTED_SSH_TEST_KEYS
497
+    def test_sign(
498
+        self,
499
+        ssh_test_key_type: str,
500
+        ssh_test_key: data.SSHTestKey,
501
+    ) -> None:
502
+        """The agent signs known key/message pairs."""
503
+        del ssh_test_key_type
504
+        spec = data.SSHTestKeyDeterministicSignatureClass.SPEC
505
+        assert ssh_test_key.expected_signatures[spec].signature is not None
506
+        string = ssh_agent.SSHAgentClient.string
507
+        query_request = string(
508
+            # request code: SSH_AGENTC_SIGN_REQUEST
509
+            b"\x0d"
510
+            # key: SSH string of the public key
511
+            + string(ssh_test_key.public_key_data)
512
+            # payload: SSH string of the vault UUID
513
+            + string(vault.Vault.UUID)
514
+            # signing flags (uint32, empty)
515
+            + b"\x00\x00\x00\x00"
516
+        )
517
+        query_response = string(
518
+            # response code: SSH_AGENT_SIGN_RESPONSE
519
+            b"\x0e"
520
+            # expected payload: the binary signature as recorded in the test key data structure
521
+            + string(ssh_test_key.expected_signatures[spec].signature)
522
+        )
523
+        with machinery.StubbedSSHAgentSocket() as agent:
524
+            agent.sendall(query_request)
525
+            assert agent.recv(1000) == query_response
526
+
527
+
528
+class TestStubbedSSHAgentSocketProperOperations:
529
+    """Test the stubbed SSH agent socket: proper use and misuse."""
530
+
531
+    def test_close_multiple(self) -> None:
532
+        """The agent can be closed repeatedly."""
533
+        with machinery.StubbedSSHAgentSocket() as agent:
534
+            pass
535
+        with machinery.StubbedSSHAgentSocket() as agent:
536
+            pass
537
+        del agent
538
+
539
+    def test_closed_agents_cannot_be_interacted_with(self) -> None:
540
+        """The agent can be closed repeatedly."""
541
+        with machinery.StubbedSSHAgentSocket() as agent:
542
+            pass
543
+        query_request = (
544
+            # SSH string header
545
+            b"\x00\x00\x00\x0a"
546
+            # request code: SSH_AGENTC_EXTENSION
547
+            b"\x1b"
548
+            # payload: SSH string "query"
549
+            b"\x00\x00\x00\x05query"
550
+        )
551
+        query_response = b""
552
+        with pytest.raises(
553
+            ValueError,
554
+            match=re.escape(machinery.StubbedSSHAgentSocket._SOCKET_IS_CLOSED),
555
+        ):
556
+            agent.sendall(query_request)
557
+        assert agent.recv(100) == query_response
558
+
559
+    def test_no_recv_without_sendall(self) -> None:
560
+        """The agent requires a message before sending a response."""
561
+        with machinery.StubbedSSHAgentSocket() as agent:  # noqa: SIM117
562
+            with pytest.raises(
563
+                AssertionError,
564
+                match=re.escape(
565
+                    machinery.StubbedSSHAgentSocket._PROTOCOL_VIOLATION
566
+                ),
567
+            ):
568
+                agent.recv(100)
569
+
570
+    @Parametrize.INVALID_SSH_AGENT_MESSAGES
571
+    def test_invalid_ssh_agent_messages(
572
+        self,
573
+        message: Buffer,
574
+    ) -> None:
575
+        """The agent responds with errors on invalid messages."""
576
+        query_response = (
577
+            # SSH string header
578
+            b"\x00\x00\x00\x01"
579
+            # response code: SSH_AGENT_FAILURE
580
+            b"\x05"
581
+        )
582
+        with machinery.StubbedSSHAgentSocket() as agent:
583
+            agent.sendall(message)
584
+            assert agent.recv(100) == query_response
585
+
586
+
587
+class TestStubbedSSHAgentSocketSupportedAndUnsupportedFeatures:
588
+    """Test the stubbed SSH agent socket: supported/unsupported features."""
589
+
590
+    @Parametrize.UNSUPPORTED_SSH_AGENT_MESSAGES
591
+    def test_unsupported_ssh_agent_messages(
592
+        self,
593
+        message: Buffer,
594
+    ) -> None:
595
+        """The agent responds with errors on unsupported messages."""
596
+        query_response = (
597
+            # SSH string header
598
+            b"\x00\x00\x00\x01"
599
+            # response code: SSH_AGENT_FAILURE
600
+            b"\x05"
601
+        )
602
+        with machinery.StubbedSSHAgentSocket() as agent:
603
+            agent.sendall(message)
604
+            assert agent.recv(100) == query_response
605
+
606
+    @Parametrize.STUBBED_AGENT_ADDRESSES
607
+    def test_addresses(
608
+        self,
609
+        address: str | None,
610
+        exception: type[Exception] | None,
611
+        match: str,
612
+    ) -> None:
613
+        """The agent accepts addresses."""
614
+        with contextlib.ExitStack() as stack:
615
+            monkeypatch = stack.enter_context(pytest.MonkeyPatch.context())
616
+            if address:
617
+                monkeypatch.setenv("SSH_AUTH_SOCK", address)
618
+            else:
619
+                monkeypatch.delenv("SSH_AUTH_SOCK", raising=False)
620
+            if exception:
621
+                stack.enter_context(
622
+                    pytest.raises(exception, match=re.escape(match))
623
+                )
624
+            machinery.StubbedSSHAgentSocketWithAddress()
... ...
@@ -8,11 +8,8 @@ from __future__ import annotations
8 8
 
9 9
 import base64
10 10
 import contextlib
11
-import errno
12 11
 import importlib.metadata
13 12
 import io
14
-import os
15
-import pathlib
16 13
 import re
17 14
 import socket
18 15
 import sys
... ...
@@ -35,7 +32,7 @@ from tests.machinery import pytest as pytest_machinery
35 32
 if TYPE_CHECKING:
36 33
     from collections.abc import Iterable, Iterator, Mapping
37 34
 
38
-    from typing_extensions import Any, Buffer, Literal
35
+    from typing_extensions import Any, Literal
39 36
 
40 37
 if sys.version_info < (3, 11):
41 38
     from exceptiongroup import ExceptionGroup
... ...
@@ -112,31 +109,6 @@ class Parametrize(types.SimpleNamespace):
112 109
             ),
113 110
         ],
114 111
     )
115
-    STUBBED_AGENT_ADDRESSES = pytest.mark.parametrize(
116
-        ["address", "exception", "match"],
117
-        [
118
-            pytest.param(None, KeyError, "SSH_AUTH_SOCK", id="unset"),
119
-            pytest.param("stub-ssh-agent:", None, "", id="standard"),
120
-            pytest.param(
121
-                str(pathlib.Path("~").expanduser()),
122
-                FileNotFoundError,
123
-                os.strerror(errno.ENOENT),
124
-                id="invalid-url",
125
-            ),
126
-            pytest.param(
127
-                "stub-ssh-agent:EPROTONOSUPPORT",
128
-                OSError,
129
-                os.strerror(errno.EPROTONOSUPPORT),
130
-                id="protocol-not-supported",
131
-            ),
132
-            pytest.param(
133
-                "stub-ssh-agent:ABCDEFGHIJKLMNOPQRSTUVWXYZ",
134
-                OSError,
135
-                os.strerror(errno.EINVAL),
136
-                id="invalid-error-code",
137
-            ),
138
-        ],
139
-    )
140 112
     EXISTING_REGISTRY_ENTRIES = pytest.mark.parametrize(
141 113
         "existing", ["posix", "the_annoying_os"]
142 114
     )
... ...
@@ -345,76 +317,6 @@ class Parametrize(types.SimpleNamespace):
345 317
             ),
346 318
         ],
347 319
     )
348
-    INVALID_SSH_AGENT_MESSAGES = pytest.mark.parametrize(
349
-        "message",
350
-        [
351
-            pytest.param(b"\x00\x00\x00\x00", id="empty-message"),
352
-            pytest.param(b"\x00\x00\x00\x0f\x0d", id="truncated-message"),
353
-            pytest.param(
354
-                b"\x00\x00\x00\x06\x1b\x00\x00\x00\x01\xff",
355
-                id="invalid-extension-name",
356
-            ),
357
-            pytest.param(
358
-                b"\x00\x00\x00\x11\x0d\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
359
-                id="sign-with-trailing-data",
360
-            ),
361
-        ],
362
-    )
363
-    UNSUPPORTED_SSH_AGENT_MESSAGES = pytest.mark.parametrize(
364
-        "message",
365
-        [
366
-            pytest.param(
367
-                ssh_agent.SSHAgentClient.string(
368
-                    b"".join([
369
-                        b"\x0d",
370
-                        ssh_agent.SSHAgentClient.string(
371
-                            data.ALL_KEYS["rsa"].public_key_data
372
-                        ),
373
-                        ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
374
-                        b"\x00\x00\x00\x02",
375
-                    ])
376
-                ),
377
-                id="sign-with-flags",
378
-            ),
379
-            pytest.param(
380
-                ssh_agent.SSHAgentClient.string(
381
-                    b"".join([
382
-                        b"\x0d",
383
-                        ssh_agent.SSHAgentClient.string(
384
-                            data.ALL_KEYS["ed25519"].public_key_data
385
-                        ),
386
-                        b"\x00\x00\x00\x08\x00\x01\x02\x03\x04\x05\x06\x07",
387
-                        b"\x00\x00\x00\x00",
388
-                    ])
389
-                ),
390
-                id="sign-with-nonstandard-passphrase",
391
-            ),
392
-            pytest.param(
393
-                ssh_agent.SSHAgentClient.string(
394
-                    b"".join([
395
-                        b"\x0d",
396
-                        ssh_agent.SSHAgentClient.string(
397
-                            data.ALL_KEYS["dsa1024"].public_key_data
398
-                        ),
399
-                        ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
400
-                        b"\x00\x00\x00\x00",
401
-                    ])
402
-                ),
403
-                id="sign-key-no-expected-signature",
404
-            ),
405
-            pytest.param(
406
-                ssh_agent.SSHAgentClient.string(
407
-                    b"".join([
408
-                        b"\x0d",
409
-                        b"\x00\x00\x00\x00",
410
-                        ssh_agent.SSHAgentClient.string(vault.Vault.UUID),
411
-                        b"\x00\x00\x00\x00",
412
-                    ])
413
-                ),
414
-                id="sign-key-unregistered-test-key",
415
-            ),
416
-        ],
417
-    )
418 320
     PUBLIC_KEY_DATA = pytest.mark.parametrize(
419 321
         "public_key_struct",
420 322
         list(data.SUPPORTED_KEYS.values()),
... ...
@@ -484,11 +386,6 @@ class Parametrize(types.SimpleNamespace):
484 386
             ),
485 387
         ],
486 388
     )
487
-    SUPPORTED_SSH_TEST_KEYS = pytest.mark.parametrize(
488
-        ["ssh_test_key_type", "ssh_test_key"],
489
-        list(data.SUPPORTED_KEYS.items()),
490
-        ids=data.SUPPORTED_KEYS.keys(),
491
-    )
492 389
     ALL_SSH_TEST_KEYS = pytest.mark.parametrize(
493 390
         ["ssh_test_key_type", "ssh_test_key"],
494 391
         list(data.ALL_KEYS.items()),
... ...
@@ -507,268 +404,6 @@ class Parametrize(types.SimpleNamespace):
507 404
     )
508 405
 
509 406
 
510
-class TestStubbedSSHAgentSocketRequests:
511
-    """Test the stubbed SSH agent socket: normal requests."""
512
-
513
-    @contextlib.contextmanager
514
-    def _get_addressed_agent(
515
-        self, *, extended_agent: bool = False
516
-    ) -> Iterator[machinery.StubbedSSHAgentSocketWithAddress]:
517
-        agent_class: type[machinery.StubbedSSHAgentSocketWithAddress] = (
518
-            machinery.StubbedSSHAgentSocketWithAddressAndDeterministicDSA
519
-            if extended_agent
520
-            else machinery.StubbedSSHAgentSocketWithAddress
521
-        )
522
-        with contextlib.ExitStack() as stack:
523
-            monkeypatch = stack.enter_context(pytest.MonkeyPatch.context())
524
-            monkeypatch.setenv("SSH_AUTH_SOCK", agent_class.ADDRESS)
525
-            agent = stack.enter_context(agent_class())
526
-            yield agent
527
-
528
-    def test_query_extensions_base(self) -> None:
529
-        """The base agent implements no extensions."""
530
-        with self._get_addressed_agent(extended_agent=False) as agent:
531
-            assert "query" not in agent.enabled_extensions
532
-            query_request = (
533
-                # SSH string header
534
-                b"\x00\x00\x00\x0a"
535
-                # request code: SSH_AGENTC_EXTENSION
536
-                b"\x1b"
537
-                # payload: SSH string "query"
538
-                b"\x00\x00\x00\x05query"
539
-            )
540
-            query_response = (
541
-                # SSH string header
542
-                b"\x00\x00\x00\x01"
543
-                # response code: SSH_AGENT_FAILURE
544
-                b"\x05"
545
-            )
546
-            agent.sendall(query_request)
547
-            assert agent.recv(1000) == query_response
548
-
549
-    def test_query_extensions_extended(self) -> None:
550
-        """The extended agent implements a known list of extensions."""
551
-        with self._get_addressed_agent(extended_agent=True) as agent:
552
-            assert "query" in agent.enabled_extensions
553
-            query_request = (
554
-                # SSH string header
555
-                b"\x00\x00\x00\x0a"
556
-                # request code: SSH_AGENTC_EXTENSION
557
-                b"\x1b"
558
-                # payload: SSH string "query"
559
-                b"\x00\x00\x00\x05query"
560
-            )
561
-            query_response = (
562
-                # SSH string header
563
-                b"\x00\x00\x00\x40"
564
-                # response code: SSH_AGENT_EXTENSION_RESPONSE
565
-                b"\x1d"
566
-                # extension response: extension type ("query")
567
-                b"\x00\x00\x00\x05query"
568
-                # supported extension #1: query
569
-                b"\x00\x00\x00\x05query"
570
-                # supported extension #2:
571
-                # list-extended@putty.projects.tartarus.org
572
-                b"\x00\x00\x00\x29list-extended@putty.projects.tartarus.org"
573
-            )
574
-            agent.sendall(query_request)
575
-            assert agent.recv(1000) == query_response
576
-
577
-    def test_request_identities(self) -> None:
578
-        """The agent implements a known list of identities."""
579
-        unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix
580
-        with machinery.StubbedSSHAgentSocket() as agent:
581
-            query_request = (
582
-                # SSH string header
583
-                b"\x00\x00\x00\x01"
584
-                # request code: SSH_AGENTC_REQUEST_IDENTITIES
585
-                b"\x0b"
586
-            )
587
-            agent.sendall(query_request)
588
-            message_length = int.from_bytes(agent.recv(4), "big")
589
-            orig_message: bytes | bytearray = bytearray(
590
-                agent.recv(message_length)
591
-            )
592
-            assert (
593
-                _types.SSH_AGENT(orig_message[0])
594
-                == _types.SSH_AGENT.IDENTITIES_ANSWER
595
-            )
596
-            identity_count = int.from_bytes(orig_message[1:5], "big")
597
-            message = bytes(orig_message[5:])
598
-            for _ in range(identity_count):
599
-                key, message = unstring_prefix(message)
600
-                _comment, message = unstring_prefix(message)
601
-                assert key
602
-                assert key in {
603
-                    k.public_key_data for k in data.ALL_KEYS.values()
604
-                }
605
-            assert not message
606
-
607
-    def test_request_identities_extended(self) -> None:
608
-        """The extended agent implements PuTTY's `list-extended` extension."""
609
-        unstring_prefix = ssh_agent.SSHAgentClient.unstring_prefix
610
-        with self._get_addressed_agent(extended_agent=True) as agent:
611
-            extension_request = (
612
-                # SSH string header
613
-                b"\x00\x00\x00\x2e"
614
-                # request code: SSH_AGENTC_REQUEST_IDENTITIES
615
-                b"\x1b"
616
-                # extension type: list-extended@putty.projects.tartarus.org
617
-                b"\x00\x00\x00\x29list-extended@putty.projects.tartarus.org"
618
-                # (no payload)
619
-            )
620
-            agent.sendall(extension_request)
621
-            message_length = int.from_bytes(agent.recv(4), "big")
622
-            orig_message: bytes | bytearray = bytearray(
623
-                agent.recv(message_length)
624
-            )
625
-            assert (
626
-                _types.SSH_AGENT(orig_message[0]) == _types.SSH_AGENT.SUCCESS
627
-            )
628
-            identity_count = int.from_bytes(orig_message[1:5], "big")
629
-            message = bytes(orig_message[5:])
630
-            for _ in range(identity_count):
631
-                key, message = unstring_prefix(message)
632
-                _comment, message = unstring_prefix(message)
633
-                flags, message = unstring_prefix(message)
634
-                assert flags == b"\x00\x00\x00\x00"
635
-                assert key
636
-                assert key in {
637
-                    k.public_key_data for k in data.ALL_KEYS.values()
638
-                }
639
-            assert not message
640
-
641
-    @Parametrize.SUPPORTED_SSH_TEST_KEYS
642
-    def test_sign(
643
-        self,
644
-        ssh_test_key_type: str,
645
-        ssh_test_key: data.SSHTestKey,
646
-    ) -> None:
647
-        """The agent signs known key/message pairs."""
648
-        del ssh_test_key_type
649
-        spec = data.SSHTestKeyDeterministicSignatureClass.SPEC
650
-        assert ssh_test_key.expected_signatures[spec].signature is not None
651
-        string = ssh_agent.SSHAgentClient.string
652
-        query_request = string(
653
-            # request code: SSH_AGENTC_SIGN_REQUEST
654
-            b"\x0d"
655
-            # key: SSH string of the public key
656
-            + string(ssh_test_key.public_key_data)
657
-            # payload: SSH string of the vault UUID
658
-            + string(vault.Vault.UUID)
659
-            # signing flags (uint32, empty)
660
-            + b"\x00\x00\x00\x00"
661
-        )
662
-        query_response = string(
663
-            # response code: SSH_AGENT_SIGN_RESPONSE
664
-            b"\x0e"
665
-            # expected payload: the binary signature as recorded in the test key data structure
666
-            + string(ssh_test_key.expected_signatures[spec].signature)
667
-        )
668
-        with machinery.StubbedSSHAgentSocket() as agent:
669
-            agent.sendall(query_request)
670
-            assert agent.recv(1000) == query_response
671
-
672
-
673
-class TestStubbedSSHAgentSocketProperOperations:
674
-    """Test the stubbed SSH agent socket: proper use and misuse."""
675
-
676
-    def test_close_multiple(self) -> None:
677
-        """The agent can be closed repeatedly."""
678
-        with machinery.StubbedSSHAgentSocket() as agent:
679
-            pass
680
-        with machinery.StubbedSSHAgentSocket() as agent:
681
-            pass
682
-        del agent
683
-
684
-    def test_closed_agents_cannot_be_interacted_with(self) -> None:
685
-        """The agent can be closed repeatedly."""
686
-        with machinery.StubbedSSHAgentSocket() as agent:
687
-            pass
688
-        query_request = (
689
-            # SSH string header
690
-            b"\x00\x00\x00\x0a"
691
-            # request code: SSH_AGENTC_EXTENSION
692
-            b"\x1b"
693
-            # payload: SSH string "query"
694
-            b"\x00\x00\x00\x05query"
695
-        )
696
-        query_response = b""
697
-        with pytest.raises(
698
-            ValueError,
699
-            match=re.escape(machinery.StubbedSSHAgentSocket._SOCKET_IS_CLOSED),
700
-        ):
701
-            agent.sendall(query_request)
702
-        assert agent.recv(100) == query_response
703
-
704
-    def test_no_recv_without_sendall(self) -> None:
705
-        """The agent requires a message before sending a response."""
706
-        with machinery.StubbedSSHAgentSocket() as agent:  # noqa: SIM117
707
-            with pytest.raises(
708
-                AssertionError,
709
-                match=re.escape(
710
-                    machinery.StubbedSSHAgentSocket._PROTOCOL_VIOLATION
711
-                ),
712
-            ):
713
-                agent.recv(100)
714
-
715
-    @Parametrize.INVALID_SSH_AGENT_MESSAGES
716
-    def test_invalid_ssh_agent_messages(
717
-        self,
718
-        message: Buffer,
719
-    ) -> None:
720
-        """The agent responds with errors on invalid messages."""
721
-        query_response = (
722
-            # SSH string header
723
-            b"\x00\x00\x00\x01"
724
-            # response code: SSH_AGENT_FAILURE
725
-            b"\x05"
726
-        )
727
-        with machinery.StubbedSSHAgentSocket() as agent:
728
-            agent.sendall(message)
729
-            assert agent.recv(100) == query_response
730
-
731
-
732
-class TestStubbedSSHAgentSocketSupportedAndUnsupportedFeatures:
733
-    """Test the stubbed SSH agent socket: supported/unsupported features."""
734
-
735
-    @Parametrize.UNSUPPORTED_SSH_AGENT_MESSAGES
736
-    def test_unsupported_ssh_agent_messages(
737
-        self,
738
-        message: Buffer,
739
-    ) -> None:
740
-        """The agent responds with errors on unsupported messages."""
741
-        query_response = (
742
-            # SSH string header
743
-            b"\x00\x00\x00\x01"
744
-            # response code: SSH_AGENT_FAILURE
745
-            b"\x05"
746
-        )
747
-        with machinery.StubbedSSHAgentSocket() as agent:
748
-            agent.sendall(message)
749
-            assert agent.recv(100) == query_response
750
-
751
-    @Parametrize.STUBBED_AGENT_ADDRESSES
752
-    def test_addresses(
753
-        self,
754
-        address: str | None,
755
-        exception: type[Exception] | None,
756
-        match: str,
757
-    ) -> None:
758
-        """The agent accepts addresses."""
759
-        with contextlib.ExitStack() as stack:
760
-            monkeypatch = stack.enter_context(pytest.MonkeyPatch.context())
761
-            if address:
762
-                monkeypatch.setenv("SSH_AUTH_SOCK", address)
763
-            else:
764
-                monkeypatch.delenv("SSH_AUTH_SOCK", raising=False)
765
-            if exception:
766
-                stack.enter_context(
767
-                    pytest.raises(exception, match=re.escape(match))
768
-                )
769
-            machinery.StubbedSSHAgentSocketWithAddress()
770
-
771
-
772 407
 class TestStaticFunctionality:
773 408
     """Test the static functionality of the `ssh_agent` module."""
774 409
 
775 410