Fix user interface errors if the SSH agent does not work
Marco Ricci

Marco Ricci commited on 2024-12-13 14:52:17
Zeige 2 geänderte Dateien mit 38 Einfügungen und 8 Löschungen.


While the configuration and interactive selection of an SSH key is
properly guarded against (i.e., any exceptions are wrapped into proper
command-line error messages), the calls to `vault.Vault.phrase_from_key`
were not similarly guarded.  This has now been fixed.

The fix also uncovered an incorrectly specified mock function in the
test suite, which has been repaired as well.
... ...
@@ -1703,6 +1703,36 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1703 1703
         logger.error(msg, *args, stacklevel=stacklevel, **kwargs)
1704 1704
         ctx.exit(1)
1705 1705
 
1706
+    def key_to_phrase(key_: str | bytes | bytearray, /) -> bytes | bytearray:
1707
+        key = base64.standard_b64decode(key_)
1708
+        try:
1709
+            with ssh_agent.SSHAgentClient.ensure_agent_subcontext() as client:
1710
+                try:
1711
+                    return vault.Vault.phrase_from_key(key, conn=client)
1712
+                except ssh_agent.SSHAgentFailedError as e:
1713
+                    try:
1714
+                        keylist = client.list_keys()
1715
+                    except ssh_agent.SSHAgentFailedError:
1716
+                        pass
1717
+                    except Exception as e2:  # noqa: BLE001
1718
+                        e.__context__ = e2
1719
+                    else:
1720
+                        if not any(k == key for k, _ in keylist):
1721
+                            err(
1722
+                                'The requested SSH key is not loaded '
1723
+                                'into the agent.'
1724
+                            )
1725
+                    err(e)
1726
+        except KeyError:
1727
+            err('Cannot find running SSH agent; check SSH_AUTH_SOCK')
1728
+        except NotImplementedError:
1729
+            err(
1730
+                'Cannot connect to SSH agent because '
1731
+                'this Python version does not support UNIX domain sockets'
1732
+            )
1733
+        except OSError as e:
1734
+            err('Cannot connect to SSH agent: %s', e.strerror)
1735
+
1706 1736
     def get_config() -> _types.VaultConfig:
1707 1737
         try:
1708 1738
             return _load_config()
... ...
@@ -2047,13 +2077,6 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
2047 2077
                 if k in service_keys and v is not None
2048 2078
             }
2049 2079
 
2050
-            def key_to_phrase(
2051
-                key: str | bytes | bytearray,
2052
-            ) -> bytes | bytearray:
2053
-                return vault.Vault.phrase_from_key(
2054
-                    base64.standard_b64decode(key)
2055
-                )
2056
-
2057 2080
             if use_phrase:
2058 2081
                 form = cast(
2059 2082
                     Literal['NFC', 'NFD', 'NFKC', 'NFKD'],
... ...
@@ -30,6 +30,7 @@ from derivepassphrase import _types, cli, ssh_agent, vault
30 30
 __all__ = ()
31 31
 
32 32
 if TYPE_CHECKING:
33
+    import socket
33 34
     from collections.abc import Callable, Iterator, Mapping, Sequence
34 35
 
35 36
     import click.testing
... ...
@@ -1412,7 +1413,13 @@ def suitable_ssh_keys(conn: Any) -> Iterator[_types.KeyCommentPair]:
1412 1413
     ]
1413 1414
 
1414 1415
 
1415
-def phrase_from_key(key: bytes) -> bytes:
1416
+def phrase_from_key(
1417
+    key: bytes,
1418
+    /,
1419
+    *,
1420
+    conn: ssh_agent.SSHAgentClient | socket.socket | None = None,
1421
+) -> bytes:
1422
+    del conn
1416 1423
     if key == DUMMY_KEY1:  # pragma: no branch
1417 1424
         return DUMMY_PHRASE_FROM_KEY1
1418 1425
     raise KeyError(key)  # pragma: no cover
1419 1426