22396e0a2949d6766ead804647dbe17e0a0300f4
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

1) # SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2) #
3) # SPDX-License-Identifier: MIT
4) 
5) """A bare-bones SSH agent client supporting signing and key listing."""
6) 
7) from __future__ import annotations
8) 
9) import collections
10) import enum
11) import errno
12) import os
13) import pathlib
14) import socket
15) 
16) from collections.abc import Sequence, MutableSequence
17) from typing import Any, NamedTuple, Self, TypeAlias
18) from ssh_agent_client.types import KeyCommentPair, SSH_AGENT, SSH_AGENTC
19) 
20) __all__ = ('SSHAgentClient',)
Marco Ricci Remove __about__.py files,...

Marco Ricci authored 3 months ago

21) __author__ = 'Marco Ricci <m@the13thletter.info>'
22) __version__ = "0.1.0"
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

23) 
24) _socket = socket
25) 
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

26) class TrailingDataError(RuntimeError):
27)     """The result contained trailing data."""
28) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

29) class SSHAgentClient:
30)     """A bare-bones SSH agent client supporting signing and key listing.
31) 
32)     The main use case is requesting the agent sign some data, after
33)     checking that the necessary key is already loaded.
34) 
35)     The main fleshed out methods are `list_keys` and `sign`, which
36)     implement the `REQUEST_IDENTITIES` and `SIGN_REQUEST` requests.  If
37)     you *really* wanted to, there is enough infrastructure in place to
38)     issue other requests as defined in the protocol---it's merely the
39)     wrapper functions and the protocol numbers table that are missing.
40) 
41)     """
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

42)     _connection: socket.socket
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

43)     def __init__(
44)         self, /, *, socket: socket.socket | None = None, timeout: int = 125
45)     ) -> None:
46)         """Initialize the client.
47) 
48)         Args:
49)             socket:
50)                 An optional socket, connected to the SSH agent.  If not
51)                 given, we query the `SSH_AUTH_SOCK` environment
52)                 variable to auto-discover the correct socket address.
53)             timeout:
54)                 A connection timeout for the SSH agent.  Only used if
55)                 the socket is not yet connected.  The default value
56)                 gives ample time for agent connections forwarded via
57)                 SSH on high-latency networks (e.g. Tor).
58) 
59)         """
60)         if socket is not None:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

61)             self._connection = socket
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

62)         else:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

63)             self._connection = _socket.socket(family=_socket.AF_UNIX)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

64)         try:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

65)             # Test whether the socket is connected.
66)             self._connection.getpeername()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

67)         except OSError as e:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

68)             # This condition is hard to test purposefully, so exclude
69)             # from coverage.
70)             if e.errno != errno.ENOTCONN:  # pragma: no cover
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

71)                 raise
72)             try:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

73)                 ssh_auth_sock = os.environ['SSH_AUTH_SOCK']
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

74)             except KeyError as e:
75)                 raise RuntimeError(
76)                     "Can't find running ssh-agent: missing SSH_AUTH_SOCK"
77)                 ) from e
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

78)             self._connection.settimeout(timeout)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

79)             try:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

80)                 self._connection.connect(ssh_auth_sock)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

81)             except FileNotFoundError as e:
82)                 raise RuntimeError(
83)                     "Can't find running ssh-agent: unusable SSH_AUTH_SOCK"
84)                 ) from e
85) 
86)     def __enter__(self) -> Self:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

87)         """Close socket connection upon context manager completion."""
88)         self._connection.__enter__()
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

89)         return self
90) 
91)     def __exit__(
92)         self, exc_type: Any, exc_val: Any, exc_tb: Any
93)     ) -> bool:
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

94)         """Close socket connection upon context manager completion."""
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

95)         return bool(
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

96)             self._connection.__exit__(
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

97)                 exc_type, exc_val, exc_tb)  # type: ignore[func-returns-value]
98)         )
99) 
100)     @staticmethod
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

