1112627d6dd1f23dc066c1a723f5bfad01b31dab
Marco Ricci Add test fixture for manual...

Marco Ricci authored 2 months ago

1) # SPDX-FileCopyrightText: 2024 Marco Ricci <software@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4) 
5) from __future__ import annotations
6) 
7) import base64
8) import contextlib
9) import operator
10) import os
11) import shutil
12) import subprocess
13) import sys
14) import textwrap
15) from typing import TYPE_CHECKING, TypeVar
16) 
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 2 months ago

17) import hypothesis
Marco Ricci Add test fixture for manual...

Marco Ricci authored 2 months ago

18) import packaging.version
19) import pytest
20) 
21) import tests
22) from derivepassphrase import _types, ssh_agent
23) 
24) if TYPE_CHECKING:
25)     from collections.abc import Iterator
26)     from typing import Literal
27) 
28) startup_ssh_auth_sock = os.environ.get('SSH_AUTH_SOCK', None)
29) 
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 2 months ago

30) # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles
Marco Ricci Fix outstanding formatting...

Marco Ricci authored 2 months ago

31) hypothesis.settings.register_profile('ci', max_examples=1000)
32) hypothesis.settings.register_profile('dev', max_examples=10)
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 2 months ago

33) hypothesis.settings.register_profile(
Marco Ricci Fix outstanding formatting...

Marco Ricci authored 2 months ago

34)     'debug', max_examples=10, verbosity=hypothesis.Verbosity.verbose
Marco Ricci Set up the "hypothesis" tes...

Marco Ricci authored 2 months ago

35) )
36) 
Marco Ricci Add test fixture for manual...

Marco Ricci authored 2 months ago

