Allow the user to overwrite the current vault configuration
Marco Ricci

Marco Ricci commited on 2024-12-12 11:59:47
Zeige 2 geänderte Dateien mit 68 Einfügungen und 12 Löschungen.


In the "vault" derivation scheme, commands that edit or import
a configuration actually merge with the existing configuration, for
compatibility with the original vault(1).  However, sometimes it is
desired to replace the existing configuration instead, when importing
or when setting a (service or global) configuration.  We introduce a new
`--overwrite-existing` command-line option that signals exactly that,
and a `--merge-existing` option which signals the (previously implicit)
default merging behavior.
... ...
@@ -1299,6 +1299,12 @@ class StorageManagementOption(OptionGroupOption):
1299 1299
     """
1300 1300
 
1301 1301
 
1302
+class CompatibilityOption(OptionGroupOption):
1303
+    """Compatibility and incompatibility options for the CLI."""
1304
+
1305
+    option_group_name = 'Options concerning compatibility with other tools'
1306
+
1307
+
1302 1308
 def _validate_occurrence_constraint(
1303 1309
     ctx: click.Context,
1304 1310
     param: click.Parameter,
... ...
@@ -1530,6 +1536,13 @@ DEFAULT_NOTES_MARKER = '# - - - - - >8 - - - - -'
1530 1536
     help='import saved settings from file PATH',
1531 1537
     cls=StorageManagementOption,
1532 1538
 )
1539
+@click.option(
1540
+    '--overwrite-existing/--merge-existing',
1541
+    'overwrite_config',
1542
+    default=False,
1543
+    help='overwrite or merge (default) the existing configuration',
1544
+    cls=CompatibilityOption,
1545
+)
1533 1546
 @click.version_option(version=dpp.__version__, prog_name=PROG_NAME)
1534 1547
 @standard_logging_options
1535 1548
 @click.argument('service', required=False)
... ...
@@ -1556,6 +1569,7 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1556 1569
     clear_all_settings: bool = False,
1557 1570
     export_settings: TextIO | pathlib.Path | os.PathLike[str] | None = None,
1558 1571
     import_settings: TextIO | pathlib.Path | os.PathLike[str] | None = None,
1572
+    overwrite_config: bool = False,
1559 1573
 ) -> None:
1560 1574
     """Derive a passphrase using the vault(1) derivation scheme.
1561 1575
 
... ...
@@ -1647,6 +1661,11 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1647 1661
             must be open for reading and yield `str` values.  Otherwise,
1648 1662
             a filename to open for reading.  Using `-` for standard
1649 1663
             input is supported.
1664
+        overwrite_config:
1665
+            Command-line arguments `--overwrite-existing` (True) and
1666
+            `--merge-existing` (False).  Controls whether config saving
1667
+            and config importing overwrite existing configurations, or
1668
+            merge them section-wise instead.
1650 1669
 
