a101d1f604a3ad6de242989f7a4887b78ab012a1
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

1) # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4) 
5) """A bare-bones SSH agent client supporting signing and key listing."""
6) 
7) from __future__ import annotations
8) 
9) import collections
10) import errno
11) import os
12) import socket
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

13) from typing import TYPE_CHECKING
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

14) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

15) from typing_extensions import Self
Marco Ricci Support Python 3.10 and PyP...

Marco Ricci authored 2 months ago

16) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

17) from ssh_agent_client import types as ssh_types
18) 
19) if TYPE_CHECKING:
20)     import types
21)     from collections.abc import Sequence
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

22) 
23) __all__ = ('SSHAgentClient',)
Marco Ricci Restore __version__ attribu...

Marco Ricci authored 2 months ago

24) __author__ = "Marco Ricci <m@the13thletter.info>"
Marco Ricci Release 0.1.1

Marco Ricci authored 2 months ago

25) __version__ = "0.1.1"
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

26) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

27) # In SSH bytestrings, the "length" of the byte string is stored as
28) # a 4-byte/32-bit unsigned integer at the beginning.
29) HEAD_LEN = 4
30) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

31) _socket = socket
32) 
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

33) class TrailingDataError(RuntimeError):
34)     """The result contained trailing data."""
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

35)     def __init__(self):
36)         super().__init__('Overlong response from SSH agent')
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

37) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

38) class SSHAgentClient:
39)     """A bare-bones SSH agent client supporting signing and key listing.
40) 
41)     The main use case is requesting the agent sign some data, after
42)     checking that the necessary key is already loaded.
43) 
44)     The main fleshed out methods are `list_keys` and `sign`, which
45)     implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests.  If
46)     you *really* wanted to, there is enough infrastructure in place to
47)     issue other requests as defined in the protocol---it's merely the
48)     wrapper functions and the protocol numbers table that are missing.
49) 
50)     """
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

51)     _connection: socket.socket
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

52)     def __init__(
53)         self, /, *, socket: socket.socket | None = None, timeout: int = 125
54)     ) -> None:
55)         """Initialize the client.
56) 
57)         Args:
58)             socket:
59)                 An optional socket, connected to the SSH agent.  If not
60)                 given, we query the `SSH_AUTH_SOCK` environment
61)                 variable to auto-discover the correct socket address.
62)             timeout:
63)                 A connection timeout for the SSH agent.  Only used if
64)                 the socket is not yet connected.  The default value
65)                 gives ample time for agent connections forwarded via
66)                 SSH on high-latency networks (e.g. Tor).
67) 
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

68)         Raises:
69)             KeyError:
70)                 The `SSH_AUTH_SOCK` environment was not found.
71)             OSError:
72)                 There was an error setting up a socket connection to the
73)                 agent.
74) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

75)         """
76)         if socket is not None:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

77)             self._connection = socket
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

78)         else:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

79)             self._connection = _socket.socket(family=_socket.AF_UNIX)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

80)         try:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

81)             # Test whether the socket is connected.
82)             self._connection.getpeername()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

83)         except OSError as e:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

84)             # This condition is hard to test purposefully, so exclude
85)             # from coverage.
86)             if e.errno != errno.ENOTCONN:  # pragma: no cover
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

87)                 raise
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

88)             if 'SSH_AUTH_SOCK' not in os.environ:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

89)                 msg = 'SSH_AUTH_SOCK environment variable'
90)                 raise KeyError(msg) from None
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

91)             ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

92)             self._connection.settimeout(timeout)
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

93)             self._connection.connect(ssh_auth_sock)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

94) 
95)     def __enter__(self) -> Self:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

96)         """Close socket connection upon context manager completion."""
97)         self._connection.__enter__()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

98)         return self
99) 
100)     def __exit__(
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

101)         self,
102)         exc_type: type[BaseException] | None,
103)         exc_val: BaseException | None,
104)         exc_tb: types.TracebackType | None,
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

105)     ) -> bool:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

106)         """Close socket connection upon context manager completion."""
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

107)         return bool(
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

108)             self._connection.__exit__(
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

109)                 exc_type, exc_val, exc_tb)  # type: ignore[func-returns-value]
110)         )
111) 
112)     @staticmethod
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

