Marco Ricci commited on 2024-08-16 13:27:20
Zeige 4 geänderte Dateien mit 62 Einfügungen und 21 Löschungen.
Add the generic `SSH_AGENT_FAILURE` and `SSH_AGENT_SUCCESS` status codes (as `_types.SSH_AGENT.FAILURE` and `_types.SSH_AGENT.SUCCESS`, respectively), and add an `SSHAgentFailedError` class to signal error returns from the SSH agent. The error class may emit a specific or a generic error message, depending on the status code and error message passed. Rewrite the tests to use actual failure status codes, instead of random magic numbers. Rewrite it further to deal with the `_types.SSH_AGENT` enum instead of numeric codes. This also makes the tests more readable.
... | ... |
@@ -183,6 +183,10 @@ class SSH_AGENT(enum.Enum): # noqa: N801 |
183 | 183 |
"""SSH agent protocol numbers: server replies. |
184 | 184 |
|
185 | 185 |
Attributes: |
186 |
+ FAILURE: |
|
187 |
+ Generic failure code. |
|
188 |
+ SUCCESS: |
|
189 |
+ Generic success code. |
|
186 | 190 |
IDENTITIES_ANSWER: |
187 | 191 |
Successful answer to `SSH_AGENTC.REQUEST_IDENTITIES`. |
188 | 192 |
SIGN_RESPONSE: |
... | ... |
@@ -190,5 +194,7 @@ class SSH_AGENT(enum.Enum): # noqa: N801 |
190 | 194 |
|
191 | 195 |
""" |
192 | 196 |
|
197 |
+ FAILURE: int = 5 |
|
198 |
+ SUCCESS: int = 6 |
|
193 | 199 |
IDENTITIES_ANSWER: int = 12 |
194 | 200 |
SIGN_RESPONSE: int = 14 |
... | ... |
@@ -168,6 +168,8 @@ def _get_suitable_ssh_keys( |
168 | 168 |
SSH agent. |
169 | 169 |
RuntimeError: |
170 | 170 |
There was an error communicating with the SSH agent. |
171 |
+ SSHAgentFailedError: |
|
172 |
+ The agent failed to supply a list of loaded keys. |
|
171 | 173 |
|
172 | 174 |
""" |
173 | 175 |
client: ssh_agent.SSHAgentClient |
... | ... |
@@ -310,6 +312,8 @@ def _select_ssh_key( |
310 | 312 |
SSH agent. |
311 | 313 |
RuntimeError: |
312 | 314 |
There was an error communicating with the SSH agent. |
315 |
+ SSHAgentFailedError: |
|
316 |
+ The agent failed to supply a list of loaded keys. |
|
313 | 317 |
""" |
314 | 318 |
suitable_keys = list(_get_suitable_ssh_keys(conn)) |
315 | 319 |
key_listing: list[str] = [] |
... | ... |
@@ -1036,7 +1040,11 @@ def derivepassphrase( |
1036 | 1040 |
f'Cannot connect to SSH agent: {e.strerror}: ' |
1037 | 1041 |
f'{e.filename!r}' |
1038 | 1042 |
) |
1039 |
- except (LookupError, RuntimeError) as e: |
|
1043 |
+ except ( |
|
1044 |
+ LookupError, |
|
1045 |
+ RuntimeError, |
|
1046 |
+ ssh_agent.SSHAgentFailedError, |
|
1047 |
+ ) as e: |
|
1040 | 1048 |
err(str(e)) |
1041 | 1049 |
elif use_phrase: |
1042 | 1050 |
maybe_phrase = _prompt_for_passphrase() |
... | ... |
@@ -37,6 +37,24 @@ class TrailingDataError(RuntimeError): |
37 | 37 |
super().__init__('Overlong response from SSH agent') |
38 | 38 |
|
39 | 39 |
|
40 |
+class SSHAgentFailedError(RuntimeError): |
|
41 |
+ """The SSH agent failed to complete the requested operation.""" |
|
42 |
+ |
|
43 |
+ def __str__(self) -> str: |
|
44 |
+ match self.args: |
|
45 |
+ case (_types.SSH_AGENT.FAILURE.value, b''): # pragma: no branch |
|
46 |
+ return 'The SSH agent failed to complete the request' |
|
47 |
+ case (_, _msg) if _msg: # pragma: no cover |
|
48 |
+ code = self.args[0] |
|
49 |
+ msg = self.args[1].decode('utf-8', 'surrogateescape') |
|
50 |
+ return f'[Code {code:d}] {msg:s}' |
|
51 |
+ case _: # pragma: no cover |
|
52 |
+ return repr(self) |
|
53 |
+ |
|
54 |
+ def __repr__(self) -> str: # pragma: no cover |
|
55 |
+ return f'{self.__class__.__name__}{self.args!r}' |
|
56 |
+ |
|
57 |
+ |
|
40 | 58 |
class SSHAgentClient: |
41 | 59 |
"""A bare-bones SSH agent client supporting signing and key listing. |
42 | 60 |
|
... | ... |
@@ -284,7 +302,7 @@ class SSHAgentClient: |
284 | 302 |
The response from the SSH agent is truncated or missing. |
285 | 303 |
TrailingDataError: |
286 | 304 |
The response from the SSH agent is too long. |
287 |
- RuntimeError: |
|
305 |
+ SSHAgentFailedError: |
|
288 | 306 |
The agent failed to complete the request. |
289 | 307 |
|
290 | 308 |
""" |
... | ... |
@@ -292,11 +310,7 @@ class SSHAgentClient: |
292 | 310 |
_types.SSH_AGENTC.REQUEST_IDENTITIES.value, b'' |
293 | 311 |
) |
294 | 312 |
if response_code != _types.SSH_AGENT.IDENTITIES_ANSWER.value: |
295 |
- msg = ( |
|
296 |
- f'error return from SSH agent: ' |
|
297 |
- f'{response_code = }, {response = }' |
|
298 |
- ) |
|
299 |
- raise RuntimeError(msg) |
|
313 |
+ raise SSHAgentFailedError(response_code, response) |
|
300 | 314 |
response_stream = collections.deque(response) |
301 | 315 |
|
302 | 316 |
def shift(num: int) -> bytes: |
... | ... |
@@ -362,7 +376,7 @@ class SSHAgentClient: |
362 | 376 |
The response from the SSH agent is truncated or missing. |
363 | 377 |
TrailingDataError: |
364 | 378 |
The response from the SSH agent is too long. |
365 |
- RuntimeError: |
|
379 |
+ SSHAgentFailedError: |
|
366 | 380 |
The agent failed to complete the request. |
367 | 381 |
KeyError: |
368 | 382 |
`check_if_key_loaded` is true, and the `key` was not |
... | ... |
@@ -381,6 +395,5 @@ class SSHAgentClient: |
381 | 395 |
_types.SSH_AGENTC.SIGN_REQUEST.value, request_data |
382 | 396 |
) |
383 | 397 |
if response_code != _types.SSH_AGENT.SIGN_RESPONSE.value: |
384 |
- msg = f'signing data failed: {response_code = }, {response = }' |
|
385 |
- raise RuntimeError(msg) |
|
398 |
+ raise SSHAgentFailedError(response_code, response) |
|
386 | 399 |
return self.unstring(response) |
... | ... |
@@ -360,10 +360,20 @@ class TestAgentInteraction: |
360 | 360 |
@pytest.mark.parametrize( |
361 | 361 |
['response_code', 'response', 'exc_type', 'exc_pattern'], |
362 | 362 |
[ |
363 |
- (255, b'', RuntimeError, 'error return from SSH agent:'), |
|
364 |
- (12, b'\x00\x00\x00\x01', EOFError, 'truncated response'), |
|
365 | 363 |
( |
366 |
- 12, |
|
364 |
+ _types.SSH_AGENT.FAILURE, |
|
365 |
+ b'', |
|
366 |
+ ssh_agent.SSHAgentFailedError, |
|
367 |
+ 'failed to complete the request', |
|
368 |
+ ), |
|
369 |
+ ( |
|
370 |
+ _types.SSH_AGENT.IDENTITIES_ANSWER, |
|
371 |
+ b'\x00\x00\x00\x01', |
|
372 |
+ EOFError, |
|
373 |
+ 'truncated response', |
|
374 |
+ ), |
|
375 |
+ ( |
|
376 |
+ _types.SSH_AGENT.IDENTITIES_ANSWER, |
|
367 | 377 |
b'\x00\x00\x00\x00abc', |
368 | 378 |
ssh_agent.TrailingDataError, |
369 | 379 |
'Overlong response', |
... | ... |
@@ -373,7 +383,7 @@ class TestAgentInteraction: |
373 | 383 |
def test_320_list_keys_error_responses( |
374 | 384 |
self, |
375 | 385 |
monkeypatch: Any, |
376 |
- response_code: int, |
|
386 |
+ response_code: _types.SSH_AGENT, |
|
377 | 387 |
response: bytes | bytearray, |
378 | 388 |
exc_type: type[Exception], |
379 | 389 |
exc_pattern: str, |
... | ... |
@@ -382,7 +392,7 @@ class TestAgentInteraction: |
382 | 392 |
monkeypatch.setattr( |
383 | 393 |
client, |
384 | 394 |
'request', |
385 |
- lambda *a, **kw: (response_code, response), # noqa: ARG005 |
|
395 |
+ lambda *a, **kw: (response_code.value, response), # noqa: ARG005 |
|
386 | 396 |
) |
387 | 397 |
with pytest.raises(exc_type, match=exc_pattern): |
388 | 398 |
client.list_keys() |
... | ... |
@@ -394,16 +404,16 @@ class TestAgentInteraction: |
394 | 404 |
( |
395 | 405 |
b'invalid-key', |
396 | 406 |
True, |
397 |
- (255, b''), |
|
407 |
+ (_types.SSH_AGENT.FAILURE, b''), |
|
398 | 408 |
KeyError, |
399 | 409 |
'target SSH key not loaded into agent', |
400 | 410 |
), |
401 | 411 |
( |
402 | 412 |
tests.SUPPORTED_KEYS['ed25519']['public_key_data'], |
403 | 413 |
True, |
404 |
- (255, b''), |
|
405 |
- RuntimeError, |
|
406 |
- 'signing data failed:', |
|
414 |
+ (_types.SSH_AGENT.FAILURE, b''), |
|
415 |
+ ssh_agent.SSHAgentFailedError, |
|
416 |
+ 'failed to complete the request', |
|
407 | 417 |
), |
408 | 418 |
], |
409 | 419 |
) |
... | ... |
@@ -412,12 +422,16 @@ class TestAgentInteraction: |
412 | 422 |
monkeypatch: Any, |
413 | 423 |
key: bytes | bytearray, |
414 | 424 |
check: bool, |
415 |
- response: tuple[int, bytes | bytearray], |
|
425 |
+ response: tuple[_types.SSH_AGENT, bytes | bytearray], |
|
416 | 426 |
exc_type: type[Exception], |
417 | 427 |
exc_pattern: str, |
418 | 428 |
) -> None: |
419 | 429 |
client = ssh_agent.SSHAgentClient() |
420 |
- monkeypatch.setattr(client, 'request', lambda a, b: response) # noqa: ARG005 |
|
430 |
+ monkeypatch.setattr( |
|
431 |
+ client, |
|
432 |
+ 'request', |
|
433 |
+ lambda a, b: (response[0].value, response[1]), # noqa: ARG005 |
|
434 |
+ ) |
|
421 | 435 |
KeyCommentPair = _types.KeyCommentPair # noqa: N806 |
422 | 436 |
loaded_keys = [ |
423 | 437 |
KeyCommentPair(v['public_key_data'], b'no comment') |
424 | 438 |