7495b213d512315c4e72be5bda1a5cdd7e9030a0
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

1) # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4) 
5) """A bare-bones SSH agent client supporting signing and key listing."""
6) 
7) from __future__ import annotations
8) 
9) import collections
10) import errno
11) import os
12) import socket
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

13) from typing import TYPE_CHECKING
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

14) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

15) from typing_extensions import Self
Marco Ricci Support Python 3.10 and PyP...

Marco Ricci authored 2 months ago

16) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

17) from ssh_agent_client import types as ssh_types
18) 
19) if TYPE_CHECKING:
20)     import types
21)     from collections.abc import Sequence
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

22) 
23) __all__ = ('SSHAgentClient',)
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

24) __author__ = 'Marco Ricci <m@the13thletter.info>'
Marco Ricci Release 0.1.2

Marco Ricci authored 1 month ago

25) __version__ = '0.1.2'
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

26) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

27) # In SSH bytestrings, the "length" of the byte string is stored as
28) # a 4-byte/32-bit unsigned integer at the beginning.
29) HEAD_LEN = 4
30) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

31) _socket = socket
32) 
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

33) 
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

34) class TrailingDataError(RuntimeError):
35)     """The result contained trailing data."""
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

36) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

37)     def __init__(self):
38)         super().__init__('Overlong response from SSH agent')
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

39) 
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

40) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

41) class SSHAgentClient:
42)     """A bare-bones SSH agent client supporting signing and key listing.
43) 
44)     The main use case is requesting the agent sign some data, after
45)     checking that the necessary key is already loaded.
46) 
47)     The main fleshed out methods are `list_keys` and `sign`, which
48)     implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests.  If
49)     you *really* wanted to, there is enough infrastructure in place to
50)     issue other requests as defined in the protocol---it's merely the
51)     wrapper functions and the protocol numbers table that are missing.
52) 
53)     """
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

54) 
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

55)     _connection: socket.socket
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

56) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

57)     def __init__(
58)         self, /, *, socket: socket.socket | None = None, timeout: int = 125
59)     ) -> None:
60)         """Initialize the client.
61) 
62)         Args:
63)             socket:
64)                 An optional socket, connected to the SSH agent.  If not
65)                 given, we query the `SSH_AUTH_SOCK` environment
66)                 variable to auto-discover the correct socket address.
67)             timeout:
68)                 A connection timeout for the SSH agent.  Only used if
69)                 the socket is not yet connected.  The default value
70)                 gives ample time for agent connections forwarded via
71)                 SSH on high-latency networks (e.g. Tor).
72) 
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

73)         Raises:
74)             KeyError:
75)                 The `SSH_AUTH_SOCK` environment was not found.
76)             OSError:
77)                 There was an error setting up a socket connection to the
78)                 agent.
79) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

80)         """
81)         if socket is not None:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

82)             self._connection = socket
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

83)         else:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

84)             self._connection = _socket.socket(family=_socket.AF_UNIX)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

85)         try:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

86)             # Test whether the socket is connected.
87)             self._connection.getpeername()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

88)         except OSError as e:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

89)             # This condition is hard to test purposefully, so exclude
90)             # from coverage.
91)             if e.errno != errno.ENOTCONN:  # pragma: no cover
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

92)                 raise
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

93)             if 'SSH_AUTH_SOCK' not in os.environ:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

94)                 msg = 'SSH_AUTH_SOCK environment variable'
95)                 raise KeyError(msg) from None
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

96)             ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

97)             self._connection.settimeout(timeout)
Marco Ricci Distinguish errors when con...

Marco Ricci authored 2 months ago

98)             self._connection.connect(ssh_auth_sock)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

99) 
100)     def __enter__(self) -> Self:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

101)         """Close socket connection upon context manager completion."""
102)         self._connection.__enter__()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

103)         return self
104) 
105)     def __exit__(
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

106)         self,
107)         exc_type: type[BaseException] | None,
108)         exc_val: BaseException | None,
109)         exc_tb: types.TracebackType | None,
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

110)     ) -> bool:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

111)         """Close socket connection upon context manager completion."""
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

112)         return bool(
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

113)             self._connection.__exit__(exc_type, exc_val, exc_tb)  # type: ignore[func-returns-value]
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

114)         )
115) 
116)     @staticmethod
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

