Support exporting the `vault` configuration as a POSIX shell script
Marco Ricci

Marco Ricci commited on 2024-12-21 00:23:33
Zeige 3 geänderte Dateien mit 411 Einfügungen und 8 Löschungen.


When exporting `vault` configurations via `--export`, a new option
`--export-as=FORMAT` allows selecting the export format (defaulting to
`json`).  Setting `FORMAT` as `sh` selects the new POSIX shell script
export format, which yields a sh(1)-compatible script to reimport the
existing configuration as a series of calls to the `derivepassphrase`
command.

Because the output is very regular, the test suite also includes an
interpreter specifically for the sh(1) subset emitted during an `sh`
export, on top of the usual adaptations to existing tests for the new
functionality.
... ...
@@ -0,0 +1,6 @@
1
+### Added
2
+
3
+  - `derivepassphrase vault --export` can now also export the current
4
+    configuration as a POSIX `sh` script, using the `--export-as=sh` option.
5
+    The default (and previous behavior) is `--export-as=json`.
6
+
... ...
@@ -18,6 +18,7 @@ import inspect
18 18
 import json
19 19
 import logging
20 20
 import os
21
+import shlex
21 22
 import sys
22 23
 import unicodedata
23 24
 import warnings
... ...
@@ -1361,6 +1362,78 @@ def _key_to_phrase(
1361 1362
         error_callback('Cannot connect to SSH agent: %s', e.strerror)
1362 1363
 
1363 1364
 
1365
+def _print_config_as_sh_script(
1366
+    config: _types.VaultConfig,
1367
+    /,
1368
+    *,
1369
+    outfile: TextIO,
1370
+    prog_name_list: Sequence[str],
1371
+) -> None:
1372
+    service_keys = (
1373
+        'length',
1374
+        'repeat',
1375
+        'lower',
1376
+        'upper',
1377
+        'number',
1378
+        'space',
1379
+        'dash',
1380
+        'symbol',
1381
+    )
1382
+    print('#!/bin/sh -e', file=outfile)
1383
+    print(file=outfile)
1384
+    print(shlex.join([*prog_name_list, '--clear']), file=outfile)
1385
+    sv_obj_pairs: list[
1386
+        tuple[
1387
+            str | None,
1388
+            _types.VaultConfigGlobalSettings
1389
+            | _types.VaultConfigServicesSettings,
1390
+        ],
1391
+    ] = list(config['services'].items())
1392
+    if config.get('global', {}):
1393
+        sv_obj_pairs.insert(0, (None, config['global']))
1394
+    for sv, sv_obj in sv_obj_pairs:
1395
+        this_service_keys = tuple(k for k in service_keys if k in sv_obj)
1396
+        this_other_keys = tuple(k for k in sv_obj if k not in service_keys)
1397
+        if this_other_keys:
1398
+            other_sv_obj = {k: sv_obj[k] for k in this_other_keys}  # type: ignore[literal-required]
1399
+            dumped_config = json.dumps(
1400
+                (
1401
+                    {'services': {sv: other_sv_obj}}
1402
+                    if sv is not None
1403
+                    else {'global': other_sv_obj, 'services': {}}
1404
+                ),
1405
+                ensure_ascii=False,
1406
+                indent=None,
1407
+            )
1408
+            print(
1409
+                shlex.join([*prog_name_list, '--import', '-']) + " <<'HERE'",
1410
+                dumped_config,
1411
+                'HERE',
1412
+                sep='\n',
1413
+                file=outfile,
1414
+            )
1415
+        if not this_service_keys and not this_other_keys and sv:
1416
+            dumped_config = json.dumps(
1417
+                {'services': {sv: {}}},
1418
+                ensure_ascii=False,
1419
+                indent=None,
1420
+            )
1421
+            print(
1422
+                shlex.join([*prog_name_list, '--import', '-']) + " <<'HERE'",
1423
+                dumped_config,
1424
+                'HERE',
1425
+                sep='\n',
1426
+                file=outfile,
1427
+            )
1428
+        elif this_service_keys:
1429
+            tokens = [*prog_name_list, '--config']
1430
+            for key in this_service_keys:
1431
+                tokens.extend([f'--{key}', str(sv_obj[key])])  # type: ignore[literal-required]
1432
+            if sv is not None:
1433
+                tokens.extend(['--', sv])
1434
+            print(shlex.join(tokens), file=outfile)
1435
+
1436
+
1364 1437
 # Concrete option groups used by this command-line interface.
1365 1438
 class PasswordGenerationOption(OptionGroupOption):
1366 1439
     """Password generation options for the CLI."""
... ...
@@ -1657,6 +1730,13 @@ DEFAULT_NOTES_MARKER = '# - - - - - >8 - - - - -'
1657 1730
     ),