37) 
38) # https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup
39) # https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595
40) @pytest.fixture(scope='session', autouse=True)
41) def term_handler() -> Iterator[None]:  # pragma: no cover
42)     try:
43)         import signal  # noqa: PLC0415
44) 
45)         sigint_handler = signal.getsignal(signal.SIGINT)
46)     except (ImportError, OSError):
47)         return
48)     else:
49)         orig_term = signal.signal(signal.SIGTERM, sigint_handler)
50)         yield
51)         signal.signal(signal.SIGTERM, orig_term)
52) 
53) 
54) def _spawn_pageant(  # pragma: no cover
55)     executable: str | None, env: dict[str, str]
56) ) -> tuple[subprocess.Popen[str], bool] | None:
57)     """Spawn an isolated Pageant, if possible.
58) 
59)     We attempt to detect whether Pageant is usable, i.e. whether Pageant
60)     has output buffering problems when announcing its authentication
61)     socket.
62) 
63)     Args:
64)         executable:
65)             The path to the Pageant executable.
66)         env:
67)             The new environment for Pageant.  Should typically not
68)             include an SSH_AUTH_SOCK variable.
69) 
70)     Returns:
71)         (tuple[subprocess.Popen, bool] | None):
72)         A 2-tuple `(proc, debug_output)`, where `proc` is the spawned
73)         Pageant subprocess, and `debug_output` indicates whether Pageant
74)         will continue to emit debug output (that needs to be actively
75)         read) or not.
76) 
77)         It is the caller's responsibility to clean up the spawned
78)         subprocess.
79) 
80)         If the executable is `None`, or if we detect that Pageant is too
81)         old to properly flush its output, which prevents readers from
82)         learning the SSH_AUTH_SOCK setting needed to connect to Pageant
83)         in the first place, then return `None` directly.
84) 
85)     """
86)     if executable is None:  # pragma: no cover
87)         return None
88) 
89)     pageant_features = {'flush': False, 'foreground': False}
90) 
91)     # Apparently, Pageant 0.81 and lower running in debug mode does
92)     # not actively flush its output.  As a result, the first two
93)     # lines, which set the SSH_AUTH_SOCK and the SSH_AGENT_PID, only
94)     # print once the output buffer is flushed, whenever that is.
95)     #
96)     # This has been reported to the PuTTY developers.
97)     #
98)     # For testing purposes, I currently build a version of Pageant with
99)     # output flushing fixed and with a `--foreground` option.  This is
100)     # detected here.
101)     help_output = subprocess.run(
102)         ['pageant', '--help'],
103)         executable=executable,
104)         env=env,
105)         capture_output=True,
106)         text=True,
107)         check=False,
108)     ).stdout
109)     help_lines = help_output.splitlines(True)
110)     pageant_version_string = (
111)         help_lines[1].strip().removeprefix('Release ')
112)         if len(help_lines) >= 2
113)         else ''
114)     )
115)     v0_81 = packaging.version.Version('0.81')
116)     if pageant_version_string not in {'', 'Unidentified build'}:
117)         # TODO(the-13th-letter): Once a fixed Pageant is released,
118)         # remove the check for build information in the version string.
119)         # https://github.com/the-13th-letter/derivepassphrase/issues/14
120)         pageant_version_string_numeric, local_segment_list = (
121)             pageant_version_string.split('+', 1)
122)             if '+' in pageant_version_string
123)             else (pageant_version_string, '')
124)         )
125)         local_segments = frozenset(local_segment_list.split('+'))
126)         pageant_version = packaging.version.Version(
127)             pageant_version_string_numeric
128)         )
129)         for key in pageant_features:
130)             pageant_features[key] = pageant_version > v0_81 or (
131)                 pageant_version == v0_81 and key in local_segments
132)             )
133) 
134)     if not pageant_features['flush']:  # pragma: no cover
135)         return None
136) 
137)     # Because Pageant's debug mode prints debugging information on
138)     # standard output, and because we yield control to a different
139)     # thread of execution, we cannot read-and-discard Pageant's output
140)     # here.  Instead, spawn a consumer process and connect it to
141)     # Pageant's standard output; see _spawn_data_sink.
142)     #
143)     # This will hopefully not be necessary with newer Pageants:
144)     # a feature request for a `--foreground` option that just avoids the
145)     # forking behavior has been submitted.
146) 
147)     return subprocess.Popen(
148)         [
149)             'pageant',
150)             '--foreground' if pageant_features['foreground'] else '--debug',
151)             '-s',
152)         ],
153)         executable=executable,
154)         stdin=subprocess.DEVNULL,
155)         stdout=subprocess.PIPE,
156)         shell=False,
157)         env=env,
158)         text=True,
159)         bufsize=1,
160)     ), not pageant_features['foreground']
161) 
162) 
163) def _spawn_openssh_agent(  # pragma: no cover
164)     executable: str | None, env: dict[str, str]
165) ) -> tuple[subprocess.Popen[str], Literal[False]] | None:
166)     """Spawn an isolated OpenSSH agent, if possible.
167) 
168)     We attempt to detect whether Pageant is usable, i.e. whether Pageant
169)     has output buffering problems when announcing its authentication
170)     socket.
171) 
172)     Args:
173)         executable:
174)             The path to the OpenSSH agent executable.
175)         env:
176)             The new environment for the OpenSSH agent.  Should typically
177)             not include an SSH_AUTH_SOCK variable.
178) 
179)     Returns:
180)         (tuple[subprocess.Popen, Literal[False]] | None):
181)         A 2-tuple `(proc, debug_output)`, where `proc` is the spawned
182)         OpenSSH agent subprocess, and `debug_output` indicates whether
183)         the OpenSSH agent will continue to emit debug output that needs
184)         to be actively read (which it doesn't, so this is always false).
185) 
186)         It is the caller's responsibility to clean up the spawned
187)         subprocess.
188) 
189)         If the executable is `None`, then return `None` directly.
190) 
191)     """
192)     if executable is None:
193)         return None
194)     return subprocess.Popen(
195)         ['ssh-agent', '-D', '-s'],
196)         executable=executable,
197)         stdin=subprocess.DEVNULL,
198)         stdout=subprocess.PIPE,
199)         shell=False,
200)         env=env,
201)         text=True,
202)         bufsize=1,
203)     ), False
204) 
205) 
206) def _spawn_system_agent(  # pragma: no cover
207)     executable: str | None, env: dict[str, str]
208) ) -> None:
209)     """Placeholder function. Does nothing."""
210) 
211) 
212) _spawn_handlers = [
213)     ('pageant', _spawn_pageant, tests.KnownSSHAgent.Pageant),
214)     ('ssh-agent', _spawn_openssh_agent, tests.KnownSSHAgent.OpenSSHAgent),
215)     ('(system)', _spawn_system_agent, tests.KnownSSHAgent.UNKNOWN),
216) ]
217) 
218) 
219) @pytest.fixture
220) def running_ssh_agent() -> Iterator[str]:  # pragma: no cover
221)     """Ensure a running SSH agent, if possible, as a pytest fixture.
222) 
223)     Check for a running SSH agent, or spawn a new one if possible.  We
224)     know how to spawn OpenSSH's agent and PuTTY's Pageant.  If spawned
225)     this way, the agent does not persist beyond the test.
226) 
227)     This fixture can neither guarantee a particular running agent, nor
228)     can it guarantee a particular set of loaded keys.
229) 
230)     Yields:
231)         str:
232)             The value of the SSH_AUTH_SOCK environment variable, to be
233)             used to connect to the running agent.
234) 
235)     Raises:
236)         pytest.skip.Exception:
237)             If no agent is running or can be spawned, skip this test.
238) 
239)     """
240)     exit_stack = contextlib.ExitStack()
241)     Popen = TypeVar('Popen', bound=subprocess.Popen)
242) 
243)     @contextlib.contextmanager
244)     def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
245)         try:
246)             yield proc
247)         finally:
248)             proc.terminate()
249)             proc.wait()
250) 
251)     with pytest.MonkeyPatch.context() as monkeypatch:
252)         # pytest's fixture system does not seem to guarantee that
253)         # environment variables are set up correctly if nested and
254)         # parametrized fixtures are used: it is possible that "outer"
255)         # parametrized fixtures are torn down only after other "outer"
256)         # fixtures of the same parameter set have run.  So set
257)         # SSH_AUTH_SOCK explicitly to the value saved at interpreter
258)         # startup.  This is then verified with *a lot* of further assert
259)         # statements.
260)         if startup_ssh_auth_sock:  # pragma: no cover
261)             monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
262)         else:  # pragma: no cover
263)             monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
264)         for exec_name, spawn_func, _ in _spawn_handlers:
265)             match exec_name:
266)                 case '(system)':
267)                     assert (
268)                         os.environ.get('SSH_AUTH_SOCK', None)
269)                         == startup_ssh_auth_sock
270)                     ), 'SSH_AUTH_SOCK mismatch when checking for running agent'
271)                     try:
272)                         with ssh_agent.SSHAgentClient() as client:
273)                             client.list_keys()
274)                     except (KeyError, OSError):
275)                         continue
276)                     yield os.environ['SSH_AUTH_SOCK']
277)                     assert (
278)                         os.environ.get('SSH_AUTH_SOCK', None)
279)                         == startup_ssh_auth_sock
280)                     ), 'SSH_AUTH_SOCK mismatch after returning from running agent'  # noqa: E501
281)                 case _:
282)                     assert (
283)                         os.environ.get('SSH_AUTH_SOCK', None)
284)                         == startup_ssh_auth_sock
285)                     ), f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'  # noqa: E501
286)                     spawn_data = spawn_func(  # type: ignore[operator]
287)                         executable=shutil.which(exec_name), env={}
288)                     )
289)                     if spawn_data is None:
290)                         continue
291)                     proc: subprocess.Popen[str]
292)                     emits_debug_output: bool
293)                     proc, emits_debug_output = spawn_data
294)                     with exit_stack:
295)                         exit_stack.enter_context(terminate_on_exit(proc))
296)                         assert (
297)                             os.environ.get('SSH_AUTH_SOCK', None)
298)                             == startup_ssh_auth_sock
299)                         ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
300)                         assert proc.stdout is not None
301)                         ssh_auth_sock_line = proc.stdout.readline()
302)                         try:
303)                             ssh_auth_sock = tests.parse_sh_export_line(
304)                                 ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
305)                             )
306)                         except ValueError:  # pragma: no cover
307)                             continue
308)                         pid_line = proc.stdout.readline()
309)                         if (
310)                             'pid' not in pid_line.lower()
311)                             and '_pid' not in pid_line.lower()
312)                         ):  # pragma: no cover
313)                             pytest.skip(
314)                                 f'Cannot parse agent output: {pid_line!r}'
315)                             )
316)                         proc2 = _spawn_data_sink(
317)                             emits_debug_output=emits_debug_output, proc=proc
318)                         )
319)                         if proc2 is not None:  # pragma: no cover
320)                             exit_stack.enter_context(terminate_on_exit(proc2))
321)                         assert (
322)                             os.environ.get('SSH_AUTH_SOCK', None)
323)                             == startup_ssh_auth_sock
324)                         ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name} helper'  # noqa: E501
325)                         monkeypatch2 = exit_stack.enter_context(
326)                             pytest.MonkeyPatch.context()
327)                         )
328)                         monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
329)                         yield ssh_auth_sock
330)                     assert (
331)                         os.environ.get('SSH_AUTH_SOCK', None)
332)                         == startup_ssh_auth_sock
333)                     ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
334)             return
335)         pytest.skip('No SSH agent running or spawnable')
336) 
337) 
338) def _spawn_data_sink(  # pragma: no cover
339)     emits_debug_output: bool, *, proc: subprocess.Popen[str]
340) ) -> subprocess.Popen[str] | None:
341)     """Spawn a data sink to read and discard standard input.
342) 
343)     Necessary for certain SSH agents that emit copious debugging output.
344) 
345)     On UNIX, we can use `cat`, redirected to `/dev/null`.  Otherwise,
346)     the most robust thing to do is to spawn Python and repeatedly call
347)     `.read()` on `sys.stdin.buffer`.
348) 
349)     """
350)     if not emits_debug_output:
351)         return None
352)     if proc.stdout is None:
353)         return None
354)     sink_script = textwrap.dedent("""
355)     import sys
356)     while sys.stdin.buffer.read(4096):
357)         pass
358)     """)
359)     return subprocess.Popen(
360)         (
361)             ['cat']
362)             if os.name == 'posix'
363)             else [sys.executable or 'python3', '-c', sink_script]
364)         ),
365)         executable=sys.executable or None,
366)         stdin=proc.stdout.fileno(),
367)         stdout=subprocess.DEVNULL,
368)         shell=False,
369)         text=True,
370)     )
371) 
372) 
373) @pytest.fixture(params=_spawn_handlers, ids=operator.itemgetter(0))
Marco Ricci Fix outstanding formatting...

