Support specifying the SSH agent socket provider via CLI or configuration
Marco Ricci

Marco Ricci commited on 2026-02-08 13:00:41
Zeige 3 geänderte Dateien mit 125 Einfügungen und 3 Löschungen.


Add a designated entry `vault.ssh-agent-socket-provider` to the
`derivepassphrase` main configuration and the new option
`--ssh-agent-socket-provider` to the `derivepassphrase vault`
command-line interface.  These both specify a specific SSH agent socket
provider to use, instead of the built-in default provider list.  The
command-line option has precedence over the user configuration.

In complementary work, streamline the construction of SSH agent client
contexts and the querying of configuration files in `derivepassphrase
vault`'s main program: provide a `get_configured_connection_hint` for
the former, and harmonize function signatures and call responsibilities
for the latter.
... ...
@@ -26,6 +26,7 @@ import shlex
26 26
 import sys
27 27
 import threading
28 28
 import unicodedata
29
+from collections.abc import Sequence
29 30
 from typing import TYPE_CHECKING, cast
30 31
 
31 32
 import click
... ...
@@ -48,7 +49,6 @@ if TYPE_CHECKING:
48 49
     from collections.abc import (
49 50
         Iterator,
50 51
         Mapping,
51
-        Sequence,
52 52
     )
53 53
     from contextlib import AbstractContextManager