113)     def uint32(num: int, /) -> bytes:
114)         r"""Format the number as a `uint32`, as per the agent protocol.
115) 
116)         Args:
117)             num: A number.
118) 
119)         Returns:
120)             The number in SSH agent wire protocol format, i.e. as
121)             a 32-bit big endian number.
122) 
123)         Raises:
124)             OverflowError:
125)                 As per [`int.to_bytes`][].
126) 
127)         Examples:
128)             >>> SSHAgentClient.uint32(16777216)
129)             b'\x01\x00\x00\x00'
130) 
131)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

132)         return int.to_bytes(num, 4, 'big', signed=False)
133) 
134)     @classmethod
135)     def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

136)         r"""Format the payload as an SSH string, as per the agent protocol.
137) 
138)         Args:
139)             payload: A byte string.
140) 
141)         Returns:
142)             The payload, framed in the SSH agent wire protocol format.
143) 
144)         Examples:
145)             >>> bytes(SSHAgentClient.string(b'ssh-rsa'))
146)             b'\x00\x00\x00\x07ssh-rsa'
147) 
148)         """
Marco Ricci Fix numerous argument type...

Marco Ricci authored 3 months ago

149)         try:
150)             ret = bytearray()
151)             ret.extend(cls.uint32(len(payload)))
152)             ret.extend(payload)
153)         except Exception as e:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

154)             msg = 'invalid payload type'
155)             raise TypeError(msg) from e
156)         return ret
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

157) 
158)     @classmethod
159)     def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

160)         r"""Unpack an SSH string.
161) 
162)         Args:
163)             bytestring: A framed byte string.
164) 
165)         Returns:
166)             The unframed byte string, i.e., the payload.
167) 
168)         Raises:
169)             ValueError:
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

170)                 The byte string is not an SSH string.
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

171) 
172)         Examples:
173)             >>> bytes(SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa'))
174)             b'ssh-rsa'
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 2 months ago

175)             >>> bytes(SSHAgentClient.unstring(
176)             ...     SSHAgentClient.string(b'ssh-ed25519')))
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

177)             b'ssh-ed25519'
178) 
179)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

180)         n = len(bytestring)
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

181)         msg = 'malformed SSH byte string'
182)         if (
183)             n < HEAD_LEN
184)             or n != HEAD_LEN + int.from_bytes(bytestring[:HEAD_LEN], 'big',
185)                                               signed=False)
186)         ):
187)             raise ValueError(msg)
188)         return bytestring[HEAD_LEN:]
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

189) 
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

190)     @classmethod
191)     def unstring_prefix(
192)         cls, bytestring: bytes | bytearray, /
193)     ) -> tuple[bytes | bytearray, bytes | bytearray]:
194)         r"""Unpack an SSH string at the beginning of the byte string.
195) 
196)         Args:
197)             bytestring:
198)                 A (general) byte string, beginning with a framed/SSH
199)                 byte string.
200) 
201)         Returns:
202)             A 2-tuple `(a, b)`, where `a` is the unframed byte
203)             string/payload at the beginning of input byte string, and
204)             `b` is the remainder of the input byte string.
205) 
206)         Raises:
207)             ValueError:
208)                 The byte string does not begin with an SSH string.
209) 
210)         Examples:
211)             >>> a, b = SSHAgentClient.unstring_prefix(
212)             ...     b'\x00\x00\x00\x07ssh-rsa____trailing data')
213)             >>> (bytes(a), bytes(b))
214)             (b'ssh-rsa', b'____trailing data')
215)             >>> a, b = SSHAgentClient.unstring_prefix(
216)             ...     SSHAgentClient.string(b'ssh-ed25519'))
217)             >>> (bytes(a), bytes(b))
218)             (b'ssh-ed25519', b'')
219) 
220)         """
221)         n = len(bytestring)
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

222)         msg = 'malformed SSH byte string'
223)         if n < HEAD_LEN:
224)             raise ValueError(msg)
225)         m = int.from_bytes(bytestring[:HEAD_LEN], 'big', signed=False)
226)         if m + HEAD_LEN > n:
227)             raise ValueError(msg)
228)         return (bytestring[HEAD_LEN:m + HEAD_LEN], bytestring[m + HEAD_LEN:])
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

229) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

230)     def request(
231)         self, code: int, payload: bytes | bytearray, /
232)     ) -> tuple[int, bytes | bytearray]:
233)         """Issue a generic request to the SSH agent.
234) 
235)         Args:
236)             code:
237)                 The request code.  See the SSH agent protocol for
238)                 protocol numbers to use here (and which protocol numbers
239)                 to expect in a response).
240)             payload:
241)                 A byte string containing the payload, or "contents", of
242)                 the request.  Request-specific.  `request` will add any
243)                 necessary wire framing around the request code and the
244)                 payload.
245) 
246)         Returns:
247)             A 2-tuple consisting of the response code and the payload,
248)             with all wire framing removed.
249) 
250)         Raises:
251)             EOFError:
252)                 The response from the SSH agent is truncated or missing.
253) 
254)         """
255)         request_message = bytearray([code])
256)         request_message.extend(payload)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

257)         self._connection.sendall(self.string(request_message))
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

