Report SSH agent socket providers in `--version` output
Marco Ricci

Marco Ricci commited on 2026-01-24 23:50:57
Zeige 5 geänderte Dateien mit 142 Einfügungen und 1 Löschungen.


In the output of `derivepassphrase vault --version`, output the list of
supported and the list of unavailable SSH agent socket providers.
Report all aliases as well.  We intend to reuse the names later on in
the user configuration file, so it is important to expose them
somewhere.

To determine whether an entry key is a base name or an alias, we build
a topological sorting over the entry keys, using tools from the standard
library introduced in Python 3.9.  (Older Python versions could be
retrofitted with a backport, if need be.)

Open problem: The topological sorter checks for cycles, but there is
otherwise no deeper error checking yet.  In particular, these entry keys
may be supplied by a malicious third party, but there are no
sanitization checks yet for terminal sequences or similar when
outputting the key during the `--version` call.  How to handle this?
... ...
@@ -1143,11 +1143,39 @@ def vault_version_option_callback(
1143 1143
             _types.Feature.SSH_KEY,
1144 1144
         ]
1145 1145
         click.echo()
1146
+
1147
+        from derivepassphrase.ssh_agent import socketprovider  # noqa: PLC0415
1148
+
1149
+        socket_providers: dict[str, bool] = {}
1150
+        for key, names in socketprovider.SocketProvider.grouped().items():
1151
+            if isinstance(key, _types.BuiltinSSHAgentSocketProvider):
1152
+                if not key.is_known_fake_agent():
1153
+                    other_names = set(names) - {key}
1154
+                    formatted_key = (
1155
+                        "{key!s} ({aliases_key!s} {aliases})".format(
1156
+                            key=key,
1157
+                            aliases_key=_msg.TranslatedString(
1158
+                                _msg.Label.FEATURE_ITEM_ALIASES
1159
+                            ),
1160
+                            aliases=", ".join(sorted(other_names, key=str)),
1161
+                        )
1162
+                        if other_names
1163
+                        else str(key)
1164
+                    )
1165
+                    socket_providers[formatted_key] = key.test()
1166
+            elif key:  # pragma: no cover [external]
1167
+                socket_providers[key] = True
1146 1168
         version_info_types: dict[_msg.Label, list[str]] = {
1147 1169
             _msg.Label.SUPPORTED_FEATURES: [k for k in features if k.test()],
1148 1170
             _msg.Label.UNAVAILABLE_FEATURES: [
1149 1171
                 k for k in features if not k.test()
1150 1172
             ],
1173
+            _msg.Label.SUPPORTED_SSH_AGENT_SOCKET_PROVIDERS: sorted(
1174
+                k for k, v in socket_providers.items() if v
1175
+            ),
1176
+            _msg.Label.UNAVAILABLE_SSH_AGENT_SOCKET_PROVIDERS: sorted(
1177
+                k for k, v in socket_providers.items() if not v
1178
+            ),
1151 1179
         }
1152 1180
         print_version_info_types(version_info_types, ctx=ctx)
1153 1181
         ctx.exit()
... ...
@@ -1342,6 +1342,16 @@ class Label(enum.Enum):
1342 1342
         "Supported foreign configuration formats:",
1343 1343
     )
1344 1344
     """"""
1345
+    SUPPORTED_SSH_AGENT_SOCKET_PROVIDERS = commented(
1346
+        "This is part of the version output, emitting lists of supported "
1347
+        "SSH agent socket providers.  "
1348
+        "A comma-separated English list of items follows, "
1349
+        "with standard English punctuation.",
1350
+    )(
1351
+        "Label :: Info Message:: Table row header",
1352
+        "Supported SSH agent socket providers:",
1353
+    )
1354
+    """"""
1345 1355
     SUPPORTED_SUBCOMMANDS = commented(
1346 1356
         "This is part of the version output, emitting lists of supported "
1347 1357
         "subcommands.  "
... ...
@@ -1382,6 +1392,16 @@ class Label(enum.Enum):
1382 1392
         "Known foreign configuration formats:",
1383 1393
     )
1384 1394
     """"""