Marco Ricci authored 2 months ago

374) def spawn_ssh_agent(  # noqa: C901
Marco Ricci Add test fixture for manual...

Marco Ricci authored 2 months ago

375)     request: pytest.FixtureRequest,
376) ) -> Iterator[tests.SpawnedSSHAgentInfo]:
377)     """Spawn an isolated SSH agent, if possible, as a pytest fixture.
378) 
379)     Spawn a new SSH agent isolated from other SSH use by other
380)     processes, if possible.  We know how to spawn OpenSSH's agent and
381)     PuTTY's Pageant, and the "(system)" fallback agent.
382) 
383)     Yields:
384)         (tests.SpawnedSSHAgentInfo):
Marco Ricci Fix bad docstring reference...

Marco Ricci authored 2 months ago

385)             A [named tuple][collections.namedtuple] containing
Marco Ricci Add test fixture for manual...

Marco Ricci authored 2 months ago

386)             information about the spawned agent, e.g. the software
387)             product, a client connected to the agent, and whether the
388)             agent is isolated from other clients.
389) 
390)     Raises:
391)         pytest.skip.Exception:
392)             If the agent cannot be spawned, skip this test.
393) 
394)     """
395)     agent_env = os.environ.copy()
396)     agent_env.pop('SSH_AUTH_SOCK', None)
397)     exit_stack = contextlib.ExitStack()
398)     Popen = TypeVar('Popen', bound=subprocess.Popen)
399) 
400)     @contextlib.contextmanager
401)     def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
402)         try:
403)             yield proc
404)         finally:
405)             proc.terminate()
406)             proc.wait()
407) 
408)     with pytest.MonkeyPatch.context() as monkeypatch:
409)         # pytest's fixture system does not seem to guarantee that
410)         # environment variables are set up correctly if nested and
411)         # parametrized fixtures are used: it is possible that "outer"
412)         # parametrized fixtures are torn down only after other "outer"
413)         # fixtures of the same parameter set have run.  So set
414)         # SSH_AUTH_SOCK explicitly to the value saved at interpreter
415)         # startup.  This is then verified with *a lot* of further assert
416)         # statements.
417)         if startup_ssh_auth_sock:  # pragma: no cover
418)             monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
419)         else:  # pragma: no cover
420)             monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
421)         exec_name, spawn_func, agent_type = request.param
422)         match exec_name:
423)             case '(system)':
424)                 assert (
425)                     os.environ.get('SSH_AUTH_SOCK', None)
426)                     == startup_ssh_auth_sock
427)                 ), 'SSH_AUTH_SOCK mismatch when checking for running agent'
428)                 try:
429)                     client = ssh_agent.SSHAgentClient()
430)                     client.list_keys()
431)                 except KeyError:  # pragma: no cover
432)                     pytest.skip('SSH agent is not running')
Marco Ricci Remove debugging-only code...