1658 1731
     cls=CompatibilityOption,
1659 1732
 )
1733
+@click.option(
1734
+    '--export-as',
1735
+    type=click.Choice(['JSON', 'sh']),
1736
+    default='JSON',
1737
+    help='when exporting, export as JSON (default) or POSIX sh',
1738
+    cls=CompatibilityOption,
1739
+)
1660 1740
 @click.version_option(version=dpp.__version__, prog_name=PROG_NAME)
1661 1741
 @standard_logging_options
1662 1742
 @click.argument('service', required=False)
... ...
@@ -1685,6 +1765,7 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1685 1765
     import_settings: TextIO | pathlib.Path | os.PathLike[str] | None = None,
1686 1766
     overwrite_config: bool = False,
1687 1767
     unset_settings: Sequence[str] = (),
1768
+    export_as: Literal['json', 'sh'] = 'json',
1688 1769
 ) -> None:
1689 1770
     """Derive a passphrase using the vault(1) derivation scheme.
1690 1771
 
... ...
@@ -1785,6 +1866,10 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1785 1866
             Command-line argument `--unset`.  If given together with
1786 1867
             `--config`, unsets the specified settings (in addition to
1787 1868
             any other changes requested).
1869
+        export_as:
1870
+            Command-line argument `--export-as`.  If given together with
1871
+            `--export`, selects the format to export the current
1872
+            configuration as: JSON ("json", default) or POSIX sh ("sh").
1788 1873
 
1789 1874
     """  # noqa: D301
1790 1875
     logger = logging.getLogger(PROG_NAME)
... ...
@@ -2120,6 +2205,23 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
2120 2205
             # and for programmatic use, our caller may want accurate
2121 2206
             # error information.
2122 2207
             with outfile:
2208
+                if export_as == 'sh':
2209
+                    this_ctx = ctx
2210
+                    prog_name_pieces = collections.deque([
2211
+                        this_ctx.info_name or 'vault',
2212
+                    ])
2213
+                    while (
2214
+                        this_ctx.parent is not None
2215
+                        and this_ctx.parent.info_name is not None
2216
+                    ):
2217
+                        prog_name_pieces.appendleft(this_ctx.parent.info_name)
2218
+                        this_ctx = this_ctx.parent
2219
+                    _print_config_as_sh_script(
2220
+                        configuration,
2221
+                        outfile=outfile,
2222
+                        prog_name_list=prog_name_pieces,
2223
+                    )
2224
+                else:
2123 2225
                     json.dump(configuration, outfile)
2124 2226
         except OSError as e:
2125 2227
             err('Cannot store config: %s: %r', e.strerror, e.filename)
... ...
@@ -8,14 +8,16 @@ import base64
8 8
 import contextlib
9 9
 import copy
10 10
 import errno
11
+import io
11 12
 import json
12 13
 import logging
13 14
 import os
15
+import shlex
14 16
 import shutil
15 17
 import socket
16 18
 import textwrap
17 19
 import warnings
18
-from typing import TYPE_CHECKING, NoReturn
20
+from typing import TYPE_CHECKING
19 21
 
20 22
 import click.testing
21 23
 import hypothesis
... ...
@@ -27,7 +29,8 @@ import tests
27 29
 from derivepassphrase import _types, cli, ssh_agent, vault
28 30
 
29 31
 if TYPE_CHECKING:
30
-    from collections.abc import Callable, Iterable
32
+    from collections.abc import Callable, Iterable, Iterator
33
+    from typing import NoReturn
31 34
 
32 35
 DUMMY_SERVICE = tests.DUMMY_SERVICE
33 36
 DUMMY_PASSPHRASE = tests.DUMMY_PASSPHRASE
... ...
@@ -227,6 +230,56 @@ def is_harmless_config_import_warning(record: tuple[str, int, str]) -> bool:
227 230
     return any(tests.warning_emitted(w, [record]) for w in possible_warnings)
228 231
 
229 232
 
233
+def vault_config_exporter_shell_interpreter(  # noqa: C901
234
+    script: str | Iterable[str],
235
+    /,
236
+    *,
237
+    prog_name_list: list[str] | None = None,
238
+    command: click.BaseCommand | None = None,
239
+    runner: click.testing.CliRunner | None = None,
240
+) -> Iterator[click.testing.Result]:
241
+    if isinstance(script, str):  # pragma: no cover
242
+        script = script.splitlines(False)
243
+    if prog_name_list is None:  # pragma: no cover
244
+        prog_name_list = ['derivepassphrase', 'vault']
245
+    if command is None:  # pragma: no cover
246
+        command = cli.derivepassphrase_vault
247
+    if runner is None:  # pragma: no cover
248
+        runner = click.testing.CliRunner(mix_stderr=False)
249
+    n = len(prog_name_list)
250
+    it = iter(script)
251
+    while True:
252
+        try:
253
+            raw_line = next(it)
254
+        except StopIteration:
255
+            break
256
+        else:
257
+            line = shlex.split(raw_line)
258
+        input_buffer: list[str] = []
259
+        if line[:n] != prog_name_list:
260
+            continue
261
+        line[:n] = []
262
+        if line and line[-1] == '<<HERE':
263
+            # naive HERE document support
264
+            while True:
265
+                try:
266
+                    raw_line = next(it)
267
+                except StopIteration as exc:  # pragma: no cover
268
+                    msg = 'incomplete here document'
269
+                    raise EOFError(msg) from exc
270
+                else:
271
+                    if raw_line == 'HERE':
272
+                        break
273
+                    input_buffer.append(raw_line)
274
+            line.pop()
275
+        yield runner.invoke(
276
+            command,
277
+            line,
278
+            catch_exceptions=False,
279
+            input=(''.join(x + '\n' for x in input_buffer) or None),
280
+        )
281
+
282
+
230 283
 class TestCLI:
231 284
     def test_200_help_output(self, monkeypatch: pytest.MonkeyPatch) -> None:
232 285
         runner = click.testing.CliRunner(mix_stderr=False)
... ...
@@ -866,25 +919,45 @@ class TestCLI:
866 919
             error=os.strerror(errno.EISDIR)
867 920
         ), 'expected error exit and known error message'
868 921
 
922
+    @pytest.mark.parametrize(
923
+        'export_options',
924
+        [
925
+            [],
926
+            ['--export-as=sh'],
927
+        ],
928
+    )
869 929
     def test_214_export_settings_no_stored_settings(
870 930
         self,
871 931
         monkeypatch: pytest.MonkeyPatch,
932
+        export_options: list[str],
872 933
     ) -> None:
873 934
         runner = click.testing.CliRunner(mix_stderr=False)
874 935
         with tests.isolated_config(monkeypatch=monkeypatch, runner=runner):
875 936
             with contextlib.suppress(FileNotFoundError):
876 937
                 os.remove(cli._config_filename(subsystem='vault'))
877 938
             _result = runner.invoke(
878
-                cli.derivepassphrase_vault,
879
-                ['--export', '-'],
939
+                # Test parent context navigation by not calling
940
+                # `cli.derivepassphrase_vault` directly.  Used e.g. in
941
+                # the `--export-as=sh` section to autoconstruct the
942
+                # program name correctly.
943
+                cli.derivepassphrase,
944
+                ['vault', '--export', '-', *export_options],
880 945
                 catch_exceptions=False,
881 946
             )
882 947
         result = tests.ReadableResult.parse(_result)
883 948
         assert result.clean_exit(empty_stderr=True), 'expected clean exit'
884 949
 
950
+    @pytest.mark.parametrize(
951
+        'export_options',
952
+        [
953
+            [],
954
+            ['--export-as=sh'],
955
+        ],
956
+    )
885 957
     def test_214a_export_settings_bad_stored_config(
886 958
         self,
887 959
         monkeypatch: pytest.MonkeyPatch,
960
+        export_options: list[str],
888 961
     ) -> None:
889 962
         runner = click.testing.CliRunner(mix_stderr=False)
890 963
         with tests.isolated_vault_config(
... ...
@@ -892,7 +965,7 @@ class TestCLI:
892 965
         ):
893 966
             _result = runner.invoke(
894 967
                 cli.derivepassphrase_vault,
895
-                ['--export', '-'],
968
+                ['--export', '-', *export_options],
896 969
                 input='null',
897 970
                 catch_exceptions=False,
898 971
             )
... ...
@@ -901,9 +974,17 @@ class TestCLI:
901 974
             error='Cannot load config'
902 975
         ), 'expected error exit and known error message'
903 976
 
977
+    @pytest.mark.parametrize(
978
+        'export_options',
979
+        [
980
+            [],
981
+            ['--export-as=sh'],
982
+        ],
983
+    )
904 984
     def test_214b_export_settings_not_a_file(
905 985
         self,
906 986
         monkeypatch: pytest.MonkeyPatch,
987
+        export_options: list[str],
907 988
     ) -> None:
908 989
         runner = click.testing.CliRunner(mix_stderr=False)
909 990
         with tests.isolated_config(monkeypatch=monkeypatch, runner=runner):
... ...
@@ -912,7 +993,7 @@ class TestCLI:
912 993
             os.makedirs(cli._config_filename(subsystem='vault'))
913 994
             _result = runner.invoke(
914 995
                 cli.derivepassphrase_vault,
915
-                ['--export', '-'],
996
+                ['--export', '-', *export_options],
916 997
                 input='null',
917 998
                 catch_exceptions=False,
918 999
             )
... ...
@@ -921,16 +1002,24 @@ class TestCLI:
921 1002
             error='Cannot load config'
922 1003
         ), 'expected error exit and known error message'
923 1004
 
1005
+    @pytest.mark.parametrize(
1006
+        'export_options',
1007
+        [
1008
+            [],
1009
+            ['--export-as=sh'],
1010
+        ],
1011
+    )
924 1012
     def test_214c_export_settings_target_not_a_file(
925 1013
         self,
926 1014
         monkeypatch: pytest.MonkeyPatch,
1015
+        export_options: list[str],
927 1016
     ) -> None:
928 1017
         runner = click.testing.CliRunner(mix_stderr=False)
929 1018
         with tests.isolated_config(monkeypatch=monkeypatch, runner=runner):
930 1019
             dname = cli._config_filename(subsystem=None)
931 1020
             _result = runner.invoke(
932 1021
                 cli.derivepassphrase_vault,
933
-                ['--export', os.fsdecode(dname)],
1022
+                ['--export', os.fsdecode(dname), *export_options],
934 1023
                 input='null',
935 1024
                 catch_exceptions=False,
936 1025
             )
... ...
@@ -939,9 +1028,17 @@ class TestCLI:
939 1028
             error='Cannot store config'
940 1029
         ), 'expected error exit and known error message'
941 1030
 
1031
+    @pytest.mark.parametrize(
1032
+        'export_options',
1033
+        [
1034
+            [],
1035
+            ['--export-as=sh'],
1036
+        ],
1037
+    )
942 1038
     def test_214d_export_settings_settings_directory_not_a_directory(
943 1039
         self,
944 1040
         monkeypatch: pytest.MonkeyPatch,
1041
+        export_options: list[str],
945 1042
     ) -> None:
946 1043
         runner = click.testing.CliRunner(mix_stderr=False)
947 1044
         with tests.isolated_config(monkeypatch=monkeypatch, runner=runner):
... ...
@@ -951,7 +1048,7 @@ class TestCLI:
951 1048
                 print('Obstruction!!', file=outfile)
