11a492e7d9c86097a31c97d78d9751f1d82db377
Marco Ricci Update copyright notices to...

Marco Ricci authored 2 months ago

1) # SPDX-FileCopyrightText: 2025 Marco Ricci <software@the13thletter.info>
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

2) #
Marco Ricci Update copyright notices to...

Marco Ricci authored 2 months ago

3) # SPDX-License-Identifier: Zlib
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

4) 
5) from __future__ import annotations
6) 
7) import base64
8) import contextlib
9) import operator
10) import os
11) import shutil
Marco Ricci Fail gracefully if UNIX dom...

Marco Ricci authored 5 months ago

12) import socket
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

13) import subprocess
14) from typing import TYPE_CHECKING, TypeVar
15) 
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 5 months ago

16) import hypothesis
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

17) import packaging.version
18) import pytest
19) 
20) import tests
21) from derivepassphrase import _types, ssh_agent
22) 
23) if TYPE_CHECKING:
24)     from collections.abc import Iterator
25) 
26) startup_ssh_auth_sock = os.environ.get('SSH_AUTH_SOCK', None)
27) 
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 5 months ago

28) # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles
Marco Ricci Fix outstanding formatting...

Marco Ricci authored 5 months ago

29) hypothesis.settings.register_profile('ci', max_examples=1000)
30) hypothesis.settings.register_profile('dev', max_examples=10)
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 5 months ago

31) hypothesis.settings.register_profile(
Marco Ricci Fix outstanding formatting...

Marco Ricci authored 5 months ago

32)     'debug', max_examples=10, verbosity=hypothesis.Verbosity.verbose
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 5 months ago

33) )
34) 
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

35) 
36) # https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup
37) # https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595
38) @pytest.fixture(scope='session', autouse=True)
39) def term_handler() -> Iterator[None]:  # pragma: no cover
40)     try:
41)         import signal  # noqa: PLC0415
42) 
43)         sigint_handler = signal.getsignal(signal.SIGINT)
44)     except (ImportError, OSError):
45)         return
46)     else:
47)         orig_term = signal.signal(signal.SIGTERM, sigint_handler)
48)         yield
49)         signal.signal(signal.SIGTERM, orig_term)
50) 
51) 
Marco Ricci Fail gracefully if UNIX dom...

Marco Ricci authored 5 months ago

52) @pytest.fixture(scope='session')
53) def skip_if_no_af_unix_support() -> None:  # pragma: no cover
54)     """Skip the test if Python does not support AF_UNIX.
55) 
56)     Implemented as a fixture instead of a mark because it has
57)     consequences for other fixtures, and because another "autouse"
58)     session fixture may want to force/simulate non-support of
59)     [`socket.AF_UNIX`][].
60) 
61)     """
62)     if not hasattr(socket, 'AF_UNIX'):
63)         pytest.skip('socket module does not support AF_UNIX')
64) 
65) 
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

66) def _spawn_pageant(  # pragma: no cover
67)     executable: str | None, env: dict[str, str]
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

68) ) -> subprocess.Popen[str] | None:
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

69)     """Spawn an isolated Pageant, if possible.
70) 
71)     We attempt to detect whether Pageant is usable, i.e. whether Pageant
72)     has output buffering problems when announcing its authentication
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

73)     socket.  This is the case for Pageant 0.81 and earlier.
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

74) 
75)     Args:
76)         executable:
77)             The path to the Pageant executable.
78)         env:
79)             The new environment for Pageant.  Should typically not
80)             include an SSH_AUTH_SOCK variable.
81) 
82)     Returns:
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

83)         The spawned Pageant subprocess.  If the executable is `None`, or
84)         if we detect that Pageant cannot be sensibly controlled as
85)         a subprocess, then return `None` directly.
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

86) 
87)         It is the caller's responsibility to clean up the spawned
88)         subprocess.
89) 
90)     """
91)     if executable is None:  # pragma: no cover
92)         return None
93) 
94)     # Apparently, Pageant 0.81 and lower running in debug mode does
95)     # not actively flush its output.  As a result, the first two
96)     # lines, which set the SSH_AUTH_SOCK and the SSH_AGENT_PID, only
97)     # print once the output buffer is flushed, whenever that is.
98)     #
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