Marco Ricci authored 2 months ago

433)                 except OSError as exc:  # pragma: no cover
434)                     pytest.skip(
435)                         f'Cannot talk to SSH agent: '
436)                         f'{exc.strerror}: {exc.filename!r}'
437)                     )
Marco Ricci Add test fixture for manual...

Marco Ricci authored 2 months ago

438)                 with client:
439)                     assert (
440)                         os.environ.get('SSH_AUTH_SOCK', None)
441)                         == startup_ssh_auth_sock
442)                     ), 'SSH_AUTH_SOCK mismatch before setting up for running agent'  # noqa: E501
443)                     yield tests.SpawnedSSHAgentInfo(agent_type, client, False)
444)                 assert (
445)                     os.environ.get('SSH_AUTH_SOCK', None)
446)                     == startup_ssh_auth_sock
447)                 ), 'SSH_AUTH_SOCK mismatch after returning from running agent'
448)                 return
449) 
450)             case _:
451)                 assert (
452)                     os.environ.get('SSH_AUTH_SOCK', None)
453)                     == startup_ssh_auth_sock
454)                 ), f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'  # noqa: E501
455)                 spawn_data = spawn_func(
456)                     executable=shutil.which(exec_name), env=agent_env
457)                 )
458)                 assert (
459)                     os.environ.get('SSH_AUTH_SOCK', None)
460)                     == startup_ssh_auth_sock
461)                 ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
462)                 if spawn_data is None:  # pragma: no cover
463)                     pytest.skip(f'Cannot spawn usable {exec_name}')
464)                 proc, emits_debug_output = spawn_data
465)                 with exit_stack:
466)                     exit_stack.enter_context(terminate_on_exit(proc))
467)                     assert proc.stdout is not None
468)                     ssh_auth_sock_line = proc.stdout.readline()
469)                     try:
470)                         ssh_auth_sock = tests.parse_sh_export_line(
471)                             ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
472)                         )
473)                     except ValueError:  # pragma: no cover
474)                         pytest.skip(
475)                             f'Cannot parse agent output: {ssh_auth_sock_line}'
476)                         )
477)                     pid_line = proc.stdout.readline()
478)                     if (
479)                         'pid' not in pid_line.lower()
480)                         and '_pid' not in pid_line.lower()
481)                     ):  # pragma: no cover
482)                         pytest.skip(f'Cannot parse agent output: {pid_line!r}')
483)                     assert (
484)                         os.environ.get('SSH_AUTH_SOCK', None)
485)                         == startup_ssh_auth_sock
486)                     ), f'SSH_AUTH_SOCK mismatch before spawning {exec_name} helper'  # noqa: E501
487)                     proc2 = _spawn_data_sink(
488)                         emits_debug_output=emits_debug_output, proc=proc
489)                     )
490)                     if proc2 is not None:  # pragma: no cover
491)                         exit_stack.enter_context(terminate_on_exit(proc2))
492)                     assert (
493)                         os.environ.get('SSH_AUTH_SOCK', None)
494)                         == startup_ssh_auth_sock
495)                     ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name} helper'  # noqa: E501
496)                     monkeypatch2 = exit_stack.enter_context(
497)                         pytest.MonkeyPatch.context()
498)                     )
499)                     monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
500)                     try:
501)                         client = ssh_agent.SSHAgentClient()
502)                     except OSError as exc:  # pragma: no cover
503)                         pytest.skip(
504)                             f'Cannot talk to SSH agent: '
505)                             f'{exc.strerror}: {exc.filename!r}'
506)                         )
507)                     exit_stack.enter_context(client)
508)                     yield tests.SpawnedSSHAgentInfo(agent_type, client, True)
509)                 assert (
510)                     os.environ.get('SSH_AUTH_SOCK', None)
511)                     == startup_ssh_auth_sock
512)                 ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
513)                 return
514) 
515) 
516) @pytest.fixture
517) def ssh_agent_client_with_test_keys_loaded(  # noqa: C901
518)     spawn_ssh_agent: tests.SpawnedSSHAgentInfo,
519) ) -> Iterator[ssh_agent.SSHAgentClient]:
Marco Ricci Remove debugging-only code...