101)     def uint32(num: int, /) -> bytes:
102)         r"""Format the number as a `uint32`, as per the agent protocol.
103) 
104)         Args:
105)             num: A number.
106) 
107)         Returns:
108)             The number in SSH agent wire protocol format, i.e. as
109)             a 32-bit big endian number.
110) 
111)         Raises:
112)             OverflowError:
113)                 As per [`int.to_bytes`][].
114) 
115)         Examples:
116)             >>> SSHAgentClient.uint32(16777216)
117)             b'\x01\x00\x00\x00'
118) 
119)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

120)         return int.to_bytes(num, 4, 'big', signed=False)
121) 
122)     @classmethod
123)     def string(cls, payload: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

124)         r"""Format the payload as an SSH string, as per the agent protocol.
125) 
126)         Args:
127)             payload: A byte string.
128) 
129)         Returns:
130)             The payload, framed in the SSH agent wire protocol format.
131) 
132)         Examples:
133)             >>> bytes(SSHAgentClient.string(b'ssh-rsa'))
134)             b'\x00\x00\x00\x07ssh-rsa'
135) 
136)         """
Marco Ricci Fix numerous argument type...

Marco Ricci authored 3 months ago

137)         try:
138)             ret = bytearray()
139)             ret.extend(cls.uint32(len(payload)))
140)             ret.extend(payload)
141)             return ret
142)         except Exception as e:
143)             raise TypeError('invalid payload type') from e
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

144) 
145)     @classmethod
146)     def unstring(cls, bytestring: bytes | bytearray, /) -> bytes | bytearray:
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

147)         r"""Unpack an SSH string.
148) 
149)         Args:
150)             bytestring: A framed byte string.
151) 
152)         Returns:
153)             The unframed byte string, i.e., the payload.
154) 
155)         Raises:
156)             ValueError:
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

157)                 The byte string is not an SSH string.
Marco Ricci Add unit tests, both new an...

Marco Ricci authored 3 months ago

158) 
159)         Examples:
160)             >>> bytes(SSHAgentClient.unstring(b'\x00\x00\x00\x07ssh-rsa'))
161)             b'ssh-rsa'
162)             >>> bytes(SSHAgentClient.unstring(SSHAgentClient.string(b'ssh-ed25519')))
163)             b'ssh-ed25519'
164) 
165)         """
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

166)         n = len(bytestring)
167)         if n < 4:
168)             raise ValueError('malformed SSH byte string')
169)         elif n != 4 + int.from_bytes(bytestring[:4], 'big', signed=False):
170)             raise ValueError('malformed SSH byte string')
171)         return bytestring[4:]
172) 
Marco Ricci Add function for SSH framed...

Marco Ricci authored 2 months ago

173)     @classmethod
174)     def unstring_prefix(
175)         cls, bytestring: bytes | bytearray, /
176)     ) -> tuple[bytes | bytearray, bytes | bytearray]:
177)         r"""Unpack an SSH string at the beginning of the byte string.
178) 
179)         Args:
180)             bytestring:
181)                 A (general) byte string, beginning with a framed/SSH
182)                 byte string.
183) 
184)         Returns:
185)             A 2-tuple `(a, b)`, where `a` is the unframed byte
186)             string/payload at the beginning of input byte string, and
187)             `b` is the remainder of the input byte string.
188) 
189)         Raises:
190)             ValueError:
191)                 The byte string does not begin with an SSH string.
192) 
193)         Examples:
194)             >>> a, b = SSHAgentClient.unstring_prefix(
195)             ...     b'\x00\x00\x00\x07ssh-rsa____trailing data')
196)             >>> (bytes(a), bytes(b))
197)             (b'ssh-rsa', b'____trailing data')
198)             >>> a, b = SSHAgentClient.unstring_prefix(
199)             ...     SSHAgentClient.string(b'ssh-ed25519'))
200)             >>> (bytes(a), bytes(b))
201)             (b'ssh-ed25519', b'')
202) 
203)         """
204)         n = len(bytestring)
205)         if n < 4:
206)             raise ValueError('malformed SSH byte string')
207)         m = int.from_bytes(bytestring[:4], 'big', signed=False)
208)         if m + 4 > n:
209)             raise ValueError('malformed SSH byte string')
210)         return (bytestring[4:m + 4], bytestring[m + 4:])
211) 
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

212)     def request(
213)         self, code: int, payload: bytes | bytearray, /
214)     ) -> tuple[int, bytes | bytearray]:
215)         """Issue a generic request to the SSH agent.
216) 
217)         Args:
218)             code:
219)                 The request code.  See the SSH agent protocol for
220)                 protocol numbers to use here (and which protocol numbers
221)                 to expect in a response).
222)             payload:
223)                 A byte string containing the payload, or "contents", of
224)                 the request.  Request-specific.  `request` will add any
225)                 necessary wire framing around the request code and the
226)                 payload.
227) 
228)         Returns:
229)             A 2-tuple consisting of the response code and the payload,
230)             with all wire framing removed.
231) 
232)         Raises:
233)             EOFError:
234)                 The response from the SSH agent is truncated or missing.
235) 
236)         """
237)         request_message = bytearray([code])
238)         request_message.extend(payload)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

239)         self._connection.sendall(self.string(request_message))
240)         chunk = self._connection.recv(4)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

241)         if len(chunk) < 4:
242)             raise EOFError('cannot read response length')
243)         response_length = int.from_bytes(chunk, 'big', signed=False)
Marco Ricci Remove public attributes of...

Marco Ricci authored 2 months ago

244)         response = self._connection.recv(response_length)
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

245)         if len(response) < response_length:
246)             raise EOFError('truncated response from SSH agent')
247)         return response[0], response[1:]
248) 
249)     def list_keys(self) -> Sequence[KeyCommentPair]:
250)         """Request a list of keys known to the SSH agent.
251) 
252)         Returns:
253)             A read-only sequence of key/comment pairs.
254) 
255)         Raises:
256)             EOFError:
257)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