1651 1670
     """  # noqa: D301
1652 1671
     logger = logging.getLogger(PROG_NAME)
... ...
@@ -1665,6 +1684,8 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1665 1684
                 group = StorageManagementOption
1666 1685
             elif isinstance(param, LoggingOption):
1667 1686
                 group = LoggingOption
1687
+            elif isinstance(param, CompatibilityOption):
1688
+                group = CompatibilityOption
1668 1689
             elif isinstance(param, OptionGroupOption):
1669 1690
                 raise AssertionError(  # noqa: DOC501,TRY003,TRY004
1670 1691
                     f'Unknown option group for {param!r}'  # noqa: EM102
... ...
@@ -1897,8 +1918,12 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1897 1918
                 cast(dict[str, Any], value),
1898 1919
                 form=form,
1899 1920
             )
1921
+        if overwrite_config:
1922
+            put_config(maybe_config)
1923
+        else:
1900 1924
             configuration = get_config()
1901
-        merged_config: collections.ChainMap[str, Any] = collections.ChainMap(
1925
+            merged_config: collections.ChainMap[str, Any] = (
1926
+                collections.ChainMap(
1902 1927
                     {
1903 1928
                         'services': collections.ChainMap(
1904 1929
                             maybe_config['services'],
... ...
@@ -1912,6 +1937,7 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
1912 1937
                     if 'global' in configuration
1913 1938
                     else {},
1914 1939
                 )
1940
+            )
1915 1941
             new_config: Any = {
1916 1942
                 k: dict(v) if isinstance(v, collections.ChainMap) else v
1917 1943
                 for k, v in sorted(merged_config.items())
... ...
@@ -2022,10 +2048,14 @@ def derivepassphrase_vault(  # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915
2022 2048
                     f'actual settings'
2023 2049
                 )
2024 2050
                 raise click.UsageError(msg)
2025
-            if service:
2026
-                configuration['services'].setdefault(service, {}).update(view)  # type: ignore[typeddict-item]
2027
-            else:
2028
-                configuration.setdefault('global', {}).update(view)  # type: ignore[typeddict-item]
2051
+            subtree: dict[str, Any] = (
2052
+                configuration['services'].setdefault(service, {})  # type: ignore[assignment]
2053
+                if service
2054
+                else configuration.setdefault('global', {})
2055
+            )
2056
+            if overwrite_config:
2057
+                subtree.clear()
2058
+            subtree.update(view)
2029 2059
             assert _types.is_vault_config(
2030 2060
                 configuration
2031 2061
             ), f'Invalid vault configuration: {configuration!r}'
... ...
@@ -2525,19 +2525,26 @@ class ConfigMergingStateMachine(stateful.RuleBasedStateMachine):
2525 2525
 
2526 2526
     @stateful.rule(
2527 2527
         settings_obj=stateful.consumes(settings),
2528
+        overwrite=strategies.booleans(),
2528 2529
     )
2529 2530
     def set_globals(
2530 2531
         self,
2531 2532
         settings_obj: _types.VaultConfigGlobalSettings,
2533
+        overwrite: bool,
2532 2534
     ) -> None:
2533
-        self.current_config['global'] = settings_obj
2535
+        if overwrite:
2536
+            self.current_config['global'] = {}
2537
+        self.current_config.setdefault('global', {}).update(settings_obj)
2534 2538
         assert _types.is_vault_config(self.current_config)
2535 2539
         # NOTE: This relies on settings_obj containing only the keys
2536 2540
         # "length", "repeat", "upper", "lower", "number", "space",
2537 2541
         # "dash" and "symbol".
2538 2542
         _result = self.runner.invoke(
2539 2543
             cli.derivepassphrase_vault,
2540
-            ['--config']
2544
+            [
2545
+                '--config',
2546
+                '--overwrite-existing' if overwrite else '--merge-existing',
2547
+            ]
2541 2548
             + [f'--{key}={value}' for key, value in settings_obj.items()],
2542 2549
             catch_exceptions=False,
2543 2550
         )
... ...
@@ -2556,20 +2563,29 @@ class ConfigMergingStateMachine(stateful.RuleBasedStateMachine):
2556 2563
     @stateful.rule(
2557 2564
         service=known_services,
2558 2565
         settings_obj=stateful.consumes(settings),
2566
+        overwrite=strategies.booleans(),
2559 2567
     )
2560 2568
     def set_service(
2561 2569
         self,
2562 2570
         service: str,
2563 2571
         settings_obj: _types.VaultConfigServicesSettings,
2572
+        overwrite: bool,
2564 2573
     ) -> None:
2565
-        self.current_config['services'][service] = settings_obj
2574
+        if overwrite:
2575
+            self.current_config['services'][service] = {}
2576
+        self.current_config['services'].setdefault(service, {}).update(
2577
+            settings_obj
2578
+        )
2566 2579
         assert _types.is_vault_config(self.current_config)
2567 2580
         # NOTE: This relies on settings_obj containing only the keys
2568 2581
         # "length", "repeat", "upper", "lower", "number", "space",
2569 2582
         # "dash" and "symbol".
2570 2583
         _result = self.runner.invoke(
2571 2584
             cli.derivepassphrase_vault,
2572
-            ['--config']
2585
+            [
2586
+                '--config',
2587
+                '--overwrite-existing' if overwrite else '--merge-existing',
2588
+            ]
2573 2589
             + [f'--{key}={value}' for key, value in settings_obj.items()]
2574 2590
             + ['--', service],
2575 2591
             catch_exceptions=False,
... ...
@@ -2627,13 +2643,23 @@ class ConfigMergingStateMachine(stateful.RuleBasedStateMachine):
2627 2643
 
2628 2644
     @stateful.rule(
2629 2645
         config=stateful.consumes(configurations),
2646
+        overwrite=strategies.booleans(),
2647
+    )
2648
+    def import_configuraton(
2649
+        self,
2650
+        config: _types.VaultConfig,
2651
+        overwrite: bool,
2652
+    ) -> None:
2653
+        self.current_config = (
2654
+            self.fold_configs(config, self.current_config)
2655
+            if not overwrite
2656
+            else config
2630 2657
         )
2631
-    def import_configuraton(self, config: _types.VaultConfig) -> None:
2632
-        self.current_config = self.fold_configs(config, self.current_config)
2633 2658
         assert _types.is_vault_config(self.current_config)
2634 2659
         _result = self.runner.invoke(
2635 2660
             cli.derivepassphrase_vault,
2636
-            ['--import', '-'],
2661
+            ['--import', '-']
2662
+            + (['--overwrite-existing'] if overwrite else []),
2637 2663
             input=json.dumps(config),
2638 2664
             catch_exceptions=False,
2639 2665
         )
2640 2666