117)     def uint32(num: int, /) -> bytes:
118)         r"""Format the number as a `uint32`, as per the agent protocol.
119) 
120)         Args:
121)             num: A number.
122) 
123)         Returns:
124)             The number in SSH agent wire protocol format, i.e. as
125)             a 32-bit big endian number.
126) 
127)         Raises:
128)             OverflowError:
129)                 As per [`int.to_bytes`][].
130) 
131)         Examples:
132)             >>> SSHAgentClient.uint32(16777216)
133)             b'\x01\x00\x00\x00'
134) 
135)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

136)         return int.to_bytes(num, 4, 'big', signed=False)
137) 
138)     @classmethod
139)     def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

140)         r"""Format the payload as an SSH string, as per the agent protocol.
141) 
142)         Args:
143)             payload: A byte string.
144) 
145)         Returns:
146)             The payload, framed in the SSH agent wire protocol format.
147) 
148)         Examples:
149)             >>> bytes(SSHAgentClient.string(b'ssh-rsa'))
150)             b'\x00\x00\x00\x07ssh-rsa'
151) 
152)         """
Marco Ricci Fix numerous argument type...

Marco Ricci authored 3 months ago

153)         try:
154)             ret = bytearray()
155)             ret.extend(cls.uint32(len(payload)))
156)             ret.extend(payload)
157)         except Exception as e:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

158)             msg = 'invalid payload type'
159)             raise TypeError(msg) from e
160)         return ret
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

161) 
162)     @classmethod
163)     def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

164)         r"""Unpack an SSH string.
165) 
166)         Args:
167)             bytestring: A framed byte string.
168) 
169)         Returns:
170)             The unframed byte string, i.e., the payload.
171) 
172)         Raises:
173)             ValueError:
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

174)                 The byte string is not an SSH string.
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

175) 
176)         Examples:
177)             >>> bytes(SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa'))
178)             b'ssh-rsa'
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

179)             >>> bytes(
180)             ...     SSHAgentClient.unstring(SSHAgentClient.string(b'ssh-ed25519'))
181)             ... )
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

182)             b'ssh-ed25519'
183) 
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

184)         """  # noqa: E501
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

185)         n = len(bytestring)
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

186)         msg = 'malformed SSH byte string'
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

187)         if n < HEAD_LEN or n != HEAD_LEN + int.from_bytes(
188)             bytestring[:HEAD_LEN], 'big', signed=False
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

189)         ):
190)             raise ValueError(msg)
191)         return bytestring[HEAD_LEN:]
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

192) 
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

193)     @classmethod
194)     def unstring_prefix(
195)         cls, bytestring: bytes | bytearray, /
196)     ) -> tuple[bytes | bytearray, bytes | bytearray]:
197)         r"""Unpack an SSH string at the beginning of the byte string.
198) 
199)         Args:
200)             bytestring:
201)                 A (general) byte string, beginning with a framed/SSH
202)                 byte string.
203) 
204)         Returns:
205)             A 2-tuple `(a, b)`, where `a` is the unframed byte
206)             string/payload at the beginning of input byte string, and
207)             `b` is the remainder of the input byte string.
208) 
209)         Raises:
210)             ValueError:
211)                 The byte string does not begin with an SSH string.
212) 
213)         Examples:
214)             >>> a, b = SSHAgentClient.unstring_prefix(
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

215)             ...     b'\x00\x00\x00\x07ssh-rsa____trailing data'
216)             ... )
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

217)             >>> (bytes(a), bytes(b))
218)             (b'ssh-rsa', b'____trailing data')
219)             >>> a, b = SSHAgentClient.unstring_prefix(
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

220)             ...     SSHAgentClient.string(b'ssh-ed25519')
221)             ... )
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

222)             >>> (bytes(a), bytes(b))
223)             (b'ssh-ed25519', b'')
224) 
225)         """
226)         n = len(bytestring)
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

227)         msg = 'malformed SSH byte string'
228)         if n < HEAD_LEN:
229)             raise ValueError(msg)
230)         m = int.from_bytes(bytestring[:HEAD_LEN], 'big', signed=False)
231)         if m + HEAD_LEN > n:
232)             raise ValueError(msg)
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

233)         return (
234)             bytestring[HEAD_LEN : m + HEAD_LEN],
235)             bytestring[m + HEAD_LEN :],
236)         )
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

237) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

238)     def request(
239)         self, code: int, payload: bytes | bytearray, /
240)     ) -> tuple[int, bytes | bytearray]:
241)         """Issue a generic request to the SSH agent.
242) 
243)         Args:
244)             code:
245)                 The request code.  See the SSH agent protocol for
246)                 protocol numbers to use here (and which protocol numbers
247)                 to expect in a response).
248)             payload:
249)                 A byte string containing the payload, or "contents", of
250)                 the request.  Request-specific.  `request` will add any
251)                 necessary wire framing around the request code and the
252)                 payload.
253) 
254)         Returns:
255)             A 2-tuple consisting of the response code and the payload,
256)             with all wire framing removed.
257) 
258)         Raises:
259)             EOFError:
260)                 The response from the SSH agent is truncated or missing.
261) 
262)         """
263)         request_message = bytearray([code])
264)         request_message.extend(payload)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

265)         self._connection.sendall(self.string(request_message))
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

266)         chunk = self._connection.recv(HEAD_LEN)
267)         if len(chunk) < HEAD_LEN:
268)             msg = 'cannot read response length'
269)             raise EOFError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

270)         response_length = int.from_bytes(chunk, 'big', signed=False)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

271)         response = self._connection.recv(response_length)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

272)         if len(response) < response_length:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

273)             msg = 'truncated response from SSH agent'
274)             raise EOFError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

275)         return response[0], response[1:]
276) 
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

277)     def list_keys(self) -> Sequence[ssh_types.KeyCommentPair]:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

278)         """Request a list of keys known to the SSH agent.
279) 
280)         Returns:
281)             A read-only sequence of key/comment pairs.
282) 
283)         Raises:
284)             EOFError:
285)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

