5403acefe8c2e2872ab3f884401115ab2419caf5
Marco Ricci Add prototype implementation

Marco Ricci authored 5 months ago

1) # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4) 
5) """A bare-bones SSH agent client supporting signing and key listing."""
6) 
7) from __future__ import annotations
8) 
9) import collections
10) import enum
11) import errno
12) import os
13) import pathlib
14) import socket
15) 
16) from collections.abc import Sequence, MutableSequence
17) from typing import Any, NamedTuple, Self, TypeAlias
18) from ssh_agent_client.types import KeyCommentPair, SSH_AGENT, SSH_AGENTC
19) 
20) __all__ = ('SSHAgentClient',)
21) 
22) _socket = socket
23) 
24) class SSHAgentClient:
25)     """A bare-bones SSH agent client supporting signing and key listing.
26) 
27)     The main use case is requesting the agent sign some data, after
28)     checking that the necessary key is already loaded.
29) 
30)     The main fleshed out methods are `list_keys` and `sign`, which
31)     implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests.  If
32)     you *really* wanted to, there is enough infrastructure in place to
33)     issue other requests as defined in the protocol---it's merely the
34)     wrapper functions and the protocol numbers table that are missing.
35) 
36)     Attributes:
37)         connection:
38)             The socket connected to the SSH agent.
39)         ssh_auth_sock:
40)             The UNIX domain socket the SSH agent is listening on.  Unset
41)             if socket auto-discovery is not used.
42) 
43)     """
44)     connection: socket.socket
45)     ssh_auth_sock: str | None
46)     def __init__(
47)         self, /, *, socket: socket.socket | None = None, timeout: int = 125
48)     ) -> None:
49)         """Initialize the client.
50) 
51)         Args:
52)             socket:
53)                 An optional socket, connected to the SSH agent.  If not
54)                 given, we query the `SSH_AUTH_SOCK` environment
55)                 variable to auto-discover the correct socket address.
56)             timeout:
57)                 A connection timeout for the SSH agent.  Only used if
58)                 the socket is not yet connected.  The default value
59)                 gives ample time for agent connections forwarded via
60)                 SSH on high-latency networks (e.g. Tor).
61) 
62)         """
63)         self.ssh_auth_sock = None
64)         if socket is not None:
65)             self.connection = socket
66)         else:
67)             self.connection = _socket.socket(family=_socket.AF_UNIX)
68)         try:
69)             self.connection.getpeername()
70)         except OSError as e:
71)             if e.errno != errno.ENOTCONN:
72)                 raise
73)             try:
74)                 self.ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
75)             except KeyError as e:
76)                 raise RuntimeError(
77)                     "Can't find running ssh-agent: missing SSH_AUTH_SOCK"
78)                 ) from e
79)             self.connection.settimeout(timeout)
80)             try:
81)                 self.connection.connect(self.ssh_auth_sock)
82)             except FileNotFoundError as e:
83)                 raise RuntimeError(
84)                     "Can't find running ssh-agent: unusable SSH_AUTH_SOCK"
85)                 ) from e
86) 
87)     def __enter__(self) -> Self:
88)         """Context management: defer to `self.connection`."""
89)         self.connection.__enter__()
90)         return self
91) 
92)     def __exit__(
93)         self, exc_type: Any, exc_val: Any, exc_tb: Any
94)     ) -> bool:
95)         """Context management: defer to `self.connection`."""
96)         return bool(
97)             self.connection.__exit__(
98)                 exc_type, exc_val, exc_tb)  # type: ignore[func-returns-value]
99)         )
100) 
101)     @staticmethod
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 5 months ago

102)     def uint32(num: int, /) -> bytes:
103)         r"""Format the number as a `uint32`, as per the agent protocol.
104) 
105)         Args:
106)             num: A number.
107) 
108)         Returns:
109)             The number in SSH agent wire protocol format, i.e. as
110)             a 32-bit big endian number.
111) 
112)         Raises:
113)             OverflowError:
114)                 As per [`int.to_bytes`][].
115) 
116)         Examples:
117)             >>> SSHAgentClient.uint32(16777216)
118)             b'\x01\x00\x00\x00'
119) 
120)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 5 months ago

121)         return int.to_bytes(num, 4, 'big', signed=False)
122) 
123)     @classmethod
124)     def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 5 months ago

125)         r"""Format the payload as an SSH string, as per the agent protocol.
126) 
127)         Args:
128)             payload: A byte string.
129) 
130)         Returns:
131)             The payload, framed in the SSH agent wire protocol format.
132) 
133)         Examples:
134)             >>> bytes(SSHAgentClient.string(b'ssh-rsa'))
135)             b'\x00\x00\x00\x07ssh-rsa'
136) 
137)         """
Marco Ricci Fix numerous argument type...

Marco Ricci authored 5 months ago

138)         try:
139)             ret = bytearray()
140)             ret.extend(cls.uint32(len(payload)))
141)             ret.extend(payload)
142)             return ret
143)         except Exception as e:
144)             raise TypeError('invalid payload type') from e
Marco Ricci Add prototype implementation

Marco Ricci authored 5 months ago

145) 
146)     @classmethod
147)     def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 5 months ago

148)         r"""Unpack an SSH string.
149) 
150)         Args:
151)             bytestring: A framed byte string.
152) 
153)         Returns:
154)             The unframed byte string, i.e., the payload.
155) 
156)         Raises:
157)             ValueError:
158)                 The bytestring is not an SSH string.
159) 
160)         Examples:
161)             >>> bytes(SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa'))
162)             b'ssh-rsa'
163)             >>> bytes(SSHAgentClient.unstring(SSHAgentClient.string(b'ssh-ed25519')))
164)             b'ssh-ed25519'
165) 
166)         """