Hoist and add tests for internal `key_to_phrase` CLI function
Marco Ricci

Marco Ricci commited on 2024-12-19 15:04:11
Zeige 2 geänderte Dateien mit 117 Einfügungen und 33 Löschungen.


Commit d15069d2db4040d444f65d7c3db6a854f4adcb92 extended the internal
`key_to_phrase` CLI function, adding more expansive error handling and
better error messages.  The new error handling still needed to be tested,
however.  When actually writing tests for this function, it turns out to
be cumbersome to trigger the various scenarios directly, because, as an
internal function, this means issuing a full CLI call in a high-level
mock environment.  Instead, we now hoist the function to the outer level,
making it easily accessible to tests and easier to isolate and prepare
a smaller and more specialized mock environment for.

(The primary reason this function was internal before was its use of the
`err` error callback, which still is internal to the CLI, but will now
be supplied explicitly as a callback.)
... ...
@@ -1322,6 +1322,44 @@ def _check_for_misleading_passphrase(
1322 1322
             )
1323 1323
 
1324 1324
 
1325
+def _key_to_phrase(
1326
+    key_: str | bytes | bytearray,
1327
+    /,
1328
+    *,
1329
+    error_callback: Callable[..., NoReturn] = sys.exit,
1330
+) -> bytes | bytearray:
1331
+    key = base64.standard_b64decode(key_)
1332
+    try:
1333
+        with ssh_agent.SSHAgentClient.ensure_agent_subcontext() as client:
1334
+            try:
1335
+                return vault.Vault.phrase_from_key(key, conn=client)
1336
+            except ssh_agent.SSHAgentFailedError as e:
1337
+                try:
1338
+                    keylist = client.list_keys()
1339
+                except ssh_agent.SSHAgentFailedError:
1340
+                    pass
1341
+                except Exception as e2:  # noqa: BLE001
1342
+                    e.__context__ = e2
1343
+                else:
1344
+                    if not any(  # pragma: no branch
1345
+                        k == key for k, _ in keylist
1346
+                    ):
1347
+                        error_callback(
1348
+                            'The requested SSH key is not loaded '
1349
+                            'into the agent.'
1350
+                        )
1351
+                error_callback(e)
1352
+    except KeyError:
1353
+        error_callback('Cannot find running SSH agent; check SSH_AUTH_SOCK')
1354
+    except NotImplementedError:
1355
+        error_callback(
1356
+            'Cannot connect to SSH agent because '
1357
+            'this Python version does not support UNIX domain sockets'
1358
+        )
1359
+    except OSError as e:
1360
+        error_callback('Cannot connect to SSH agent: %s', e.strerror)
1361
+
1362
+
1325 1363
 # Concrete option groups used by this command-line interface.
1326 1364
 class PasswordGenerationOption(OptionGroupOption):
1327 1365
     """Password generation options for the CLI."""
... ...
@@ -1779,36 +1817,6 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1779 1817
         logger.error(msg, *args, stacklevel=stacklevel, **kwargs)
1780 1818
         ctx.exit(1)
1781 1819
 
1782
-    def key_to_phrase(key_: str | bytes | bytearray, /) -> bytes | bytearray:
1783
-        key = base64.standard_b64decode(key_)
1784
-        try:
1785
-            with ssh_agent.SSHAgentClient.ensure_agent_subcontext() as client:
1786
-                try:
1787
-                    return vault.Vault.phrase_from_key(key, conn=client)
1788
-                except ssh_agent.SSHAgentFailedError as e:
1789
-                    try:
1790
-                        keylist = client.list_keys()
1791
-                    except ssh_agent.SSHAgentFailedError:
1792
-                        pass
1793
-                    except Exception as e2:  # noqa: BLE001
1794
-                        e.__context__ = e2
1795
-                    else:
1796
-                        if not any(k == key for k, _ in keylist):
1797
-                            err(
1798
-                                'The requested SSH key is not loaded '
1799
-                                'into the agent.'
1800
-                            )
1801
-                    err(e)
1802
-        except KeyError:
1803
-            err('Cannot find running SSH agent; check SSH_AUTH_SOCK')
1804
-        except NotImplementedError:
1805
-            err(
1806
-                'Cannot connect to SSH agent because '
1807
-                'this Python version does not support UNIX domain sockets'
1808
-            )
1809
-        except OSError as e:
1810
-            err('Cannot connect to SSH agent: %s', e.strerror)
1811
-
1812 1820
     def get_config() -> _types.VaultConfig:
1813 1821
         try:
1814 1822
             return _load_config()
... ...
@@ -2184,9 +2192,13 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
2184 2192
             # cases, set the phrase via vault.Vault.phrase_from_key if
2185 2193
             # a key is given.  Finally, if nothing is set, error out.
2186 2194
             if use_key or use_phrase:
2187
-                kwargs['phrase'] = key_to_phrase(key) if use_key else phrase
2195
+                kwargs['phrase'] = _key_to_phrase(
2196
+                    key, error_callback=err
2197
+                ) if use_key else phrase
2188 2198
             elif kwargs.get('key'):
