Add prototype implementation
Marco Ricci authored 5 months ago
|
1) # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4)
5) """A bare-bones SSH agent client supporting signing and key listing."""
6)
7) from __future__ import annotations
8)
9) import collections
10) import enum
11) import errno
12) import os
13) import pathlib
14) import socket
15)
16) from collections.abc import Sequence, MutableSequence
17) from typing import Any, NamedTuple, Self, TypeAlias
18) from ssh_agent_client.types import KeyCommentPair, SSH_AGENT, SSH_AGENTC
19)
20) __all__ = ('SSHAgentClient',)
21)
22) _socket = socket
23)
24) class SSHAgentClient:
25) """A bare-bones SSH agent client supporting signing and key listing.
26)
27) The main use case is requesting the agent sign some data, after
28) checking that the necessary key is already loaded.
29)
30) The main fleshed out methods are `list_keys` and `sign`, which
31) implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests. If
32) you *really* wanted to, there is enough infrastructure in place to
33) issue other requests as defined in the protocol---it's merely the
34) wrapper functions and the protocol numbers table that are missing.
35)
36) Attributes:
37) connection:
38) The socket connected to the SSH agent.
39) ssh_auth_sock:
40) The UNIX domain socket the SSH agent is listening on. Unset
41) if socket auto-discovery is not used.
42)
43) """
44) connection: socket.socket
45) ssh_auth_sock: str | None
46) def __init__(
47) self, /, *, socket: socket.socket | None = None, timeout: int = 125
48) ) -> None:
49) """Initialize the client.
50)
51) Args:
52) socket:
53) An optional socket, connected to the SSH agent. If not
54) given, we query the `SSH_AUTH_SOCK` environment
55) variable to auto-discover the correct socket address.
56) timeout:
57) A connection timeout for the SSH agent. Only used if
58) the socket is not yet connected. The default value
59) gives ample time for agent connections forwarded via
60) SSH on high-latency networks (e.g. Tor).
61)
62) """
63) self.ssh_auth_sock = None
64) if socket is not None:
65) self.connection = socket
66) else:
67) self.connection = _socket.socket(family=_socket.AF_UNIX)
68) try:
69) self.connection.getpeername()
70) except OSError as e:
71) if e.errno != errno.ENOTCONN:
72) raise
73) try:
74) self.ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
75) except KeyError as e:
76) raise RuntimeError(
77) "Can't find running ssh-agent: missing SSH_AUTH_SOCK"
78) ) from e
79) self.connection.settimeout(timeout)
80) try:
81) self.connection.connect(self.ssh_auth_sock)
82) except FileNotFoundError as e:
83) raise RuntimeError(
84) "Can't find running ssh-agent: unusable SSH_AUTH_SOCK"
85) ) from e
86)
87) def __enter__(self) -> Self:
88) """Context management: defer to `self.connection`."""
89) self.connection.__enter__()
90) return self
91)
92) def __exit__(
93) self, exc_type: Any, exc_val: Any, exc_tb: Any
94) ) -> bool:
95) """Context management: defer to `self.connection`."""
96) return bool(
97) self.connection.__exit__(
98) exc_type, exc_val, exc_tb) # type: ignore[func-returns-value]
99) )
100)
101) @staticmethod
102) def uint32(num, /) -> bytes:
103) """Format the number as a `uint32`, as per the agent protocol."""
104) return int.to_bytes(num, 4, 'big', signed=False)
105)
106) @classmethod
107) def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
108) """Format the payload as an SSH string, as per the agent protocol."""
|