Refactor pytest fixtures concerning SSH agent spawning
Marco Ricci

Marco Ricci commited on 2025-01-21 21:02:31
Zeige 1 geänderte Dateien mit 103 Einfügungen und 163 Löschungen.


The two pytest fixtures `running_ssh_agent` and `spawn_ssh_agent` both
deal with providing access to SSH agents, spawning them if necessary.
They differ in the data they provide to the test functions (just the
environment setup, or an actual connected `SSHAgentClient`) and in their
parametrization (`running_ssh_agent` spawns any one, `spawn_ssh_agent`
spawns all).  However, despite the spawning logic being basically
identical, both methods contained incompatible implementations of the
logic, separately for named agent spawning and for system agent
spawning.

So, consolidate these functions into a single spawning function, and
move all necessary supporting code (context manager for subprocess
termination, typing protocol for spawn function) to the top level.
... ...
@@ -11,7 +11,7 @@ import os
11 11
 import shutil
12 12
 import socket
13 13
 import subprocess
14
-from typing import TYPE_CHECKING, TypeVar
14
+from typing import TYPE_CHECKING, Protocol, TypeVar
15 15
 
16 16
 import hypothesis
17 17
 import packaging.version
... ...
@@ -63,6 +63,16 @@ def skip_if_no_af_unix_support() -> None:  # pragma: no cover
63 63
         pytest.skip('socket module does not support AF_UNIX')
64 64
 
65 65
 
66
+class SpawnFunc(Protocol):
67
+    """Spawns an SSH agent, if possible."""
68
+
69
+    def __call__(
70
+        self,
71
+        executable: str | None,
72
+        env: dict[str, str],
73
+    ) -> subprocess.Popen[str] | None: ...
74
+
75
+
66 76
 def _spawn_pageant(  # pragma: no cover
67 77
     executable: str | None, env: dict[str, str]
68 78
 ) -> subprocess.Popen[str] | None:
... ...
@@ -169,7 +179,7 @@ def _spawn_openssh_agent(  # pragma: no cover
169 179
     )
170 180
 
171 181
 