2189
-                kwargs['phrase'] = key_to_phrase(kwargs['key'])
2199
+                kwargs['phrase'] = _key_to_phrase(
2200
+                    kwargs['key'], error_callback=err
2201
+                )
2190 2202
             elif kwargs.get('phrase'):
2191 2203
                 pass
2192 2204
             else:
... ...
@@ -4,6 +4,7 @@
4 4
 
5 5
 from __future__ import annotations
6 6
 
7
+import base64
7 8
 import contextlib
8 9
 import copy
9 10
 import errno
... ...
@@ -14,7 +15,7 @@ import shutil
14 15
 import socket
15 16
 import textwrap
16 17
 import warnings
17
-from typing import TYPE_CHECKING
18
+from typing import TYPE_CHECKING, NoReturn
18 19
 
19 20
 import click.testing
20 21
 import hypothesis
... ...
@@ -2116,6 +2117,77 @@ Boo.
2116 2117
                     exception is None
2117 2118
                 ), 'exception querying suitable SSH keys'
2118 2119
 
2120
+    def test_400_key_to_phrase(
2121
+        self,
2122
+        monkeypatch: pytest.MonkeyPatch,
2123
+        skip_if_no_af_unix_support: None,
2124
+        ssh_agent_client_with_test_keys_loaded: ssh_agent.SSHAgentClient,
2125
+    ) -> None:
2126
+
2127
+        class CustomError(RuntimeError):
2128
+            pass
2129
+
2130
+        def err(*args: Any, **_kwargs: Any) -> NoReturn:
2131
+            args = args or ('custom error message',)
2132
+            raise CustomError(*args)
2133
+
2134
+        def fail(*_args: Any, **_kwargs: Any) -> Any:
2135
+            raise ssh_agent.SSHAgentFailedError(
2136
+                _types.SSH_AGENT.FAILURE.value,
2137
+                b'',
2138
+            )
2139
+
2140
+        del skip_if_no_af_unix_support
2141
+        monkeypatch.setattr(ssh_agent.SSHAgentClient, 'sign', fail)
2142
+        loaded_keys = list(ssh_agent_client_with_test_keys_loaded.list_keys())
2143
+        loaded_key = base64.standard_b64encode(loaded_keys[0][0])
2144
+        with monkeypatch.context() as mp:
2145
+            mp.setattr(
2146
+                ssh_agent.SSHAgentClient,
2147
+                'list_keys',
2148
+                lambda *_a, **_kw: [],
2149
+            )
2150
+            with pytest.raises(CustomError, match='not loaded into the agent'):
2151
+                cli._key_to_phrase(loaded_key, error_callback=err)
2152
+        with monkeypatch.context() as mp:
2153
+            mp.setattr(ssh_agent.SSHAgentClient, 'list_keys', fail)
2154
+            with pytest.raises(
2155
+                CustomError, match='SSH agent failed to complete'
2156
+            ):
2157
+                cli._key_to_phrase(loaded_key, error_callback=err)
2158
+        with monkeypatch.context() as mp:
2159
+            mp.setattr(ssh_agent.SSHAgentClient, 'list_keys', err)
2160
+            with pytest.raises(
2161
+                CustomError, match='SSH agent failed to complete'
2162
+            ) as excinfo:
2163
+                cli._key_to_phrase(loaded_key, error_callback=err)
2164
+            assert excinfo.value.args
2165
+            assert isinstance(
2166
+                excinfo.value.args[0], ssh_agent.SSHAgentFailedError
2167
+            )
2168
+            assert excinfo.value.args[0].__context__ is not None
2169
+            assert isinstance(excinfo.value.args[0].__context__, CustomError)
2170
+        with monkeypatch.context() as mp:
2171
+            mp.delenv('SSH_AUTH_SOCK', raising=True)
2172
+            with pytest.raises(
2173
+                CustomError, match='Cannot find running SSH agent'
2174
+            ):
2175
+                cli._key_to_phrase(loaded_key, error_callback=err)
2176
+        with monkeypatch.context() as mp:
2177
+            mp.setenv(
2178
+                'SSH_AUTH_SOCK', os.environ['SSH_AUTH_SOCK'] + '~'
2179
+            )
2180
+            with pytest.raises(
2181
+                CustomError, match='Cannot connect to SSH agent'
2182
+            ):
2183
+                cli._key_to_phrase(loaded_key, error_callback=err)
2184
+        with monkeypatch.context() as mp:
2185
+            mp.delattr(socket, 'AF_UNIX', raising=True)
2186
+            with pytest.raises(
2187
+                CustomError, match='does not support UNIX domain sockets'
2188
+            ):
2189
+                cli._key_to_phrase(loaded_key, error_callback=err)
2190
+
2119 2191
 
2120 2192
 class TestCLITransition:
2121 2193
     def test_100_help_output(self, monkeypatch: pytest.MonkeyPatch) -> None:
2122 2194