258)             TrailingDataError:
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

259)                 The response from the SSH agent is too long.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

260)             RuntimeError:
261)                 The agent failed to complete the request.
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

262) 
263)         """
264)         response_code, response = self.request(
265)             SSH_AGENTC.REQUEST_IDENTITIES.value, b'')
266)         if response_code != SSH_AGENT.IDENTITIES_ANSWER.value:
267)             raise RuntimeError(
268)                 f'error return from SSH agent: '
269)                 f'{response_code = }, {response = }'
270)             )
271)         response_stream = collections.deque(response)
272)         def shift(num: int) -> bytes:
273)             buf = collections.deque(bytes())
274)             for i in range(num):
275)                 try:
276)                     val = response_stream.popleft()
277)                 except IndexError:
278)                     response_stream.extendleft(reversed(buf))
279)                     raise EOFError(
280)                         'truncated response from SSH agent'
281)                     ) from None
282)                 buf.append(val)
283)             return bytes(buf)
284)         key_count = int.from_bytes(shift(4), 'big')
285)         keys: collections.deque[KeyCommentPair] = collections.deque()
286)         for i in range(key_count):
287)             key_size = int.from_bytes(shift(4), 'big')
288)             key = shift(key_size)
289)             comment_size = int.from_bytes(shift(4), 'big')
290)             comment = shift(comment_size)
291)             # Both `key` and `comment` are not wrapped as SSH strings.
292)             keys.append(KeyCommentPair(key, comment))
293)         if response_stream:
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

294)             raise TrailingDataError('overlong response from SSH agent')
Marco Ricci Add prototype implementation

Marco Ricci authored 4 months ago

295)         return keys
296) 
297)     def sign(
298)         self, /, key: bytes | bytearray, payload: bytes | bytearray,
299)         *, flags: int = 0, check_if_key_loaded: bool = False,
300)     ) -> bytes | bytearray:
301)         """Request the SSH agent sign the payload with the key.
302) 
303)         Args:
304)             key:
305)                 The public SSH key to sign the payload with, in the same
306)                 format as returned by, e.g., the `list_keys` method.
307)                 The corresponding private key must have previously been
308)                 loaded into the agent to successfully issue a signature.
309)             payload:
310)                 A byte string of data to sign.
311)             flags:
312)                 Optional flags for the signing request.  Currently
313)                 passed on as-is to the agent.  In real-world usage, this
314)                 could be used, e.g., to request more modern hash
315)                 algorithms when signing with RSA keys.  (No such
316)                 real-world usage is currently implemented.)
317)             check_if_key_loaded:
318)                 If true, check beforehand (via `list_keys`) if the
319)                 corresponding key has been loaded into the agent.
320) 
321)         Returns:
322)             The binary signature of the payload under the given key.
323) 
324)         Raises:
325)             EOFError:
326)                 The response from the SSH agent is truncated or missing.
Marco Ricci Introduce TrailingDataError...

Marco Ricci authored 2 months ago

327)             TrailingDataError: