Consolidate `types` submodules into combined `_types` submodule
Marco Ricci

Marco Ricci commited on 2024-07-28 22:26:40
Zeige 10 geänderte Dateien mit 92 Einfügungen und 116 Löschungen.


Merge the submodules `types` and `ssh_agent.types` into a new submodule
`_types`.  This naming avoids a clash with the standard library, which
in turn causes bizarre import errors when other code tries to import and
then introspect the parent modules.

Due to the new and shorter naming, some module or package imports have
been rephrased, which causes deceivingly large-looking formatting
changes.
... ...
@@ -152,9 +152,10 @@ parametrize-names-type = 'list'
152 152
 
153 153
 [tool.ruff.lint.extend-per-file-ignores]
154 154
 "**/tests/**/*" = [
155
-  'SLF001',
156 155
   'A002',
157 156
   'FBT001',
157
+  'PLC2701',
158
+  'SLF001',
158 159
 ]
159 160
 
160 161
 [tool.hatch.envs.hatch-static-analysis]
... ...
@@ -6,7 +6,8 @@
6 6
 
7 7
 from __future__ import annotations
8 8
 
9
-from typing import TypeGuard
9
+import enum
10
+from typing import NamedTuple, TypeGuard
10 11
 
11 12
 from typing_extensions import (
12 13
     Any,
... ...
@@ -15,10 +16,13 @@ from typing_extensions import (
15 16
     TypedDict,
16 17
 )
17 18
 
18
-import derivepassphrase
19
-
20
-__author__ = derivepassphrase.__author__
21
-__version__ = derivepassphrase.__version__
19
+__all__ = (
20
+    'SSH_AGENT',
21
+    'SSH_AGENTC',
22
+    'KeyCommentPair',
23
+    'VaultConfig',
24
+    'is_vault_config',
25
+)
22 26
 
23 27
 
24 28
 class VaultConfigGlobalSettings(TypedDict, total=False):
... ...
@@ -145,3 +149,46 @@ def is_vault_config(obj: Any) -> TypeGuard[VaultConfig]:
145 149
         if 'key' in service and 'phrase' in service:
146 150
             return False
147 151
     return True
152
+
153
+
154
+class KeyCommentPair(NamedTuple):
155
+    """SSH key plus comment pair.  For typing purposes.
156
+
157
+    Attributes:
158
+        key: SSH key.
159
+        comment: SSH key comment.
160
+
161
+    """
162
+
163
+    key: bytes | bytearray
164
+    comment: bytes | bytearray
165
+
166
+
167
+class SSH_AGENTC(enum.Enum):  # noqa: N801
168
+    """SSH agent protocol numbers: client requests.
169
+
170
+    Attributes:
171
+        REQUEST_IDENTITIES:
172
+            List identities.  Expecting `SSH_AGENT.IDENTITIES_ANSWER`.
173
+        SIGN_REQUEST:
174
+            Sign data.  Expecting `SSH_AGENT.SIGN_RESPONSE`.
175
+
176
+    """
177
+
178
+    REQUEST_IDENTITIES: int = 11
179
+    SIGN_REQUEST: int = 13
180
+
181
+
182
+class SSH_AGENT(enum.Enum):  # noqa: N801
183
+    """SSH agent protocol numbers: server replies.
184
+
185
+    Attributes:
186
+        IDENTITIES_ANSWER:
187
+            Successful answer to `SSH_AGENTC.REQUEST_IDENTITIES`.
188
+        SIGN_RESPONSE:
189
+            Successful answer to `SSH_AGENTC.SIGN_REQUEST`.
190
+
191
+    """
192
+
193
+    IDENTITIES_ANSWER: int = 12
194
+    SIGN_RESPONSE: int = 14
... ...
@@ -27,8 +27,7 @@ from typing_extensions import (
27 27
 )
28 28
 
29 29
 import derivepassphrase as dpp
30
-from derivepassphrase import ssh_agent, vault
31
-from derivepassphrase import types as dpp_types
30
+from derivepassphrase import _types, ssh_agent, vault
32 31
 
33 32
 if TYPE_CHECKING:
34 33
     import pathlib
... ...
@@ -68,7 +67,7 @@ def _config_filename() -> str | bytes | pathlib.Path:
68 67
     return os.path.join(path, 'settings.json')
69 68
 
70 69
 
71
-def _load_config() -> dpp_types.VaultConfig:
70
+def _load_config() -> _types.VaultConfig:
72 71
     """Load a vault(1)-compatible config from the application directory.
73 72
 
74 73
     The filename is obtained via
... ...
@@ -90,12 +89,12 @@ def _load_config() -> dpp_types.VaultConfig:
90 89
     filename = _config_filename()
91 90
     with open(filename, 'rb') as fileobj:
92 91
         data = json.load(fileobj)
93
-    if not dpp_types.is_vault_config(data):
92
+    if not _types.is_vault_config(data):
94 93
         raise ValueError(_INVALID_VAULT_CONFIG)
95 94
     return data
96 95
 
97 96
 
98
-def _save_config(config: dpp_types.VaultConfig, /) -> None:
97
+def _save_config(config: _types.VaultConfig, /) -> None:
99 98
     """Save a vault(1)-compatible config to the application directory.
100 99
 
101 100
     The filename is obtained via
... ...
@@ -113,7 +112,7 @@ def _save_config(config: dpp_types.VaultConfig, /) -> None:
113 112
             The data cannot be stored as a vault(1)-compatible config.
114 113
 
115 114
     """
116
-    if not dpp_types.is_vault_config(config):
115
+    if not _types.is_vault_config(config):
117 116
         raise ValueError(_INVALID_VAULT_CONFIG)
118 117
     filename = _config_filename()
119 118
     filedir = os.path.dirname(os.path.abspath(filename))
... ...
@@ -128,7 +127,7 @@ def _save_config(config: dpp_types.VaultConfig, /) -> None:
128 127
 
129 128
 def _get_suitable_ssh_keys(
130 129
     conn: ssh_agent.SSHAgentClient | socket.socket | None = None, /
131
-) -> Iterator[ssh_agent.types.KeyCommentPair]:
130
+) -> Iterator[_types.KeyCommentPair]:
132 131
     """Yield all SSH keys suitable for passphrase derivation.
133 132
 
134 133
     Suitable SSH keys are queried from the running SSH agent (see
... ...
@@ -845,7 +844,7 @@ def derivepassphrase(
845 844
                     opt_str, f'mutually exclusive with {other_str}', ctx=ctx
846 845
                 )
847 846
 
848
-    def get_config() -> dpp_types.VaultConfig:
847
+    def get_config() -> _types.VaultConfig:
849 848
         try:
850 849
             return _load_config()
851 850
         except FileNotFoundError:
... ...
@@ -853,7 +852,7 @@ def derivepassphrase(
853 852
         except Exception as e:  # noqa: BLE001
854 853
             ctx.fail(f'cannot load config: {e}')
855 854
 
856
-    configuration: dpp_types.VaultConfig
855
+    configuration: _types.VaultConfig
857 856
 
858 857
     check_incompatible_options('--phrase', '--key')
859 858
     for group in (ConfigurationOption, StorageManagementOption):
... ...
@@ -903,7 +902,7 @@ def derivepassphrase(
903 902
         assert service is not None
904 903
         configuration = get_config()
905 904
         text = DEFAULT_NOTES_TEMPLATE + configuration['services'].get(
906
-            service, cast(dpp_types.VaultConfigServicesSettings, {})
905
+            service, cast(_types.VaultConfigServicesSettings, {})
907 906
         ).get('notes', '')
908 907
         notes_value = click.edit(text=text)
909 908
         if notes_value is not None:
... ...
@@ -947,7 +946,7 @@ def derivepassphrase(
947 946
             ctx.fail(f'Cannot load config: cannot decode JSON: {e}')
948 947
         except OSError as e:
949 948
             ctx.fail(f'Cannot load config: {e.strerror}')
950
-        if dpp_types.is_vault_config(maybe_config):
949
+        if _types.is_vault_config(maybe_config):
951 950
             _save_config(maybe_config)
952 951
         else:
953 952
             ctx.fail('not a valid config')
... ...
@@ -1037,7 +1036,7 @@ def derivepassphrase(
1037 1036
                 configuration['services'].setdefault(service, {}).update(view)  # type: ignore[typeddict-item]
1038 1037
             else:
1039 1038
                 configuration.setdefault('global', {}).update(view)  # type: ignore[typeddict-item]
1040
-            assert dpp_types.is_vault_config(
1039
+            assert _types.is_vault_config(
1041 1040
                 configuration
1042 1041
             ), f'invalid vault configuration: {configuration!r}'
1043 1042
             _save_config(configuration)
... ...
@@ -14,7 +14,7 @@ from typing import TYPE_CHECKING
14 14
 
15 15
 from typing_extensions import Self
16 16
 
17
-from derivepassphrase.ssh_agent import types
17
+from derivepassphrase import _types
18 18
 
19 19
 if TYPE_CHECKING:
20 20
     from collections.abc import Sequence
... ...
@@ -273,7 +273,7 @@ class SSHAgentClient:
273 273
             raise EOFError(msg)
274 274
         return response[0], response[1:]
275 275
 
276
-    def list_keys(self) -> Sequence[types.KeyCommentPair]:
276
+    def list_keys(self) -> Sequence[_types.KeyCommentPair]:
277 277
         """Request a list of keys known to the SSH agent.
278 278
 
279 279
         Returns:
... ...
@@ -289,9 +289,9 @@ class SSHAgentClient:
289 289
 
290 290
         """
291 291
         response_code, response = self.request(
292
-            types.SSH_AGENTC.REQUEST_IDENTITIES.value, b''
292
+            _types.SSH_AGENTC.REQUEST_IDENTITIES.value, b''
293 293
         )
294
-        if response_code != types.SSH_AGENT.IDENTITIES_ANSWER.value:
294
+        if response_code != _types.SSH_AGENT.IDENTITIES_ANSWER.value:
295 295
             msg = (
296 296
                 f'error return from SSH agent: '
297 297
                 f'{response_code = }, {response = }'
... ...
@@ -312,7 +312,7 @@ class SSHAgentClient:
312 312
             return bytes(buf)
313 313
 
314 314
         key_count = int.from_bytes(shift(4), 'big')
315
-        keys: collections.deque[types.KeyCommentPair]
315
+        keys: collections.deque[_types.KeyCommentPair]
316 316
         keys = collections.deque()
317 317
         for _ in range(key_count):
318 318
             key_size = int.from_bytes(shift(4), 'big')
... ...
@@ -320,7 +320,7 @@ class SSHAgentClient:
320 320
             comment_size = int.from_bytes(shift(4), 'big')
321 321
             comment = shift(comment_size)
322 322
             # Both `key` and `comment` are not wrapped as SSH strings.
323
-            keys.append(types.KeyCommentPair(key, comment))
323
+            keys.append(_types.KeyCommentPair(key, comment))
324 324
         if response_stream:
325 325
             raise TrailingDataError
326 326
         return keys
... ...
@@ -378,9 +378,9 @@ class SSHAgentClient:
378 378
         request_data.extend(self.string(payload))
379 379
         request_data.extend(self.uint32(flags))
380 380
         response_code, response = self.request(
381
-            types.SSH_AGENTC.SIGN_REQUEST.value, request_data
381
+            _types.SSH_AGENTC.SIGN_REQUEST.value, request_data
382 382
         )
383
-        if response_code != types.SSH_AGENT.SIGN_RESPONSE.value:
383
+        if response_code != _types.SSH_AGENT.SIGN_RESPONSE.value:
384 384
             msg = f'signing data failed: {response_code = }, {response = }'
385 385
             raise RuntimeError(msg)
386 386
         return self.unstring(response)
... ...
@@ -1,55 +0,0 @@
1
-# SPDX-FileCopyrightText: 2024 Marco Ricci <m@the13thletter.info>
2
-#
3
-# SPDX-License-Identifier: MIT
4
-
5
-"""Common typing declarations for the parent module."""
6
-
7
-from __future__ import annotations
8
-
9
-import enum
10
-from typing import NamedTuple
11
-
12
-__all__ = ('SSH_AGENT', 'SSH_AGENTC', 'KeyCommentPair')
13
-
14
-
15
-class KeyCommentPair(NamedTuple):
16
-    """SSH key plus comment pair.  For typing purposes.
17
-
18
-    Attributes:
19
-        key: SSH key.
20
-        comment: SSH key comment.
21
-
22
-    """
23
-
24
-    key: bytes | bytearray
25
-    comment: bytes | bytearray
26
-
27
-
28
-class SSH_AGENTC(enum.Enum):  # noqa: N801
29
-    """SSH agent protocol numbers: client requests.
30
-
31
-    Attributes:
32
-        REQUEST_IDENTITIES:
33
-            List identities.  Expecting `SSH_AGENT.IDENTITIES_ANSWER`.
34
-        SIGN_REQUEST:
35
-            Sign data.  Expecting `SSH_AGENT.SIGN_RESPONSE`.
36
-
37
-    """
38
-
39
-    REQUEST_IDENTITIES: int = 11
40
-    SIGN_REQUEST: int = 13
41
-
42
-
43
-class SSH_AGENT(enum.Enum):  # noqa: N801
44
-    """SSH agent protocol numbers: server replies.
45
-
46
-    Attributes:
47
-        IDENTITIES_ANSWER:
48
-            Successful answer to `SSH_AGENTC.REQUEST_IDENTITIES`.
49
-        SIGN_RESPONSE:
50
-            Successful answer to `SSH_AGENTC.SIGN_REQUEST`.
51
-
52
-    """
53
-
54
-    IDENTITIES_ANSWER: int = 12
55
-    SIGN_RESPONSE: int = 14
... ...
@@ -12,11 +12,7 @@ from typing import TYPE_CHECKING
12 12
 
13 13
 import pytest
14 14
 
15
-import derivepassphrase
16
-import derivepassphrase.cli
17
-import derivepassphrase.ssh_agent
18
-import derivepassphrase.ssh_agent.types
19
-import derivepassphrase.types
15
+from derivepassphrase import _types, cli
20 16
 
21 17
 __all__ = ()
22 18
 
... ...
@@ -355,11 +351,9 @@ skip_if_no_agent = pytest.mark.skipif(
355 351
 )
356 352
 
357 353
 
358
-def list_keys(
359
-    self: Any = None,
360
-) -> list[derivepassphrase.ssh_agent.types.KeyCommentPair]:
354
+def list_keys(self: Any = None) -> list[_types.KeyCommentPair]:
361 355
     del self  # Unused.
362
-    Pair = derivepassphrase.ssh_agent.types.KeyCommentPair  # noqa: N806
356
+    Pair = _types.KeyCommentPair  # noqa: N806
363 357
     list1 = [
364 358
         Pair(value['public_key_data'], f'{key} test key'.encode('ASCII'))
365 359
         for key, value in SUPPORTED_KEYS.items()
... ...
@@ -371,11 +365,9 @@ def list_keys(
371 365
     return list1 + list2
372 366
 
373 367
 
374
-def list_keys_singleton(
375
-    self: Any = None,
376
-) -> list[derivepassphrase.ssh_agent.types.KeyCommentPair]:
368
+def list_keys_singleton(self: Any = None) -> list[_types.KeyCommentPair]:
377 369
     del self  # Unused.
378
-    Pair = derivepassphrase.ssh_agent.types.KeyCommentPair  # noqa: N806
370
+    Pair = _types.KeyCommentPair  # noqa: N806
379 371
     list1 = [
380 372
         Pair(value['public_key_data'], f'{key} test key'.encode('ASCII'))
381 373
         for key, value in SUPPORTED_KEYS.items()
... ...
@@ -383,11 +375,9 @@ def list_keys_singleton(
383 375
     return list1[:1]
384 376
 
385 377
 
386
-def suitable_ssh_keys(
387
-    conn: Any,
388
-) -> Iterator[derivepassphrase.ssh_agent.types.KeyCommentPair]:
378
+def suitable_ssh_keys(conn: Any) -> Iterator[_types.KeyCommentPair]:
389 379
     del conn  # Unused.
390
-    Pair = derivepassphrase.ssh_agent.types.KeyCommentPair  # noqa: N806
380
+    Pair = _types.KeyCommentPair  # noqa: N806
391 381
     yield from [
392 382
         Pair(DUMMY_KEY1, b'no comment'),
393 383
         Pair(DUMMY_KEY2, b'a comment'),
... ...
@@ -406,21 +396,14 @@ def isolated_config(
406 396
     runner: click.testing.CliRunner,
407 397
     config: Any,
408 398
 ) -> Iterator[None]:
409
-    prog_name = derivepassphrase.cli.PROG_NAME
399
+    prog_name = cli.PROG_NAME
410 400
     env_name = prog_name.replace(' ', '_').upper() + '_PATH'
411 401
     with runner.isolated_filesystem():
412 402
         monkeypatch.setenv('HOME', os.getcwd())
413 403
         monkeypatch.setenv('USERPROFILE', os.getcwd())
414 404
         monkeypatch.delenv(env_name, raising=False)
415
-        os.makedirs(
416
-            os.path.dirname(derivepassphrase.cli._config_filename()),
417
-            exist_ok=True,
418
-        )
419
-        with open(
420
-            derivepassphrase.cli._config_filename(),
421
-            'w',
422
-            encoding='UTF-8',
423
-        ) as outfile:
405
+        os.makedirs(os.path.dirname(cli._config_filename()), exist_ok=True)
406
+        with open(cli._config_filename(), 'w', encoding='UTF-8') as outfile:
424 407
             json.dump(config, outfile)
425 408
         yield
426 409
 
... ...
@@ -17,7 +17,7 @@ from typing_extensions import NamedTuple
17 17
 
18 18
 import derivepassphrase as dpp
19 19
 import tests
20
-from derivepassphrase import cli, ssh_agent
20
+from derivepassphrase import _types, cli, ssh_agent
21 21
 
22 22
 if TYPE_CHECKING:
23 23
     from collections.abc import Callable
... ...
@@ -308,7 +308,7 @@ class TestCLI:
308 308
     def test_204a_key_from_config(
309 309
         self,
310 310
         monkeypatch: Any,
311
-        config: dpp.types.VaultConfig,
311
+        config: _types.VaultConfig,
312 312
     ) -> None:
313 313
         runner = click.testing.CliRunner(mix_stderr=False)
314 314
         with tests.isolated_config(
... ...
@@ -1282,8 +1282,8 @@ Boo.
1282 1282
         self,
1283 1283
         monkeypatch: Any,
1284 1284
         command_line: list[str],
1285
-        config: dpp.types.VaultConfig,
1286
-        result_config: dpp.types.VaultConfig,
1285
+        config: _types.VaultConfig,
1286
+        result_config: _types.VaultConfig,
1287 1287
     ) -> None:
1288 1288
         runner = click.testing.CliRunner(mix_stderr=False)
1289 1289
         for start_config in [config, result_config]:
... ...
@@ -19,7 +19,7 @@ import pytest
19 19
 from typing_extensions import Any
20 20
 
21 21
 import tests
22
-from derivepassphrase import cli, ssh_agent, vault
22
+from derivepassphrase import _types, cli, ssh_agent, vault
23 23
 
24 24
 if TYPE_CHECKING:
25 25
     from collections.abc import Iterator
... ...
@@ -418,7 +418,7 @@ class TestAgentInteraction:
418 418
     ) -> None:
419 419
         client = ssh_agent.SSHAgentClient()
420 420
         monkeypatch.setattr(client, 'request', lambda a, b: response)  # noqa: ARG005
421
-        KeyCommentPair = ssh_agent.types.KeyCommentPair  # noqa: N806
421
+        KeyCommentPair = _types.KeyCommentPair  # noqa: N806
422 422
         loaded_keys = [
423 423
             KeyCommentPair(v['public_key_data'], b'no comment')
424 424
             for v in tests.SUPPORTED_KEYS.values()
... ...
@@ -7,7 +7,7 @@ from __future__ import annotations
7 7
 import pytest
8 8
 from typing_extensions import Any
9 9
 
10
-import derivepassphrase.types
10
+from derivepassphrase import _types
11 11
 
12 12
 
13 13
 @pytest.mark.parametrize(
... ...
@@ -89,7 +89,7 @@ import derivepassphrase.types
89 89
     ],
90 90
 )
91 91
 def test_200_is_vault_config(obj: Any, comment: str) -> None:
92
-    is_vault_config = derivepassphrase.types.is_vault_config
92
+    is_vault_config = _types.is_vault_config
93 93
     assert is_vault_config(obj) == (not comment), (
94 94
         'failed to complain about: ' + comment
95 95
         if comment
... ...
@@ -0,0 +1 @@
1
+Combine and consolidate `derivepassphrase.types` and `derivepassphrase.ssh_agent.types` into a new submodule `derivepassphrase._types`.  Despite the name, the module is public.
0 2