99)     # This has been reported to the PuTTY developers.  It is fixed in
100)     # version 0.82, though the PuTTY developers consider this to be an
101)     # abuse of debug mode.  A new foreground mode (`--foreground`), also
102)     # introduced in 0.82, provides the desired behavior: no forking, and
103)     # immediately parsable instructions for SSH_AUTH_SOCK and
104)     # SSH_AGENT_PID.
105) 
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

106)     help_output = subprocess.run(
107)         ['pageant', '--help'],
108)         executable=executable,
109)         env=env,
110)         capture_output=True,
111)         text=True,
112)         check=False,
113)     ).stdout
114)     help_lines = help_output.splitlines(True)
115)     pageant_version_string = (
116)         help_lines[1].strip().removeprefix('Release ')
117)         if len(help_lines) >= 2
118)         else ''
119)     )
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

120)     v0_82 = packaging.version.Version('0.82')
121)     pageant_version = packaging.version.Version(pageant_version_string)
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

122) 
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

123)     if pageant_version < v0_82:  # pragma: no cover
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

124)         return None
125) 
126)     return subprocess.Popen(
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

127)         ['pageant', '--foreground', '-s'],
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

128)         executable=executable,
129)         stdin=subprocess.DEVNULL,
130)         stdout=subprocess.PIPE,
131)         shell=False,
132)         env=env,
133)         text=True,
134)         bufsize=1,
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

135)     )
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

136) 
137) 
138) def _spawn_openssh_agent(  # pragma: no cover
139)     executable: str | None, env: dict[str, str]
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

140) ) -> subprocess.Popen[str] | None:
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

141)     """Spawn an isolated OpenSSH agent, if possible.
142) 
143)     Args:
144)         executable:
145)             The path to the OpenSSH agent executable.
146)         env:
147)             The new environment for the OpenSSH agent.  Should typically
148)             not include an SSH_AUTH_SOCK variable.
149) 
150)     Returns:
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

151)         The spawned OpenSSH agent subprocess.  If the executable is
152)         `None`, then return `None` directly.
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

153) 
154)         It is the caller's responsibility to clean up the spawned
155)         subprocess.
156) 
157)     """
158)     if executable is None:
159)         return None
160)     return subprocess.Popen(
161)         ['ssh-agent', '-D', '-s'],
162)         executable=executable,
163)         stdin=subprocess.DEVNULL,
164)         stdout=subprocess.PIPE,
165)         shell=False,
166)         env=env,
167)         text=True,
168)         bufsize=1,
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

169)     )
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

170) 
171) 
172) def _spawn_system_agent(  # pragma: no cover
173)     executable: str | None, env: dict[str, str]
174) ) -> None:
175)     """Placeholder function. Does nothing."""
176) 
177) 
178) _spawn_handlers = [
179)     ('pageant', _spawn_pageant, tests.KnownSSHAgent.Pageant),
180)     ('ssh-agent', _spawn_openssh_agent, tests.KnownSSHAgent.OpenSSHAgent),
181)     ('(system)', _spawn_system_agent, tests.KnownSSHAgent.UNKNOWN),
182) ]
183) 
184) 
185) @pytest.fixture
Marco Ricci Fail gracefully if UNIX dom...

Marco Ricci authored 5 months ago

186) def running_ssh_agent(  # pragma: no cover
187)     skip_if_no_af_unix_support: None,
Marco Ricci Let the `running_ssh_agent`...

Marco Ricci authored 3 months ago

188) ) -> Iterator[tests.RunningSSHAgentInfo]:
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

189)     """Ensure a running SSH agent, if possible, as a pytest fixture.
190) 
191)     Check for a running SSH agent, or spawn a new one if possible.  We
192)     know how to spawn OpenSSH's agent and PuTTY's Pageant.  If spawned
193)     this way, the agent does not persist beyond the test.
194) 
195)     This fixture can neither guarantee a particular running agent, nor
196)     can it guarantee a particular set of loaded keys.
197) 
198)     Yields:
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

199)         A 2-tuple `(ssh_auth_sock, agent_type)`, where `ssh_auth_sock`
200)         is the value of the `SSH_AUTH_SOCK` environment variable, to be
201)         used to connect to the running agent, and `agent_type` is the
202)         agent type.
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

203) 
204)     Raises:
205)         pytest.skip.Exception:
206)             If no agent is running or can be spawned, skip this test.
207) 
208)     """
Marco Ricci Fail gracefully if UNIX dom...

Marco Ricci authored 5 months ago

209)     del skip_if_no_af_unix_support
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

210)     exit_stack = contextlib.ExitStack()
211)     Popen = TypeVar('Popen', bound=subprocess.Popen)
212) 
213)     @contextlib.contextmanager
214)     def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
215)         try:
216)             yield proc
217)         finally:
218)             proc.terminate()
219)             proc.wait()
220) 
221)     with pytest.MonkeyPatch.context() as monkeypatch:
222)         # pytest's fixture system does not seem to guarantee that
223)         # environment variables are set up correctly if nested and
224)         # parametrized fixtures are used: it is possible that "outer"
225)         # parametrized fixtures are torn down only after other "outer"
226)         # fixtures of the same parameter set have run.  So set
227)         # SSH_AUTH_SOCK explicitly to the value saved at interpreter
228)         # startup.  This is then verified with *a lot* of further assert
229)         # statements.
230)         if startup_ssh_auth_sock:  # pragma: no cover
231)             monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
232)         else:  # pragma: no cover
233)             monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
Marco Ricci Let the `running_ssh_agent`...

Marco Ricci authored 3 months ago

234)         for exec_name, spawn_func, agent_type in _spawn_handlers:
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

235)             # Use match/case here once Python 3.9 becomes unsupported.
236)             if exec_name == '(system)':
237)                 assert (
238)                     os.environ.get('SSH_AUTH_SOCK', None)
239)                     == startup_ssh_auth_sock
240)                 ), 'SSH_AUTH_SOCK mismatch when checking for running agent'
241)                 try:
242)                     with ssh_agent.SSHAgentClient() as client:
243)                         client.list_keys()
244)                 except (KeyError, OSError):
245)                     continue
Marco Ricci Let the `running_ssh_agent`...

Marco Ricci authored 3 months ago

246)                 yield tests.RunningSSHAgentInfo(
247)                     os.environ['SSH_AUTH_SOCK'], agent_type
248)                 )
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

249)                 assert (
250)                     os.environ.get('SSH_AUTH_SOCK', None)
251)                     == startup_ssh_auth_sock
252)                 ), 'SSH_AUTH_SOCK mismatch after returning from running agent'
253)             else:
254)                 assert (
255)                     os.environ.get('SSH_AUTH_SOCK', None)
256)                     == startup_ssh_auth_sock
Marco Ricci Update ruff to v0.8.x, refo...

Marco Ricci authored 2 months ago

257)                 ), (
258)                     f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'
259)                 )
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

260)                 proc = spawn_func(executable=shutil.which(exec_name), env={})
261)                 if proc is None:
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

262)                     continue
263)                 with exit_stack:
264)                     exit_stack.enter_context(terminate_on_exit(proc))
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

265)                     assert (
266)                         os.environ.get('SSH_AUTH_SOCK', None)
267)                         == startup_ssh_auth_sock
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

268)                     ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
269)                     assert proc.stdout is not None
270)                     ssh_auth_sock_line = proc.stdout.readline()
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

271)                     try:
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

272)                         ssh_auth_sock = tests.parse_sh_export_line(
273)                             ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
274)                         )
275)                     except ValueError:  # pragma: no cover
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

276)                         continue
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

277)                     pid_line = proc.stdout.readline()
278)                     if (
279)                         'pid' not in pid_line.lower()
280)                         and '_pid' not in pid_line.lower()
281)                     ):  # pragma: no cover
282)                         pytest.skip(f'Cannot parse agent output: {pid_line!r}')
283)                     monkeypatch2 = exit_stack.enter_context(
284)                         pytest.MonkeyPatch.context()
285)                     )
286)                     monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
Marco Ricci Let the `running_ssh_agent`...

Marco Ricci authored 3 months ago

287)                     yield tests.RunningSSHAgentInfo(ssh_auth_sock, agent_type)
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

288)                 assert (
289)                     os.environ.get('SSH_AUTH_SOCK', None)
290)                     == startup_ssh_auth_sock
291)                 ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

292)             return
293)         pytest.skip('No SSH agent running or spawnable')
294) 
295) 
296) @pytest.fixture(params=_spawn_handlers, ids=operator.itemgetter(0))
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

297) def spawn_ssh_agent(
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

298)     request: pytest.FixtureRequest,
Marco Ricci Fail gracefully if UNIX dom...

Marco Ricci authored 5 months ago

299)     skip_if_no_af_unix_support: None,
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

300) ) -> Iterator[tests.SpawnedSSHAgentInfo]:
301)     """Spawn an isolated SSH agent, if possible, as a pytest fixture.
302) 
303)     Spawn a new SSH agent isolated from other SSH use by other
304)     processes, if possible.  We know how to spawn OpenSSH's agent and
305)     PuTTY's Pageant, and the "(system)" fallback agent.
306) 
307)     Yields:
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

308)         A [named tuple][collections.namedtuple] containing information
309)         about the spawned agent, e.g. the software product, a client
310)         connected to the agent, and whether the agent is isolated from
311)         other clients.
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

312) 
313)     Raises:
314)         pytest.skip.Exception:
315)             If the agent cannot be spawned, skip this test.
316) 
317)     """
Marco Ricci Fail gracefully if UNIX dom...

Marco Ricci authored 5 months ago

318)     del skip_if_no_af_unix_support
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

319)     agent_env = os.environ.copy()
320)     agent_env.pop('SSH_AUTH_SOCK', None)
321)     exit_stack = contextlib.ExitStack()
322)     Popen = TypeVar('Popen', bound=subprocess.Popen)
323) 
324)     @contextlib.contextmanager
325)     def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
326)         try:
327)             yield proc
328)         finally:
329)             proc.terminate()
330)             proc.wait()
331) 
332)     with pytest.MonkeyPatch.context() as monkeypatch:
333)         # pytest's fixture system does not seem to guarantee that
334)         # environment variables are set up correctly if nested and
335)         # parametrized fixtures are used: it is possible that "outer"
336)         # parametrized fixtures are torn down only after other "outer"
337)         # fixtures of the same parameter set have run.  So set
338)         # SSH_AUTH_SOCK explicitly to the value saved at interpreter
339)         # startup.  This is then verified with *a lot* of further assert
340)         # statements.
341)         if startup_ssh_auth_sock:  # pragma: no cover
342)             monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
343)         else:  # pragma: no cover
344)             monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
345)         exec_name, spawn_func, agent_type = request.param
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

346)         # Use match/case here once Python 3.9 becomes unsupported.
347)         if exec_name == '(system)':
348)             assert (
349)                 os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
350)             ), 'SSH_AUTH_SOCK mismatch when checking for running agent'
351)             try:
352)                 client = ssh_agent.SSHAgentClient()
353)                 client.list_keys()
354)             except KeyError:  # pragma: no cover
355)                 pytest.skip('SSH agent is not running')
356)             except OSError as exc:  # pragma: no cover
357)                 pytest.skip(
358)                     f'Cannot talk to SSH agent: '
359)                     f'{exc.strerror}: {exc.filename!r}'
360)                 )
361)             with client:
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

362)                 assert (
363)                     os.environ.get('SSH_AUTH_SOCK', None)
364)                     == startup_ssh_auth_sock
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

365)                 ), 'SSH_AUTH_SOCK mismatch before setting up for running agent'
366)                 yield tests.SpawnedSSHAgentInfo(agent_type, client, False)
367)             assert (
368)                 os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
369)             ), 'SSH_AUTH_SOCK mismatch after returning from running agent'
370)             return
371) 
372)         else:
373)             assert (
374)                 os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
Marco Ricci Update ruff to v0.8.x, refo...

Marco Ricci authored 2 months ago

375)             ), (
376)                 f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'
377)             )
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

378)             proc = spawn_func(
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

379)                 executable=shutil.which(exec_name), env=agent_env
380)             )
381)             assert (
382)                 os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
383)             ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

384)             if proc is None:  # pragma: no cover
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

385)                 pytest.skip(f'Cannot spawn usable {exec_name}')
386)             with exit_stack:
387)                 exit_stack.enter_context(terminate_on_exit(proc))
388)                 assert proc.stdout is not None
389)                 ssh_auth_sock_line = proc.stdout.readline()
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

390)                 try:
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

391)                     ssh_auth_sock = tests.parse_sh_export_line(
392)                         ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
393)                     )
394)                 except ValueError:  # pragma: no cover
Marco Ricci Remove debugging-only code...

Marco Ricci authored 5 months ago

395)                     pytest.skip(
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

396)                         f'Cannot parse agent output: {ssh_auth_sock_line}'
Marco Ricci Remove debugging-only code...

Marco Ricci authored 5 months ago

397)                     )
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

398)                 pid_line = proc.stdout.readline()
399)                 if (
400)                     'pid' not in pid_line.lower()
401)                     and '_pid' not in pid_line.lower()
402)                 ):  # pragma: no cover
403)                     pytest.skip(f'Cannot parse agent output: {pid_line!r}')
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

404)                 assert (
405)                     os.environ.get('SSH_AUTH_SOCK', None)
406)                     == startup_ssh_auth_sock
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

407)                 ), f'SSH_AUTH_SOCK mismatch before spawning {exec_name} helper'
408)                 monkeypatch2 = exit_stack.enter_context(
409)                     pytest.MonkeyPatch.context()
410)                 )
411)                 monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
412)                 try:
413)                     client = ssh_agent.SSHAgentClient()
414)                 except OSError as exc:  # pragma: no cover
415)                     pytest.skip(
416)                         f'Cannot talk to SSH agent: '
417)                         f'{exc.strerror}: {exc.filename!r}'
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

418)                     )
Marco Ricci Add support for Python 3.9

Marco Ricci authored 5 months ago

419)                 exit_stack.enter_context(client)
420)                 yield tests.SpawnedSSHAgentInfo(agent_type, client, True)
421)             assert (
422)                 os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
423)             ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
424)             return
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

425) 
426) 
427) @pytest.fixture
428) def ssh_agent_client_with_test_keys_loaded(  # noqa: C901
429)     spawn_ssh_agent: tests.SpawnedSSHAgentInfo,
430) ) -> Iterator[ssh_agent.SSHAgentClient]:
Marco Ricci Remove debugging-only code...

Marco Ricci authored 5 months ago

431)     """Provide an SSH agent with loaded test keys, as a pytest fixture.
432) 
433)     Use the `spawn_ssh_agent` fixture to acquire a usable SSH agent,
434)     upload the known test keys into the agent, and return a connected
435)     client.
436) 
437)     The agent may reject several of the test keys due to unsupported or
438)     obsolete key types.  Rejected keys will be silently ignored, unless
439)     all keys are rejected; then the test will be skipped.  You must not
440)     automatically assume any particular key is present in the agent.
441) 
442)     Yields:
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

443)         A [named tuple][collections.namedtuple] containing
444)         information about the spawned agent, e.g. the software
445)         product, a client connected to the agent, and whether the
446)         agent is isolated from other clients.
Marco Ricci Remove debugging-only code...

Marco Ricci authored 5 months ago

447) 
448)     Raises:
449)         OSError:
450)             There was a communication or a socket setup error with the
451)             agent.
452)         pytest.skip.Exception:
453)             If the agent is unusable or if it rejected all test keys,
454)             skip this test.
455) 
456)     Warning:
457)         It is the fixture's responsibility to clean up the SSH agent
458)         client after the test.  Closing the client's socket connection
459)         beforehand (e.g. by using the client as a context manager) may
460)         lead to exceptions being thrown upon fixture teardown.
461) 
462)     """
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