1395
+    UNAVAILABLE_SSH_AGENT_SOCKET_PROVIDERS = commented(
1396
+        "This is part of the version output, emitting lists of known, "
1397
+        "unavailable SSH agent socket providers.  "
1398
+        "A comma-separated English list of items follows, "
1399
+        "with standard English punctuation.",
1400
+    )(
1401
+        "Label :: Info Message:: Table row header",
1402
+        "Known SSH agent socket providers:",
1403
+    )
1404
+    """"""
1385 1405
     CONFIRM_THIS_CHOICE_PROMPT_TEXT = commented(
1386 1406
         'There is no support for "yes" or "no" in other languages '
1387 1407
         "than English, so it is advised that your translation makes it "
... ...
@@ -1122,6 +1122,14 @@ class BuiltinSSHAgentSocketProvider(str, enum.Enum):
1122 1122
                 pass
1123 1123
         return ret
1124 1124
 
1125
+    def is_known_fake_agent(self, *_args: Any, **_kwargs: Any) -> bool:  # noqa: ANN401
1126
+        """Return true if this SSH agent is a known fake agent."""
1127
+        return self in {
1128
+            BuiltinSSHAgentSocketProvider.STUB_AGENT,
1129
+            BuiltinSSHAgentSocketProvider.STUB_AGENT_WITH_ADDRESS,
1130
+            BuiltinSSHAgentSocketProvider.STUB_AGENT_WITH_ADDRESS_AND_DETERMINISTIC_DSA,
1131
+        }
1132
+
1125 1133
     __str__ = str.__str__
1126 1134
     __format__ = str.__format__  # type: ignore[assignment]
1127 1135
 
... ...
@@ -10,7 +10,10 @@ import collections
10 10
 import ctypes
11 11
 import enum
12 12
 import errno
13
+import graphlib
13 14
 import hashlib
15
+import itertools
16
+import operator
14 17
 import os
15 18
 import socket
16 19
 from ctypes.wintypes import (  # type: ignore[attr-defined]
... ...
@@ -27,7 +30,8 @@ from typing import TYPE_CHECKING, cast
27 30
 from derivepassphrase import _types
28 31
 
29 32
 if TYPE_CHECKING:
30
-    from collections.abc import Callable
33
+    from collections.abc import Callable, Mapping
34
+    from collections.abc import Set as AbstractSet
31 35
     from typing import ClassVar
32 36
 
33 37
     from typing_extensions import (
... ...
@@ -903,6 +907,59 @@ class SocketProvider:
903 907
             raise NotImplementedError(msg)
904 908
         return ret
905 909
 
910
+    @classmethod
911
+    def grouped(cls) -> Mapping[str, AbstractSet[str]]:
912
+        """Calculate a mapping of canonical socket provider entries.
913
+
914
+        Specifically, determine the non-alias entries in the socket
915
+        provider registry, and map each such non-alias entry to its set
916
+        of aliases.
917
+
918
+        Returns:
919
+            A mapping of non-alias entry names to sets of alias entry
920
+            names.
921
+
922
+        Warning:
923
+            The results are undefined if the registry has been modified
924
+            by any means other than the [`register`][] decorator.
925
+
926
+        """
927
+        known_socket_provider_values = frozenset({
928
+            v for v in cls.registry.values() if not isinstance(v, str)
929
+        })
930
+        canonical: dict[str, str] = {}
931
+        sorter: graphlib.TopologicalSorter[
932
+            _types.SSHAgentSocketProvider | str | None
933
+        ] = graphlib.TopologicalSorter()
934
+        k: _types.SSHAgentSocketProvider | str | None
935
+        v: _types.SSHAgentSocketProvider | str | None
936
+        for k, v in cls.registry.items():
937
+            sorter.add(k, v)
938
+        sorter.prepare()
939
+        while sorter:
940
+            entries = sorter.get_ready()
941
+            for k in entries:
942
+                if not isinstance(k, str):
943
+                    assert k in known_socket_provider_values
944
+                    sorter.done(k)
945
+                    continue
946
+                v = cls.registry[k]
947
+                canonical[k] = (
948
+                    canonical[v]  # alias
949
+                    if isinstance(v, str)
950
+                    else k  # actual entry
951
+                )
952
+                sorter.done(k)
953
+
954
+        key_of_entry = operator.itemgetter(0)
955
+        by_value = operator.itemgetter(1)
956
+
957
+        sorted_entries = sorted(canonical.items(), key=by_value)
958
+        return {
959
+            k: frozenset(map(key_of_entry, v))
960
+            for k, v in itertools.groupby(sorted_entries, key=by_value)
961
+        }
962
+
906 963
     ENTRY_POINT_GROUP_NAME = "derivepassphrase.ssh_agent_socket_providers"
907 964
     """
908 965
     The group name under which [entry
... ...
@@ -36,6 +36,7 @@ class VersionOutputData(NamedTuple):
36 36
     extras: frozenset[str]
37 37
     subcommands: frozenset[str]
38 38
     features: dict[str, bool]
39
+    ssh_agent_socket_providers: dict[str, bool]
39 40
 
40 41
 
41 42
 def _label_text(e: cli_messages.Label, /) -> str:
... ...
@@ -60,6 +61,12 @@ class KnownLineType(str, enum.Enum):
60 61
     )
61 62
     SUPPORTED_FEATURES = _label_text(cli_messages.Label.SUPPORTED_FEATURES)
62 63
     UNAVAILABLE_FEATURES = _label_text(cli_messages.Label.UNAVAILABLE_FEATURES)
64
+    SUPPORTED_SSH_AGENT_SOCKET_PROVIDERS = _label_text(
65
+        cli_messages.Label.SUPPORTED_SSH_AGENT_SOCKET_PROVIDERS
66
+    )
67
+    UNAVAILABLE_SSH_AGENT_SOCKET_PROVIDERS = _label_text(
68
+        cli_messages.Label.UNAVAILABLE_SSH_AGENT_SOCKET_PROVIDERS
69
+    )
63 70
     ENABLED_EXTRAS = _label_text(cli_messages.Label.ENABLED_PEP508_EXTRAS)
64 71
 
65 72
 
... ...
@@ -191,6 +198,7 @@ PEP 508 extras: export.
191 198
                     subcommands=frozenset(),
192 199
                     features={},
193 200
                     extras=frozenset({"export"}),
201
+                    ssh_agent_socket_providers={},
194 202
                 ),
195 203
                 id="derivepassphrase-0.4.0-export",
196 204
             ),
... ...
@@ -215,6 +223,7 @@ No PEP 508 extras are active.
215 223
                     subcommands=frozenset({"export", "vault"}),
216 224
                     features={},
217 225
                     extras=frozenset({}),
226
+                    ssh_agent_socket_providers={},
218 227
                 ),
219 228
                 id="derivepassphrase-0.5-plain",
220 229
             ),
... ...
@@ -237,6 +246,8 @@ Known foreign configuration formats: divination v3.141592,
237 246
 Supported subcommands: delete-all-files, dump-core.
238 247
 Supported features: delete-while-open.
239 248
 Known features: backups-are-nice-to-have.
249
+Supported SSH agent socket providers: agents-of-shield.
250
+Known SSH agent socket providers: agent-smith.
240 251
 PEP 508 extras: annoying-popups, delete-all-files,
241 252
     dump-core-depending-on-the-phase-of-the-moon.
242 253
 
... ...
@@ -269,6 +280,10 @@ PEP 508 extras: annoying-popups, delete-all-files,
269 280
                         "delete-all-files",
270 281
                         "dump-core-depending-on-the-phase-of-the-moon",
271 282
                     }),
283
+                    ssh_agent_socket_providers={
284
+                        "agents-of-shield": True,
285
+                        "agent-smith": False,
286
+                    },
272 287
                 ),
273 288
                 id="inventpassphrase",
274 289
             ),
... ...
@@ -305,6 +320,7 @@ Supported subcommands: export, spectre ({aliases!s} master-password, mpw),
305 320
                     }),
306 321
                     features={},
307 322
                     extras=frozenset(),
323
+                    ssh_agent_socket_providers={},
308 324
                 ),
309 325
                 id="aliases",
310 326
             ),
... ...
@@ -476,6 +492,7 @@ def parse_version_output(  # noqa: C901
476 492
     subcommands: set[str] = set()
477 493
     extras: set[str] = set()
478 494
     features: dict[str, bool] = {}
495
+    ssh_agent_socket_providers: dict[str, bool] = {}
479 496
     if len(paragraphs) < 2:  # pragma: no cover
480 497
         return VersionOutputData(
481 498
             derivation_schemes=schemes,
... ...
@@ -483,6 +500,7 @@ def parse_version_output(  # noqa: C901
483 500
             subcommands=frozenset(subcommands),
484 501
             extras=frozenset(extras),
485 502
             features=features,
503
+            ssh_agent_socket_providers=ssh_agent_socket_providers,
486 504
         )
487 505
     for line in paragraphs[1]:
488 506
         line_type, _, value = line.partition(":")
... ...
@@ -506,6 +524,15 @@ def parse_version_output(  # noqa: C901
506 524
                 features[item] = True
507 525
             elif line_type == KnownLineType.UNAVAILABLE_FEATURES:
508 526
                 features[item] = False
527
+            elif (
528
+                line_type == KnownLineType.SUPPORTED_SSH_AGENT_SOCKET_PROVIDERS
529
+            ):
530
+                ssh_agent_socket_providers[item] = True
531
+            elif (
532
+                line_type
533
+                == KnownLineType.UNAVAILABLE_SSH_AGENT_SOCKET_PROVIDERS
534
+            ):
535
+                ssh_agent_socket_providers[item] = False
509 536
             else:
510 537
                 raise AssertionError(  # noqa: TRY003
511 538
                     f"Unknown version info line type: {line_type!r}"  # noqa: EM102
... ...
@@ -516,6 +543,7 @@ def parse_version_output(  # noqa: C901
516 543
         subcommands=frozenset(subcommands),
517 544
         extras=frozenset(extras),
518 545
         features=features,
546
+        ssh_agent_socket_providers=ssh_agent_socket_providers,
519 547
     )
520 548
 
521 549
 
522 550