Add test fixture for manual...
Marco Ricci authored 2 months ago
|
37)
38) # https://docs.pytest.org/en/stable/explanation/fixtures.html#a-note-about-fixture-cleanup
39) # https://github.com/pytest-dev/pytest/issues/5243#issuecomment-491522595
40) @pytest.fixture(scope='session', autouse=True)
41) def term_handler() -> Iterator[None]: # pragma: no cover
42) try:
43) import signal # noqa: PLC0415
44)
45) sigint_handler = signal.getsignal(signal.SIGINT)
46) except (ImportError, OSError):
47) return
48) else:
49) orig_term = signal.signal(signal.SIGTERM, sigint_handler)
50) yield
51) signal.signal(signal.SIGTERM, orig_term)
52)
53)
54) def _spawn_pageant( # pragma: no cover
55) executable: str | None, env: dict[str, str]
56) ) -> tuple[subprocess.Popen[str], bool] | None:
57) """Spawn an isolated Pageant, if possible.
58)
59) We attempt to detect whether Pageant is usable, i.e. whether Pageant
60) has output buffering problems when announcing its authentication
61) socket.
62)
63) Args:
64) executable:
65) The path to the Pageant executable.
66) env:
67) The new environment for Pageant. Should typically not
68) include an SSH_AUTH_SOCK variable.
69)
70) Returns:
71) (tuple[subprocess.Popen, bool] | None):
72) A 2-tuple `(proc, debug_output)`, where `proc` is the spawned
73) Pageant subprocess, and `debug_output` indicates whether Pageant
74) will continue to emit debug output (that needs to be actively
75) read) or not.
76)
77) It is the caller's responsibility to clean up the spawned
78) subprocess.
79)
80) If the executable is `None`, or if we detect that Pageant is too
81) old to properly flush its output, which prevents readers from
82) learning the SSH_AUTH_SOCK setting needed to connect to Pageant
83) in the first place, then return `None` directly.
84)
85) """
86) if executable is None: # pragma: no cover
87) return None
88)
89) pageant_features = {'flush': False, 'foreground': False}
90)
91) # Apparently, Pageant 0.81 and lower running in debug mode does
92) # not actively flush its output. As a result, the first two
93) # lines, which set the SSH_AUTH_SOCK and the SSH_AGENT_PID, only
94) # print once the output buffer is flushed, whenever that is.
95) #
96) # This has been reported to the PuTTY developers.
97) #
98) # For testing purposes, I currently build a version of Pageant with
99) # output flushing fixed and with a `--foreground` option. This is
100) # detected here.
101) help_output = subprocess.run(
102) ['pageant', '--help'],
103) executable=executable,
104) env=env,
105) capture_output=True,
106) text=True,
107) check=False,
108) ).stdout
109) help_lines = help_output.splitlines(True)
110) pageant_version_string = (
111) help_lines[1].strip().removeprefix('Release ')
112) if len(help_lines) >= 2
113) else ''
114) )
115) v0_81 = packaging.version.Version('0.81')
116) if pageant_version_string not in {'', 'Unidentified build'}:
117) # TODO(the-13th-letter): Once a fixed Pageant is released,
118) # remove the check for build information in the version string.
119) # https://github.com/the-13th-letter/derivepassphrase/issues/14
120) pageant_version_string_numeric, local_segment_list = (
121) pageant_version_string.split('+', 1)
122) if '+' in pageant_version_string
123) else (pageant_version_string, '')
124) )
125) local_segments = frozenset(local_segment_list.split('+'))
126) pageant_version = packaging.version.Version(
127) pageant_version_string_numeric
128) )
129) for key in pageant_features:
130) pageant_features[key] = pageant_version > v0_81 or (
131) pageant_version == v0_81 and key in local_segments
132) )
133)
134) if not pageant_features['flush']: # pragma: no cover
135) return None
136)
137) # Because Pageant's debug mode prints debugging information on
138) # standard output, and because we yield control to a different
139) # thread of execution, we cannot read-and-discard Pageant's output
140) # here. Instead, spawn a consumer process and connect it to
141) # Pageant's standard output; see _spawn_data_sink.
142) #
143) # This will hopefully not be necessary with newer Pageants:
144) # a feature request for a `--foreground` option that just avoids the
145) # forking behavior has been submitted.
146)
147) return subprocess.Popen(
148) [
149) 'pageant',
150) '--foreground' if pageant_features['foreground'] else '--debug',
151) '-s',
152) ],
153) executable=executable,
154) stdin=subprocess.DEVNULL,
155) stdout=subprocess.PIPE,
156) shell=False,
157) env=env,
158) text=True,
159) bufsize=1,
160) ), not pageant_features['foreground']
161)
162)
163) def _spawn_openssh_agent( # pragma: no cover
164) executable: str | None, env: dict[str, str]
165) ) -> tuple[subprocess.Popen[str], Literal[False]] | None:
166) """Spawn an isolated OpenSSH agent, if possible.
167)
168) We attempt to detect whether Pageant is usable, i.e. whether Pageant
169) has output buffering problems when announcing its authentication
170) socket.
171)
172) Args:
173) executable:
174) The path to the OpenSSH agent executable.
175) env:
176) The new environment for the OpenSSH agent. Should typically
177) not include an SSH_AUTH_SOCK variable.
178)
179) Returns:
180) (tuple[subprocess.Popen, Literal[False]] | None):
181) A 2-tuple `(proc, debug_output)`, where `proc` is the spawned
182) OpenSSH agent subprocess, and `debug_output` indicates whether
183) the OpenSSH agent will continue to emit debug output that needs
184) to be actively read (which it doesn't, so this is always false).
185)
186) It is the caller's responsibility to clean up the spawned
187) subprocess.
188)
189) If the executable is `None`, then return `None` directly.
190)
191) """
192) if executable is None:
193) return None
194) return subprocess.Popen(
195) ['ssh-agent', '-D', '-s'],
196) executable=executable,
197) stdin=subprocess.DEVNULL,
198) stdout=subprocess.PIPE,
199) shell=False,
200) env=env,
201) text=True,
202) bufsize=1,
203) ), False
204)
205)
206) def _spawn_system_agent( # pragma: no cover
207) executable: str | None, env: dict[str, str]
208) ) -> None:
209) """Placeholder function. Does nothing."""
210)
211)
212) _spawn_handlers = [
213) ('pageant', _spawn_pageant, tests.KnownSSHAgent.Pageant),
214) ('ssh-agent', _spawn_openssh_agent, tests.KnownSSHAgent.OpenSSHAgent),
215) ('(system)', _spawn_system_agent, tests.KnownSSHAgent.UNKNOWN),
216) ]
217)
218)
219) @pytest.fixture
220) def running_ssh_agent() -> Iterator[str]: # pragma: no cover
221) """Ensure a running SSH agent, if possible, as a pytest fixture.
222)
223) Check for a running SSH agent, or spawn a new one if possible. We
224) know how to spawn OpenSSH's agent and PuTTY's Pageant. If spawned
225) this way, the agent does not persist beyond the test.
226)
227) This fixture can neither guarantee a particular running agent, nor
228) can it guarantee a particular set of loaded keys.
229)
230) Yields:
231) str:
232) The value of the SSH_AUTH_SOCK environment variable, to be
233) used to connect to the running agent.
234)
235) Raises:
236) pytest.skip.Exception:
237) If no agent is running or can be spawned, skip this test.
238)
239) """
240) exit_stack = contextlib.ExitStack()
241) Popen = TypeVar('Popen', bound=subprocess.Popen)
242)
243) @contextlib.contextmanager
244) def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
245) try:
246) yield proc
247) finally:
248) proc.terminate()
249) proc.wait()
250)
251) with pytest.MonkeyPatch.context() as monkeypatch:
252) # pytest's fixture system does not seem to guarantee that
253) # environment variables are set up correctly if nested and
254) # parametrized fixtures are used: it is possible that "outer"
255) # parametrized fixtures are torn down only after other "outer"
256) # fixtures of the same parameter set have run. So set
257) # SSH_AUTH_SOCK explicitly to the value saved at interpreter
258) # startup. This is then verified with *a lot* of further assert
259) # statements.
260) if startup_ssh_auth_sock: # pragma: no cover
261) monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
262) else: # pragma: no cover
263) monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
264) for exec_name, spawn_func, _ in _spawn_handlers:
265) match exec_name:
266) case '(system)':
267) assert (
268) os.environ.get('SSH_AUTH_SOCK', None)
269) == startup_ssh_auth_sock
270) ), 'SSH_AUTH_SOCK mismatch when checking for running agent'
271) try:
272) with ssh_agent.SSHAgentClient() as client:
273) client.list_keys()
274) except (KeyError, OSError):
275) continue
276) yield os.environ['SSH_AUTH_SOCK']
277) assert (
278) os.environ.get('SSH_AUTH_SOCK', None)
279) == startup_ssh_auth_sock
280) ), 'SSH_AUTH_SOCK mismatch after returning from running agent' # noqa: E501
281) case _:
282) assert (
283) os.environ.get('SSH_AUTH_SOCK', None)
284) == startup_ssh_auth_sock
285) ), f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}' # noqa: E501
286) spawn_data = spawn_func( # type: ignore[operator]
287) executable=shutil.which(exec_name), env={}
288) )
289) if spawn_data is None:
290) continue
291) proc: subprocess.Popen[str]
292) emits_debug_output: bool
293) proc, emits_debug_output = spawn_data
294) with exit_stack:
295) exit_stack.enter_context(terminate_on_exit(proc))
296) assert (
297) os.environ.get('SSH_AUTH_SOCK', None)
298) == startup_ssh_auth_sock
299) ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
300) assert proc.stdout is not None
301) ssh_auth_sock_line = proc.stdout.readline()
302) try:
303) ssh_auth_sock = tests.parse_sh_export_line(
304) ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
305) )
306) except ValueError: # pragma: no cover
307) continue
308) pid_line = proc.stdout.readline()
309) if (
310) 'pid' not in pid_line.lower()
311) and '_pid' not in pid_line.lower()
312) ): # pragma: no cover
313) pytest.skip(
314) f'Cannot parse agent output: {pid_line!r}'
315) )
316) proc2 = _spawn_data_sink(
317) emits_debug_output=emits_debug_output, proc=proc
318) )
319) if proc2 is not None: # pragma: no cover
320) exit_stack.enter_context(terminate_on_exit(proc2))
321) assert (
322) os.environ.get('SSH_AUTH_SOCK', None)
323) == startup_ssh_auth_sock
324) ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name} helper' # noqa: E501
325) monkeypatch2 = exit_stack.enter_context(
326) pytest.MonkeyPatch.context()
327) )
328) monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
329) yield ssh_auth_sock
330) assert (
331) os.environ.get('SSH_AUTH_SOCK', None)
332) == startup_ssh_auth_sock
333) ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
334) return
335) pytest.skip('No SSH agent running or spawnable')
336)
337)
338) def _spawn_data_sink( # pragma: no cover
339) emits_debug_output: bool, *, proc: subprocess.Popen[str]
340) ) -> subprocess.Popen[str] | None:
341) """Spawn a data sink to read and discard standard input.
342)
343) Necessary for certain SSH agents that emit copious debugging output.
344)
345) On UNIX, we can use `cat`, redirected to `/dev/null`. Otherwise,
346) the most robust thing to do is to spawn Python and repeatedly call
347) `.read()` on `sys.stdin.buffer`.
348)
349) """
350) if not emits_debug_output:
351) return None
352) if proc.stdout is None:
353) return None
354) sink_script = textwrap.dedent("""
355) import sys
356) while sys.stdin.buffer.read(4096):
357) pass
358) """)
359) return subprocess.Popen(
360) (
361) ['cat']
362) if os.name == 'posix'
363) else [sys.executable or 'python3', '-c', sink_script]
364) ),
365) executable=sys.executable or None,
366) stdin=proc.stdout.fileno(),
367) stdout=subprocess.DEVNULL,
368) shell=False,
369) text=True,
370) )
371)
372)
373) @pytest.fixture(params=_spawn_handlers, ids=operator.itemgetter(0))
|
Add test fixture for manual...
Marco Ricci authored 2 months ago
|
438) with client:
439) assert (
440) os.environ.get('SSH_AUTH_SOCK', None)
441) == startup_ssh_auth_sock
442) ), 'SSH_AUTH_SOCK mismatch before setting up for running agent' # noqa: E501
443) yield tests.SpawnedSSHAgentInfo(agent_type, client, False)
444) assert (
445) os.environ.get('SSH_AUTH_SOCK', None)
446) == startup_ssh_auth_sock
447) ), 'SSH_AUTH_SOCK mismatch after returning from running agent'
448) return
449)
450) case _:
451) assert (
452) os.environ.get('SSH_AUTH_SOCK', None)
453) == startup_ssh_auth_sock
454) ), f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}' # noqa: E501
455) spawn_data = spawn_func(
456) executable=shutil.which(exec_name), env=agent_env
457) )
458) assert (
459) os.environ.get('SSH_AUTH_SOCK', None)
460) == startup_ssh_auth_sock
461) ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
462) if spawn_data is None: # pragma: no cover
463) pytest.skip(f'Cannot spawn usable {exec_name}')
464) proc, emits_debug_output = spawn_data
465) with exit_stack:
466) exit_stack.enter_context(terminate_on_exit(proc))
467) assert proc.stdout is not None
468) ssh_auth_sock_line = proc.stdout.readline()
469) try:
470) ssh_auth_sock = tests.parse_sh_export_line(
471) ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
472) )
473) except ValueError: # pragma: no cover
474) pytest.skip(
475) f'Cannot parse agent output: {ssh_auth_sock_line}'
476) )
477) pid_line = proc.stdout.readline()
478) if (
479) 'pid' not in pid_line.lower()
480) and '_pid' not in pid_line.lower()
481) ): # pragma: no cover
482) pytest.skip(f'Cannot parse agent output: {pid_line!r}')
483) assert (
484) os.environ.get('SSH_AUTH_SOCK', None)
485) == startup_ssh_auth_sock
486) ), f'SSH_AUTH_SOCK mismatch before spawning {exec_name} helper' # noqa: E501
487) proc2 = _spawn_data_sink(
488) emits_debug_output=emits_debug_output, proc=proc
489) )
490) if proc2 is not None: # pragma: no cover
491) exit_stack.enter_context(terminate_on_exit(proc2))
492) assert (
493) os.environ.get('SSH_AUTH_SOCK', None)
494) == startup_ssh_auth_sock
495) ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name} helper' # noqa: E501
496) monkeypatch2 = exit_stack.enter_context(
497) pytest.MonkeyPatch.context()
498) )
499) monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
500) try:
501) client = ssh_agent.SSHAgentClient()
502) except OSError as exc: # pragma: no cover
503) pytest.skip(
504) f'Cannot talk to SSH agent: '
505) f'{exc.strerror}: {exc.filename!r}'
506) )
507) exit_stack.enter_context(client)
508) yield tests.SpawnedSSHAgentInfo(agent_type, client, True)
509) assert (
510) os.environ.get('SSH_AUTH_SOCK', None)
511) == startup_ssh_auth_sock
512) ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
513) return
514)
515)
516) @pytest.fixture
517) def ssh_agent_client_with_test_keys_loaded( # noqa: C901
518) spawn_ssh_agent: tests.SpawnedSSHAgentInfo,
519) ) -> Iterator[ssh_agent.SSHAgentClient]:
|