54 54
     from typing import (
... ...
@@ -1068,6 +1068,68 @@ def check_for_misleading_passphrase(
1068 1068
             )
1069 1069
 
1070 1070
 
1071
+def get_configured_connection_hint(
1072
+    conn: ssh_agent.SSHAgentClient
1073
+    | _types.SSHAgentSocket
1074
+    | Sequence[str]
1075
+    | None = None,
1076
+    /,
1077
+    *,
1078
+    main_config: dict[str, Any],
1079
+) -> ssh_agent.SSHAgentClient | _types.SSHAgentSocket | Sequence[str] | None:
1080
+    """Return a suitable connection hint for the SSH agent client.
1081
+
1082
+    If a connection hint is already known, return that connection hint.
1083
+    Otherwise, look up configured SSH agent socket providers in the user
1084
+    configuration and return those as a hint, if possible.
1085
+
1086
+    Args:
1087
+        conn:
1088
+            An optional connection hint to the SSH agent, to be used if
1089
+            non-trivial.
1090
+        main_config:
1091
+            The parsed main user configuration.
1092
+
1093
+    Returns:
1094
+        A connection hint suitable for use with
1095
+        [`ssh_agent.SSHAgentClient.ensure_agent_subcontext`][].
1096
+
1097
+    """
1098
+
1099
+    def handle_return_value(
1100
+        val: ssh_agent.SSHAgentClient
1101
+        | _types.SSHAgentSocket
1102
+        | str
1103
+        | Sequence[str]
1104
+        | None,
1105
+    ) -> (
1106
+        ssh_agent.SSHAgentClient | _types.SSHAgentSocket | Sequence[str] | None
1107
+    ):  # pragma: no cover [external]
1108
+        # A separate function, to easily exclude it from coverage,
1109
+        # because it's just type handling, which the type checker
1110
+        # already checks.
1111
+        #
1112
+        # Strings should be packed into singleton tuples, and general
1113
+        # sequences should be turned into readonly tuples.  All other
1114
+        # types of values can be passed through.  But because strings
1115
+        # are sequences as well, we need to check for strings
1116
+        # separately.
1117
+        if isinstance(val, str):
1118
+            return (val,)
1119
+        if isinstance(val, Sequence):
1120
+            return tuple(val)
1121
+        return val
1122
+
1123
+    return handle_return_value(
1124
+        conn
1125
+        if conn is not None
1126
+        else cast(
1127
+            "str | Sequence[str] | None",
1128
+            main_config.get("vault", {}).get("ssh-agent-socket-provider"),
1129
+        )
1130
+    )
1131
+
1132
+
1071 1133
 def key_to_phrase(
1072 1134
     key: str | Buffer,
1073 1135
     /,
... ...
@@ -902,6 +902,15 @@ class Label(enum.Enum):
902 902
         "PATH",
903 903
     )
904 904
     """"""
905
+    VAULT_COMPATIBILITY_METAVAR_SSH_AGENT_SOCKET_PROVIDER = commented(
906
+        "This metavar is used in "
907
+        "Label.DERIVEPASSPHRASE_VAULT_SSH_AGENT_SOCKET_PROVIDER_HELP_TEXT, "
908
+        'yielding e.g. "Use PROVIDER as the SSH agent socket provider.".  ',
909
+    )(
910
+        "Label :: Help text :: Metavar :: vault",
911
+        "PROVIDER",
912
+    )
913
+    """"""
905 914
     VAULT_METAVAR_SERVICE = commented(
906 915
         'This metavar is used as "service_metavar" in multiple help texts, '
907 916
         "such as Label.DERIVEPASSPHRASE_VAULT_CONFIG_HELP_TEXT, "
... ...
@@ -1122,6 +1131,15 @@ class Label(enum.Enum):
1122 1131
         flags="python-brace-format",
1123 1132
     )
1124 1133
     """"""
1134
+    DERIVEPASSPHRASE_VAULT_SSH_AGENT_SOCKET_PROVIDER_HELP_TEXT = commented(
1135
+        "The metavar is"
1136
+        "Label.VAULT_COMPATIBILITY_METAVAR_SSH_AGENT_SOCKET_PROVIDER.",
1137
+    )(
1138
+        "Label :: Help text :: One-line description",
1139
+        "Use {metavar} as the SSH agent socket provider.",
1140
+        flags="python-brace-format",
1141
+    )
1142
+    """"""
1125 1143
     DERIVEPASSPHRASE_VAULT_SPACE_HELP_TEXT = commented(
1126 1144
         "The metavar is Label.PASSPHRASE_GENERATION_METAVAR_NUMBER.",
1127 1145
     )(
... ...
@@ -1053,6 +1053,7 @@ class _VaultContext:  # noqa: PLR0904
1053 1053
         *,
1054 1054
         empty_service_permitted: bool,
1055 1055
         configuration: _types.VaultConfig | None = None,
1056
+        main_config: dict[str, Any] | None = None,
1056 1057
     ) -> collections.ChainMap[str, Any]:
1057 1058
         """Query the master passphrase or master SSH key, if changed.
1058 1059
 
... ...
@@ -1076,6 +1077,9 @@ class _VaultContext:  # noqa: PLR0904
1076 1077
                 some callers need access to the full configuration *and*
1077 1078
                 need the slice within the effective configuration to
1078 1079
                 refer to the same object.
1080
+            main_config:
1081
+                The user configuration, parsed from disk.  If not given,
1082
+                we read the configuration from disk ourselves.
1079 1083
 
1080 1084
         Returns:
1081 1085
             The effective configuration for the (possibly empty) given
... ...
@@ -1098,6 +1102,8 @@ class _VaultContext:  # noqa: PLR0904
1098 1102
         use_phrase = self.ctx.params["use_phrase"]
1099 1103
         if configuration is None:
1100 1104
             configuration = self.get_config()
1105
+        if main_config is None:  # pragma: no cover [unused]
1106
+            main_config = self.get_user_config()
1101 1107
         service_keys_on_commandline = {
1102 1108
             "length",
1103 1109
             "repeat",
... ...
@@ -1129,8 +1135,13 @@ class _VaultContext:  # noqa: PLR0904
1129 1135
             )
1130 1136
             raise click.UsageError(str(err_msg))
1131 1137
         if use_key:
1138
+            conn = cli_helpers.get_configured_connection_hint(
1139
+                self.ctx.params["ssh_agent_socket_provider"],
1140
+                main_config=main_config,
1141
+            )
1132 1142
             settings.maps[0]["key"] = base64.standard_b64encode(
1133 1143
                 cli_helpers.select_ssh_key(
1144
+                    conn,
1134 1145
                     ctx=self.ctx,
1135 1146
                     error_callback=self.err,
1136 1147
                     warning_callback=self.warning,
... ...
@@ -1182,7 +1193,9 @@ class _VaultContext:  # noqa: PLR0904
1182 1193
         configuration = self.get_config()
1183 1194
         user_config = self.get_user_config()
1184 1195
         settings = self.run_subop_query_phrase_or_key_change(
1185
-            configuration=configuration, empty_service_permitted=True
1196
+            empty_service_permitted=True,
1197
+            configuration=configuration,
1198
+            main_config=user_config,
1186 1199
         )
1187 1200
         overrides = settings.maps[0]
1188 1201
         view: collections.ChainMap[str, Any]
... ...
@@ -1343,7 +1356,8 @@ class _VaultContext:  # noqa: PLR0904
1343 1356
         print_notes_before = self.ctx.params["print_notes_before"]
1344 1357
         user_config = self.get_user_config()
1345 1358
         settings = self.run_subop_query_phrase_or_key_change(
1346
-            empty_service_permitted=False
1359
+            empty_service_permitted=False,
1360
+            main_config=user_config,
1347 1361
         )
1348 1362
         if use_phrase:
1349 1363
             try:
... ...
@@ -1370,18 +1384,28 @@ class _VaultContext:  # noqa: PLR0904
1370 1384
         # cases, set the phrase via vault.Vault.phrase_from_key if
1371 1385
         # a key is given.  Finally, if nothing is set, error out.
1372 1386
         if use_key:
1387
+            conn = cli_helpers.get_configured_connection_hint(
1388
+                self.ctx.params["ssh_agent_socket_provider"],
1389
+                main_config=user_config,
1390
+            )
1373 1391
             phrase = cli_helpers.key_to_phrase(
1374 1392
                 cast("str", overrides["key"]),
1375 1393
                 error_callback=self.err,
1376 1394
                 warning_callback=self.warning,
1395
+                conn=conn,
1377 1396
             )
1378 1397
         elif use_phrase:
1379 1398
             phrase = cast("str", overrides["phrase"])
1380 1399
         elif settings.get("key"):
1400
+            conn = cli_helpers.get_configured_connection_hint(
1401
+                self.ctx.params["ssh_agent_socket_provider"],
1402
+                main_config=user_config,
1403
+            )
1381 1404
             phrase = cli_helpers.key_to_phrase(
1382 1405
                 cast("str", settings["key"]),
1383 1406
                 error_callback=self.err,
1384 1407
                 warning_callback=self.warning,
1408
+                conn=conn,
1385 1409
             )
1386 1410
         elif settings.get("phrase"):
1387 1411
             phrase = cast("str", settings["phrase"])
... ...
@@ -1719,6 +1743,20 @@ class _VaultContext:  # noqa: PLR0904
1719 1743
     ),
1720 1744
     cls=cli_machinery.CompatibilityOption,
1721 1745
 )
1746
+@click.option(
1747
+    "--ssh-agent-socket-provider",
1748
+    "ssh_agent_socket_provider",
1749
+    metavar=_msg.TranslatedString(
1750
+        _msg.Label.VAULT_COMPATIBILITY_METAVAR_SSH_AGENT_SOCKET_PROVIDER
1751
+    ),
1752
+    help=_msg.TranslatedString(
1753
+        _msg.Label.DERIVEPASSPHRASE_VAULT_SSH_AGENT_SOCKET_PROVIDER_HELP_TEXT,
1754
+        metavar=_msg.TranslatedString(
1755
+            _msg.Label.VAULT_COMPATIBILITY_METAVAR_SSH_AGENT_SOCKET_PROVIDER
1756
+        ),
1757
+    ),
1758
+    cls=cli_machinery.CompatibilityOption,
1759
+)
1722 1760
 @cli_machinery.version_option(cli_machinery.vault_version_option_callback)
1723 1761
 @cli_machinery.color_forcing_pseudo_option
1724 1762
 @cli_machinery.standard_logging_options
... ...
@@ -1837,6 +1875,10 @@ def derivepassphrase_vault(
1837 1875
             Command-line arguments `--print-notes-before` (True) and
1838 1876
             `--print-notes-after` (False).  Controls whether the service
1839 1877
             notes (if any) are printed before the passphrase, or after.
1878
+        ssh_agent_socket_provider (str):
1879
+            Command-line argument `--ssh-agent-socket-provider`.  Names
1880
+            an explicit SSH agent socket provider to use, overriding any
1881
+            user configuration entries or platform defaults.
1840 1882
 
1841 1883
     """
1842 1884
     vault_context = _VaultContext(ctx)
1843 1885