Marco Ricci commited on 2024-06-22 21:19:30
Zeige 9 geänderte Dateien mit 1422 Einfügungen und 50 Löschungen.
Add a finished command-line interface, with interface parity with vault v0.3. The implementation has 100% coverage, including the tests... even if the tests still contain a lot of manual code and data duplication. The project settings were updated as well, mostly expanding the scope of what is covered by type checking and what is (not) covered by coverage testing.
... | ... |
@@ -52,6 +52,7 @@ plugins: |
52 | 52 |
python: |
53 | 53 |
import: |
54 | 54 |
- https://docs.python.org/3/objects.inv |
55 |
+ - https://click.palletsprojects.com/en/8.1.x/objects.inv |
|
55 | 56 |
options: |
56 | 57 |
docstring_options: |
57 | 58 |
ignore_init_summary: true |
... | ... |
@@ -82,6 +83,7 @@ nav: |
82 | 83 |
- Module derivepassphrase: reference/derivepassphrase.md |
83 | 84 |
- Module sequin: reference/sequin.md |
84 | 85 |
- Module ssh_agent_client: reference/ssh_agent_client.md |
86 |
+ - Coverage: reference/coverage/index.html |
|
85 | 87 |
#- explanation.md |
86 | 88 |
|
87 | 89 |
markdown_extensions: |
... | ... |
@@ -43,6 +43,7 @@ Source = "https://github.com/the-13th-letter/derivepassphrase" |
43 | 43 |
derivepassphrase = "derivepassphrase.cli:derivepassphrase" |
44 | 44 |
|
45 | 45 |
[tool.mypy] |
46 |
+files = ['src/**/*.py'] |
|
46 | 47 |
mypy_path = '$MYPY_CONFIG_FILE_DIR/src' |
47 | 48 |
explicit_package_bases = true |
48 | 49 |
implicit_reexport = false |
... | ... |
@@ -84,21 +85,32 @@ extra-dependencies = [ |
84 | 85 |
[tool.hatch.envs.types.scripts] |
85 | 86 |
check = "mypy --install-types --non-interactive {args:src/derivepassphrase tests}" |
86 | 87 |
|
88 |
+[tool.coverage.html] |
|
89 |
+directory = "docs/reference/coverage" |
|
90 |
+ |
|
87 | 91 |
[tool.coverage.run] |
88 |
-source_pkgs = ["derivepassphrase", "tests"] |
|
92 |
+source_pkgs = ["derivepassphrase", "sequin", "ssh_agent_client", "tests"] |
|
89 | 93 |
branch = true |
90 | 94 |
parallel = true |
91 | 95 |
omit = [ |
92 |
- "src/derivepassphrase/__main__.py", |
|
96 |
+ "__main__.py", |
|
93 | 97 |
] |
94 | 98 |
|
95 | 99 |
[tool.coverage.paths] |
96 |
-derivepassphrase = ["src/derivepassphrase", "*/derivepassphrase/src/derivepassphrase"] |
|
97 |
-tests = ["tests", "*/derivepassphrase/tests"] |
|
100 |
+src = ["src"] |
|
101 |
+tests = ["tests"] |
|
98 | 102 |
|
99 | 103 |
[tool.coverage.report] |
100 |
-exclude_lines = [ |
|
101 |
- "no cov", |
|
104 |
+skip_covered = false |
|
105 |
+skip_empty = true |
|
106 |
+precision = 3 |
|
107 |
+partial_branches = [ |
|
108 |
+ 'pragma: no branch', |
|
109 |
+] |
|
110 |
+exclude_also = [ |
|
102 | 111 |
"if __name__ == .__main__.:", |
103 |
- "if TYPE_CHECKING:", |
|
112 |
+ 'if (?:typing\.)?TYPE_CHECKING:', |
|
113 |
+ "raise AssertionError", |
|
114 |
+ "raise NotImplementedError", |
|
115 |
+ 'assert False', |
|
104 | 116 |
] |
... | ... |
@@ -8,14 +8,24 @@ |
8 | 8 |
|
9 | 9 |
from __future__ import annotations |
10 | 10 |
|
11 |
+import base64 |
|
12 |
+import collections |
|
13 |
+from collections.abc import MutableMapping |
|
14 |
+import contextlib |
|
11 | 15 |
import inspect |
12 | 16 |
import json |
17 |
+import os |
|
13 | 18 |
import pathlib |
14 |
-from typing import Any, TextIO |
|
19 |
+import socket |
|
20 |
+from typing import ( |
|
21 |
+ Any, assert_never, cast, reveal_type, Iterator, Never, NotRequired, |
|
22 |
+ Sequence, TextIO, TypedDict, TYPE_CHECKING, |
|
23 |
+) |
|
15 | 24 |
|
16 | 25 |
import click |
17 | 26 |
import derivepassphrase as dpp |
18 | 27 |
from derivepassphrase import types as dpp_types |
28 |
+import ssh_agent_client |
|
19 | 29 |
|
20 | 30 |
__author__ = dpp.__author__ |
21 | 31 |
__version__ = dpp.__version__ |
... | ... |
@@ -92,6 +102,202 @@ def _save_config(config: dpp_types.VaultConfig, /) -> None: |
92 | 102 |
json.dump(config, fileobj) |
93 | 103 |
|
94 | 104 |
|
105 |
+def _get_suitable_ssh_keys( |
|
106 |
+ conn: ssh_agent_client.SSHAgentClient | socket.socket | None = None, |
|
107 |
+ / |
|
108 |
+) -> Iterator[ssh_agent_client.types.KeyCommentPair]: |
|
109 |
+ """Yield all SSH keys suitable for passphrase derivation. |
|
110 |
+ |
|
111 |
+ Suitable SSH keys are queried from the running SSH agent (see |
|
112 |
+ [`ssh_agent_client.SSHAgentClient.list_keys`][]). |
|
113 |
+ |
|
114 |
+ Args: |
|
115 |
+ conn: |
|
116 |
+ An optional connection hint to the SSH agent; specifically, |
|
117 |
+ an SSH agent client, or a socket connected to an SSH agent. |
|
118 |
+ |
|
119 |
+ If an existing SSH agent client, then this client will be |
|
120 |
+ queried for the SSH keys, and otherwise left intact. |
|
121 |
+ |
|
122 |
+ If a socket, then a one-shot client will be constructed |
|
123 |
+ based on the socket to query the agent, and deconstructed |
|
124 |
+ afterwards. |
|
125 |
+ |
|
126 |
+ If neither are given, then the agent's socket location is |
|
127 |
+ looked up in the `SSH_AUTH_SOCK` environment variable, and |
|
128 |
+ used to construct/deconstruct a one-shot client, as in the |
|
129 |
+ previous case. |
|
130 |
+ |
|
131 |
+ Yields: |
|
132 |
+ : |
|
133 |
+ Every SSH key from the SSH agent that is suitable for |
|
134 |
+ passphrase derivation. |
|
135 |
+ |
|
136 |
+ Raises: |
|
137 |
+ RuntimeError: |
|
138 |
+ There was an error communicating with the SSH agent. |
|
139 |
+ RuntimeError: |
|
140 |
+ No keys usable for passphrase derivation are loaded into the |
|
141 |
+ SSH agent. |
|
142 |
+ |
|
143 |
+ """ |
|
144 |
+ client: ssh_agent_client.SSHAgentClient |
|
145 |
+ client_context: contextlib.AbstractContextManager |
|
146 |
+ match conn: |
|
147 |
+ case ssh_agent_client.SSHAgentClient(): |
|
148 |
+ client = conn |
|
149 |
+ client_context = contextlib.nullcontext() |
|
150 |
+ case socket.socket() | None: |
|
151 |
+ client = ssh_agent_client.SSHAgentClient(socket=conn) |
|
152 |
+ client_context = client |
|
153 |
+ case _: # pragma: no cover |
|
154 |
+ assert_never(conn) |
|
155 |
+ raise TypeError(f'invalid connection hint: {conn!r}') |
|
156 |
+ with client_context: |
|
157 |
+ try: |
|
158 |
+ all_key_comment_pairs = list(client.list_keys()) |
|
159 |
+ except EOFError as e: # pragma: no cover |
|
160 |
+ raise RuntimeError( |
|
161 |
+ 'error communicating with the SSH agent' |
|
162 |
+ ) from e |
|
163 |
+ suitable_keys = all_key_comment_pairs[:] |
|
164 |
+ for pair in all_key_comment_pairs: |
|
165 |
+ key, comment = pair |
|
166 |
+ if dpp.Vault._is_suitable_ssh_key(key): |
|
167 |
+ yield pair |
|
168 |
+ if not suitable_keys: # pragma: no cover |
|
169 |
+ raise RuntimeError('No usable SSH keys were found') |
|
170 |
+ |
|
171 |
+ |
|
172 |
+def _prompt_for_selection( |
|
173 |
+ items: Sequence[str | bytes], heading: str = 'Possible choices:', |
|
174 |
+ single_choice_prompt: str = 'Confirm this choice?', |
|
175 |
+) -> int: |
|
176 |
+ """Prompt user for a choice among the given items. |
|
177 |
+ |
|
178 |
+ Print the heading, if any, then present the items to the user. If |
|
179 |
+ there are multiple items, prompt the user for a selection, validate |
|
180 |
+ the choice, then return the list index of the selected item. If |
|
181 |
+ there is only a single item, request confirmation for that item |
|
182 |
+ instead, and return the correct index. |
|
183 |
+ |
|
184 |
+ Args: |
|
185 |
+ heading: |
|
186 |
+ A heading for the list of items, to print immediately |
|
187 |
+ before. Defaults to a reasonable standard heading. If |
|
188 |
+ explicitly empty, print no heading. |
|
189 |
+ single_choice_prompt: |
|
190 |
+ The confirmation prompt if there is only a single possible |
|
191 |
+ choice. Defaults to a reasonable standard prompt. |
|
192 |
+ |
|
193 |
+ Returns: |
|
194 |
+ An index into the items sequence, indicating the user's |
|
195 |
+ selection. |
|
196 |
+ |
|
197 |
+ Raises: |
|
198 |
+ IndexError: |
|
199 |
+ The user made an invalid or empty selection, or requested an |
|
200 |
+ abort. |
|
201 |
+ |
|
202 |
+ """ |
|
203 |
+ n = len(items) |
|
204 |
+ if heading: |
|
205 |
+ click.echo(click.style(heading, bold=True)) |
|
206 |
+ for i, x in enumerate(items, start=1): |
|
207 |
+ click.echo(click.style(f'[{i}]', bold=True), nl=False) |
|
208 |
+ click.echo(' ', nl=False) |
|
209 |
+ click.echo(x) |
|
210 |
+ if n > 1: |
|
211 |
+ choices = click.Choice([''] + [str(i) for i in range(1, n + 1)]) |
|
212 |
+ choice = click.prompt( |
|
213 |
+ f'Your selection? (1-{n}, leave empty to abort)', |
|
214 |
+ err=True, type=choices, show_choices=False, |
|
215 |
+ show_default=False, default='') |
|
216 |
+ if not choice: |
|
217 |
+ raise IndexError('empty selection') |
|
218 |
+ return int(choice) - 1 |
|
219 |
+ else: |
|
220 |
+ prompt_suffix = (' ' |
|
221 |
+ if single_choice_prompt.endswith(tuple('?.!')) |
|
222 |
+ else ': ') |
|
223 |
+ try: |
|
224 |
+ click.confirm(single_choice_prompt, |
|
225 |
+ prompt_suffix=prompt_suffix, err=True, |
|
226 |
+ abort=True, default=False, show_default=False) |
|
227 |
+ except click.Abort: |
|
228 |
+ raise IndexError('empty selection') from None |
|
229 |
+ return 0 |
|
230 |
+ |
|
231 |
+ |
|
232 |
+def _select_ssh_key( |
|
233 |
+ conn: ssh_agent_client.SSHAgentClient | socket.socket | None = None, |
|
234 |
+ / |
|
235 |
+) -> bytes | bytearray: |
|
236 |
+ """Interactively select an SSH key for passphrase derivation. |
|
237 |
+ |
|
238 |
+ Suitable SSH keys are queried from the running SSH agent (see |
|
239 |
+ [`ssh_agent_client.SSHAgentClient.list_keys`][]), then the user is |
|
240 |
+ prompted interactively (see [`click.prompt`][]) for a selection. |
|
241 |
+ |
|
242 |
+ Args: |
|
243 |
+ conn: |
|
244 |
+ An optional connection hint to the SSH agent; specifically, |
|
245 |
+ an SSH agent client, or a socket connected to an SSH agent. |
|
246 |
+ |
|
247 |
+ If an existing SSH agent client, then this client will be |
|
248 |
+ queried for the SSH keys, and otherwise left intact. |
|
249 |
+ |
|
250 |
+ If a socket, then a one-shot client will be constructed |
|
251 |
+ based on the socket to query the agent, and deconstructed |
|
252 |
+ afterwards. |
|
253 |
+ |
|
254 |
+ If neither are given, then the agent's socket location is |
|
255 |
+ looked up in the `SSH_AUTH_SOCK` environment variable, and |
|
256 |
+ used to construct/deconstruct a one-shot client, as in the |
|
257 |
+ previous case. |
|
258 |
+ |
|
259 |
+ Returns: |
|
260 |
+ The selected SSH key. |
|
261 |
+ |
|
262 |
+ Raises: |
|
263 |
+ IndexError: |
|
264 |
+ The user made an invalid or empty selection, or requested an |
|
265 |
+ abort. |
|
266 |
+ RuntimeError: |
|
267 |
+ There was an error communicating with the SSH agent. |
|
268 |
+ RuntimeError: |
|
269 |
+ No keys usable for passphrase derivation are loaded into the |
|
270 |
+ SSH agent. |
|
271 |
+ """ |
|
272 |
+ suitable_keys = list(_get_suitable_ssh_keys(conn)) |
|
273 |
+ key_listing: list[str] = [] |
|
274 |
+ unstring_prefix = ssh_agent_client.SSHAgentClient.unstring_prefix |
|
275 |
+ for key, comment in suitable_keys: |
|
276 |
+ keytype = unstring_prefix(key)[0].decode('ASCII') |
|
277 |
+ key_str = base64.standard_b64encode(key).decode('ASCII') |
|
278 |
+ key_prefix = key_str if len(key_str) < 30 else key_str[:27] + '...' |
|
279 |
+ comment_str = comment.decode('UTF-8', errors='replace') |
|
280 |
+ key_listing.append(f'{keytype} {key_prefix} {comment_str}') |
|
281 |
+ choice = _prompt_for_selection( |
|
282 |
+ key_listing, heading='Suitable SSH keys:', |
|
283 |
+ single_choice_prompt='Use this key?') |
|
284 |
+ return suitable_keys[choice].key |
|
285 |
+ |
|
286 |
+ |
|
287 |
+def _prompt_for_passphrase() -> str: |
|
288 |
+ """Interactively prompt for the passphrase. |
|
289 |
+ |
|
290 |
+ Calls [`click.prompt`][] internally. Moved into a separate function |
|
291 |
+ mainly for testing/mocking purposes. |
|
292 |
+ |
|
293 |
+ Returns: |
|
294 |
+ The user input. |
|
295 |
+ |
|
296 |
+ """ |
|
297 |
+ return click.prompt('Passphrase', default='', hide_input=True, |
|
298 |
+ show_default=False, err=True) |
|
299 |
+ |
|
300 |
+ |
|
95 | 301 |
class OptionGroupOption(click.Option): |
96 | 302 |
"""A [`click.Option`][] with an associated group name and group epilog. |
97 | 303 |
|
... | ... |
@@ -240,15 +446,36 @@ def _validate_length( |
240 | 446 |
raise click.BadParameter('not a positive integer') |
241 | 447 |
return int_value |
242 | 448 |
|
449 |
+DEFAULT_NOTES_TEMPLATE = '''\ |
|
450 |
+# Enter notes below the line with the cut mark (ASCII scissors and |
|
451 |
+# dashes). Lines above the cut mark (such as this one) will be ignored. |
|
452 |
+# |
|
453 |
+# If you wish to clear the notes, leave everything beyond the cut mark |
|
454 |
+# blank. However, if you leave the *entire* file blank, also removing |
|
455 |
+# the cut mark, then the edit is aborted, and the old notes contents are |
|
456 |
+# retained. |
|
457 |
+# |
|
458 |
+# - - - - - >8 - - - - - >8 - - - - - >8 - - - - - >8 - - - - - |
|
459 |
+''' |
|
460 |
+DEFAULT_NOTES_MARKER = '# - - - - - >8 - - - - -' |
|
461 |
+ |
|
462 |
+ |
|
243 | 463 |
@click.command( |
244 | 464 |
context_settings={"help_option_names": ["-h", "--help"]}, |
245 | 465 |
cls=CommandWithHelpGroups, |
246 |
- epilog=''' |
|
466 |
+ epilog=r''' |
|
247 | 467 |
WARNING: There is NO WAY to retrieve the generated passphrases |
248 | 468 |
if the master passphrase, the SSH key, or the exact passphrase |
249 | 469 |
settings are lost, short of trying out all possible |
250 | 470 |
combinations. You are STRONGLY advised to keep independent |
251 | 471 |
backups of the settings and the SSH key, if any. |
472 |
+ |
|
473 |
+ Configuration is stored in a directory according to the |
|
474 |
+ DERIVEPASSPHRASE_PATH variable, which defaults to |
|
475 |
+ `~/.derivepassphrase` on UNIX-like systems and |
|
476 |
+ `C:\Users\<user>\AppData\Roaming\Derivepassphrase` on Windows. |
|
477 |
+ The configuration is NOT encrypted, and you are STRONGLY |
|
478 |
+ discouraged from using a stored passphrase. |
|
252 | 479 |
''', |
253 | 480 |
) |
254 | 481 |
@click.option('-p', '--phrase', 'use_phrase', is_flag=True, |
... | ... |
@@ -513,11 +740,166 @@ def derivepassphrase( |
513 | 740 |
opt_str = param.opts[0] |
514 | 741 |
raise click.UsageError( |
515 | 742 |
f'{opt_str} does not take a SERVICE argument') |
516 |
- #if kwargs['length'] is None: |
|
517 |
- # kwargs['length'] = dpp.Vault.__init__.__kwdefaults__['length'] |
|
518 |
- #if kwargs['repeat'] is None: |
|
519 |
- # kwargs['repeat'] = dpp.Vault.__init__.__kwdefaults__['repeat'] |
|
520 |
- click.echo(repr(ctx.params)) |
|
743 |
+ |
|
744 |
+ if edit_notes: |
|
745 |
+ assert service is not None |
|
746 |
+ configuration = get_config() |
|
747 |
+ text = (DEFAULT_NOTES_TEMPLATE + |
|
748 |
+ configuration['services'] |
|
749 |
+ .get(service, cast(dpp_types.VaultConfigServicesSettings, {})) |
|
750 |
+ .get('notes', '')) |
|
751 |
+ notes_value = click.edit(text=text) |
|
752 |
+ if notes_value is not None: |
|
753 |
+ notes_lines = collections.deque(notes_value.splitlines(True)) |
|
754 |
+ while notes_lines: |
|
755 |
+ line = notes_lines.popleft() |
|
756 |
+ if line.startswith(DEFAULT_NOTES_MARKER): |
|
757 |
+ notes_value = ''.join(notes_lines) |
|
758 |
+ break |
|
759 |
+ else: |
|
760 |
+ if not notes_value.strip(): |
|
761 |
+ ctx.fail('not saving new notes: user aborted request') |
|
762 |
+ configuration['services'].setdefault(service, {})['notes'] = ( |
|
763 |
+ notes_value.strip('\n')) |
|
764 |
+ _save_config(configuration) |
|
765 |
+ elif delete_service_settings: |
|
766 |
+ assert service is not None |
|
767 |
+ configuration = get_config() |
|
768 |
+ if service in configuration['services']: |
|
769 |
+ del configuration['services'][service] |
|
770 |
+ _save_config(configuration) |
|
771 |
+ elif delete_globals: |
|
772 |
+ configuration = get_config() |
|
773 |
+ if 'global' in configuration: |
|
774 |
+ del configuration['global'] |
|
775 |
+ _save_config(configuration) |
|
776 |
+ elif clear_all_settings: |
|
777 |
+ _save_config({'services': {}}) |
|
778 |
+ elif import_settings: |
|
779 |
+ try: |
|
780 |
+ # TODO: keep track of auto-close; try os.dup if feasible |
|
781 |
+ infile = (cast(TextIO, import_settings) |
|
782 |
+ if hasattr(import_settings, 'close') |
|
783 |
+ else click.open_file(os.fspath(import_settings), 'rt')) |
|
784 |
+ with infile: |
|
785 |
+ maybe_config = json.load(infile) |
|
786 |
+ except json.JSONDecodeError as e: |
|
787 |
+ ctx.fail(f'Cannot load config: cannot decode JSON: {e}') |
|
788 |
+ except OSError as e: |
|
789 |
+ ctx.fail(f'Cannot load config: {e.strerror}') |
|
790 |
+ if dpp_types.is_vault_config(maybe_config): |
|
791 |
+ _save_config(maybe_config) |
|
792 |
+ else: |
|
793 |
+ ctx.fail('not a valid config') |
|
794 |
+ elif export_settings: |
|
795 |
+ configuration = get_config() |
|
796 |
+ try: |
|
797 |
+ # TODO: keep track of auto-close; try os.dup if feasible |
|
798 |
+ outfile = (cast(TextIO, export_settings) |
|
799 |
+ if hasattr(export_settings, 'close') |
|
800 |
+ else click.open_file(os.fspath(export_settings), 'wt')) |
|
801 |
+ with outfile: |
|
802 |
+ json.dump(configuration, outfile) |
|
803 |
+ except OSError as e: |
|
804 |
+ ctx.fail('cannot write config: {e.strerror}') |
|
805 |
+ else: |
|
806 |
+ configuration = get_config() |
|
807 |
+ # This block could be type checked more stringently, but this |
|
808 |
+ # would probably involve a lot of code repetition. Since we |
|
809 |
+ # have a type guarding function anyway, assert that we didn't |
|
810 |
+ # make any mistakes at the end instead. |
|
811 |
+ global_keys = {'key', 'phrase'} |
|
812 |
+ service_keys = {'key', 'phrase', 'length', 'repeat', 'lower', |
|
813 |
+ 'upper', 'number', 'space', 'dash', 'symbol'} |
|
814 |
+ settings: collections.ChainMap[str, Any] = collections.ChainMap( |
|
815 |
+ {k: v for k, v in locals().items() |
|
816 |
+ if k in service_keys and v is not None}, |
|
817 |
+ cast(dict[str, Any], |
|
818 |
+ configuration['services'].get(service or '', {})), |
|
819 |
+ {}, |
|
820 |
+ cast(dict[str, Any], configuration.get('global', {})) |
|
821 |
+ ) |
|
822 |
+ if use_key: |
|
823 |
+ try: |
|
824 |
+ key = base64.standard_b64encode( |
|
825 |
+ _select_ssh_key()).decode('ASCII') |
|
826 |
+ except IndexError: |
|
827 |
+ ctx.fail('no valid SSH key selected') |
|
828 |
+ except RuntimeError as e: |
|
829 |
+ ctx.fail(str(e)) |
|
830 |
+ elif use_phrase: |
|
831 |
+ maybe_phrase = _prompt_for_passphrase() |
|
832 |
+ if not maybe_phrase: |
|
833 |
+ ctx.fail('no passphrase given') |
|
834 |
+ else: |
|
835 |
+ phrase = maybe_phrase |
|
836 |
+ if store_config_only: |
|
837 |
+ view: collections.ChainMap[str, Any] |
|
838 |
+ view = (collections.ChainMap(*settings.maps[:2]) if service |
|
839 |
+ else settings.parents.parents) |
|
840 |
+ if use_key: |
|
841 |
+ view['key'] = key |
|
842 |
+ for m in view.maps: |
|
843 |
+ m.pop('phrase', '') |
|
844 |
+ elif use_phrase: |
|
845 |
+ view['phrase'] = phrase |
|
846 |
+ for m in view.maps: |
|
847 |
+ m.pop('key', '') |
|
848 |
+ if service: |
|
849 |
+ if not view.maps[0]: |
|
850 |
+ raise click.UsageError('cannot update service settings ' |
|
851 |
+ 'without actual settings') |
|
852 |
+ else: |
|
853 |
+ configuration['services'].setdefault( |
|
854 |
+ service, {}).update(view) # type: ignore[typeddict-item] |
|
855 |
+ else: |
|
856 |
+ if not view.maps[0]: |
|
857 |
+ raise click.UsageError('cannot update global settings ' |
|
858 |
+ 'without actual settings') |
|
859 |
+ else: |
|
860 |
+ configuration.setdefault( |
|
861 |
+ 'global', {}).update(view) # type: ignore[typeddict-item] |
|
862 |
+ assert dpp_types.is_vault_config(configuration), ( |
|
863 |
+ f'invalid vault configuration: {configuration!r}' |
|
864 |
+ ) |
|
865 |
+ _save_config(configuration) |
|
866 |
+ else: |
|
867 |
+ if not service: |
|
868 |
+ raise click.UsageError(f'SERVICE is required') |
|
869 |
+ kwargs: dict[str, Any] = {k: v for k, v in settings.items() |
|
870 |
+ if k in service_keys and v is not None} |
|
871 |
+ # If either --key or --phrase are given, use that setting. |
|
872 |
+ # Otherwise, if both key and phrase are set in the config, |
|
873 |
+ # one must be global (ignore it) and one must be |
|
874 |
+ # service-specific (use that one). Otherwise, if only one of |
|
875 |
+ # key and phrase is set in the config, use that one. In all |
|
876 |
+ # these above cases, set the phrase via |
|
877 |
+ # derivepassphrase.Vault.phrase_from_key if a key is |
|
878 |
+ # given. Finally, if nothing is set, error out. |
|
879 |
+ key_to_phrase = lambda key: dpp.Vault.phrase_from_key( |
|
880 |
+ base64.standard_b64decode(key)) |
|
881 |
+ if use_key or use_phrase: |
|
882 |
+ if use_key: |
|
883 |
+ kwargs['phrase'] = key_to_phrase(key) |
|
884 |
+ else: |
|
885 |
+ kwargs['phrase'] = phrase |
|
886 |
+ kwargs.pop('key', '') |
|
887 |
+ elif kwargs.get('phrase') and kwargs.get('key'): |
|
888 |
+ if any('key' in m for m in settings.maps[:2]): |
|
889 |
+ kwargs['phrase'] = key_to_phrase(kwargs.pop('key')) |
|
890 |
+ else: |
|
891 |
+ kwargs.pop('key') |
|
892 |
+ elif kwargs.get('key'): |
|
893 |
+ kwargs['phrase'] = key_to_phrase(kwargs.pop('key')) |
|
894 |
+ elif kwargs.get('phrase'): |
|
895 |
+ pass |
|
896 |
+ else: |
|
897 |
+ raise click.UsageError( |
|
898 |
+ 'no passphrase or key given on command-line ' |
|
899 |
+ 'or in configuration') |
|
900 |
+ vault = dpp.Vault(**kwargs) |
|
901 |
+ result = vault.generate(service) |
|
902 |
+ click.echo(result.decode('ASCII')) |
|
521 | 903 |
|
522 | 904 |
|
523 | 905 |
if __name__ == '__main__': |
... | ... |
@@ -2,17 +2,308 @@ |
2 | 2 |
# |
3 | 3 |
# SPDX-License-Identifier: MIT |
4 | 4 |
|
5 |
+from __future__ import annotations |
|
6 |
+ |
|
7 |
+import base64 |
|
8 |
+import contextlib |
|
9 |
+import errno |
|
10 |
+import json |
|
11 |
+import os |
|
12 |
+from typing import Any, cast, TYPE_CHECKING, NamedTuple |
|
13 |
+ |
|
5 | 14 |
import click.testing |
6 |
-import derivepassphrase |
|
7 |
-import derivepassphrase.cli |
|
15 |
+import derivepassphrase as dpp |
|
16 |
+import derivepassphrase.cli as cli |
|
17 |
+import ssh_agent_client.types |
|
8 | 18 |
import pytest |
9 | 19 |
|
10 | 20 |
DUMMY_SERVICE = 'service1' |
11 | 21 |
DUMMY_PASSPHRASE = b'my secret passphrase\n' |
22 |
+DUMMY_CONFIG_SETTINGS = {"length": 10, "upper": 1, "lower": 1, "repeat": 5, |
|
23 |
+ "number": 1, "space": 1, "dash": 1, "symbol": 1} |
|
24 |
+DUMMY_RESULT_PASSPHRASE = b'.2V_QJkd o' |
|
25 |
+DUMMY_RESULT_KEY1 = b'E<b<{ -7iG' |
|
26 |
+DUMMY_PHRASE_FROM_KEY1_RAW = ( |
|
27 |
+ b'\x00\x00\x00\x0bssh-ed25519' |
|
28 |
+ b'\x00\x00\x00@\xf0\x98\x19\x80l\x1a\x97\xd5&\x03n' |
|
29 |
+ b'\xcc\xe3e\x8f\x86f\x07\x13\x19\x13\t!33\xf9\xe46S' |
|
30 |
+ b'\x1d\xaf\xfd\r\x08\x1f\xec\xf8s\x9b\x8c_U9\x16|ST,' |
|
31 |
+ b'\x1eR\xbb0\xed\x7f\x89\xe2/iQU\xd8\x9e\xa6\x02' |
|
32 |
+) |
|
33 |
+DUMMY_PHRASE_FROM_KEY1 = b'8JgZgGwal9UmA27M42WPhmYHExkTCSEzM/nkNlMdr/0NCB/s+HObjF9VORZ8U1QsHlK7MO1/ieIvaVFV2J6mAg==' |
|
34 |
+ |
|
35 |
+# See Ed25519 and RSA test keys in test_key_signing.py |
|
36 |
+DUMMY_KEY1 = 'AAAAC3NzaC1lZDI1NTE5AAAAIIF4gWgm1gJIXw//Mkhv5MEwidwcakUGCekJD/vCEml2' |
|
37 |
+DUMMY_KEY2 = 'AAAAB3NzaC1yc2EAAAADAQABAAABgQCxoe7pezhxWy4NI0mUwKqg9WCYOAS+IjxN9eYcqpfcmQiojcuy9XsiN/xYJ1O94SrsKS5mEia2xHnYA4RUChTyYNcM2v6cnnBQ/N/VQhpGMN7SVxdbhKUXTWFCwbjBgO6rGyHB6WtoH8vd7TOEPt+NgcXwhsWyoaUUdYTA62V+GF9vEmxMaC4ubgDz+B0QkPnauSoNxmkhcIe0lsLNb1pClZyz88PDnKXCX/d0HuN/HJ+sbPg7dCvOyqFYSyKn3uY6bCXqoIdurxXzH3O7z0P8f5sbmKOrGGKNuNxVRbeVl/D/3uDL0nqsbfUc1qvkfwbJwtMXC4IV6kOZMSk2BAsqh7x48gQ+rhYeEVSi8F3CWs4HJQoqrGt7K9a3mCSlMBHP70u3w6ME7eumoryxlUofewTd17ZEkzdX08l2ZlKzZvwQUrc+xQZ2Uw8z2mfW6Ti4gi0pYGaig7Ke4PwuXpo/C5YAWfeXycsvJZ2uaYRjMdZeJGNAnHLUGLkBscw5aI8=' |
|
38 |
+ |
|
39 |
+ |
|
40 |
+class IncompatibleConfiguration(NamedTuple): |
|
41 |
+ other_options: list[tuple[str, ...]] |
|
42 |
+ needs_service: bool | None |
|
43 |
+ input: bytes | None |
|
44 |
+ |
|
45 |
+class SingleConfiguration(NamedTuple): |
|
46 |
+ needs_service: bool | None |
|
47 |
+ input: bytes | None |
|
48 |
+ check_success: bool |
|
49 |
+ |
|
50 |
+class OptionCombination(NamedTuple): |
|
51 |
+ options: list[str] |
|
52 |
+ incompatible: bool |
|
53 |
+ needs_service: bool | None |
|
54 |
+ input: bytes | None |
|
55 |
+ check_success: bool |
|
56 |
+ |
|
57 |
+PASSWORD_GENERATION_OPTIONS: list[tuple[str, ...]] = [ |
|
58 |
+ ('--phrase',), ('--key',), ('--length', '20'), ('--repeat', '20'), |
|
59 |
+ ('--lower', '1'), ('--upper', '1'), ('--number', '1'), |
|
60 |
+ ('--space', '1'), ('--dash', '1'), ('--symbol', '1') |
|
61 |
+] |
|
62 |
+CONFIGURATION_OPTIONS: list[tuple[str, ...]] = [ |
|
63 |
+ ('--notes',), ('--config',), ('--delete',), ('--delete-globals',), |
|
64 |
+ ('--clear',) |
|
65 |
+] |
|
66 |
+CONFIGURATION_COMMANDS: list[tuple[str, ...]] = [ |
|
67 |
+ ('--notes',), ('--delete',), ('--delete-globals',), ('--clear',) |
|
68 |
+] |
|
69 |
+STORAGE_OPTIONS: list[tuple[str, ...]] = [ |
|
70 |
+ ('--export', '-'), ('--import', '-') |
|
71 |
+] |
|
72 |
+INCOMPATIBLE: dict[tuple[str, ...], IncompatibleConfiguration] = { |
|
73 |
+ ('--phrase',): IncompatibleConfiguration( |
|
74 |
+ [('--key',)] + CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
75 |
+ True, DUMMY_PASSPHRASE), |
|
76 |
+ ('--key',): IncompatibleConfiguration( |
|
77 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
78 |
+ True, DUMMY_PASSPHRASE), |
|
79 |
+ ('--length', '20'): IncompatibleConfiguration( |
|
80 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
81 |
+ True, DUMMY_PASSPHRASE), |
|
82 |
+ ('--repeat', '20'): IncompatibleConfiguration( |
|
83 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
84 |
+ True, DUMMY_PASSPHRASE), |
|
85 |
+ ('--lower', '1'): IncompatibleConfiguration( |
|
86 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
87 |
+ True, DUMMY_PASSPHRASE), |
|
88 |
+ ('--upper', '1'): IncompatibleConfiguration( |
|
89 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
90 |
+ True, DUMMY_PASSPHRASE), |
|
91 |
+ ('--number', '1'): IncompatibleConfiguration( |
|
92 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
93 |
+ True, DUMMY_PASSPHRASE), |
|
94 |
+ ('--space', '1'): IncompatibleConfiguration( |
|
95 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
96 |
+ True, DUMMY_PASSPHRASE), |
|
97 |
+ ('--dash', '1'): IncompatibleConfiguration( |
|
98 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
99 |
+ True, DUMMY_PASSPHRASE), |
|
100 |
+ ('--symbol', '1'): IncompatibleConfiguration( |
|
101 |
+ CONFIGURATION_COMMANDS + STORAGE_OPTIONS, |
|
102 |
+ True, DUMMY_PASSPHRASE), |
|
103 |
+ ('--notes',): IncompatibleConfiguration( |
|
104 |
+ [('--config',), ('--delete',), ('--delete-globals',), |
|
105 |
+ ('--clear',)] + STORAGE_OPTIONS, |
|
106 |
+ True, None), |
|
107 |
+ ('--config', '-p'): IncompatibleConfiguration( |
|
108 |
+ [('--delete',), ('--delete-globals',), |
|
109 |
+ ('--clear',)] + STORAGE_OPTIONS, |
|
110 |
+ None, DUMMY_PASSPHRASE), |
|
111 |
+ ('--delete',): IncompatibleConfiguration( |
|
112 |
+ [('--delete-globals',), ('--clear',)] + STORAGE_OPTIONS, True, None), |
|
113 |
+ ('--delete-globals',): IncompatibleConfiguration( |
|
114 |
+ [('--clear',)] + STORAGE_OPTIONS, False, None), |
|
115 |
+ ('--clear',): IncompatibleConfiguration(STORAGE_OPTIONS, False, None), |
|
116 |
+ ('--export', '-'): IncompatibleConfiguration( |
|
117 |
+ [('--import', '-')], False, None), |
|
118 |
+ ('--import', '-'): IncompatibleConfiguration( |
|
119 |
+ [], False, None), |
|
120 |
+} |
|
121 |
+SINGLES: dict[tuple[str, ...], SingleConfiguration] = { |
|
122 |
+ ('--phrase',): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
123 |
+ ('--key',): SingleConfiguration(True, None, False), |
|
124 |
+ ('--length', '20'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
125 |
+ ('--repeat', '20'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
126 |
+ ('--lower', '1'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
127 |
+ ('--upper', '1'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
128 |
+ ('--number', '1'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
129 |
+ ('--space', '1'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
130 |
+ ('--dash', '1'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
131 |
+ ('--symbol', '1'): SingleConfiguration(True, DUMMY_PASSPHRASE, True), |
|
132 |
+ ('--notes',): SingleConfiguration(True, None, False), |
|
133 |
+ ('--config', '-p'): SingleConfiguration(None, DUMMY_PASSPHRASE, False), |
|
134 |
+ ('--delete',): SingleConfiguration(True, None, False), |
|
135 |
+ ('--delete-globals',): SingleConfiguration(False, None, True), |
|
136 |
+ ('--clear',): SingleConfiguration(False, None, True), |
|
137 |
+ ('--export', '-'): SingleConfiguration(False, None, True), |
|
138 |
+ ('--import', '-'): SingleConfiguration(False, b'{"services": {}}', True), |
|
139 |
+} |
|
140 |
+INTERESTING_OPTION_COMBINATIONS: list[OptionCombination] = [] |
|
141 |
+config: OptionCombination | SingleConfiguration |
|
142 |
+for opt, config in INCOMPATIBLE.items(): |
|
143 |
+ for opt2 in config.other_options: |
|
144 |
+ INTERESTING_OPTION_COMBINATIONS.extend([ |
|
145 |
+ OptionCombination(options=list(opt + opt2), incompatible=True, |
|
146 |
+ needs_service=config.needs_service, |
|
147 |
+ input=config.input, check_success=False), |
|
148 |
+ OptionCombination(options=list(opt2 + opt), incompatible=True, |
|
149 |
+ needs_service=config.needs_service, |
|
150 |
+ input=config.input, check_success=False) |
|
151 |
+ ]) |
|
152 |
+for opt, config in SINGLES.items(): |
|
153 |
+ INTERESTING_OPTION_COMBINATIONS.append( |
|
154 |
+ OptionCombination(options=list(opt), incompatible=False, |
|
155 |
+ needs_service=config.needs_service, |
|
156 |
+ input=config.input, |
|
157 |
+ check_success=config.check_success)) |
|
158 |
+ |
|
159 |
+@contextlib.contextmanager |
|
160 |
+def isolated_config( |
|
161 |
+ monkeypatch: Any, runner: click.testing.CliRunner, config: Any, |
|
162 |
+): |
|
163 |
+ with runner.isolated_filesystem(): |
|
164 |
+ monkeypatch.setenv('HOME', os.getcwd()) |
|
165 |
+ monkeypatch.setenv('USERPROFILE', os.getcwd()) |
|
166 |
+ monkeypatch.delenv(cli.prog_name.replace(' ', '_').upper() + '_PATH', |
|
167 |
+ raising=False) |
|
168 |
+ os.makedirs(os.path.dirname(cli._config_filename()), exist_ok=True) |
|
169 |
+ with open(cli._config_filename(), 'wt') as outfile: |
|
170 |
+ json.dump(config, outfile) |
|
171 |
+ yield |
|
172 |
+ |
|
173 |
+ |
|
174 |
+def test_100_save_bad_config(monkeypatch: Any) -> None: |
|
175 |
+ runner = click.testing.CliRunner() |
|
176 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, config={}): |
|
177 |
+ with pytest.raises(ValueError, match='Invalid vault config'): |
|
178 |
+ cli._save_config(None) # type: ignore |
|
179 |
+ |
|
180 |
+ |
|
181 |
+def test_101_prompt_for_selection_multiple(monkeypatch: Any) -> None: |
|
182 |
+ @click.command() |
|
183 |
+ @click.option('--heading', default='Our menu:') |
|
184 |
+ @click.argument('items', nargs=-1) |
|
185 |
+ def driver(heading, items): |
|
186 |
+ # from https://montypython.fandom.com/wiki/Spam#The_menu |
|
187 |
+ items = items or [ |
|
188 |
+ 'Egg and bacon', |
|
189 |
+ 'Egg, sausage and bacon', |
|
190 |
+ 'Egg and spam', |
|
191 |
+ 'Egg, bacon and spam', |
|
192 |
+ 'Egg, bacon, sausage and spam', |
|
193 |
+ 'Spam, bacon, sausage and spam', |
|
194 |
+ 'Spam, egg, spam, spam, bacon and spam', |
|
195 |
+ 'Spam, spam, spam, egg and spam', |
|
196 |
+ ('Spam, spam, spam, spam, spam, spam, baked beans, ' |
|
197 |
+ 'spam, spam, spam and spam'), |
|
198 |
+ ('Lobster thermidor aux crevettes with a mornay sauce ' |
|
199 |
+ 'garnished with truffle paté, brandy ' |
|
200 |
+ 'and a fried egg on top and spam'), |
|
201 |
+ ] |
|
202 |
+ index = cli._prompt_for_selection(items, heading=heading) |
|
203 |
+ click.echo('A fine choice: ', nl=False) |
|
204 |
+ click.echo(items[index]) |
|
205 |
+ click.echo('(Note: Vikings strictly optional.)') |
|
206 |
+ runner = click.testing.CliRunner(mix_stderr=True) |
|
207 |
+ result = runner.invoke(driver, [], input='9') |
|
208 |
+ assert result.exit_code == 0, 'driver program failed' |
|
209 |
+ assert result.stdout == '''\ |
|
210 |
+Our menu: |
|
211 |
+[1] Egg and bacon |
|
212 |
+[2] Egg, sausage and bacon |
|
213 |
+[3] Egg and spam |
|
214 |
+[4] Egg, bacon and spam |
|
215 |
+[5] Egg, bacon, sausage and spam |
|
216 |
+[6] Spam, bacon, sausage and spam |
|
217 |
+[7] Spam, egg, spam, spam, bacon and spam |
|
218 |
+[8] Spam, spam, spam, egg and spam |
|
219 |
+[9] Spam, spam, spam, spam, spam, spam, baked beans, spam, spam, spam and spam |
|
220 |
+[10] Lobster thermidor aux crevettes with a mornay sauce garnished with truffle paté, brandy and a fried egg on top and spam |
|
221 |
+Your selection? (1-10, leave empty to abort): 9 |
|
222 |
+A fine choice: Spam, spam, spam, spam, spam, spam, baked beans, spam, spam, spam and spam |
|
223 |
+(Note: Vikings strictly optional.) |
|
224 |
+''', 'driver program produced unexpected output' |
|
225 |
+ result = runner.invoke(driver, ['--heading='], input='', |
|
226 |
+ catch_exceptions=True) |
|
227 |
+ assert result.exit_code > 0, 'driver program succeeded?!' |
|
228 |
+ assert result.stdout == '''\ |
|
229 |
+[1] Egg and bacon |
|
230 |
+[2] Egg, sausage and bacon |
|
231 |
+[3] Egg and spam |
|
232 |
+[4] Egg, bacon and spam |
|
233 |
+[5] Egg, bacon, sausage and spam |
|
234 |
+[6] Spam, bacon, sausage and spam |
|
235 |
+[7] Spam, egg, spam, spam, bacon and spam |
|
236 |
+[8] Spam, spam, spam, egg and spam |
|
237 |
+[9] Spam, spam, spam, spam, spam, spam, baked beans, spam, spam, spam and spam |
|
238 |
+[10] Lobster thermidor aux crevettes with a mornay sauce garnished with truffle paté, brandy and a fried egg on top and spam |
|
239 |
+Your selection? (1-10, leave empty to abort): \n''', ( |
|
240 |
+ 'driver program produced unexpected output' |
|
241 |
+ ) |
|
242 |
+ assert isinstance(result.exception, IndexError), ( |
|
243 |
+ 'driver program did not raise IndexError?!' |
|
244 |
+ ) |
|
245 |
+ |
|
246 |
+ |
|
247 |
+def test_102_prompt_for_selection_single(monkeypatch: Any) -> None: |
|
248 |
+ @click.command() |
|
249 |
+ @click.option('--item', default='baked beans') |
|
250 |
+ @click.argument('prompt') |
|
251 |
+ def driver(item, prompt): |
|
252 |
+ try: |
|
253 |
+ cli._prompt_for_selection([item], heading='', |
|
254 |
+ single_choice_prompt=prompt) |
|
255 |
+ except IndexError as e: |
|
256 |
+ click.echo('Boo.') |
|
257 |
+ raise e |
|
258 |
+ else: |
|
259 |
+ click.echo('Great!') |
|
260 |
+ runner = click.testing.CliRunner(mix_stderr=True) |
|
261 |
+ result = runner.invoke(driver, ['Will replace with spam. Confirm, y/n?'], |
|
262 |
+ input='y') |
|
263 |
+ assert result.exit_code == 0, 'driver program failed' |
|
264 |
+ assert result.stdout == '''\ |
|
265 |
+[1] baked beans |
|
266 |
+Will replace with spam. Confirm, y/n? y |
|
267 |
+Great! |
|
268 |
+''', 'driver program produced unexpected output' |
|
269 |
+ result = runner.invoke(driver, |
|
270 |
+ ['Will replace with spam, okay? ' + |
|
271 |
+ '(Please say "y" or "n".)'], |
|
272 |
+ input='') |
|
273 |
+ assert result.exit_code > 0, 'driver program succeeded?!' |
|
274 |
+ assert result.stdout == '''\ |
|
275 |
+[1] baked beans |
|
276 |
+Will replace with spam, okay? (Please say "y" or "n".): |
|
277 |
+Boo. |
|
278 |
+''', 'driver program produced unexpected output' |
|
279 |
+ assert isinstance(result.exception, IndexError), ( |
|
280 |
+ 'driver program did not raise IndexError?!' |
|
281 |
+ ) |
|
282 |
+ |
|
283 |
+ |
|
284 |
+def test_103_prompt_for_passphrase(monkeypatch: Any) -> None: |
|
285 |
+ monkeypatch.setattr(click, 'prompt', |
|
286 |
+ lambda *a, **kw: json.dumps({'args': a, 'kwargs': kw})) |
|
287 |
+ res = json.loads(cli._prompt_for_passphrase()) |
|
288 |
+ assert 'args' in res and 'kwargs' in res, ( |
|
289 |
+ 'missing arguments to passphrase prompt' |
|
290 |
+ ) |
|
291 |
+ assert res['args'][:1] == ['Passphrase'], ( |
|
292 |
+ 'missing arguments to passphrase prompt' |
|
293 |
+ ) |
|
294 |
+ assert (res['kwargs'].get('default') == '' |
|
295 |
+ and not res['kwargs'].get('show_default', True)), ( |
|
296 |
+ 'missing arguments to passphrase prompt' |
|
297 |
+ ) |
|
298 |
+ assert res['kwargs'].get('err') and res['kwargs'].get('hide_input'), ( |
|
299 |
+ 'missing arguments to passphrase prompt' |
|
300 |
+ ) |
|
301 |
+ |
|
12 | 302 |
|
13 | 303 |
def test_200_help_output(): |
14 | 304 |
runner = click.testing.CliRunner(mix_stderr=False) |
15 |
- result = runner.invoke(derivepassphrase.cli.derivepassphrase, ['--help']) |
|
305 |
+ result = runner.invoke(cli.derivepassphrase, ['--help'], |
|
306 |
+ catch_exceptions=False) |
|
16 | 307 |
assert result.exit_code == 0 |
17 | 308 |
assert 'Password generation:\n' in result.output, ( |
18 | 309 |
'Option groups not respected in help text.' |
... | ... |
@@ -21,36 +312,20 @@ def test_200_help_output(): |
21 | 312 |
'Option group epilog not printed.' |
22 | 313 |
) |
23 | 314 |
|
24 |
-@pytest.mark.parametrize(['option'], |
|
25 |
- [('--lower',), ('--upper',), ('--number',), |
|
26 |
- ('--space',), ('--dash',), ('--symbol',), |
|
27 |
- ('--repeat',), ('--length',)]) |
|
28 |
-def test_201_invalid_argument_range(option): |
|
29 |
- runner = click.testing.CliRunner(mix_stderr=False) |
|
30 |
- result = runner.invoke(derivepassphrase.cli.derivepassphrase, |
|
31 |
- [option, '-42', '-p', DUMMY_SERVICE], |
|
32 |
- input=DUMMY_PASSPHRASE) |
|
33 |
- assert result.exit_code > 0, ( |
|
34 |
- f'program unexpectedly succeeded' |
|
35 |
- ) |
|
36 |
- assert result.stderr_bytes, ( |
|
37 |
- f'program did not print any error message' |
|
38 |
- ) |
|
39 |
- assert b'Error: Invalid value' in result.stderr_bytes, ( |
|
40 |
- f'program did not print the expected error message' |
|
41 |
- ) |
|
42 |
- |
|
43 | 315 |
@pytest.mark.parametrize(['charset_name'], |
44 | 316 |
[('lower',), ('upper',), ('number',), ('space',), |
45 | 317 |
('dash',), ('symbol',)]) |
46 |
-@pytest.mark.xfail(reason='implementation not written yet') |
|
47 |
-def test_202_disable_character_set(charset_name): |
|
318 |
+def test_201_disable_character_set( |
|
319 |
+ monkeypatch: Any, charset_name: str |
|
320 |
+) -> None: |
|
321 |
+ monkeypatch.setattr(cli, '_prompt_for_passphrase', |
|
322 |
+ lambda *a, **kw: DUMMY_PASSPHRASE.decode('UTF-8')) |
|
48 | 323 |
option = f'--{charset_name}' |
49 |
- charset = derivepassphrase.Vault._CHARSETS[charset_name].decode('ascii') |
|
324 |
+ charset = dpp.Vault._CHARSETS[charset_name].decode('ascii') |
|
50 | 325 |
runner = click.testing.CliRunner(mix_stderr=False) |
51 |
- result = runner.invoke(derivepassphrase.cli.derivepassphrase, |
|
326 |
+ result = runner.invoke(cli.derivepassphrase, |
|
52 | 327 |
[option, '0', '-p', DUMMY_SERVICE], |
53 |
- input=DUMMY_PASSPHRASE) |
|
328 |
+ input=DUMMY_PASSPHRASE, catch_exceptions=False) |
|
54 | 329 |
assert result.exit_code == 0, ( |
55 | 330 |
f'program died unexpectedly with exit code {result.exit_code}' |
56 | 331 |
) |
... | ... |
@@ -63,12 +338,13 @@ def test_202_disable_character_set(charset_name): |
63 | 338 |
f'{result.stdout!r}' |
64 | 339 |
) |
65 | 340 |
|
66 |
-@pytest.mark.xfail(reason='implementation not written yet') |
|
67 |
-def test_203_disable_repetition(): |
|
341 |
+def test_202_disable_repetition(monkeypatch: Any) -> None: |
|
342 |
+ monkeypatch.setattr(cli, '_prompt_for_passphrase', |
|
343 |
+ lambda *a, **kw: DUMMY_PASSPHRASE.decode('UTF-8')) |
|
68 | 344 |
runner = click.testing.CliRunner(mix_stderr=False) |
69 |
- result = runner.invoke(derivepassphrase.cli.derivepassphrase, |
|
345 |
+ result = runner.invoke(cli.derivepassphrase, |
|
70 | 346 |
['--repeat', '0', '-p', DUMMY_SERVICE], |
71 |
- input=DUMMY_PASSPHRASE) |
|
347 |
+ input=DUMMY_PASSPHRASE, catch_exceptions=False) |
|
72 | 348 |
assert result.exit_code == 0, ( |
73 | 349 |
f'program died unexpectedly with exit code {result.exit_code}' |
74 | 350 |
) |
... | ... |
@@ -78,6 +354,544 @@ def test_203_disable_repetition(): |
78 | 354 |
passphrase = result.stdout.rstrip('\r\n') |
79 | 355 |
for i in range(len(passphrase) - 1): |
80 | 356 |
assert passphrase[i:i+1] != passphrase[i+1:i+2], ( |
81 |
- f'derived password contains repeated character at position {i}: ' |
|
82 |
- f'{result.stdout!r}' |
|
357 |
+ f'derived password contains repeated character ' |
|
358 |
+ f'at position {i}: {result.stdout!r}' |
|
359 |
+ ) |
|
360 |
+ |
|
361 |
+@pytest.mark.parametrize(['command_line', 'config', 'result_config'], [ |
|
362 |
+ (['--delete-globals'], |
|
363 |
+ {'global': {'phrase': 'abc'}, 'services': {}}, {'services': {}}), |
|
364 |
+ (['--delete', DUMMY_SERVICE], |
|
365 |
+ {'global': {'phrase': 'abc'}, |
|
366 |
+ 'services': {DUMMY_SERVICE: {'notes': '...'}}}, |
|
367 |
+ {'global': {'phrase': 'abc'}, 'services': {}}), |
|
368 |
+ (['--clear'], |
|
369 |
+ {'global': {'phrase': 'abc'}, |
|
370 |
+ 'services': {DUMMY_SERVICE: {'notes': '...'}}}, |
|
371 |
+ {'services': {}}), |
|
372 |
+]) |
|
373 |
+def test_203_repeated_config_deletion( |
|
374 |
+ monkeypatch: Any, command_line: list[str], |
|
375 |
+ config: dpp.types.VaultConfig, result_config: dpp.types.VaultConfig, |
|
376 |
+) -> None: |
|
377 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
378 |
+ for start_config in [config, result_config]: |
|
379 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
380 |
+ config=start_config): |
|
381 |
+ result = runner.invoke(cli.derivepassphrase, command_line, |
|
382 |
+ catch_exceptions=False) |
|
383 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
384 |
+ 'program exited with failure' |
|
385 |
+ ) |
|
386 |
+ with open(cli._config_filename(), 'rt') as infile: |
|
387 |
+ config_readback = json.load(infile) |
|
388 |
+ assert config_readback == result_config |
|
389 |
+ |
|
390 |
+def test_204_phrase_from_key_manually() -> None: |
|
391 |
+ assert ( |
|
392 |
+ dpp.Vault(phrase=DUMMY_PHRASE_FROM_KEY1, **DUMMY_CONFIG_SETTINGS) |
|
393 |
+ .generate(DUMMY_SERVICE) == DUMMY_RESULT_KEY1 |
|
394 |
+ ) |
|
395 |
+ |
|
396 |
+@pytest.mark.parametrize(['config'], [ |
|
397 |
+ pytest.param({'global': {'key': DUMMY_KEY1}, |
|
398 |
+ 'services': {DUMMY_SERVICE: DUMMY_CONFIG_SETTINGS}}, |
|
399 |
+ id='global'), |
|
400 |
+ pytest.param({'global': {'phrase': DUMMY_PASSPHRASE.rstrip(b'\n').decode('ASCII')}, |
|
401 |
+ 'services': {DUMMY_SERVICE: {'key': DUMMY_KEY1, |
|
402 |
+ **DUMMY_CONFIG_SETTINGS}}}, |
|
403 |
+ id='service'), |
|
404 |
+]) |
|
405 |
+def test_204a_key_from_config( |
|
406 |
+ monkeypatch: Any, config: dpp.types.VaultConfig, |
|
407 |
+) -> None: |
|
408 |
+ def phrase_from_key(key: bytes) -> bytes: |
|
409 |
+ if key == base64.standard_b64decode(DUMMY_KEY1): # pragma: no branch |
|
410 |
+ return DUMMY_PHRASE_FROM_KEY1 |
|
411 |
+ raise KeyError(key) # pragma: no cover |
|
412 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
413 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
414 |
+ config=config): |
|
415 |
+ monkeypatch.setattr(dpp.Vault, 'phrase_from_key', |
|
416 |
+ phrase_from_key) |
|
417 |
+ result = runner.invoke(cli.derivepassphrase, [DUMMY_SERVICE], |
|
418 |
+ catch_exceptions=False) |
|
419 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
420 |
+ 'program exited with failure' |
|
421 |
+ ) |
|
422 |
+ assert result.stdout_bytes.rstrip(b'\n') != DUMMY_RESULT_PASSPHRASE, ( |
|
423 |
+ 'program generated unexpected result (phrase instead of key)' |
|
424 |
+ ) |
|
425 |
+ assert result.stdout_bytes.rstrip(b'\n') == DUMMY_RESULT_KEY1, ( |
|
426 |
+ 'program generated unexpected result (wrong settings?)' |
|
427 |
+ ) |
|
428 |
+ |
|
429 |
+def test_204b_key_from_command_line(monkeypatch: Any) -> None: |
|
430 |
+ KeyCommentPair = ssh_agent_client.types.KeyCommentPair |
|
431 |
+ key_list = [ |
|
432 |
+ KeyCommentPair(base64.standard_b64decode(DUMMY_KEY1), b'no comment'), |
|
433 |
+ KeyCommentPair(base64.standard_b64decode(DUMMY_KEY2), b'a comment'), |
|
434 |
+ ] |
|
435 |
+ def _suitable_ssh_keys(conn: Any) -> Iterator[KeyCommentPair]: |
|
436 |
+ yield from key_list |
|
437 |
+ def phrase_from_key(key: bytes) -> bytes: |
|
438 |
+ if key == base64.standard_b64decode(DUMMY_KEY1): # pragma: no branch |
|
439 |
+ return DUMMY_PHRASE_FROM_KEY1 |
|
440 |
+ raise KeyError(key) # pragma: no cover |
|
441 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
442 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
443 |
+ config={'services': {DUMMY_SERVICE: DUMMY_CONFIG_SETTINGS}}): |
|
444 |
+ monkeypatch.setattr(cli, '_get_suitable_ssh_keys', _suitable_ssh_keys) |
|
445 |
+ monkeypatch.setattr(dpp.Vault, 'phrase_from_key', |
|
446 |
+ phrase_from_key) |
|
447 |
+ result = runner.invoke(cli.derivepassphrase, |
|
448 |
+ ['-k', DUMMY_SERVICE], |
|
449 |
+ input=b'1\n', catch_exceptions=False) |
|
450 |
+ assert result.exit_code == 0, 'program exited with failure' |
|
451 |
+ assert result.stdout_bytes, 'program output expected' |
|
452 |
+ last_line = result.stdout_bytes.splitlines(True)[-1] |
|
453 |
+ assert last_line.rstrip(b'\n') != DUMMY_RESULT_PASSPHRASE, ( |
|
454 |
+ 'program generated unexpected result (phrase instead of key)' |
|
455 |
+ ) |
|
456 |
+ assert last_line.rstrip(b'\n') == DUMMY_RESULT_KEY1, ( |
|
457 |
+ 'program generated unexpected result (wrong settings?)' |
|
458 |
+ ) |
|
459 |
+ |
|
460 |
+def test_205_service_phrase_if_key_in_global_config(monkeypatch: Any) -> None: |
|
461 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
462 |
+ with isolated_config( |
|
463 |
+ monkeypatch=monkeypatch, runner=runner, |
|
464 |
+ config={ |
|
465 |
+ 'global': {'key': DUMMY_KEY1}, |
|
466 |
+ 'services': { |
|
467 |
+ DUMMY_SERVICE: { |
|
468 |
+ 'phrase': DUMMY_PASSPHRASE.rstrip(b'\n').decode('ASCII'), |
|
469 |
+ **DUMMY_CONFIG_SETTINGS}}} |
|
470 |
+ ): |
|
471 |
+ result = runner.invoke(cli.derivepassphrase, [DUMMY_SERVICE], |
|
472 |
+ catch_exceptions=False) |
|
473 |
+ assert result.exit_code == 0, 'program exited with failure' |
|
474 |
+ assert result.stdout_bytes, 'program output expected' |
|
475 |
+ last_line = result.stdout_bytes.splitlines(True)[-1] |
|
476 |
+ assert last_line.rstrip(b'\n') != DUMMY_RESULT_KEY1, ( |
|
477 |
+ 'program generated unexpected result (key instead of phrase)' |
|
478 |
+ ) |
|
479 |
+ assert last_line.rstrip(b'\n') == DUMMY_RESULT_PASSPHRASE, ( |
|
480 |
+ 'program generated unexpected result (wrong settings?)' |
|
481 |
+ ) |
|
482 |
+ |
|
483 |
+@pytest.mark.parametrize(['option'], |
|
484 |
+ [('--lower',), ('--upper',), ('--number',), |
|
485 |
+ ('--space',), ('--dash',), ('--symbol',), |
|
486 |
+ ('--repeat',), ('--length',)]) |
|
487 |
+def test_210_invalid_argument_range(option: str) -> None: |
|
488 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
489 |
+ value: str | int |
|
490 |
+ for value in '-42', 'invalid': |
|
491 |
+ result = runner.invoke(cli.derivepassphrase, |
|
492 |
+ [option, cast(str, value), '-p', DUMMY_SERVICE], |
|
493 |
+ input=DUMMY_PASSPHRASE, catch_exceptions=False) |
|
494 |
+ assert result.exit_code > 0, ( |
|
495 |
+ 'program unexpectedly succeeded' |
|
496 |
+ ) |
|
497 |
+ assert result.stderr_bytes, ( |
|
498 |
+ 'program did not print any error message' |
|
499 |
+ ) |
|
500 |
+ assert b'Error: Invalid value' in result.stderr_bytes, ( |
|
501 |
+ 'program did not print the expected error message' |
|
502 |
+ ) |
|
503 |
+ |
|
504 |
+@pytest.mark.parametrize(['vfunc', 'input'], [ |
|
505 |
+ (cli._validate_occurrence_constraint, 20), |
|
506 |
+ (cli._validate_length, 20), |
|
507 |
+]) |
|
508 |
+def test_210a_validate_constraints_manually( |
|
509 |
+ vfunc: Callable[[click.Context, click.Parameter, Any], int | None], |
|
510 |
+ input: int, |
|
511 |
+) -> None: |
|
512 |
+ ctx = cli.derivepassphrase.make_context(cli.prog_name, []) |
|
513 |
+ param = cli.derivepassphrase.params[0] |
|
514 |
+ assert vfunc(ctx, param, input) == input |
|
515 |
+ |
|
516 |
+@pytest.mark.parametrize( |
|
517 |
+ ['options', 'service', 'input', 'check_success'], |
|
518 |
+ [(o.options, o.needs_service, o.input, o.check_success) |
|
519 |
+ for o in INTERESTING_OPTION_COMBINATIONS if not o.incompatible], |
|
520 |
+) |
|
521 |
+def test_211_service_needed( |
|
522 |
+ monkeypatch: Any, options: list[str], |
|
523 |
+ service: bool | None, input: bytes | None, check_success: bool, |
|
524 |
+) -> None: |
|
525 |
+ def _prompt1(*args, **kwargs): |
|
526 |
+ """Needed for --config handling.""" |
|
527 |
+ return DUMMY_PASSPHRASE.decode('UTF-8') |
|
528 |
+ monkeypatch.setattr(cli, '_prompt_for_passphrase', _prompt1) |
|
529 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
530 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
531 |
+ config={'global': {'phrase': 'abc'}, |
|
532 |
+ 'services': {}}): |
|
533 |
+ result = runner.invoke(cli.derivepassphrase, |
|
534 |
+ options if service |
|
535 |
+ else options + [DUMMY_SERVICE], |
|
536 |
+ input=input, catch_exceptions=False) |
|
537 |
+ if service is not None: |
|
538 |
+ assert result.exit_code > 0, ( |
|
539 |
+ 'program unexpectedly succeeded' |
|
540 |
+ ) |
|
541 |
+ assert result.stderr_bytes, ( |
|
542 |
+ 'program did not print any error message' |
|
543 |
+ ) |
|
544 |
+ err_msg = (b' requires a SERVICE' if service |
|
545 |
+ else b' does not take a SERVICE argument') |
|
546 |
+ assert err_msg in result.stderr_bytes, ( |
|
547 |
+ 'program did not print the expected error message' |
|
548 |
+ ) |
|
549 |
+ else: |
|
550 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
551 |
+ 'program unexpectedly failed' |
|
552 |
+ ) |
|
553 |
+ if check_success: |
|
554 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
555 |
+ config={'global': {'phrase': 'abc'}, |
|
556 |
+ 'services': {}}): |
|
557 |
+ def _prompt2(*args, **kwargs): |
|
558 |
+ return DUMMY_PASSPHRASE.decode('UTF-8') |
|
559 |
+ monkeypatch.setattr(cli, '_prompt_for_passphrase', _prompt2) |
|
560 |
+ result = runner.invoke(cli.derivepassphrase, |
|
561 |
+ options + [DUMMY_SERVICE] |
|
562 |
+ if service else options, |
|
563 |
+ input=input, catch_exceptions=False) |
|
564 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
565 |
+ 'program unexpectedly failed' |
|
566 |
+ ) |
|
567 |
+ |
|
568 |
+@pytest.mark.parametrize( |
|
569 |
+ ['options', 'service', 'input'], |
|
570 |
+ [(o.options, o.needs_service, o.input) |
|
571 |
+ for o in INTERESTING_OPTION_COMBINATIONS if o.incompatible], |
|
572 |
+) |
|
573 |
+def test_212_incompatible_options( |
|
574 |
+ options: list[str], service: bool | None, input: bytes | None, |
|
575 |
+) -> None: |
|
576 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
577 |
+ result = runner.invoke(cli.derivepassphrase, |
|
578 |
+ options + [DUMMY_SERVICE] if service else options, |
|
579 |
+ input=DUMMY_PASSPHRASE, catch_exceptions=False) |
|
580 |
+ assert result.exit_code > 0, ( |
|
581 |
+ 'program unexpectedly succeeded' |
|
582 |
+ ) |
|
583 |
+ assert result.stderr_bytes, ( |
|
584 |
+ 'program did not print any error message' |
|
585 |
+ ) |
|
586 |
+ assert b'mutually exclusive with ' in result.stderr_bytes, ( |
|
587 |
+ 'program did not print the expected error message' |
|
588 |
+ ) |
|
589 |
+ |
|
590 |
+def test_213_import_bad_config_not_vault_config(monkeypatch: Any) -> None: |
|
591 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
592 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
593 |
+ config={'services': {}}): |
|
594 |
+ result = runner.invoke(cli.derivepassphrase, ['--import', '-'], |
|
595 |
+ input=b'null', catch_exceptions=False) |
|
596 |
+ assert result.exit_code > 0, ( |
|
597 |
+ 'program unexpectedly succeeded' |
|
598 |
+ ) |
|
599 |
+ assert result.stderr_bytes, ( |
|
600 |
+ 'program did not print any error message' |
|
601 |
+ ) |
|
602 |
+ assert b'not a valid config' in result.stderr_bytes, ( |
|
603 |
+ 'program did not print the expected error message' |
|
604 |
+ ) |
|
605 |
+ |
|
606 |
+def test_213a_import_bad_config_not_json_data(monkeypatch: Any) -> None: |
|
607 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
608 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
609 |
+ config={'services': {}}): |
|
610 |
+ result = runner.invoke(cli.derivepassphrase, ['--import', '-'], |
|
611 |
+ input=b'This string is not valid JSON.', |
|
612 |
+ catch_exceptions=False) |
|
613 |
+ assert result.exit_code > 0, ( |
|
614 |
+ 'program unexpectedly succeeded' |
|
615 |
+ ) |
|
616 |
+ assert result.stderr_bytes, ( |
|
617 |
+ 'program did not print any error message' |
|
618 |
+ ) |
|
619 |
+ assert b'cannot decode JSON' in result.stderr_bytes, ( |
|
620 |
+ 'program did not print the expected error message' |
|
621 |
+ ) |
|
622 |
+ |
|
623 |
+def test_213b_import_bad_config_not_a_file(monkeypatch: Any) -> None: |
|
624 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
625 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
626 |
+ config={'services': {}}): |
|
627 |
+ with open(cli._config_filename(), 'wt') as outfile: |
|
628 |
+ print('This string is not valid JSON.', file=outfile) |
|
629 |
+ result = runner.invoke( |
|
630 |
+ cli.derivepassphrase, |
|
631 |
+ ['--import', os.path.dirname(cli._config_filename())], |
|
632 |
+ catch_exceptions=False) |
|
633 |
+ assert result.exit_code > 0, ( |
|
634 |
+ 'program unexpectedly succeeded' |
|
635 |
+ ) |
|
636 |
+ assert result.stderr_bytes, ( |
|
637 |
+ 'program did not print any error message' |
|
638 |
+ ) |
|
639 |
+ # Don't test the actual error message, because it is subject to |
|
640 |
+ # locale settings. TODO: find a way anyway. |
|
641 |
+ |
|
642 |
+def test_214_export_settings_no_stored_settings(monkeypatch: Any) -> None: |
|
643 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
644 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
645 |
+ config={'services': {}}): |
|
646 |
+ try: |
|
647 |
+ os.remove(cli._config_filename()) |
|
648 |
+ except FileNotFoundError: # pragma: no cover |
|
649 |
+ pass |
|
650 |
+ result = runner.invoke(cli.derivepassphrase, ['--export', '-'], |
|
651 |
+ catch_exceptions=False) |
|
652 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
653 |
+ 'program exited with failure' |
|
654 |
+ ) |
|
655 |
+ |
|
656 |
+def test_214a_export_settings_bad_stored_config(monkeypatch: Any) -> None: |
|
657 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
658 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
659 |
+ config={}): |
|
660 |
+ result = runner.invoke(cli.derivepassphrase, ['--export', '-'], |
|
661 |
+ input=b'null', catch_exceptions=False) |
|
662 |
+ assert result.exit_code > 0, ( |
|
663 |
+ 'program unexpectedly succeeded' |
|
664 |
+ ) |
|
665 |
+ assert result.stderr_bytes, ( |
|
666 |
+ 'program did not print any error message' |
|
667 |
+ ) |
|
668 |
+ assert b'cannot load config' in result.stderr_bytes, ( |
|
669 |
+ 'program did not print the expected error message' |
|
670 |
+ ) |
|
671 |
+ |
|
672 |
+def test_214b_export_settings_not_a_file(monkeypatch: Any) -> None: |
|
673 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
674 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
675 |
+ config={'services': {}}): |
|
676 |
+ try: |
|
677 |
+ os.remove(cli._config_filename()) |
|
678 |
+ except FileNotFoundError: # pragma: no cover |
|
679 |
+ pass |
|
680 |
+ os.makedirs(cli._config_filename()) |
|
681 |
+ result = runner.invoke(cli.derivepassphrase, ['--export', '-'], |
|
682 |
+ input=b'null', catch_exceptions=False) |
|
683 |
+ assert result.exit_code > 0, ( |
|
684 |
+ 'program unexpectedly succeeded' |
|
685 |
+ ) |
|
686 |
+ assert result.stderr_bytes, ( |
|
687 |
+ 'program did not print any error message' |
|
688 |
+ ) |
|
689 |
+ assert b'cannot load config' in result.stderr_bytes, ( |
|
690 |
+ 'program did not print the expected error message' |
|
691 |
+ ) |
|
692 |
+ |
|
693 |
+def test_214c_export_settings_target_not_a_file(monkeypatch: Any) -> None: |
|
694 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
695 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
696 |
+ config={'services': {}}): |
|
697 |
+ dname = os.path.dirname(cli._config_filename()) |
|
698 |
+ result = runner.invoke(cli.derivepassphrase, |
|
699 |
+ ['--export', dname], |
|
700 |
+ input=b'null', catch_exceptions=False) |
|
701 |
+ assert result.exit_code > 0, ( |
|
702 |
+ 'program unexpectedly succeeded' |
|
703 |
+ ) |
|
704 |
+ assert result.stderr_bytes, ( |
|
705 |
+ 'program did not print any error message' |
|
706 |
+ ) |
|
707 |
+ assert b'cannot write config' in result.stderr_bytes, ( |
|
708 |
+ 'program did not print the expected error message' |
|
709 |
+ ) |
|
710 |
+ |
|
711 |
+def test_220_edit_notes_successfully(monkeypatch: Any) -> None: |
|
712 |
+ edit_result = ''' |
|
713 |
+ |
|
714 |
+# - - - - - >8 - - - - - >8 - - - - - >8 - - - - - >8 - - - - - |
|
715 |
+contents go here |
|
716 |
+''' |
|
717 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
718 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
719 |
+ config={'global': {'phrase': 'abc'}, |
|
720 |
+ 'services': {}}): |
|
721 |
+ monkeypatch.setattr(click, 'edit', |
|
722 |
+ lambda *a, **kw: edit_result) |
|
723 |
+ result = runner.invoke(cli.derivepassphrase, ['--notes', 'sv'], |
|
724 |
+ catch_exceptions=False) |
|
725 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
726 |
+ 'program exited with failure' |
|
727 |
+ ) |
|
728 |
+ with open(cli._config_filename(), 'rt') as infile: |
|
729 |
+ config = json.load(infile) |
|
730 |
+ assert config == {'global': {'phrase': 'abc'}, |
|
731 |
+ 'services': {'sv': {'notes': 'contents go here'}}} |
|
732 |
+ |
|
733 |
+def test_221_edit_notes_noop(monkeypatch: Any) -> None: |
|
734 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
735 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
736 |
+ config={'global': {'phrase': 'abc'}, |
|
737 |
+ 'services': {}}): |
|
738 |
+ monkeypatch.setattr(click, 'edit', lambda *a, **kw: None) |
|
739 |
+ result = runner.invoke(cli.derivepassphrase, ['--notes', 'sv'], |
|
740 |
+ catch_exceptions=False) |
|
741 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
742 |
+ 'program exited with failure' |
|
743 |
+ ) |
|
744 |
+ with open(cli._config_filename(), 'rt') as infile: |
|
745 |
+ config = json.load(infile) |
|
746 |
+ assert config == {'global': {'phrase': 'abc'}, 'services': {}} |
|
747 |
+ |
|
748 |
+def test_222_edit_notes_marker_removed(monkeypatch: Any) -> None: |
|
749 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
750 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
751 |
+ config={'global': {'phrase': 'abc'}, |
|
752 |
+ 'services': {}}): |
|
753 |
+ monkeypatch.setattr(click, 'edit', lambda *a, **kw: 'long\ntext') |
|
754 |
+ result = runner.invoke(cli.derivepassphrase, ['--notes', 'sv'], |
|
755 |
+ catch_exceptions=False) |
|
756 |
+ assert (result.exit_code, result.stderr_bytes) == (0, b''), ( |
|
757 |
+ 'program exited with failure' |
|
758 |
+ ) |
|
759 |
+ with open(cli._config_filename(), 'rt') as infile: |
|
760 |
+ config = json.load(infile) |
|
761 |
+ assert config == {'global': {'phrase': 'abc'}, |
|
762 |
+ 'services': {'sv': {'notes': 'long\ntext'}}} |
|
763 |
+ |
|
764 |
+def test_223_edit_notes_abort(monkeypatch: Any) -> None: |
|
765 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
766 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
767 |
+ config={'global': {'phrase': 'abc'}, |
|
768 |
+ 'services': {}}): |
|
769 |
+ monkeypatch.setattr(click, 'edit', lambda *a, **kw: '\n\n') |
|
770 |
+ result = runner.invoke(cli.derivepassphrase, ['--notes', 'sv'], |
|
771 |
+ catch_exceptions=False) |
|
772 |
+ assert result.exit_code != 0, 'program unexpectedly succeeded' |
|
773 |
+ assert b'user aborted request' in result.stderr_bytes, ( |
|
774 |
+ 'expected error message missing' |
|
775 |
+ ) |
|
776 |
+ with open(cli._config_filename(), 'rt') as infile: |
|
777 |
+ config = json.load(infile) |
|
778 |
+ assert config == {'global': {'phrase': 'abc'}, 'services': {}} |
|
779 |
+ |
|
780 |
+@pytest.mark.parametrize(['command_line', 'input', 'result_config'], [ |
|
781 |
+ ( |
|
782 |
+ ['--phrase'], |
|
783 |
+ b'my passphrase\n', |
|
784 |
+ {'global': {'phrase': 'my passphrase'}, 'services': {}}, |
|
785 |
+ ), |
|
786 |
+ ( |
|
787 |
+ ['--key'], |
|
788 |
+ b'1\n', |
|
789 |
+ {'global': {'key': DUMMY_KEY1}, 'services': {}}, |
|
790 |
+ ), |
|
791 |
+ ( |
|
792 |
+ ['--phrase', 'sv'], |
|
793 |
+ b'my passphrase\n', |
|
794 |
+ {'global': {'phrase': 'abc'}, |
|
795 |
+ 'services': {'sv': {'phrase': 'my passphrase'}}}, |
|
796 |
+ ), |
|
797 |
+ ( |
|
798 |
+ ['--key', 'sv'], |
|
799 |
+ b'1\n', |
|
800 |
+ {'global': {'phrase': 'abc'}, |
|
801 |
+ 'services': {'sv': {'key': DUMMY_KEY1}}}, |
|
802 |
+ ), |
|
803 |
+ ( |
|
804 |
+ ['--key', '--length', '15', 'sv'], |
|
805 |
+ b'1\n', |
|
806 |
+ {'global': {'phrase': 'abc'}, |
|
807 |
+ 'services': {'sv': {'key': DUMMY_KEY1, 'length': 15}}}, |
|
808 |
+ ), |
|
809 |
+]) |
|
810 |
+def test_224_store_config_good( |
|
811 |
+ monkeypatch: Any, command_line: list[str], input: bytes, |
|
812 |
+ result_config: Any, |
|
813 |
+) -> None: |
|
814 |
+ KeyCommentPair = ssh_agent_client.types.KeyCommentPair |
|
815 |
+ key_list = [ |
|
816 |
+ KeyCommentPair(base64.standard_b64decode(DUMMY_KEY1), b'no comment'), |
|
817 |
+ KeyCommentPair(base64.standard_b64decode(DUMMY_KEY2), b'a comment'), |
|
818 |
+ ] |
|
819 |
+ def _suitable_ssh_keys(conn: Any) -> Iterator[KeyCommentPair]: |
|
820 |
+ yield from key_list |
|
821 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
822 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
823 |
+ config={'global': {'phrase': 'abc'}, |
|
824 |
+ 'services': {}}): |
|
825 |
+ monkeypatch.setattr(cli, '_get_suitable_ssh_keys', _suitable_ssh_keys) |
|
826 |
+ result = runner.invoke(cli.derivepassphrase, |
|
827 |
+ ['--config'] + command_line, |
|
828 |
+ catch_exceptions=False, input=input) |
|
829 |
+ assert result.exit_code == 0, 'program exited with failure' |
|
830 |
+ with open(cli._config_filename(), 'rt') as infile: |
|
831 |
+ config = json.load(infile) |
|
832 |
+ assert config == result_config, ( |
|
833 |
+ 'stored config does not match expectation' |
|
834 |
+ ) |
|
835 |
+ |
|
836 |
+@pytest.mark.parametrize(['command_line', 'input', 'err_text'], [ |
|
837 |
+ ([], b'', b'cannot update global settings without actual settings'), |
|
838 |
+ (['sv'], b'', b'cannot update service settings without actual settings'), |
|
839 |
+ (['--phrase', 'sv'], b'', b'no passphrase given'), |
|
840 |
+ (['--key'], b'', b'no valid SSH key selected'), |
|
841 |
+]) |
|
842 |
+def test_225_store_config_fail( |
|
843 |
+ monkeypatch: Any, command_line: list[str], input: bytes, err_text: str, |
|
844 |
+) -> None: |
|
845 |
+ KeyCommentPair = ssh_agent_client.types.KeyCommentPair |
|
846 |
+ key_list = [ |
|
847 |
+ KeyCommentPair(base64.standard_b64decode(DUMMY_KEY1), b'no comment'), |
|
848 |
+ KeyCommentPair(base64.standard_b64decode(DUMMY_KEY2), b'a comment'), |
|
849 |
+ ] |
|
850 |
+ def _suitable_ssh_keys(conn: Any) -> Iterator[KeyCommentPair]: |
|
851 |
+ yield from key_list |
|
852 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
853 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
854 |
+ config={'global': {'phrase': 'abc'}, |
|
855 |
+ 'services': {}}): |
|
856 |
+ monkeypatch.setattr(cli, '_get_suitable_ssh_keys', _suitable_ssh_keys) |
|
857 |
+ result = runner.invoke(cli.derivepassphrase, |
|
858 |
+ ['--config'] + command_line, |
|
859 |
+ catch_exceptions=False, input=input) |
|
860 |
+ assert result.exit_code != 0, 'program unexpectedly succeeded?!' |
|
861 |
+ assert err_text in result.stderr_bytes, ( |
|
862 |
+ 'expected error message missing' |
|
863 |
+ ) |
|
864 |
+ |
|
865 |
+def test_225a_store_config_fail_manual_no_ssh_key_selection( |
|
866 |
+ monkeypatch: Any, |
|
867 |
+) -> None: |
|
868 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
869 |
+ with isolated_config(monkeypatch=monkeypatch, runner=runner, |
|
870 |
+ config={'global': {'phrase': 'abc'}, |
|
871 |
+ 'services': {}}): |
|
872 |
+ def raiser(): |
|
873 |
+ raise RuntimeError('custom error message') |
|
874 |
+ monkeypatch.setattr(cli, '_select_ssh_key', raiser) |
|
875 |
+ result = runner.invoke(cli.derivepassphrase, ['--key', '--config'], |
|
876 |
+ catch_exceptions=False) |
|
877 |
+ assert result.exit_code != 0, 'program unexpectedly succeeded' |
|
878 |
+ assert b'custom error message' in result.stderr_bytes, ( |
|
879 |
+ 'expected error message missing' |
|
880 |
+ ) |
|
881 |
+ |
|
882 |
+def test_226_no_arguments() -> None: |
|
883 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
884 |
+ result = runner.invoke(cli.derivepassphrase, [], catch_exceptions=False) |
|
885 |
+ assert result.exit_code != 0, 'program unexpectedly succeeded' |
|
886 |
+ assert b'SERVICE is required' in result.stderr_bytes, ( |
|
887 |
+ 'expected error message missing' |
|
888 |
+ ) |
|
889 |
+ |
|
890 |
+def test_226a_no_passphrase_or_key() -> None: |
|
891 |
+ runner = click.testing.CliRunner(mix_stderr=False) |
|
892 |
+ result = runner.invoke(cli.derivepassphrase, [DUMMY_SERVICE], |
|
893 |
+ catch_exceptions=False) |
|
894 |
+ assert result.exit_code != 0, 'program unexpectedly succeeded' |
|
895 |
+ assert b'no passphrase or key given' in result.stderr_bytes, ( |
|
896 |
+ 'expected error message missing' |
|
83 | 897 |
) |
... | ... |
@@ -4,16 +4,25 @@ |
4 | 4 |
|
5 | 5 |
"""Test OpenSSH key loading and signing.""" |
6 | 6 |
|
7 |
+from __future__ import annotations |
|
8 |
+ |
|
9 |
+import click |
|
7 | 10 |
import pytest |
8 | 11 |
|
9 | 12 |
import derivepassphrase |
13 |
+import derivepassphrase.cli |
|
10 | 14 |
import ssh_agent_client |
11 | 15 |
|
12 | 16 |
import base64 |
17 |
+import errno |
|
18 |
+import io |
|
13 | 19 |
import os |
14 | 20 |
import socket |
15 | 21 |
import subprocess |
16 | 22 |
|
23 |
+if not os.environ.get('SSH_AUTH_SOCK'): # pragma: no cover |
|
24 |
+ pytest.skip('no running SSH agent detected', allow_module_level=True) |
|
25 |
+ |
|
17 | 26 |
SUPPORTED = { |
18 | 27 |
'ed25519': { |
19 | 28 |
'private_key': rb'''-----BEGIN OPENSSH PRIVATE KEY----- |
... | ... |
@@ -327,6 +336,14 @@ def test_client_string(input, expected): |
327 | 336 |
string = ssh_agent_client.SSHAgentClient.string |
328 | 337 |
assert bytes(string(input)) == expected |
329 | 338 |
|
339 |
+@pytest.mark.parametrize(['input', 'exc_type', 'exc_pattern'], [ |
|
340 |
+ ('some string', TypeError, 'invalid payload type'), |
|
341 |
+]) |
|
342 |
+def test_client_string_exceptions(input, exc_type, exc_pattern): |
|
343 |
+ string = ssh_agent_client.SSHAgentClient.string |
|
344 |
+ with pytest.raises(exc_type, match=exc_pattern): |
|
345 |
+ string(input) |
|
346 |
+ |
|
330 | 347 |
@pytest.mark.parametrize(['input', 'expected'], [ |
331 | 348 |
(b'\x00\x00\x00\x07ssh-rsa', b'ssh-rsa'), |
332 | 349 |
( |
... | ... |
@@ -396,6 +413,7 @@ def test_sign_data_via_agent(keytype, data_dict): |
396 | 413 |
for k, c in client.list_keys()} |
397 | 414 |
public_key_data = data_dict['public_key_data'] |
398 | 415 |
expected_signature = data_dict['expected_signature'] |
416 |
+ derived_passphrase = data_dict['derived_passphrase'] |
|
399 | 417 |
if public_key_data not in key_comment_pairs: # pragma: no cover |
400 | 418 |
pytest.skip('prerequisite SSH key not loaded') |
401 | 419 |
signature = bytes(client.sign( |
... | ... |
@@ -405,8 +423,8 @@ def test_sign_data_via_agent(keytype, data_dict): |
405 | 423 |
payload=derivepassphrase.Vault._UUID, key=public_key_data)) |
406 | 424 |
assert signature2 == expected_signature, 'SSH signature mismatch' |
407 | 425 |
assert ( |
408 |
- derivepassphrase.Vault.phrase_from_signature(public_key_data) == |
|
409 |
- expected_signature |
|
426 |
+ derivepassphrase.Vault.phrase_from_key(public_key_data) == |
|
427 |
+ derived_passphrase |
|
410 | 428 |
), 'SSH signature mismatch' |
411 | 429 |
|
412 | 430 |
@pytest.mark.parametrize(['keytype', 'data_dict'], list(UNSUITABLE.items())) |
... | ... |
@@ -439,4 +457,142 @@ def test_sign_data_via_agent_unsupported(keytype, data_dict): |
439 | 457 |
payload=derivepassphrase.Vault._UUID, key=public_key_data)) |
440 | 458 |
assert signature != signature2, 'SSH signature repeatable?!' |
441 | 459 |
with pytest.raises(ValueError, match='unsuitable SSH key'): |
442 |
- derivepassphrase.Vault.phrase_from_signature(public_key_data) |
|
460 |
+ derivepassphrase.Vault.phrase_from_key(public_key_data) |
|
461 |
+ |
|
462 |
+@pytest.mark.parametrize(['this_data', 'all_data'], |
|
463 |
+ [(v, tuple(SUPPORTED.values())) |
|
464 |
+ for v in SUPPORTED.values()]) |
|
465 |
+def test_ssh_key_selector(monkeypatch, this_data, all_data): |
|
466 |
+ successfully_uploaded_keys: list[bytes] = [] |
|
467 |
+ for data in all_data: |
|
468 |
+ private_key = data['private_key'] |
|
469 |
+ try: |
|
470 |
+ result = subprocess.run(['ssh-add', '-t', '60', '-q', '-'], |
|
471 |
+ input=private_key, check=True, |
|
472 |
+ capture_output=True) |
|
473 |
+ except subprocess.CalledProcessError as e: |
|
474 |
+ if data == this_data: |
|
475 |
+ pytest.skip( |
|
476 |
+ f"uploading non-optional test key: {e!r}, " |
|
477 |
+ f"stdout={e.stdout!r}, stderr={e.stderr!r}" |
|
478 |
+ ) |
|
479 |
+ else: |
|
480 |
+ successfully_uploaded_keys.append(data['public_key_data']) |
|
481 |
+ index = 1 + successfully_uploaded_keys.index(this_data['public_key_data']) |
|
482 |
+ b64_key = base64.standard_b64encode( |
|
483 |
+ this_data['public_key_data']).decode('ASCII') |
|
484 |
+ n = len(successfully_uploaded_keys) |
|
485 |
+ text = (f'Your selection? (1-{n}, leave empty to abort): {index}\n' |
|
486 |
+ if n > 1 else 'Use this key? yes\n') |
|
487 |
+ |
|
488 |
+ @click.command() |
|
489 |
+ def driver(): |
|
490 |
+ key = derivepassphrase.cli._select_ssh_key() |
|
491 |
+ click.echo(base64.standard_b64encode(key).decode('ASCII')) |
|
492 |
+ |
|
493 |
+ runner = click.testing.CliRunner(mix_stderr=True) |
|
494 |
+ result = runner.invoke(driver, [], |
|
495 |
+ input=(f'{index}\n' if n > 1 else f'yes\n'), |
|
496 |
+ catch_exceptions=True) |
|
497 |
+ assert result.exit_code == 0, 'driver program failed?!' |
|
498 |
+ assert result.stdout.startswith('Suitable SSH keys:\n'), ( |
|
499 |
+ 'missing expected output' |
|
500 |
+ ) |
|
501 |
+ assert text in result.stdout, 'missing expected output' |
|
502 |
+ assert result.stdout.endswith(f'\n{b64_key}\n'), 'missing expected output' |
|
503 |
+ |
|
504 |
+@pytest.mark.parametrize(['conn_hint'], [('none',), ('socket',), ('client',)]) |
|
505 |
+def test_get_suitable_ssh_keys(conn_hint): |
|
506 |
+ hint: ssh_agent_client.SSHAgentClient | socket.socket | None |
|
507 |
+ match conn_hint: |
|
508 |
+ case 'client': |
|
509 |
+ hint = ssh_agent_client.SSHAgentClient() |
|
510 |
+ case 'socket': |
|
511 |
+ hint = socket.socket(family=socket.AF_UNIX) |
|
512 |
+ hint.connect(os.environ['SSH_AUTH_SOCK']) |
|
513 |
+ case _: |
|
514 |
+ assert conn_hint == 'none' |
|
515 |
+ hint = None |
|
516 |
+ exception: type[Exception] | None = None |
|
517 |
+ try: |
|
518 |
+ list(derivepassphrase.cli._get_suitable_ssh_keys(hint)) |
|
519 |
+ except RuntimeError: # pragma: no cover |
|
520 |
+ pass |
|
521 |
+ except Exception as e: # pragma: no cover |
|
522 |
+ exception = e |
|
523 |
+ finally: |
|
524 |
+ assert exception == None, 'exception querying suitable SSH keys' |
|
525 |
+ |
|
526 |
+def test_constructor_no_running_agent(monkeypatch): |
|
527 |
+ monkeypatch.delenv('SSH_AUTH_SOCK', raising=False) |
|
528 |
+ sock = socket.socket(family=socket.AF_UNIX) |
|
529 |
+ with pytest.raises(RuntimeError, match='missing SSH_AUTH_SOCK'): |
|
530 |
+ ssh_agent_client.SSHAgentClient(socket=sock) |
|
531 |
+ |
|
532 |
+def test_constructor_bad_running_agent(monkeypatch): |
|
533 |
+ monkeypatch.setenv('SSH_AUTH_SOCK', os.environ['SSH_AUTH_SOCK'] + '~') |
|
534 |
+ sock = socket.socket(family=socket.AF_UNIX) |
|
535 |
+ with pytest.raises(RuntimeError, match='unusable SSH_AUTH_SOCK'): |
|
536 |
+ ssh_agent_client.SSHAgentClient(socket=sock) |
|
537 |
+ |
|
538 |
+@pytest.mark.parametrize(['response'], [ |
|
539 |
+ (b'\x00\x00',), |
|
540 |
+ (b'\x00\x00\x00\x1f some bytes missing',), |
|
541 |
+]) |
|
542 |
+def test_truncated_server_response(monkeypatch, response): |
|
543 |
+ client = ssh_agent_client.SSHAgentClient() |
|
544 |
+ response_stream = io.BytesIO(response) |
|
545 |
+ class PseudoSocket(object): |
|
546 |
+ pass |
|
547 |
+ pseudo_socket = PseudoSocket() |
|
548 |
+ pseudo_socket.sendall = lambda *a, **kw: None |
|
549 |
+ pseudo_socket.recv = response_stream.read |
|
550 |
+ monkeypatch.setattr(client, '_connection', pseudo_socket) |
|
551 |
+ with pytest.raises(EOFError): |
|
552 |
+ client.request(255, b'') |
|
553 |
+ |
|
554 |
+@pytest.mark.parametrize( |
|
555 |
+ ['response_code', 'response', 'exc_type', 'exc_pattern'], |
|
556 |
+ [ |
|
557 |
+ (255, b'', RuntimeError, 'error return from SSH agent:'), |
|
558 |
+ (12, b'\x00\x00\x00\x01', EOFError, 'truncated response'), |
|
559 |
+ (12, b'\x00\x00\x00\x00abc', RuntimeError, 'overlong response'), |
|
560 |
+ ] |
|
561 |
+) |
|
562 |
+def test_list_keys_error_responses(monkeypatch, response_code, response, |
|
563 |
+ exc_type, exc_pattern): |
|
564 |
+ client = ssh_agent_client.SSHAgentClient() |
|
565 |
+ monkeypatch.setattr(client, 'request', |
|
566 |
+ lambda *a, **kw: (response_code, response)) |
|
567 |
+ with pytest.raises(exc_type, match=exc_pattern): |
|
568 |
+ client.list_keys() |
|
569 |
+ |
|
570 |
+@pytest.mark.parametrize( |
|
571 |
+ ['key', 'check', 'response', 'exc_type', 'exc_pattern'], |
|
572 |
+ [ |
|
573 |
+ ( |
|
574 |
+ b'invalid-key', |
|
575 |
+ True, |
|
576 |
+ (255, b''), |
|
577 |
+ RuntimeError, |
|
578 |
+ 'target SSH key not loaded into agent', |
|
579 |
+ ), |
|
580 |
+ ( |
|
581 |
+ SUPPORTED['ed25519']['public_key_data'], |
|
582 |
+ True, |
|
583 |
+ (255, b''), |
|
584 |
+ RuntimeError, |
|
585 |
+ 'signing data failed:', |
|
586 |
+ ) |
|
587 |
+ ] |
|
588 |
+) |
|
589 |
+def test_sign_error_responses(monkeypatch, key, check, response, exc_type, |
|
590 |
+ exc_pattern): |
|
591 |
+ client = ssh_agent_client.SSHAgentClient() |
|
592 |
+ monkeypatch.setattr(client, 'request', lambda a, b: response) |
|
593 |
+ KeyCommentPair = ssh_agent_client.types.KeyCommentPair |
|
594 |
+ loaded_keys = [KeyCommentPair(v['public_key_data'], b'no comment') |
|
595 |
+ for v in SUPPORTED.values()] |
|
596 |
+ monkeypatch.setattr(client, 'list_keys', lambda: loaded_keys) |
|
597 |
+ with pytest.raises(exc_type, match=exc_pattern): |
|
598 |
+ client.sign(key, b'abc', check_if_key_loaded=check) |
... | ... |
@@ -25,6 +25,7 @@ def test_big_endian_number(sequence, base, expected): |
25 | 25 |
@pytest.mark.parametrize(['exc_type', 'exc_pattern', 'sequence' , 'base'], [ |
26 | 26 |
(ValueError, 'invalid base 3 digit:', [-1], 3), |
27 | 27 |
(ValueError, 'invalid base:', [0], 1), |
28 |
+ (TypeError, 'not an integer:', [0.0, 1.0, 0.0, 1.0], 2), |
|
28 | 29 |
]) |
29 | 30 |
def test_big_endian_number_exceptions(exc_type, exc_pattern, sequence, base): |
30 | 31 |
with pytest.raises(exc_type, match=exc_pattern): |
... | ... |
@@ -103,3 +104,5 @@ def test_internal_generating(): |
103 | 104 |
assert seq._generate_inner(1) == 0 |
104 | 105 |
with pytest.raises(ValueError, match='invalid target range'): |
105 | 106 |
seq._generate_inner(0) |
107 |
+ with pytest.raises(ValueError, match='invalid base:'): |
|
108 |
+ seq._generate_inner(16, base=1) |
|
106 | 109 |