952 1049
             _result = runner.invoke(
953 1050
                 cli.derivepassphrase_vault,
954
-                ['--export', '-'],
1051
+                ['--export', '-', *export_options],
955 1052
                 input='null',
956 1053
                 catch_exceptions=False,
957 1054
             )
... ...
@@ -2050,6 +2147,204 @@ Boo.
2050 2147
             assert f'FutureWarning: {THE_FUTURE}' in record_tuples[1][2]
2051 2148
             assert f'UserWarning: {JUST_TESTING}' in record_tuples[2][2]
2052 2149
 
2150
+    def _export_as_sh_helper(
2151
+        self,
2152
+        config: Any,
2153
+    ) -> None:
2154
+        prog_name_list = ('derivepassphrase', 'vault')
2155
+        with io.StringIO() as outfile:
2156
+            cli._print_config_as_sh_script(
2157
+                config, outfile=outfile, prog_name_list=prog_name_list
2158
+            )
2159
+            script = outfile.getvalue()
2160
+        runner = click.testing.CliRunner(mix_stderr=False)
2161
+        monkeypatch = pytest.MonkeyPatch()
2162
+        with tests.isolated_vault_config(
2163
+            runner=runner,
2164
+            monkeypatch=monkeypatch,
2165
+            vault_config={'services': {}},
2166
+        ):
2167
+            for _result in vault_config_exporter_shell_interpreter(script):
2168
+                result = tests.ReadableResult.parse(_result)
2169
+                assert result.clean_exit()
2170
+            assert cli._load_config() == config
2171
+
2172
+    @hypothesis.given(
2173
+        global_config_settable=tests.vault_full_service_config(),
2174
+        global_config_importable=strategies.fixed_dictionaries(
2175
+            {},
2176
+            optional={
2177
+                'key': strategies.text(
2178
+                    alphabet=strategies.characters(
2179
+                        min_codepoint=32,
2180
+                        max_codepoint=126,
2181
+                    ),
2182
+                    max_size=128,
2183
+                ),
2184
+                'phrase': strategies.text(
2185
+                    alphabet=strategies.characters(
2186
+                        min_codepoint=32,
2187
+                        max_codepoint=126,
2188
+                    ),
2189
+                    max_size=64,
2190
+                ),
2191
+            },
2192
+        ),
2193
+    )
2194
+    def test_130a_export_as_sh_global(
2195
+        self,
2196
+        global_config_settable: _types.VaultConfigServicesSettings,
2197
+        global_config_importable: _types.VaultConfigServicesSettings,
2198
+    ) -> None:
2199
+        config: _types.VaultConfig = {
2200
+            'global': global_config_settable | global_config_importable,
2201
+            'services': {},
2202
+        }
2203
+        assert _types.clean_up_falsy_vault_config_values(config) is not None
2204
+        assert _types.is_vault_config(config)
2205
+        return self._export_as_sh_helper(config)
2206
+
2207
+    @hypothesis.given(
2208
+        global_config_importable=strategies.fixed_dictionaries(
2209
+            {},
2210
+            optional={
2211
+                'key': strategies.text(
2212
+                    alphabet=strategies.characters(
2213
+                        min_codepoint=32,
2214
+                        max_codepoint=126,
2215
+                    ),
2216
+                    max_size=128,
2217
+                ),
2218
+                'phrase': strategies.text(
2219
+                    alphabet=strategies.characters(
2220
+                        min_codepoint=32,
2221
+                        max_codepoint=126,
2222
+                    ),
2223
+                    max_size=64,
2224
+                ),
2225
+            },
2226
+        ),
2227
+    )
2228
+    def test_130b_export_as_sh_global_only_imports(
2229
+        self,
2230
+        global_config_importable: _types.VaultConfigServicesSettings,
2231
+    ) -> None:
2232
+        config: _types.VaultConfig = {
2233
+            'global': global_config_importable,
2234
+            'services': {},
2235
+        }
2236
+        assert _types.clean_up_falsy_vault_config_values(config) is not None
2237
+        assert _types.is_vault_config(config)
2238
+        if not config['global']:
2239
+            config.pop('global')
2240
+        return self._export_as_sh_helper(config)
2241
+
2242
+    @hypothesis.given(
2243
+        service_name=strategies.text(
2244
+            alphabet=strategies.characters(
2245
+                min_codepoint=32,
2246
+                max_codepoint=126,
2247
+            ),
2248
+            min_size=4,
2249
+            max_size=64,
2250
+        ),
2251
+        service_config_settable=tests.vault_full_service_config(),
2252
+        service_config_importable=strategies.fixed_dictionaries(
2253
+            {},
2254
+            optional={
2255
+                'key': strategies.text(
2256
+                    alphabet=strategies.characters(
2257
+                        min_codepoint=32,
2258
+                        max_codepoint=126,
2259
+                    ),
2260
+                    max_size=128,
2261
+                ),
2262
+                'phrase': strategies.text(
2263
+                    alphabet=strategies.characters(
2264
+                        min_codepoint=32,
2265
+                        max_codepoint=126,
2266
+                    ),
2267
+                    max_size=64,
2268
+                ),
2269
+                'notes': strategies.text(
2270
+                    alphabet=strategies.characters(
2271
+                        min_codepoint=32,
2272
+                        max_codepoint=126,
2273
+                        include_characters=('\n', '\f', '\t'),
2274
+                    ),
2275
+                    max_size=256,
2276
+                ),
2277
+            },
2278
+        ),
2279
+    )
2280
+    def test_130c_export_as_sh_service(
2281
+        self,
2282
+        service_name: str,
2283
+        service_config_settable: _types.VaultConfigServicesSettings,
2284
+        service_config_importable: _types.VaultConfigServicesSettings,
2285
+    ) -> None:
2286
+        config: _types.VaultConfig = {
2287
+            'services': {
2288
+                service_name: (
2289
+                    service_config_settable | service_config_importable
2290
+                ),
2291
+            },
2292
+        }
2293
+        assert _types.clean_up_falsy_vault_config_values(config) is not None
2294
+        assert _types.is_vault_config(config)
2295
+        return self._export_as_sh_helper(config)
2296
+
2297
+    @hypothesis.given(
2298
+        service_name=strategies.text(
2299
+            alphabet=strategies.characters(
2300
+                min_codepoint=32,
2301
+                max_codepoint=126,
2302
+            ),
2303
+            min_size=4,
2304
+            max_size=64,
2305
+        ),
2306
+        service_config_importable=strategies.fixed_dictionaries(
2307
+            {},
2308
+            optional={
2309
+                'key': strategies.text(
2310
+                    alphabet=strategies.characters(
2311
+                        min_codepoint=32,
2312
+                        max_codepoint=126,
2313
+                    ),
2314
+                    max_size=128,
2315
+                ),
2316
+                'phrase': strategies.text(
2317
+                    alphabet=strategies.characters(
2318
+                        min_codepoint=32,
2319
+                        max_codepoint=126,
2320
+                    ),
2321
+                    max_size=64,
2322
+                ),
2323
+                'notes': strategies.text(
2324
+                    alphabet=strategies.characters(
2325
+                        min_codepoint=32,
2326
+                        max_codepoint=126,
2327
+                        include_characters=('\n', '\f', '\t'),
2328
+                    ),
2329
+                    max_size=256,
2330
+                ),
2331
+            },
2332
+        ),
2333
+    )
2334
+    def test_130d_export_as_sh_service_only_imports(
2335
+        self,
2336
+        service_name: str,
2337
+        service_config_importable: _types.VaultConfigServicesSettings,
2338
+    ) -> None:
2339
+        config: _types.VaultConfig = {
2340
+            'services': {
2341
+                service_name: service_config_importable,
2342
+            },
2343
+        }
2344
+        assert _types.clean_up_falsy_vault_config_values(config) is not None
2345
+        assert _types.is_vault_config(config)
2346
+        return self._export_as_sh_helper(config)
2347
+
2053 2348
     @pytest.mark.parametrize(
2054 2349
         ['command_line', 'config', 'result_config'],
2055 2350
         [
2056 2351