Marco Ricci authored 2 months ago

520)     """Provide an SSH agent with loaded test keys, as a pytest fixture.
521) 
522)     Use the `spawn_ssh_agent` fixture to acquire a usable SSH agent,
523)     upload the known test keys into the agent, and return a connected
524)     client.
525) 
526)     The agent may reject several of the test keys due to unsupported or
527)     obsolete key types.  Rejected keys will be silently ignored, unless
528)     all keys are rejected; then the test will be skipped.  You must not
529)     automatically assume any particular key is present in the agent.
530) 
531)     Yields:
532)         (ssh_agent.SSHAgentClient):
Marco Ricci Fix bad docstring reference...

Marco Ricci authored 2 months ago

533)             A [named tuple][collections.namedtuple] containing
Marco Ricci Remove debugging-only code...

Marco Ricci authored 2 months ago

534)             information about the spawned agent, e.g. the software
535)             product, a client connected to the agent, and whether the
536)             agent is isolated from other clients.
537) 
538)     Raises:
539)         OSError:
540)             There was a communication or a socket setup error with the
541)             agent.
542)         pytest.skip.Exception:
543)             If the agent is unusable or if it rejected all test keys,
544)             skip this test.
545) 
546)     Warning:
547)         It is the fixture's responsibility to clean up the SSH agent
548)         client after the test.  Closing the client's socket connection
549)         beforehand (e.g. by using the client as a context manager) may
550)         lead to exceptions being thrown upon fixture teardown.
551) 
552)     """