172
-def _spawn_system_agent(  # pragma: no cover
182
+def _spawn_noop(  # pragma: no cover
173 183
     executable: str | None, env: dict[str, str]
174 184
 ) -> None:
175 185
     """Placeholder function. Does nothing."""
... ...
@@ -178,9 +188,87 @@ def _spawn_system_agent(  # pragma: no cover
178 188
 _spawn_handlers = [
179 189
     ('pageant', _spawn_pageant, tests.KnownSSHAgent.Pageant),
180 190
     ('ssh-agent', _spawn_openssh_agent, tests.KnownSSHAgent.OpenSSHAgent),
181
-    ('(system)', _spawn_system_agent, tests.KnownSSHAgent.UNKNOWN),
191
+    ('(system)', _spawn_noop, tests.KnownSSHAgent.UNKNOWN),
182 192
 ]
183 193
 
194
+Popen = TypeVar('Popen', bound=subprocess.Popen)
195
+
196
+
197
+@contextlib.contextmanager
198
+def _terminate_on_exit(proc: Popen) -> Iterator[Popen]:
199
+    try:
200
+        yield proc
201
+    finally:
202
+        proc.terminate()
203
+        proc.wait()
204
+
205
+
206
+class CannotSpawnError(RuntimeError):
207
+    pass
208
+
209
+
210
+def _spawn_named_agent(
211
+    exec_name: str,
212
+    spawn_func: SpawnFunc,
213
+    agent_type: tests.KnownSSHAgent,
214
+) -> Iterator[tests.SpawnedSSHAgentInfo]:  # pragma: no cover
215
+    # pytest's fixture system does not seem to guarantee that
216
+    # environment variables are set up correctly if nested and
217
+    # parametrized fixtures are used: it is possible that "outer"
218
+    # parametrized fixtures are torn down only after other "outer"
219
+    # fixtures of the same parameter set have run.  So our fixtures set
220
+    # SSH_AUTH_SOCK explicitly to the value saved at interpreter
221
+    # startup.
222
+    #
223
+    # Here, we verify at most major steps that SSH_AUTH_SOCK didn't
224
+    # change under our nose.
225
+    assert os.environ.get('SSH_AUTH_SOCK') == startup_ssh_auth_sock, (
226
+        f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'
227
+    )
228
+    exit_stack = contextlib.ExitStack()
229
+    agent_env = os.environ.copy()
230
+    ssh_auth_sock = agent_env.pop('SSH_AUTH_SOCK', None)
231
+    proc = spawn_func(executable=shutil.which(exec_name), env=agent_env)
232
+    with exit_stack:
233
+        if spawn_func is _spawn_noop:
234
+            ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
235
+        elif proc is None:  # pragma: no cover
236
+            err_msg = f'Cannot spawn usable {exec_name}'
237
+            raise CannotSpawnError(err_msg)
238
+        else:
239
+            exit_stack.enter_context(_terminate_on_exit(proc))
240
+            assert os.environ.get('SSH_AUTH_SOCK') == startup_ssh_auth_sock, (
241
+                f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
242
+            )
243
+            assert proc.stdout is not None
244
+            ssh_auth_sock_line = proc.stdout.readline()
245
+            try:
246
+                ssh_auth_sock = tests.parse_sh_export_line(
247
+                    ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
248
+                )
249
+            except ValueError:  # pragma: no cover
250
+                err_msg = f'Cannot parse agent output: {ssh_auth_sock_line!r}'
251
+                raise CannotSpawnError(err_msg) from None
252
+            pid_line = proc.stdout.readline()
253
+            if (
254
+                'pid' not in pid_line.lower()
255
+                and '_pid' not in pid_line.lower()
256
+            ):  # pragma: no cover
257
+                err_msg = f'Cannot parse agent output: {pid_line!r}'
258
+                raise CannotSpawnError(err_msg)
259
+        monkeypatch = exit_stack.enter_context(pytest.MonkeyPatch.context())
260
+        monkeypatch.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
261
+        client = exit_stack.enter_context(
262
+            ssh_agent.SSHAgentClient.ensure_agent_subcontext()
263
+        )
264
+        client.list_keys()  # sanity test
265
+        yield tests.SpawnedSSHAgentInfo(
266
+            agent_type, client, spawn_func is not _spawn_noop
267
+        )
268
+    assert os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock, (
269
+        f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
270
+    )
271
+
184 272
 
185 273
 @pytest.fixture
186 274
 def running_ssh_agent(  # pragma: no cover
... ...
@@ -207,16 +295,6 @@ def running_ssh_agent(  # pragma: no cover
207 295
 
208 296
     """
209 297
     del skip_if_no_af_unix_support
210
-    exit_stack = contextlib.ExitStack()
211
-    Popen = TypeVar('Popen', bound=subprocess.Popen)
212
-
213
-    @contextlib.contextmanager
214
-    def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
215
-        try:
216
-            yield proc
217
-        finally:
218
-            proc.terminate()
219
-            proc.wait()
220 298
 
221 299
     with pytest.MonkeyPatch.context() as monkeypatch:
222 300
         # pytest's fixture system does not seem to guarantee that
... ...
@@ -225,70 +303,21 @@ def running_ssh_agent(  # pragma: no cover
225 303
         # parametrized fixtures are torn down only after other "outer"
226 304
         # fixtures of the same parameter set have run.  So set
227 305
         # SSH_AUTH_SOCK explicitly to the value saved at interpreter
228
-        # startup.  This is then verified with *a lot* of further assert
229
-        # statements.
230
-        if startup_ssh_auth_sock:  # pragma: no cover
306
+        # startup.
307
+        if startup_ssh_auth_sock:
231 308
             monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
232
-        else:  # pragma: no cover
309
+        else:
233 310
             monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
234 311
         for exec_name, spawn_func, agent_type in _spawn_handlers:
235
-            # Use match/case here once Python 3.9 becomes unsupported.
236
-            if exec_name == '(system)':
237
-                assert (
238
-                    os.environ.get('SSH_AUTH_SOCK', None)
239
-                    == startup_ssh_auth_sock
240
-                ), 'SSH_AUTH_SOCK mismatch when checking for running agent'
241 312
             try:
242
-                    with ssh_agent.SSHAgentClient() as client:
243
-                        client.list_keys()
244
-                except (KeyError, OSError):
245
-                    continue
313
+                for _agent_info in _spawn_named_agent(
314
+                    exec_name, spawn_func, agent_type
315
+                ):
246 316
                     yield tests.RunningSSHAgentInfo(
247 317
                         os.environ['SSH_AUTH_SOCK'], agent_type
248 318
                     )
249
-                assert (
250
-                    os.environ.get('SSH_AUTH_SOCK', None)
251
-                    == startup_ssh_auth_sock
252
-                ), 'SSH_AUTH_SOCK mismatch after returning from running agent'
253
-            else:
254
-                assert (
255
-                    os.environ.get('SSH_AUTH_SOCK', None)
256
-                    == startup_ssh_auth_sock
257
-                ), (
258
-                    f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'
259
-                )
260
-                proc = spawn_func(executable=shutil.which(exec_name), env={})
261
-                if proc is None:
262
-                    continue
263
-                with exit_stack:
264
-                    exit_stack.enter_context(terminate_on_exit(proc))
265
-                    assert (
266
-                        os.environ.get('SSH_AUTH_SOCK', None)
267
-                        == startup_ssh_auth_sock
268
-                    ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
269
-                    assert proc.stdout is not None
270
-                    ssh_auth_sock_line = proc.stdout.readline()
271
-                    try:
272
-                        ssh_auth_sock = tests.parse_sh_export_line(
273
-                            ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
274
-                        )
275
-                    except ValueError:  # pragma: no cover
319
+            except (KeyError, OSError, CannotSpawnError):
276 320
                 continue
277
-                    pid_line = proc.stdout.readline()
278
-                    if (
279
-                        'pid' not in pid_line.lower()
280
-                        and '_pid' not in pid_line.lower()
281
-                    ):  # pragma: no cover
282
-                        pytest.skip(f'Cannot parse agent output: {pid_line!r}')
283
-                    monkeypatch2 = exit_stack.enter_context(
284
-                        pytest.MonkeyPatch.context()
285
-                    )
286
-                    monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
287
-                    yield tests.RunningSSHAgentInfo(ssh_auth_sock, agent_type)
288
-                assert (
289
-                    os.environ.get('SSH_AUTH_SOCK', None)
290
-                    == startup_ssh_auth_sock
291
-                ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
292 321
             return
293 322
         pytest.skip('No SSH agent running or spawnable')
294 323
 
... ...
@@ -297,7 +326,7 @@ def running_ssh_agent(  # pragma: no cover
297 326
 def spawn_ssh_agent(
298 327
     request: pytest.FixtureRequest,
299 328
     skip_if_no_af_unix_support: None,
300
-) -> Iterator[tests.SpawnedSSHAgentInfo]:
329
+) -> Iterator[tests.SpawnedSSHAgentInfo]:  # pragma: no cover
301 330
     """Spawn an isolated SSH agent, if possible, as a pytest fixture.
302 331
 
303 332
     Spawn a new SSH agent isolated from other SSH use by other
... ...
@@ -316,19 +345,6 @@ def spawn_ssh_agent(
316 345
 
317 346
     """
318 347
     del skip_if_no_af_unix_support
319
-    agent_env = os.environ.copy()
320
-    agent_env.pop('SSH_AUTH_SOCK', None)
321
-    exit_stack = contextlib.ExitStack()
322
-    Popen = TypeVar('Popen', bound=subprocess.Popen)
323
-
324
-    @contextlib.contextmanager
325
-    def terminate_on_exit(proc: Popen) -> Iterator[Popen]:
326
-        try:
327
-            yield proc
328
-        finally:
329
-            proc.terminate()
330
-            proc.wait()
331
-
332 348
     with pytest.MonkeyPatch.context() as monkeypatch:
333 349
         # pytest's fixture system does not seem to guarantee that
334 350
         # environment variables are set up correctly if nested and
... ...
@@ -336,91 +352,15 @@ def spawn_ssh_agent(
336 352
         # parametrized fixtures are torn down only after other "outer"
337 353
         # fixtures of the same parameter set have run.  So set
338 354
         # SSH_AUTH_SOCK explicitly to the value saved at interpreter
339
-        # startup.  This is then verified with *a lot* of further assert
340
-        # statements.
355
+        # startup.
341 356
         if startup_ssh_auth_sock:  # pragma: no cover
342 357
             monkeypatch.setenv('SSH_AUTH_SOCK', startup_ssh_auth_sock)
343 358
         else:  # pragma: no cover
344 359
             monkeypatch.delenv('SSH_AUTH_SOCK', raising=False)
345
-        exec_name, spawn_func, agent_type = request.param
346
-        # Use match/case here once Python 3.9 becomes unsupported.
347
-        if exec_name == '(system)':
348
-            assert (
349
-                os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
350
-            ), 'SSH_AUTH_SOCK mismatch when checking for running agent'
351
-            try:
352
-                client = ssh_agent.SSHAgentClient()
353
-                client.list_keys()
354
-            except KeyError:  # pragma: no cover
355
-                pytest.skip('SSH agent is not running')
356
-            except OSError as exc:  # pragma: no cover
357
-                pytest.skip(
358
-                    f'Cannot talk to SSH agent: '
359
-                    f'{exc.strerror}: {exc.filename!r}'
360
-                )
361
-            with client:
362
-                assert (
363
-                    os.environ.get('SSH_AUTH_SOCK', None)
364
-                    == startup_ssh_auth_sock
365
-                ), 'SSH_AUTH_SOCK mismatch before setting up for running agent'
366
-                yield tests.SpawnedSSHAgentInfo(agent_type, client, False)
367
-            assert (
368
-                os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
369
-            ), 'SSH_AUTH_SOCK mismatch after returning from running agent'
370
-            return
371
-
372
-        else:
373
-            assert (
374
-                os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
375
-            ), (
376
-                f'SSH_AUTH_SOCK mismatch when checking for spawnable {exec_name}'
377
-            )
378
-            proc = spawn_func(
379
-                executable=shutil.which(exec_name), env=agent_env
380
-            )
381
-            assert (
382
-                os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
383
-            ), f'SSH_AUTH_SOCK mismatch after spawning {exec_name}'
384
-            if proc is None:  # pragma: no cover
385
-                pytest.skip(f'Cannot spawn usable {exec_name}')
386
-            with exit_stack:
387
-                exit_stack.enter_context(terminate_on_exit(proc))
388
-                assert proc.stdout is not None
389
-                ssh_auth_sock_line = proc.stdout.readline()
390
-                try:
391
-                    ssh_auth_sock = tests.parse_sh_export_line(
392
-                        ssh_auth_sock_line, env_name='SSH_AUTH_SOCK'
393
-                    )
394
-                except ValueError:  # pragma: no cover
395
-                    pytest.skip(
396
-                        f'Cannot parse agent output: {ssh_auth_sock_line}'
397
-                    )
398
-                pid_line = proc.stdout.readline()
399
-                if (
400
-                    'pid' not in pid_line.lower()
401
-                    and '_pid' not in pid_line.lower()
402
-                ):  # pragma: no cover
403
-                    pytest.skip(f'Cannot parse agent output: {pid_line!r}')
404
-                assert (
405
-                    os.environ.get('SSH_AUTH_SOCK', None)
406
-                    == startup_ssh_auth_sock
407
-                ), f'SSH_AUTH_SOCK mismatch before spawning {exec_name} helper'
408
-                monkeypatch2 = exit_stack.enter_context(
409
-                    pytest.MonkeyPatch.context()
410
-                )
411
-                monkeypatch2.setenv('SSH_AUTH_SOCK', ssh_auth_sock)
412 360
         try:
413
-                    client = ssh_agent.SSHAgentClient()
414
-                except OSError as exc:  # pragma: no cover
415
-                    pytest.skip(
416
-                        f'Cannot talk to SSH agent: '
417
-                        f'{exc.strerror}: {exc.filename!r}'
418
-                    )
419
-                exit_stack.enter_context(client)
420
-                yield tests.SpawnedSSHAgentInfo(agent_type, client, True)
421
-            assert (
422
-                os.environ.get('SSH_AUTH_SOCK', None) == startup_ssh_auth_sock
423
-            ), f'SSH_AUTH_SOCK mismatch after tearing down {exec_name}'
361
+            yield from _spawn_named_agent(*request.param)
362
+        except (KeyError, OSError, CannotSpawnError) as exc:
363
+            pytest.skip(exc.args[0])
424 364
         return
425 365
 
426 366
 
427 367