258)         chunk = self._connection.recv(HEAD_LEN)
259)         if len(chunk) < HEAD_LEN:
260)             msg = 'cannot read response length'
261)             raise EOFError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

262)         response_length = int.from_bytes(chunk, 'big', signed=False)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

263)         response = self._connection.recv(response_length)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

264)         if len(response) < response_length:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

265)             msg = 'truncated response from SSH agent'
266)             raise EOFError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

267)         return response[0], response[1:]
268) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

269)     def list_keys(self) -> Sequence[ssh_types.KeyCommentPair]:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

270)         """Request a list of keys known to the SSH agent.
271) 
272)         Returns:
273)             A read-only sequence of key/comment pairs.
274) 
275)         Raises:
276)             EOFError:
277)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

278)             TrailingDataError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

279)                 The response from the SSH agent is too long.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

280)             RuntimeError:
281)                 The agent failed to complete the request.
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

282) 
283)         """
284)         response_code, response = self.request(
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

285)             ssh_types.SSH_AGENTC.REQUEST_IDENTITIES.value, b'')
286)         if response_code != ssh_types.SSH_AGENT.IDENTITIES_ANSWER.value:
287)             msg = (f'error return from SSH agent: '
288)                    f'{response_code = }, {response = }')
289)             raise RuntimeError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

290)         response_stream = collections.deque(response)
291)         def shift(num: int) -> bytes:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

292)             buf = collections.deque(b'')
293)             for _ in range(num):
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

294)                 try:
295)                     val = response_stream.popleft()
296)                 except IndexError:
297)                     response_stream.extendleft(reversed(buf))
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

298)                     msg = 'truncated response from SSH agent'
299)                     raise EOFError(msg) from None
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

300)                 buf.append(val)
301)             return bytes(buf)
302)         key_count = int.from_bytes(shift(4), 'big')
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

303)         keys: collections.deque[ssh_types.KeyCommentPair]
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

304)         keys = collections.deque()
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

305)         for _ in range(key_count):
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

306)             key_size = int.from_bytes(shift(4), 'big')
307)             key = shift(key_size)
308)             comment_size = int.from_bytes(shift(4), 'big')
309)             comment = shift(comment_size)
310)             # Both `key` and `comment` are not wrapped as SSH strings.
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

311)             keys.append(ssh_types.KeyCommentPair(key, comment))
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

312)         if response_stream:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

313)             raise TrailingDataError
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

314)         return keys
315) 
316)     def sign(
317)         self, /, key: bytes | bytearray, payload: bytes | bytearray,
318)         *, flags: int = 0, check_if_key_loaded: bool = False,
319)     ) -> bytes | bytearray:
320)         """Request the SSH agent sign the payload with the key.
321) 
322)         Args:
323)             key:
324)                 The public SSH key to sign the payload with, in the same
325)                 format as returned by, e.g., the `list_keys` method.
326)                 The corresponding private key must have previously been
327)                 loaded into the agent to successfully issue a signature.
328)             payload:
329)                 A byte string of data to sign.
330)             flags:
331)                 Optional flags for the signing request.  Currently
332)                 passed on as-is to the agent.  In real-world usage, this
333)                 could be used, e.g., to request more modern hash
334)                 algorithms when signing with RSA keys.  (No such
335)                 real-world usage is currently implemented.)
336)             check_if_key_loaded:
337)                 If true, check beforehand (via `list_keys`) if the
338)                 corresponding key has been loaded into the agent.
339) 
340)         Returns:
341)             The binary signature of the payload under the given key.
342) 
343)         Raises:
344)             EOFError:
345)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

346)             TrailingDataError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

347)                 The response from the SSH agent is too long.
348)             RuntimeError:
349)                 The agent failed to complete the request.
Marco Ricci Raise KeyError when signing...

Marco Ricci authored 2 months ago

350)             KeyError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

351)                 `check_if_key_loaded` is true, and the `key` was not
352)                 loaded into the agent.
353) 
354)         """
355)         if check_if_key_loaded:
356)             loaded_keys = frozenset({pair.key for pair in self.list_keys()})
357)             if bytes(key) not in loaded_keys:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

358)                 msg = 'target SSH key not loaded into agent'
359)                 raise KeyError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

360)         request_data = bytearray(self.string(key))
361)         request_data.extend(self.string(payload))
362)         request_data.extend(self.uint32(flags))
363)         response_code, response = self.request(
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

364)             ssh_types.SSH_AGENTC.SIGN_REQUEST.value, request_data)
365)         if response_code != ssh_types.SSH_AGENT.SIGN_RESPONSE.value:
366)             msg = f'signing data failed: {response_code = }, {response = }'
367)             raise RuntimeError(msg)