463)     agent_type, client, isolated = spawn_ssh_agent
464)     successfully_loaded_keys: set[str] = set()
465) 
466)     def prepare_payload(
467)         payload: bytes | bytearray,
468)         *,
469)         isolated: bool = True,
470)         time_to_live: int = 30,
471)     ) -> tuple[_types.SSH_AGENTC, bytes]:
472)         return_code = (
473)             _types.SSH_AGENTC.ADD_IDENTITY
474)             if isolated
475)             else _types.SSH_AGENTC.ADD_ID_CONSTRAINED
476)         )
477)         lifetime_constraint = (
478)             b''
479)             if isolated
480)             else b'\x01' + ssh_agent.SSHAgentClient.uint32(time_to_live)
481)         )
482)         return (return_code, bytes(payload) + lifetime_constraint)
483) 
484)     try:
Marco Ricci Convert `tests.SSHTestKey`...

Marco Ricci authored 2 months ago

485)         for key_type, key_struct in tests.ALL_KEYS.items():
486)             private_key_data = key_struct.private_key_blob
487)             if private_key_data is None:  # pragma: no cover
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

488)                 continue
489)             request_code, payload = prepare_payload(
490)                 private_key_data, isolated=isolated, time_to_live=30
491)             )
492)             try:
493)                 try:
494)                     client.request(
495)                         request_code,
496)                         payload,
497)                         response_code=_types.SSH_AGENT.SUCCESS,
498)                     )
499)                 except ssh_agent.SSHAgentFailedError:  # pragma: no cover
500)                     # Pageant can fail to accept a key for two separate
501)                     # reasons:
502)                     #
503)                     # - Pageant refuses to accept a key it already holds
Marco Ricci Explicitly support Pageant...

Marco Ricci authored 3 months ago

504)                     #   in memory.  Verify this by listing keys.
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

505)                     # - Pageant does not support key constraints (see
506)                     #   references below).
507)                     #
508)                     # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-timeout.html
509)                     # https://www.chiark.greenend.org.uk/~sgtatham/putty/wishlist/pageant-key-confirm.html
510)                     current_loaded_keys = frozenset({
511)                         pair.key for pair in client.list_keys()
512)                     })
513)                     if agent_type == tests.KnownSSHAgent.Pageant and (
Marco Ricci Convert `tests.SSHTestKey`...

Marco Ricci authored 2 months ago

514)                         key_struct.public_key_data in current_loaded_keys
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

515)                     ):
516)                         pass
517)                     elif agent_type == tests.KnownSSHAgent.Pageant and (
518)                         not isolated
519)                     ):
520)                         request_code, payload = prepare_payload(
521)                             private_key_data, isolated=True
522)                         )
523)                         client.request(
524)                             request_code,
525)                             payload,
526)                             response_code=_types.SSH_AGENT.SUCCESS,
527)                         )
528)                     else:
529)                         raise
530)             except (
531)                 EOFError,
532)                 OSError,
533)                 ssh_agent.SSHAgentFailedError,
534)             ):  # pragma: no cover
535)                 pass
536)             else:  # pragma: no cover
537)                 successfully_loaded_keys.add(key_type)
538)         yield client
539)     finally:
Marco Ricci Convert `tests.SSHTestKey`...

Marco Ricci authored 2 months ago

540)         for key_type, key_struct in tests.ALL_KEYS.items():
Marco Ricci Add test fixture for manual...

Marco Ricci authored 5 months ago

541)             if not isolated and (
542)                 key_type in successfully_loaded_keys
543)             ):  # pragma: no cover
544)                 # The public key blob is the base64-encoded part in
545)                 # the "public key line".
546)                 public_key = base64.standard_b64decode(
Marco Ricci Convert `tests.SSHTestKey`...

Marco Ricci authored 2 months ago

547)                     key_struct.public_key.split(None, 2)[1]