286)             TrailingDataError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

287)                 The response from the SSH agent is too long.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

288)             RuntimeError:
289)                 The agent failed to complete the request.
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

290) 
291)         """
292)         response_code, response = self.request(
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

293)             ssh_types.SSH_AGENTC.REQUEST_IDENTITIES.value, b''
294)         )
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

295)         if response_code != ssh_types.SSH_AGENT.IDENTITIES_ANSWER.value:
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

296)             msg = (
297)                 f'error return from SSH agent: '
298)                 f'{response_code = }, {response = }'
299)             )
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

300)             raise RuntimeError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

301)         response_stream = collections.deque(response)
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

302) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

303)         def shift(num: int) -> bytes:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

304)             buf = collections.deque(b'')
305)             for _ in range(num):
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

306)                 try:
307)                     val = response_stream.popleft()
308)                 except IndexError:
309)                     response_stream.extendleft(reversed(buf))
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

310)                     msg = 'truncated response from SSH agent'
311)                     raise EOFError(msg) from None
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

312)                 buf.append(val)
313)             return bytes(buf)
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

314) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

315)         key_count = int.from_bytes(shift(4), 'big')
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

316)         keys: collections.deque[ssh_types.KeyCommentPair]
Marco Ricci Fix Google code style viola...

Marco Ricci authored 2 months ago

317)         keys = collections.deque()
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

318)         for _ in range(key_count):
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

319)             key_size = int.from_bytes(shift(4), 'big')
320)             key = shift(key_size)
321)             comment_size = int.from_bytes(shift(4), 'big')
322)             comment = shift(comment_size)
323)             # Both `key` and `comment` are not wrapped as SSH strings.
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

324)             keys.append(ssh_types.KeyCommentPair(key, comment))
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

325)         if response_stream:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

326)             raise TrailingDataError
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

327)         return keys
328) 
329)     def sign(
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

330)         self,
331)         /,
332)         key: bytes | bytearray,
333)         payload: bytes | bytearray,
334)         *,
335)         flags: int = 0,
336)         check_if_key_loaded: bool = False,
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

337)     ) -> bytes | bytearray:
338)         """Request the SSH agent sign the payload with the key.
339) 
340)         Args:
341)             key:
342)                 The public SSH key to sign the payload with, in the same
343)                 format as returned by, e.g., the `list_keys` method.
344)                 The corresponding private key must have previously been
345)                 loaded into the agent to successfully issue a signature.
346)             payload:
347)                 A byte string of data to sign.
348)             flags:
349)                 Optional flags for the signing request.  Currently
350)                 passed on as-is to the agent.  In real-world usage, this
351)                 could be used, e.g., to request more modern hash
352)                 algorithms when signing with RSA keys.  (No such
353)                 real-world usage is currently implemented.)
354)             check_if_key_loaded:
355)                 If true, check beforehand (via `list_keys`) if the
356)                 corresponding key has been loaded into the agent.
357) 
358)         Returns:
359)             The binary signature of the payload under the given key.
360) 
361)         Raises:
362)             EOFError:
363)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

364)             TrailingDataError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

365)                 The response from the SSH agent is too long.
366)             RuntimeError:
367)                 The agent failed to complete the request.
Marco Ricci Raise KeyError when signing...

Marco Ricci authored 2 months ago

368)             KeyError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

369)                 `check_if_key_loaded` is true, and the `key` was not
370)                 loaded into the agent.
371) 
372)         """
373)         if check_if_key_loaded:
374)             loaded_keys = frozenset({pair.key for pair in self.list_keys()})
375)             if bytes(key) not in loaded_keys:
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

376)                 msg = 'target SSH key not loaded into agent'
377)                 raise KeyError(msg)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

378)         request_data = bytearray(self.string(key))
379)         request_data.extend(self.string(payload))
380)         request_data.extend(self.uint32(flags))
381)         response_code, response = self.request(
Marco Ricci Reformat everything with ruff

Marco Ricci authored 1 month ago

382)             ssh_types.SSH_AGENTC.SIGN_REQUEST.value, request_data
383)         )
Marco Ricci Fix style issues with ruff...

Marco Ricci authored 1 month ago

384)         if response_code != ssh_types.SSH_AGENT.SIGN_RESPONSE.value:
385)             msg = f'signing data failed: {response_code = }, {response = }'
386)             raise RuntimeError(msg)