Marco Ricci commited on 2024-09-22 16:37:58
Zeige 4 geänderte Dateien mit 760 Einfügungen und 68 Löschungen.
Include pytest fixtures to spawn known SSH agents and interface with the running SSH agent, in an agent-agnostic way. Move the key loading parts from the test functions into the test fixtures. This generally makes the actual test functions somewhat cleaner and easier to read, but because the monkeypatch fixture interferes with these new fixtures, the net improvements to readibility are only moderate. The test functions do however profit directly from reduced copy-and-paste in the key loading part. Pageant is one of the supported agents, and it behaves markedly differently than OpenSSH's agent. In particular, Pageant does not support adding keys with constraints or re-submitting a key it is already holding, and *every* key type Pageant currently offers yields a deterministic signature. Furthermore, a bug concerning output buffering in Pageant 0.81 and lower currently makes it impossible to use Pageant as a subprocess properly without correctly guessing the socket address. (This has already been reported upstream.) On the other hand, Pageant supports ed448 keys, which OpenSSH doesn't. So, implementing support for Pageant was very valuable to highlight areas where the code made unreasonable assumptions about SSH agent behavior, in particular the availability and behavior of the system SSH agent service. The new fixtures live in `tests/conftest.py`, following a relevant pytest convention. The fixtures themselves are necessarily platform- and runtime-dependent, so even though they are test code that should be included in test coverage, all parts dealing with querying the system, spawning programs, error handling related to the former and ensuring a certain functionality is available (or skipping the test otherwise) are excluded from test coverage. In particular, this includes the entire fixture to ensure a running agent, and the cleanup part of the agent with loaded keys fixture; I have however tried various constellations by hand to ensure the code works if certain agents are available or unavailable.
... | ... |
@@ -6,9 +6,11 @@ from __future__ import annotations |
6 | 6 |
|
7 | 7 |
import base64 |
8 | 8 |
import contextlib |
9 |
+import enum |
|
9 | 10 |
import importlib.util |
10 | 11 |
import json |
11 | 12 |
import os |
13 |
+import shlex |
|
12 | 14 |
import stat |
13 | 15 |
import tempfile |
14 | 16 |
import zipfile |
... | ... |
@@ -17,7 +19,7 @@ from typing import TYPE_CHECKING |
17 | 19 |
import pytest |
18 | 20 |
from typing_extensions import NamedTuple, Self, assert_never |
19 | 21 |
|
20 |
-from derivepassphrase import _types, cli |
|
22 |
+from derivepassphrase import _types, cli, ssh_agent |
|
21 | 23 |
|
22 | 24 |
__all__ = () |
23 | 25 |
|
... | ... |
@@ -36,6 +38,18 @@ if TYPE_CHECKING: |
36 | 38 |
derived_passphrase: bytes | str | None |
37 | 39 |
|
38 | 40 |
|
41 |
+class KnownSSHAgent(str, enum.Enum): |
|
42 |
+ UNKNOWN: str = '(unknown)' |
|
43 |
+ Pageant: str = 'Pageant' |
|
44 |
+ OpenSSHAgent: str = 'OpenSSHAgent' |
|
45 |
+ |
|
46 |
+ |
|
47 |
+class SpawnedSSHAgentInfo(NamedTuple): |
|
48 |
+ agent_type: KnownSSHAgent |
|
49 |
+ client: ssh_agent.SSHAgentClient |
|
50 |
+ isolated: bool |
|
51 |
+ |
|
52 |
+ |
|
39 | 53 |
SUPPORTED_KEYS: Mapping[str, SSHTestKey] = { |
40 | 54 |
'ed25519': { |
41 | 55 |
'private_key': rb"""-----BEGIN OPENSSH PRIVATE KEY----- |
... | ... |
@@ -683,9 +697,6 @@ CANNOT_LOAD_CRYPTOGRAPHY = ( |
683 | 697 |
'Cannot load the required Python module "cryptography".' |
684 | 698 |
) |
685 | 699 |
|
686 |
-skip_if_no_agent = pytest.mark.skipif( |
|
687 |
- not os.environ.get('SSH_AUTH_SOCK'), reason='running SSH agent required' |
|
688 |
-) |
|
689 | 700 |
skip_if_cryptography_support = pytest.mark.skipif( |
690 | 701 |
importlib.util.find_spec('cryptography') is not None, |
691 | 702 |
reason='cryptography support available; cannot test "no support" scenario', |
... | ... |
@@ -933,3 +944,26 @@ class ReadableResult(NamedTuple): |
933 | 944 |
) |
934 | 945 |
case _: |
935 | 946 |
return isinstance(self.exception, error) |
947 |
+ |
|
948 |
+ |
|
949 |
+def parse_sh_export_line(line: str, *, env_name: str) -> str: |
|
950 |
+ line = line.rstrip('\r\n') |
|
951 |
+ shlex_parser = shlex.shlex( |
|
952 |
+ instream=line, posix=True, punctuation_chars=True |
|
953 |
+ ) |
|
954 |
+ shlex_parser.whitespace = ' \t' |
|
955 |
+ tokens = list(shlex_parser) |
|
956 |
+ orig_tokens = tokens.copy() |
|
957 |
+ if tokens[-1] == ';': |
|
958 |
+ tokens.pop() |
|
959 |
+ if tokens[-3:] == [';', 'export', env_name]: |
|
960 |
+ tokens[-3:] = [] |
|
961 |
+ tokens[:0] = ['export'] |
|
962 |
+ if not ( |
|
963 |
+ len(tokens) == 2 |
|
964 |
+ and tokens[0] == 'export' |
|
965 |
+ and tokens[1].startswith(f'{env_name}=') |
|
966 |
+ ): |
|
967 |
+ msg = f'Cannot parse sh line: {orig_tokens!r} -> {tokens!r}' |
|
968 |
+ raise ValueError(msg) |
|
969 |
+ return tokens[1].split('=', 1)[1] |
... | ... |
@@ -0,0 +1,609 @@ |
1 |
+# SPDX-FileCopyrightText: 2024 Marco Ricci <software@the13thletter.info> |
|
2 |
+# |
|
3 |
+# SPDX-License-Identifier: MIT |
|
4 |
+ |
|
5 |
+from __future__ import annotations |
|
6 |
+ |
|
7 |
+import base64 |
|
8 |
+import contextlib |
|
9 |
+import operator |
|
10 |
+import os |
|
11 |
+import shutil |
|
12 |
+import subprocess |
|
13 |
+import sys |
|
14 |
+import textwrap |
|
15 |
+from typing import TYPE_CHECKING, TypeVar |
|
16 |
+ |
|
17 |
+import packaging.version |
|
18 |
+import pytest |
|
19 |
+ |
|
20 |
+import tests |
|
21 |
+from derivepassphrase import _types, ssh_agent |
|
22 |
+ |
|
23 |
+if TYPE_CHECKING: |
|
24 |
+ from collections.abc import Iterator |
|
25 |
+ from typing import Literal |
|
26 |
+ |
|
27 |
+ |
|
28 |
+startup_ssh_auth_sock = os.environ.get('SSH_AUTH_SOCK', None) |
|
29 |
+ |
|
30 |
+ |
|
31 |
+# https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup |
|
32 |
+# https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595 |
|
33 |
+@pytest.fixture(scope='session', autouse=True) |
|
34 |
+def term_handler() -> Iterator[None]: # pragma: no cover |
|
35 |
+ try: |
|
36 |
+ import signal # noqa: PLC0415 |
|
37 |
+ |
|
38 |
+ sigint_handler = signal.getsignal(signal.SIGINT) |
|
39 |
+ except (ImportError, OSError): |
|
40 |
+ return |
|
41 |
+ else: |
|
42 |
+ orig_term = signal.signal(signal.SIGTERM, sigint_handler) |
|
43 |
+ yield |
|
44 |
+ signal.signal(signal.SIGTERM, orig_term) |
|
45 |
+ |
|
46 |
+ |
|
47 |
+def _spawn_pageant( # pragma: no cover |
|
48 |
+ executable: str | None, env: dict[str, str] |
|
49 |
+) -> tuple[subprocess.Popen[str], bool] | None: |
|
50 |
+ """Spawn an isolated Pageant, if possible. |
|
51 |
+ |
|
52 |
+ We attempt to detect whether Pageant is usable, i.e. whether Pageant |
|
53 |
+ has output buffering problems when announcing its authentication |
|
54 |
+ socket. |
|
55 |
+ |
|
56 |
+ Args: |
|
57 |
+ executable: |
|
58 |
+ The path to the Pageant executable. |
|
59 |
+ env: |
|
60 |
+ The new environment for Pageant. Should typically not |
|
61 |
+ include an SSH_AUTH_SOCK variable. |
|
62 |
+ |
|
63 |
+ Returns: |
|
64 |
+ (tuple[subprocess.Popen, bool] | None): |
|
65 |
+ A 2-tuple `(proc, debug_output)`, where `proc` is the spawned |
|
66 |
+ Pageant subprocess, and `debug_output` indicates whether Pageant |
|
67 |
+ will continue to emit debug output (that needs to be actively |
|
68 |
+ read) or not. |
|
69 |
+ |
|
70 |
+ It is the caller's responsibility to clean up the spawned |
|
71 |
+ subprocess. |
|
72 |
+ |
|
73 |
+ If the executable is `None`, or if we detect that Pageant is too |
|
74 |
+ old to properly flush its output, which prevents readers from |
|
75 |
+ learning the SSH_AUTH_SOCK setting needed to connect to Pageant |
|
76 |
+ in the first place, then return `None` directly. |
|
77 |
+ |
|
78 |
+ """ |
|
79 |
+ if executable is None: # pragma: no cover |
|
80 |
+ return None |
|
81 |
+ |
|
82 |
+ pageant_features = {'flush': False, 'foreground': False} |
|
83 |
+ |
|
84 |
+ # Apparently, Pageant 0.81 and lower running in debug mode does |
|
85 |
+ # not actively flush its output. As a result, the first two |
|
86 |
+ # lines, which set the SSH_AUTH_SOCK and the SSH_AGENT_PID, only |
|
87 |
+ # print once the output buffer is flushed, whenever that is. |
|
88 |
+ # |
|
89 |
+ # This has been reported to the PuTTY developers. |
|
90 |
+ # |
|
91 |
+ # For testing purposes, I currently build a version of Pageant with |
|
92 |
+ # output flushing fixed and with a `--foreground` option. This is |
|
93 |
+ # detected here. |
|
94 |
+ help_output = subprocess.run( |
|
95 |
+ ['pageant', '--help'], |
|
96 |
+ executable=executable, |
|
97 |
+ env=env, |
|
98 |
+ capture_output=True, |
|
99 |
+ text=True, |
|
100 |
+ check=False, |
|
101 |
+ ).stdout |
|
102 |
+ help_lines = help_output.splitlines(True) |
|
103 |
+ pageant_version_string = ( |
|
104 |
+ help_lines[1].strip().removeprefix('Release ') |
|
105 |
+ if len(help_lines) >= 2 |
|
106 |
+ else '' |
|
107 |
+ ) |
|
108 |
+ v0_81 = packaging.version.Version('0.81') |
|
109 |
+ if pageant_version_string not in {'', 'Unidentified build'}: |
|
110 |
+ # TODO(the-13th-letter): Once a fixed Pageant is released, |
|
111 |
+ # remove the check for build information in the version string. |
|
112 |
+ # https://github.com/the-13th-letter/derivepassphrase/issues/14 |
|
113 |
+ pageant_version_string_numeric, local_segment_list = ( |
|
114 |
+ pageant_version_string.split('+', 1) |
|
115 |
+ if '+' in pageant_version_string |
|
116 |
+ else (pageant_version_string, '') |
|
117 |
+ ) |
|
118 |
+ local_segments = frozenset(local_segment_list.split('+')) |
|
119 |
+ pageant_version = packaging.version.Version( |
|
120 |
+ pageant_version_string_numeric |
|
121 |
+ ) |
|
122 |
+ for key in pageant_features: |
|
123 |
+ pageant_features[key] = pageant_version > v0_81 or ( |
|
124 |
+ pageant_version == v0_81 and key in local_segments |
|
125 |
+ ) |
|
126 |
+ |
|
127 |
+ if not pageant_features['flush']: # pragma: no cover |
|
128 |
+ return None |
|
129 |
+ |
|
130 |
+ # Because Pageant's debug mode prints debugging information on |
|
131 |
+ # standard output, and because we yield control to a different |
|
132 |
+ # thread of execution, we cannot read-and-discard Pageant's output |
|
133 |
+ # here. Instead, spawn a consumer process and connect it to |
|
134 |
+ # Pageant's standard output; see _spawn_data_sink. |
|
135 |
+ # |
|
136 |
+ # This will hopefully not be necessary with newer Pageants: |
|
137 |
+ # a feature request for a `--foreground` option that just avoids the |
|
138 |
+ # forking behavior has been submitted. |
|
139 |
+ |
|
140 |
+ return subprocess.Popen( |
|
141 |
+ [ |
|
142 |
+ 'pageant', |
|
143 |
+ '--foreground' if pageant_features['foreground'] else '--debug', |
|
144 |
+ '-s', |
|
145 |
+ ], |
|
146 |
+ executable=executable, |
|
147 |
+ stdin=subprocess.DEVNULL, |
|
148 |
+ stdout=subprocess.PIPE, |
|
149 |
+ shell=False, |
|
150 |
+ env=env, |
|
151 |
+ text=True, |
|
152 |
+ bufsize=1, |
|
153 |
+ ), not pageant_features['foreground'] |
|
154 |
+ |
|
155 |
+ |
|
156 |
+def _spawn_openssh_agent( # pragma: no cover |
|
157 |
+ executable: str | None, env: dict[str, str] |
|
158 |
+) -> tuple[subprocess.Popen[str], Literal[False]] | None: |
|
159 |
+ """Spawn an isolated OpenSSH agent, if possible. |
|
160 |
+ |
|
161 |
+ We attempt to detect whether Pageant is usable, i.e. whether Pageant |
|
162 |
+ has output buffering problems when announcing its authentication |
|
163 |
+ socket. |
|
164 |
+ |
|
165 |
+ Args: |
|
166 |
+ executable: |
|
167 |
+ The path to the OpenSSH agent executable. |
|
168 |
+ env: |
|
169 |
+ The new environment for the OpenSSH agent. Should typically |
|
170 |
+ not include an SSH_AUTH_SOCK variable. |
|
171 |
+ |
|
172 |
+ Returns: |
|
173 |
+ (tuple[subprocess.Popen, Literal[False]] | None): |
|
174 |
+ A 2-tuple `(proc, debug_output)`, where `proc` is the spawned |
|
175 |
+ OpenSSH agent subprocess, and `debug_output` indicates whether |
|
176 |
+ the OpenSSH agent will continue to emit debug output that needs |
|
177 |
+ to be actively read (which it doesn't, so this is always false). |
|
178 |
+ |
|
179 |
+ It is the caller's responsibility to clean up the spawned |
|
180 |
+ subprocess. |
|
181 |
+ |
|
182 |
+ If the executable is `None`, then return `None` directly. |
|
183 |
+ |
|
184 |
+ """ |
|
185 |
+ if executable is None: |
|
186 |
+ return None |
|
187 |
+ return subprocess.Popen( |
|
188 |
+ ['ssh-agent', '-D', '-s'], |
|
189 |
+ executable=executable, |
|
190 |
+ stdin=subprocess.DEVNULL, |
|
191 |
+ stdout=subprocess.PIPE, |
|
192 |
+ shell=False, |
|
193 |
+ env=env, |
|
194 |
+ text=True, |
|
195 |
+ bufsize=1, |
|
196 |
+ ), False |
|
197 |
+ |
|
198 |
+ |
|
199 |
+def _spawn_system_agent( # pragma: no cover |
|
200 |
+ executable: str | None, env: dict[str, str] |
|
201 |
+) -> None: |
|
202 |
+ """Placeholder function. Does nothing.""" |
|
203 |
+ |
|
204 |
+ |
|
205 |
+_spawn_handlers = [ |
|
206 |
+ ('pageant', _spawn_pageant, tests.KnownSSHAgent.Pageant), |
|
207 |
+ ('ssh-agent', _spawn_openssh_agent, tests.KnownSSHAgent.OpenSSHAgent), |
|
208 |
+ ('(system)', _spawn_system_agent, tests.KnownSSHAgent.UNKNOWN), |
|
209 |
+] |
|
210 |
+ |
|
211 |
+ |
|
212 |
+@pytest.fixture |
|
213 |
+def running_ssh_agent() -> Iterator[str]: # pragma: no cover |
|
214 |
+ """Ensure a running SSH agent, if possible, as a pytest fixture. |
|
215 |
+ |
|
216 |
+ Check for a running SSH agent, or spawn a new one if possible. We |
|
217 |
+ know how to spawn OpenSSH's agent and PuTTY's Pageant. If spawned |
|
218 |
+ this way, the agent does not persist beyond the test. |
|
219 |
+ |
|
220 |
+ This fixture can neither guarantee a particular running agent, nor |
|
221 |
+ can it guarantee a particular set of loaded keys. |
|
222 |
+ |
|
223 |
+ Yields: |
|
224 |
+ str: |
|
225 |
+ The value of the SSH_AUTH_SOCK environment variable, to be |
|
226 |
+ used to connect to the running agent. |
|
227 |
+ |
|
228 |
+ Raises: |
|
229 |
+ pytest.skip.Exception: |
|
230 |
+ If no agent is running or can be spawned, skip this test. |
|
231 |
+ |
|
232 |
+ """ |
|
233 |
+ exit_stack = contextlib.ExitStack() |
|
234 |
+ Popen = TypeVar('Popen', bound=subprocess.Popen) |
|
235 |
+ |
|
236 |
+ @contextlib.contextmanager |
|
237 |
+ def terminate_on_exit(proc: Popen) -> Iterator[Popen]: |
|
238 |
+ try: |
|
239 |
+ yield proc |
|
240 |
+ finally: |
|
241 |
+ proc.terminate() |
|
242 |
+ proc.wait() |
|
243 |
+ |
|
244 |
+ with pytest.MonkeyPatch.context() as monkeypatch: |
|
245 |
+ # pytest's fixture system does not seem to guarantee that |
|
246 |
+ # environment variables are set up correctly if nested and |
|
247 |
+ # parametrized fixtures are used: it is possible that "outer" |
|
248 |
+ # parametrized fixtures are torn down only after other "outer" |
|
249 |
+ # fixtures of the same parameter set have run. So set |
|
250 |
+ # SSH_AUTH_SOCK explicitly to the value saved at interpreter |
|
251 |
+ # startup. This is then verified with *a lot* of further assert |
|
252 |
+ # statements. |
|
253 |
+ if startup_ssh_auth_sock: # pragma: no cover |
|
254 |
+ monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock) |
|
255 |
+ else: # pragma: no cover |
|
256 |
+ monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) |
|
257 |
+ for exec_name, spawn_func, _ in _spawn_handlers: |
|
258 |
+ match exec_name: |
|
259 |
+ case '(system)': |
|
260 |
+ assert ( |
|
261 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
262 |
+ == startup_ssh_auth_sock |
|
263 |
+ ), 'SSH_AUTH_SOCK mismatch when checking for running agent' |
|
264 |
+ try: |
|
265 |
+ with ssh_agent.SSHAgentClient() as client: |
|
266 |
+ client.list_keys() |
|
267 |
+ except (KeyError, OSError): |
|
268 |
+ continue |
|
269 |
+ yield os.environ['SSH_AUTH_SOCK'] |
|
270 |
+ assert ( |
|
271 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
272 |
+ == startup_ssh_auth_sock |
|
273 |
+ ), 'SSH_AUTH_SOCK mismatch after returning from running agent' # noqa: E501 |
|
274 |
+ case _: |
|
275 |
+ assert ( |
|
276 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
277 |
+ == startup_ssh_auth_sock |
|
278 |
+ ), f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}' # noqa: E501 |
|
279 |
+ spawn_data = spawn_func( # type: ignore[operator] |
|
280 |
+ executable=shutil.which(exec_name), env={} |
|
281 |
+ ) |
|
282 |
+ if spawn_data is None: |
|
283 |
+ continue |
|
284 |
+ proc: subprocess.Popen[str] |
|
285 |
+ emits_debug_output: bool |
|
286 |
+ proc, emits_debug_output = spawn_data |
|
287 |
+ with exit_stack: |
|
288 |
+ exit_stack.enter_context(terminate_on_exit(proc)) |
|
289 |
+ assert ( |
|
290 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
291 |
+ == startup_ssh_auth_sock |
|
292 |
+ ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}' |
|
293 |
+ assert proc.stdout is not None |
|
294 |
+ ssh_auth_sock_line = proc.stdout.readline() |
|
295 |
+ try: |
|
296 |
+ ssh_auth_sock = tests.parse_sh_export_line( |
|
297 |
+ ssh_auth_sock_line, env_name='SSH_AUTH_SOCK' |
|
298 |
+ ) |
|
299 |
+ except ValueError: # pragma: no cover |
|
300 |
+ continue |
|
301 |
+ pid_line = proc.stdout.readline() |
|
302 |
+ if ( |
|
303 |
+ 'pid' not in pid_line.lower() |
|
304 |
+ and '_pid' not in pid_line.lower() |
|
305 |
+ ): # pragma: no cover |
|
306 |
+ pytest.skip( |
|
307 |
+ f'Cannot parse agent output: {pid_line!r}' |
|
308 |
+ ) |
|
309 |
+ proc2 = _spawn_data_sink( |
|
310 |
+ emits_debug_output=emits_debug_output, proc=proc |
|
311 |
+ ) |
|
312 |
+ if proc2 is not None: # pragma: no cover |
|
313 |
+ exit_stack.enter_context(terminate_on_exit(proc2)) |
|
314 |
+ assert ( |
|
315 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
316 |
+ == startup_ssh_auth_sock |
|
317 |
+ ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name} helper' # noqa: E501 |
|
318 |
+ monkeypatch2 = exit_stack.enter_context( |
|
319 |
+ pytest.MonkeyPatch.context() |
|
320 |
+ ) |
|
321 |
+ monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock) |
|
322 |
+ yield ssh_auth_sock |
|
323 |
+ assert ( |
|
324 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
325 |
+ == startup_ssh_auth_sock |
|
326 |
+ ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}' |
|
327 |
+ return |
|
328 |
+ pytest.skip('No SSH agent running or spawnable') |
|
329 |
+ |
|
330 |
+ |
|
331 |
+def _spawn_data_sink( # pragma: no cover |
|
332 |
+ emits_debug_output: bool, *, proc: subprocess.Popen[str] |
|
333 |
+) -> subprocess.Popen[str] | None: |
|
334 |
+ """Spawn a data sink to read and discard standard input. |
|
335 |
+ |
|
336 |
+ Necessary for certain SSH agents that emit copious debugging output. |
|
337 |
+ |
|
338 |
+ On UNIX, we can use `cat`, redirected to `/dev/null`. Otherwise, |
|
339 |
+ the most robust thing to do is to spawn Python and repeatedly call |
|
340 |
+ `.read()` on `sys.stdin.buffer`. |
|
341 |
+ |
|
342 |
+ """ |
|
343 |
+ if not emits_debug_output: |
|
344 |
+ return None |
|
345 |
+ if proc.stdout is None: |
|
346 |
+ return None |
|
347 |
+ sink_script = textwrap.dedent(""" |
|
348 |
+ import sys |
|
349 |
+ while sys.stdin.buffer.read(4096): |
|
350 |
+ pass |
|
351 |
+ """) |
|
352 |
+ return subprocess.Popen( |
|
353 |
+ ( |
|
354 |
+ ['cat'] |
|
355 |
+ if os.name == 'posix' |
|
356 |
+ else [sys.executable or 'python3', '-c', sink_script] |
|
357 |
+ ), |
|
358 |
+ executable=sys.executable or None, |
|
359 |
+ stdin=proc.stdout.fileno(), |
|
360 |
+ stdout=subprocess.DEVNULL, |
|
361 |
+ shell=False, |
|
362 |
+ text=True, |
|
363 |
+ ) |
|
364 |
+ |
|
365 |
+ |
|
366 |
+@pytest.fixture(params=_spawn_handlers, ids=operator.itemgetter(0)) |
|
367 |
+def spawn_ssh_agent( |
|
368 |
+ request: pytest.FixtureRequest, |
|
369 |
+) -> Iterator[tests.SpawnedSSHAgentInfo]: |
|
370 |
+ """Spawn an isolated SSH agent, if possible, as a pytest fixture. |
|
371 |
+ |
|
372 |
+ Spawn a new SSH agent isolated from other SSH use by other |
|
373 |
+ processes, if possible. We know how to spawn OpenSSH's agent and |
|
374 |
+ PuTTY's Pageant, and the "(system)" fallback agent. |
|
375 |
+ |
|
376 |
+ Yields: |
|
377 |
+ (tests.SpawnedSSHAgentInfo): |
|
378 |
+ A [named tuple][collection.namedtuple] containing |
|
379 |
+ information about the spawned agent, e.g. the software |
|
380 |
+ product, a client connected to the agent, and whether the |
|
381 |
+ agent is isolated from other clients. |
|
382 |
+ |
|
383 |
+ Raises: |
|
384 |
+ pytest.skip.Exception: |
|
385 |
+ If the agent cannot be spawned, skip this test. |
|
386 |
+ |
|
387 |
+ """ |
|
388 |
+ agent_env = os.environ.copy() |
|
389 |
+ agent_env.pop('SSH_AUTH_SOCK', None) |
|
390 |
+ exit_stack = contextlib.ExitStack() |
|
391 |
+ Popen = TypeVar('Popen', bound=subprocess.Popen) |
|
392 |
+ |
|
393 |
+ @contextlib.contextmanager |
|
394 |
+ def terminate_on_exit(proc: Popen) -> Iterator[Popen]: |
|
395 |
+ try: |
|
396 |
+ yield proc |
|
397 |
+ finally: |
|
398 |
+ proc.terminate() |
|
399 |
+ proc.wait() |
|
400 |
+ |
|
401 |
+ with pytest.MonkeyPatch.context() as monkeypatch: |
|
402 |
+ # pytest's fixture system does not seem to guarantee that |
|
403 |
+ # environment variables are set up correctly if nested and |
|
404 |
+ # parametrized fixtures are used: it is possible that "outer" |
|
405 |
+ # parametrized fixtures are torn down only after other "outer" |
|
406 |
+ # fixtures of the same parameter set have run. So set |
|
407 |
+ # SSH_AUTH_SOCK explicitly to the value saved at interpreter |
|
408 |
+ # startup. This is then verified with *a lot* of further assert |
|
409 |
+ # statements. |
|
410 |
+ if startup_ssh_auth_sock: # pragma: no cover |
|
411 |
+ monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock) |
|
412 |
+ else: # pragma: no cover |
|
413 |
+ monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) |
|
414 |
+ exec_name, spawn_func, agent_type = request.param |
|
415 |
+ match exec_name: |
|
416 |
+ case '(system)': |
|
417 |
+ assert ( |
|
418 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
419 |
+ == startup_ssh_auth_sock |
|
420 |
+ ), 'SSH_AUTH_SOCK mismatch when checking for running agent' |
|
421 |
+ try: |
|
422 |
+ client = ssh_agent.SSHAgentClient() |
|
423 |
+ client.list_keys() |
|
424 |
+ except KeyError: # pragma: no cover |
|
425 |
+ pytest.skip('SSH agent is not running') |
|
426 |
+ # except OSError as exc: # pragma: no cover |
|
427 |
+ # pytest.skip( |
|
428 |
+ # f'Cannot talk to SSH agent: ' |
|
429 |
+ # f'{exc.strerror}: {exc.filename!r}' |
|
430 |
+ # ) |
|
431 |
+ with client: |
|
432 |
+ assert ( |
|
433 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
434 |
+ == startup_ssh_auth_sock |
|
435 |
+ ), 'SSH_AUTH_SOCK mismatch before setting up for running agent' # noqa: E501 |
|
436 |
+ yield tests.SpawnedSSHAgentInfo(agent_type, client, False) |
|
437 |
+ assert ( |
|
438 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
439 |
+ == startup_ssh_auth_sock |
|
440 |
+ ), 'SSH_AUTH_SOCK mismatch after returning from running agent' |
|
441 |
+ return |
|
442 |
+ |
|
443 |
+ case _: |
|
444 |
+ assert ( |
|
445 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
446 |
+ == startup_ssh_auth_sock |
|
447 |
+ ), f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}' # noqa: E501 |
|
448 |
+ spawn_data = spawn_func( |
|
449 |
+ executable=shutil.which(exec_name), env=agent_env |
|
450 |
+ ) |
|
451 |
+ assert ( |
|
452 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
453 |
+ == startup_ssh_auth_sock |
|
454 |
+ ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}' |
|
455 |
+ if spawn_data is None: # pragma: no cover |
|
456 |
+ pytest.skip(f'Cannot spawn usable {exec_name}') |
|
457 |
+ proc, emits_debug_output = spawn_data |
|
458 |
+ with exit_stack: |
|
459 |
+ exit_stack.enter_context(terminate_on_exit(proc)) |
|
460 |
+ assert proc.stdout is not None |
|
461 |
+ ssh_auth_sock_line = proc.stdout.readline() |
|
462 |
+ try: |
|
463 |
+ ssh_auth_sock = tests.parse_sh_export_line( |
|
464 |
+ ssh_auth_sock_line, env_name='SSH_AUTH_SOCK' |
|
465 |
+ ) |
|
466 |
+ except ValueError: # pragma: no cover |
|
467 |
+ pytest.skip( |
|
468 |
+ f'Cannot parse agent output: {ssh_auth_sock_line}' |
|
469 |
+ ) |
|
470 |
+ pid_line = proc.stdout.readline() |
|
471 |
+ if ( |
|
472 |
+ 'pid' not in pid_line.lower() |
|
473 |
+ and '_pid' not in pid_line.lower() |
|
474 |
+ ): # pragma: no cover |
|
475 |
+ pytest.skip(f'Cannot parse agent output: {pid_line!r}') |
|
476 |
+ assert ( |
|
477 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
478 |
+ == startup_ssh_auth_sock |
|
479 |
+ ), f'SSH_AUTH_SOCK mismatch before spawning {exec_name} helper' # noqa: E501 |
|
480 |
+ proc2 = _spawn_data_sink( |
|
481 |
+ emits_debug_output=emits_debug_output, proc=proc |
|
482 |
+ ) |
|
483 |
+ if proc2 is not None: # pragma: no cover |
|
484 |
+ exit_stack.enter_context(terminate_on_exit(proc2)) |
|
485 |
+ assert ( |
|
486 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
487 |
+ == startup_ssh_auth_sock |
|
488 |
+ ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name} helper' # noqa: E501 |
|
489 |
+ monkeypatch2 = exit_stack.enter_context( |
|
490 |
+ pytest.MonkeyPatch.context() |
|
491 |
+ ) |
|
492 |
+ monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock) |
|
493 |
+ try: |
|
494 |
+ client = ssh_agent.SSHAgentClient() |
|
495 |
+ except OSError as exc: # pragma: no cover |
|
496 |
+ pytest.skip( |
|
497 |
+ f'Cannot talk to SSH agent: ' |
|
498 |
+ f'{exc.strerror}: {exc.filename!r}' |
|
499 |
+ ) |
|
500 |
+ exit_stack.enter_context(client) |
|
501 |
+ yield tests.SpawnedSSHAgentInfo(agent_type, client, True) |
|
502 |
+ assert ( |
|
503 |
+ os.environ.get('SSH_AUTH_SOCK', None) |
|
504 |
+ == startup_ssh_auth_sock |
|
505 |
+ ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}' |
|
506 |
+ return |
|
507 |
+ |
|
508 |
+ |
|
509 |
+@pytest.fixture |
|
510 |
+def ssh_agent_client_with_test_keys_loaded( # noqa: C901 |
|
511 |
+ spawn_ssh_agent: tests.SpawnedSSHAgentInfo, |
|
512 |
+) -> Iterator[ssh_agent.SSHAgentClient]: |
|
513 |
+ agent_type, client, isolated = spawn_ssh_agent |
|
514 |
+ all_test_keys = {**tests.SUPPORTED_KEYS, **tests.UNSUITABLE_KEYS} |
|
515 |
+ successfully_loaded_keys: set[str] = set() |
|
516 |
+ |
|
517 |
+ def prepare_payload( |
|
518 |
+ payload: bytes | bytearray, |
|
519 |
+ *, |
|
520 |
+ isolated: bool = True, |
|
521 |
+ time_to_live: int = 30, |
|
522 |
+ ) -> tuple[_types.SSH_AGENTC, bytes]: |
|
523 |
+ return_code = ( |
|
524 |
+ _types.SSH_AGENTC.ADD_IDENTITY |
|
525 |
+ if isolated |
|
526 |
+ else _types.SSH_AGENTC.ADD_ID_CONSTRAINED |
|
527 |
+ ) |
|
528 |
+ lifetime_constraint = ( |
|
529 |
+ b'' |
|
530 |
+ if isolated |
|
531 |
+ else b'\x01' + ssh_agent.SSHAgentClient.uint32(time_to_live) |
|
532 |
+ ) |
|
533 |
+ return (return_code, bytes(payload) + lifetime_constraint) |
|
534 |
+ |
|
535 |
+ try: |
|
536 |
+ for key_type, key_struct in all_test_keys.items(): |
|
537 |
+ try: |
|
538 |
+ private_key_data = key_struct['private_key_blob'] |
|
539 |
+ except KeyError: # pragma: no cover |
|
540 |
+ continue |
|
541 |
+ request_code, payload = prepare_payload( |
|
542 |
+ private_key_data, isolated=isolated, time_to_live=30 |
|
543 |
+ ) |
|
544 |
+ try: |
|
545 |
+ try: |
|
546 |
+ client.request( |
|
547 |
+ request_code, |
|
548 |
+ payload, |
|
549 |
+ response_code=_types.SSH_AGENT.SUCCESS, |
|
550 |
+ ) |
|
551 |
+ except ssh_agent.SSHAgentFailedError: # pragma: no cover |
|
552 |
+ # Pageant can fail to accept a key for two separate |
|
553 |
+ # reasons: |
|
554 |
+ # |
|
555 |
+ # - Pageant refuses to accept a key it already holds |
|
556 |
+ # in memory. Verify this by listing key |
|
557 |
+ # - Pageant does not support key constraints (see |
|
558 |
+ # references below). |
|
559 |
+ # |
|
560 |
+ # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-timeout.html |
|
561 |
+ # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-key-confirm.html |
|
562 |
+ current_loaded_keys = frozenset({ |
|
563 |
+ pair.key for pair in client.list_keys() |
|
564 |
+ }) |
|
565 |
+ if agent_type == tests.KnownSSHAgent.Pageant and ( |
|
566 |
+ key_struct['public_key_data'] in current_loaded_keys |
|
567 |
+ ): |
|
568 |
+ pass |
|
569 |
+ elif agent_type == tests.KnownSSHAgent.Pageant and ( |
|
570 |
+ not isolated |
|
571 |
+ ): |
|
572 |
+ request_code, payload = prepare_payload( |
|
573 |
+ private_key_data, isolated=True |
|
574 |
+ ) |
|
575 |
+ client.request( |
|
576 |
+ request_code, |
|
577 |
+ payload, |
|
578 |
+ response_code=_types.SSH_AGENT.SUCCESS, |
|
579 |
+ ) |
|
580 |
+ else: |
|
581 |
+ raise |
|
582 |
+ except ( |
|
583 |
+ EOFError, |
|
584 |
+ OSError, |
|
585 |
+ ssh_agent.SSHAgentFailedError, |
|
586 |
+ ): # pragma: no cover |
|
587 |
+ pass |
|
588 |
+ else: # pragma: no cover |
|
589 |
+ successfully_loaded_keys.add(key_type) |
|
590 |
+ yield client |
|
591 |
+ finally: |
|
592 |
+ for key_type, key_struct in all_test_keys.items(): |
|
593 |
+ if not isolated and ( |
|
594 |
+ key_type in successfully_loaded_keys |
|
595 |
+ ): # pragma: no cover |
|
596 |
+ # The public key blob is the base64-encoded part in |
|
597 |
+ # the "public key line". |
|
598 |
+ public_key = base64.standard_b64decode( |
|
599 |
+ key_struct['public_key'].split(None, 2)[1] |
|
600 |
+ ) |
|
601 |
+ request_code = _types.SSH_AGENTC.REMOVE_IDENTITY |
|
602 |
+ client.request( |
|
603 |
+ request_code, |
|
604 |
+ public_key, |
|
605 |
+ response_code=frozenset({ |
|
606 |
+ _types.SSH_AGENT.SUCCESS, |
|
607 |
+ _types.SSH_AGENT.FAILURE, |
|
608 |
+ }), |
|
609 |
+ ) |
... | ... |
@@ -355,7 +355,6 @@ class TestCLI: |
355 | 355 |
last_line.rstrip(b'\n') == DUMMY_RESULT_KEY1 |
356 | 356 |
), 'expected known output' |
357 | 357 |
|
358 |
- @tests.skip_if_no_agent |
|
359 | 358 |
@pytest.mark.parametrize( |
360 | 359 |
'config', |
361 | 360 |
[ |
... | ... |
@@ -383,6 +382,7 @@ class TestCLI: |
383 | 382 |
def test_204c_key_override_on_command_line( |
384 | 383 |
self, |
385 | 384 |
monkeypatch: pytest.MonkeyPatch, |
385 |
+ running_ssh_agent: str, |
|
386 | 386 |
config: dict[str, Any], |
387 | 387 |
key_index: int, |
388 | 388 |
) -> None: |
... | ... |
@@ -396,6 +396,8 @@ class TestCLI: |
396 | 396 |
return value['expected_signature'] |
397 | 397 |
raise AssertionError |
398 | 398 |
|
399 |
+ with monkeypatch.context(): |
|
400 |
+ monkeypatch.setenv('SSH_AUTH_SOCK', running_ssh_agent) |
|
399 | 401 |
monkeypatch.setattr( |
400 | 402 |
ssh_agent.SSHAgentClient, 'list_keys', tests.list_keys |
401 | 403 |
) |
... | ... |
@@ -1553,13 +1555,15 @@ Boo. |
1553 | 1555 |
param = cli.derivepassphrase_vault.params[0] |
1554 | 1556 |
assert vfunc(ctx, param, input) == input |
1555 | 1557 |
|
1556 |
- @tests.skip_if_no_agent |
|
1557 | 1558 |
@pytest.mark.parametrize('conn_hint', ['none', 'socket', 'client']) |
1558 | 1559 |
def test_227_get_suitable_ssh_keys( |
1559 | 1560 |
self, |
1560 | 1561 |
monkeypatch: pytest.MonkeyPatch, |
1562 |
+ running_ssh_agent: str, |
|
1561 | 1563 |
conn_hint: str, |
1562 | 1564 |
) -> None: |
1565 |
+ with monkeypatch.context(): |
|
1566 |
+ monkeypatch.setenv('SSH_AUTH_SOCK', running_ssh_agent) |
|
1563 | 1567 |
monkeypatch.setattr( |
1564 | 1568 |
ssh_agent.SSHAgentClient, 'list_keys', tests.list_keys |
1565 | 1569 |
) |
... | ... |
@@ -1569,7 +1573,7 @@ Boo. |
1569 | 1573 |
hint = ssh_agent.SSHAgentClient() |
1570 | 1574 |
case 'socket': |
1571 | 1575 |
hint = socket.socket(family=socket.AF_UNIX) |
1572 |
- hint.connect(os.environ['SSH_AUTH_SOCK']) |
|
1576 |
+ hint.connect(running_ssh_agent) |
|
1573 | 1577 |
case _: |
1574 | 1578 |
assert conn_hint == 'none' |
1575 | 1579 |
hint = None |
... | ... |
@@ -1581,7 +1585,9 @@ Boo. |
1581 | 1585 |
except Exception as e: # noqa: BLE001 # pragma: no cover |
1582 | 1586 |
exception = e |
1583 | 1587 |
finally: |
1584 |
- assert exception is None, 'exception querying suitable SSH keys' |
|
1588 |
+ assert ( |
|
1589 |
+ exception is None |
|
1590 |
+ ), 'exception querying suitable SSH keys' |
|
1585 | 1591 |
|
1586 | 1592 |
|
1587 | 1593 |
class TestCLITransition: |
... | ... |
@@ -10,7 +10,6 @@ import base64 |
10 | 10 |
import io |
11 | 11 |
import os |
12 | 12 |
import socket |
13 |
-import subprocess |
|
14 | 13 |
from typing import TYPE_CHECKING |
15 | 14 |
|
16 | 15 |
import click |
... | ... |
@@ -23,6 +22,7 @@ from derivepassphrase import _types, cli, ssh_agent, vault |
23 | 22 |
|
24 | 23 |
if TYPE_CHECKING: |
25 | 24 |
from collections.abc import Iterable, Iterator |
25 |
+ from typing import Literal |
|
26 | 26 |
|
27 | 27 |
|
28 | 28 |
class TestStaticFunctionality: |
... | ... |
@@ -41,6 +41,67 @@ class TestStaticFunctionality: |
41 | 41 |
keydata == public_key_data |
42 | 42 |
), "recorded public key data doesn't match" |
43 | 43 |
|
44 |
+ @pytest.mark.parametrize( |
|
45 |
+ ['line', 'env_name', 'value'], |
|
46 |
+ [ |
|
47 |
+ ( |
|
48 |
+ 'SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170; export SSH_AUTH_SOCK;', # noqa: E501 |
|
49 |
+ 'SSH_AUTH_SOCK', |
|
50 |
+ '/tmp/pageant.user/pageant.27170', |
|
51 |
+ ), |
|
52 |
+ ( |
|
53 |
+ 'SSH_AUTH_SOCK=/tmp/ssh-3CSTC1W5M22A/agent.27270; export SSH_AUTH_SOCK;', # noqa: E501 |
|
54 |
+ 'SSH_AUTH_SOCK', |
|
55 |
+ '/tmp/ssh-3CSTC1W5M22A/agent.27270', |
|
56 |
+ ), |
|
57 |
+ ( |
|
58 |
+ 'SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170; export SSH_AUTH_SOCK', # noqa: E501 |
|
59 |
+ 'SSH_AUTH_SOCK', |
|
60 |
+ '/tmp/pageant.user/pageant.27170', |
|
61 |
+ ), |
|
62 |
+ ( |
|
63 |
+ 'export SSH_AUTH_SOCK=/tmp/ssh-3CSTC1W5M22A/agent.27270;', |
|
64 |
+ 'SSH_AUTH_SOCK', |
|
65 |
+ '/tmp/ssh-3CSTC1W5M22A/agent.27270', |
|
66 |
+ ), |
|
67 |
+ ( |
|
68 |
+ 'export SSH_AUTH_SOCK=/tmp/pageant.user/pageant.27170', |
|
69 |
+ 'SSH_AUTH_SOCK', |
|
70 |
+ '/tmp/pageant.user/pageant.27170', |
|
71 |
+ ), |
|
72 |
+ ( |
|
73 |
+ 'SSH_AGENT_PID=27170; export SSH_AGENT_PID;', |
|
74 |
+ 'SSH_AGENT_PID', |
|
75 |
+ '27170', |
|
76 |
+ ), |
|
77 |
+ ( |
|
78 |
+ 'SSH_AGENT_PID=27170; export SSH_AGENT_PID', |
|
79 |
+ 'SSH_AGENT_PID', |
|
80 |
+ '27170', |
|
81 |
+ ), |
|
82 |
+ ('export SSH_AGENT_PID=27170;', 'SSH_AGENT_PID', '27170'), |
|
83 |
+ ('export SSH_AGENT_PID=27170', 'SSH_AGENT_PID', '27170'), |
|
84 |
+ ( |
|
85 |
+ 'export VARIABLE=value; export OTHER_VARIABLE=other_value;', |
|
86 |
+ 'VARIABLE', |
|
87 |
+ None, |
|
88 |
+ ), |
|
89 |
+ ( |
|
90 |
+ 'VARIABLE=value', |
|
91 |
+ 'VARIABLE', |
|
92 |
+ None, |
|
93 |
+ ), |
|
94 |
+ ], |
|
95 |
+ ) |
|
96 |
+ def test_190_sh_export_line_parsing( |
|
97 |
+ self, line: str, env_name: str, value: str | None |
|
98 |
+ ) -> None: |
|
99 |
+ if value is not None: |
|
100 |
+ assert tests.parse_sh_export_line(line, env_name=env_name) == value |
|
101 |
+ else: |
|
102 |
+ with pytest.raises(ValueError, match='Cannot parse sh line:'): |
|
103 |
+ tests.parse_sh_export_line(line, env_name=env_name) |
|
104 |
+ |
|
44 | 105 |
def test_200_constructor_no_running_agent( |
45 | 106 |
self, monkeypatch: pytest.MonkeyPatch |
46 | 107 |
) -> None: |
... | ... |
@@ -165,7 +226,6 @@ class TestStaticFunctionality: |
165 | 226 |
unstring_prefix(input) |
166 | 227 |
|
167 | 228 |
|
168 |
-@tests.skip_if_no_agent |
|
169 | 229 |
class TestAgentInteraction: |
170 | 230 |
@pytest.mark.parametrize( |
171 | 231 |
'data_dict', |
... | ... |
@@ -173,30 +233,12 @@ class TestAgentInteraction: |
173 | 233 |
ids=tests.SUPPORTED_KEYS.keys(), |
174 | 234 |
) |
175 | 235 |
def test_200_sign_data_via_agent( |
176 |
- self, data_dict: tests.SSHTestKey |
|
236 |
+ self, |
|
237 |
+ ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient, |
|
238 |
+ data_dict: tests.SSHTestKey, |
|
177 | 239 |
) -> None: |
178 |
- private_key = data_dict['private_key'] |
|
179 |
- try: |
|
180 |
- _ = subprocess.run( |
|
181 |
- ['ssh-add', '-t', '30', '-q', '-'], |
|
182 |
- input=private_key, |
|
183 |
- check=True, |
|
184 |
- capture_output=True, |
|
185 |
- ) |
|
186 |
- except subprocess.CalledProcessError as e: |
|
187 |
- pytest.skip( |
|
188 |
- f'uploading test key: {e!r}, stdout={e.stdout!r}, ' |
|
189 |
- f'stderr={e.stderr!r}' |
|
190 |
- ) |
|
191 |
- else: |
|
192 |
- try: |
|
193 |
- client = ssh_agent.SSHAgentClient() |
|
194 |
- except OSError: # pragma: no cover |
|
195 |
- pytest.skip('communication error with the SSH agent') |
|
196 |
- with client: |
|
197 |
- key_comment_pairs = { |
|
198 |
- bytes(k): bytes(c) for k, c in client.list_keys() |
|
199 |
- } |
|
240 |
+ client = ssh_agent_client_with_test_keys_loaded |
|
241 |
+ key_comment_pairs = {bytes(k): bytes(c) for k, c in client.list_keys()} |
|
200 | 242 |
public_key_data = data_dict['public_key_data'] |
201 | 243 |
expected_signature = data_dict['expected_signature'] |
202 | 244 |
derived_passphrase = data_dict['derived_passphrase'] |
... | ... |
@@ -211,8 +253,7 @@ class TestAgentInteraction: |
211 | 253 |
) |
212 | 254 |
assert signature2 == expected_signature, 'SSH signature mismatch' |
213 | 255 |
assert ( |
214 |
- vault.Vault.phrase_from_key(public_key_data) |
|
215 |
- == derived_passphrase |
|
256 |
+ vault.Vault.phrase_from_key(public_key_data) == derived_passphrase |
|
216 | 257 |
), 'SSH signature mismatch' |
217 | 258 |
|
218 | 259 |
@pytest.mark.parametrize( |
... | ... |
@@ -221,30 +262,12 @@ class TestAgentInteraction: |
221 | 262 |
ids=tests.UNSUITABLE_KEYS.keys(), |
222 | 263 |
) |
223 | 264 |
def test_201_sign_data_via_agent_unsupported( |
224 |
- self, data_dict: tests.SSHTestKey |
|
265 |
+ self, |
|
266 |
+ ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient, |
|
267 |
+ data_dict: tests.SSHTestKey, |
|
225 | 268 |
) -> None: |
226 |
- private_key = data_dict['private_key'] |
|
227 |
- try: |
|
228 |
- _ = subprocess.run( |
|
229 |
- ['ssh-add', '-t', '30', '-q', '-'], |
|
230 |
- input=private_key, |
|
231 |
- check=True, |
|
232 |
- capture_output=True, |
|
233 |
- ) |
|
234 |
- except subprocess.CalledProcessError as e: # pragma: no cover |
|
235 |
- pytest.skip( |
|
236 |
- f'uploading test key: {e!r}, stdout={e.stdout!r}, ' |
|
237 |
- f'stderr={e.stderr!r}' |
|
238 |
- ) |
|
239 |
- else: |
|
240 |
- try: |
|
241 |
- client = ssh_agent.SSHAgentClient() |
|
242 |
- except OSError: # pragma: no cover |
|
243 |
- pytest.skip('communication error with the SSH agent') |
|
244 |
- with client: |
|
245 |
- key_comment_pairs = { |
|
246 |
- bytes(k): bytes(c) for k, c in client.list_keys() |
|
247 |
- } |
|
269 |
+ client = ssh_agent_client_with_test_keys_loaded |
|
270 |
+ key_comment_pairs = {bytes(k): bytes(c) for k, c in client.list_keys()} |
|
248 | 271 |
public_key_data = data_dict['public_key_data'] |
249 | 272 |
_ = data_dict['expected_signature'] |
250 | 273 |
if public_key_data not in key_comment_pairs: # pragma: no cover |
... | ... |
@@ -265,8 +288,14 @@ class TestAgentInteraction: |
265 | 288 |
|
266 | 289 |
@pytest.mark.parametrize(['key', 'single'], list(_params())) |
267 | 290 |
def test_210_ssh_key_selector( |
268 |
- self, monkeypatch: pytest.MonkeyPatch, key: bytes, single: bool |
|
291 |
+ self, |
|
292 |
+ monkeypatch: pytest.MonkeyPatch, |
|
293 |
+ running_ssh_agent: str, |
|
294 |
+ key: bytes, |
|
295 |
+ single: bool, |
|
269 | 296 |
) -> None: |
297 |
+ del running_ssh_agent |
|
298 |
+ |
|
270 | 299 |
def key_is_suitable(key: bytes) -> bool: |
271 | 300 |
return key in { |
272 | 301 |
v['public_key_data'] for v in tests.SUPPORTED_KEYS.values() |
... | ... |
@@ -318,9 +347,12 @@ class TestAgentInteraction: |
318 | 347 |
del _params |
319 | 348 |
|
320 | 349 |
def test_300_constructor_bad_running_agent( |
321 |
- self, monkeypatch: pytest.MonkeyPatch |
|
350 |
+ self, |
|
351 |
+ monkeypatch: pytest.MonkeyPatch, |
|
352 |
+ running_ssh_agent: str, |
|
322 | 353 |
) -> None: |
323 |
- monkeypatch.setenv('SSH_AUTH_SOCK', os.environ['SSH_AUTH_SOCK'] + '~') |
|
354 |
+ with monkeypatch.context() as monkeypatch2: |
|
355 |
+ monkeypatch2.setenv('SSH_AUTH_SOCK', running_ssh_agent + '~') |
|
324 | 356 |
sock = socket.socket(family=socket.AF_UNIX) |
325 | 357 |
with pytest.raises(OSError): # noqa: PT011 |
326 | 358 |
ssh_agent.SSHAgentClient(socket=sock) |
... | ... |
@@ -333,8 +365,12 @@ class TestAgentInteraction: |
333 | 365 |
], |
334 | 366 |
) |
335 | 367 |
def test_310_truncated_server_response( |
336 |
- self, monkeypatch: pytest.MonkeyPatch, response: bytes |
|
368 |
+ self, |
|
369 |
+ monkeypatch: pytest.MonkeyPatch, |
|
370 |
+ running_ssh_agent: str, |
|
371 |
+ response: bytes, |
|
337 | 372 |
) -> None: |
373 |
+ del running_ssh_agent |
|
338 | 374 |
client = ssh_agent.SSHAgentClient() |
339 | 375 |
response_stream = io.BytesIO(response) |
340 | 376 |
|
... | ... |
@@ -350,7 +386,6 @@ class TestAgentInteraction: |
350 | 386 |
with pytest.raises(EOFError): |
351 | 387 |
client.request(255, b'') |
352 | 388 |
|
353 |
- @tests.skip_if_no_agent |
|
354 | 389 |
@pytest.mark.parametrize( |
355 | 390 |
['response_code', 'response', 'exc_type', 'exc_pattern'], |
356 | 391 |
[ |
... | ... |
@@ -377,11 +412,14 @@ class TestAgentInteraction: |
377 | 412 |
def test_320_list_keys_error_responses( |
378 | 413 |
self, |
379 | 414 |
monkeypatch: pytest.MonkeyPatch, |
415 |
+ running_ssh_agent: str, |
|
380 | 416 |
response_code: _types.SSH_AGENT, |
381 | 417 |
response: bytes | bytearray, |
382 | 418 |
exc_type: type[Exception], |
383 | 419 |
exc_pattern: str, |
384 | 420 |
) -> None: |
421 |
+ del running_ssh_agent |
|
422 |
+ |
|
385 | 423 |
passed_response_code = response_code |
386 | 424 |
|
387 | 425 |
def request( |
... | ... |
@@ -413,12 +451,12 @@ class TestAgentInteraction: |
413 | 451 |
) |
414 | 452 |
return response |
415 | 453 |
|
454 |
+ with monkeypatch.context() as monkeypatch2: |
|
416 | 455 |
client = ssh_agent.SSHAgentClient() |
417 |
- monkeypatch.setattr(client, 'request', request) |
|
456 |
+ monkeypatch2.setattr(client, 'request', request) |
|
418 | 457 |
with pytest.raises(exc_type, match=exc_pattern): |
419 | 458 |
client.list_keys() |
420 | 459 |
|
421 |
- @tests.skip_if_no_agent |
|
422 | 460 |
@pytest.mark.parametrize( |
423 | 461 |
[ |
424 | 462 |
'key', |
... | ... |
@@ -450,6 +488,7 @@ class TestAgentInteraction: |
450 | 488 |
def test_330_sign_error_responses( |
451 | 489 |
self, |
452 | 490 |
monkeypatch: pytest.MonkeyPatch, |
491 |
+ running_ssh_agent: str, |
|
453 | 492 |
key: bytes | bytearray, |
454 | 493 |
check: bool, |
455 | 494 |
response_code: _types.SSH_AGENT, |
... | ... |
@@ -457,6 +496,7 @@ class TestAgentInteraction: |
457 | 496 |
exc_type: type[Exception], |
458 | 497 |
exc_pattern: str, |
459 | 498 |
) -> None: |
499 |
+ del running_ssh_agent |
|
460 | 500 |
passed_response_code = response_code |
461 | 501 |
|
462 | 502 |
def request( |
... | ... |
@@ -490,18 +530,18 @@ class TestAgentInteraction: |
490 | 530 |
) |
491 | 531 |
return response # pragma: no cover |
492 | 532 |
|
533 |
+ with monkeypatch.context() as monkeypatch2: |
|
493 | 534 |
client = ssh_agent.SSHAgentClient() |
494 |
- monkeypatch.setattr(client, 'request', request) |
|
535 |
+ monkeypatch2.setattr(client, 'request', request) |
|
495 | 536 |
KeyCommentPair = _types.KeyCommentPair # noqa: N806 |
496 | 537 |
loaded_keys = [ |
497 | 538 |
KeyCommentPair(v['public_key_data'], b'no comment') |
498 | 539 |
for v in tests.SUPPORTED_KEYS.values() |
499 | 540 |
] |
500 |
- monkeypatch.setattr(client, 'list_keys', lambda: loaded_keys) |
|
541 |
+ monkeypatch2.setattr(client, 'list_keys', lambda: loaded_keys) |
|
501 | 542 |
with pytest.raises(exc_type, match=exc_pattern): |
502 | 543 |
client.sign(key, b'abc', check_if_key_loaded=check) |
503 | 544 |
|
504 |
- @tests.skip_if_no_agent |
|
505 | 545 |
@pytest.mark.parametrize( |
506 | 546 |
['request_code', 'response_code', 'exc_type', 'exc_pattern'], |
507 | 547 |
[ |
... | ... |
@@ -515,11 +555,14 @@ class TestAgentInteraction: |
515 | 555 |
) |
516 | 556 |
def test_340_request_error_responses( |
517 | 557 |
self, |
558 |
+ running_ssh_agent: str, |
|
518 | 559 |
request_code: _types.SSH_AGENTC, |
519 | 560 |
response_code: _types.SSH_AGENT, |
520 | 561 |
exc_type: type[Exception], |
521 | 562 |
exc_pattern: str, |
522 | 563 |
) -> None: |
564 |
+ del running_ssh_agent |
|
565 |
+ |
|
523 | 566 |
with ( |
524 | 567 |
pytest.raises(exc_type, match=exc_pattern), |
525 | 568 |
ssh_agent.SSHAgentClient() as client, |
526 | 569 |