Marco Ricci commited on 2025-01-20 18:00:12
Zeige 10 geänderte Dateien mit 175 Einfügungen und 203 Löschungen.
The interface was already supported by all the user-facing code (interestingly enough) and by almost all of the internal code. Using `pathlib.Path` objects directly instead of manually dealing with string or byte string collections does cut down on a lot of the otherwise explicit bookkeeping and context management. Some changes to the tests are necessary because other (or more) functions need to be mocked, or other ways of testing successful or ensuring unsuccessful operations are needed.
... | ... |
@@ -247,9 +247,7 @@ select = [ |
247 | 247 |
'CPY', 'C4', 'DTZ', 'T10', 'DJ', 'EM', 'EXE', 'FA', |
248 | 248 |
'ISC', 'ICN', 'LOG', 'G', 'INP', 'PIE', 'T20', 'PYI', |
249 | 249 |
'PT', 'Q', 'RET', 'SLF', 'SLOT', 'SIM', 'TID', 'TC', |
250 |
- 'INT', 'ARG', |
|
251 |
- # We currently do not use pathlib. Disable 'PTH'. |
|
252 |
- 'TD', |
|
250 |
+ 'INT', 'ARG', 'PTH', 'TD', |
|
253 | 251 |
# We use TODOs and FIXMEs as notes for later, and don't want the |
254 | 252 |
# linter to nag about every occurrence. Disable 'FIX'. |
255 | 253 |
# |
... | ... |
@@ -24,6 +24,7 @@ import functools |
24 | 24 |
import gettext |
25 | 25 |
import inspect |
26 | 26 |
import os |
27 |
+import pathlib |
|
27 | 28 |
import string |
28 | 29 |
import sys |
29 | 30 |
import textwrap |
... | ... |
@@ -48,7 +49,7 @@ PROG_NAME = 'derivepassphrase' |
48 | 49 |
|
49 | 50 |
|
50 | 51 |
def load_translations( |
51 |
- localedirs: list[str] | None = None, |
|
52 |
+ localedirs: list[str | bytes | os.PathLike] | None = None, |
|
52 | 53 |
languages: Sequence[str] | None = None, |
53 | 54 |
class_: type[gettext.NullTranslations] | None = None, |
54 | 55 |
) -> gettext.NullTranslations: # pragma: no cover |
... | ... |
@@ -71,29 +72,36 @@ def load_translations( |
71 | 72 |
Returns: |
72 | 73 |
A (potentially dummy) translation catalog. |
73 | 74 |
|
75 |
+ Raises: |
|
76 |
+ RuntimeError: |
|
77 |
+ `APPDATA` (on Windows) or `XDG_DATA_HOME` (otherwise) is not |
|
78 |
+ set. We attempted to compute the default value, but failed |
|
79 |
+ to determine the home directory. |
|
80 |
+ |
|
74 | 81 |
""" |
75 | 82 |
if localedirs is None: |
76 | 83 |
if sys.platform.startswith('win'): |
77 |
- xdg_data_home = os.environ.get( |
|
78 |
- 'APPDATA', |
|
79 |
- os.path.expanduser('~'), |
|
84 |
+ xdg_data_home = ( |
|
85 |
+ pathlib.Path(os.environ['APPDATA']) |
|
86 |
+ if os.environ.get('APPDATA') |
|
87 |
+ else pathlib.Path('~').expanduser() |
|
80 | 88 |
) |
81 | 89 |
elif os.environ.get('XDG_DATA_HOME'): |
82 |
- xdg_data_home = os.environ['XDG_DATA_HOME'] |
|
90 |
+ xdg_data_home = pathlib.Path(os.environ['XDG_DATA_HOME']) |
|
83 | 91 |
else: |
84 |
- xdg_data_home = os.path.join( |
|
85 |
- os.path.expanduser('~'), '.local', 'share' |
|
92 |
+ xdg_data_home = ( |
|
93 |
+ pathlib.Path('~').expanduser() / '.local' / '.share' |
|
86 | 94 |
) |
87 | 95 |
localedirs = [ |
88 |
- os.path.join(xdg_data_home, 'locale'), |
|
89 |
- os.path.join(sys.prefix, 'share', 'locale'), |
|
90 |
- os.path.join(sys.base_prefix, 'share', 'locale'), |
|
96 |
+ pathlib.Path(xdg_data_home, 'locale'), |
|
97 |
+ pathlib.Path(sys.prefix, 'share', 'locale'), |
|
98 |
+ pathlib.Path(sys.base_prefix, 'share', 'locale'), |
|
91 | 99 |
] |
92 | 100 |
for localedir in localedirs: |
93 | 101 |
with contextlib.suppress(OSError): |
94 | 102 |
return gettext.translation( |
95 | 103 |
PROG_NAME, |
96 |
- localedir=localedir, |
|
104 |
+ localedir=os.fsdecode(localedir), |
|
97 | 105 |
languages=languages, |
98 | 106 |
class_=class_, |
99 | 107 |
) |
... | ... |
@@ -17,6 +17,7 @@ import inspect |
17 | 17 |
import json |
18 | 18 |
import logging |
19 | 19 |
import os |
20 |
+import pathlib |
|
20 | 21 |
import shlex |
21 | 22 |
import sys |
22 | 23 |
import unicodedata |
... | ... |
@@ -50,7 +51,6 @@ else: |
50 | 51 |
import tomli as tomllib |
51 | 52 |
|
52 | 53 |
if TYPE_CHECKING: |
53 |
- import pathlib |
|
54 | 54 |
import socket |
55 | 55 |
import types |
56 | 56 |
from collections.abc import ( |
... | ... |
@@ -1468,7 +1468,7 @@ def derivepassphrase_export_vault( |
1468 | 1468 |
|
1469 | 1469 |
def _config_filename( |
1470 | 1470 |
subsystem: str | None = 'old settings.json', |
1471 |
-) -> str | bytes | pathlib.Path: |
|
1471 |
+) -> pathlib.Path: |
|
1472 | 1472 |
"""Return the filename of the configuration file for the subsystem. |
1473 | 1473 |
|
1474 | 1474 |
The (implicit default) file is currently named `settings.json`, |
... | ... |
@@ -1495,9 +1495,9 @@ def _config_filename( |
1495 | 1495 |
The subsystem will be mandatory to specify. |
1496 | 1496 |
|
1497 | 1497 |
""" |
1498 |
- path: str | bytes | pathlib.Path |
|
1499 |
- path = os.getenv(PROG_NAME.upper() + '_PATH') or click.get_app_dir( |
|
1500 |
- PROG_NAME, force_posix=True |
|
1498 |
+ path = pathlib.Path( |
|
1499 |
+ os.getenv(PROG_NAME.upper() + '_PATH') |
|
1500 |
+ or click.get_app_dir(PROG_NAME, force_posix=True) |
|
1501 | 1501 |
) |
1502 | 1502 |
# Use match/case here once Python 3.9 becomes unsupported. |
1503 | 1503 |
if subsystem is None: |
... | ... |
@@ -1511,7 +1511,7 @@ def _config_filename( |
1511 | 1511 |
else: # pragma: no cover |
1512 | 1512 |
msg = f'Unknown configuration subsystem: {subsystem!r}' |
1513 | 1513 |
raise AssertionError(msg) |
1514 |
- return os.path.join(path, filename) |
|
1514 |
+ return path / filename |
|
1515 | 1515 |
|
1516 | 1516 |
|
1517 | 1517 |
def _load_config() -> _types.VaultConfig: |
... | ... |
@@ -1532,7 +1532,7 @@ def _load_config() -> _types.VaultConfig: |
1532 | 1532 |
|
1533 | 1533 |
""" |
1534 | 1534 |
filename = _config_filename(subsystem='vault') |
1535 |
- with open(filename, 'rb') as fileobj: |
|
1535 |
+ with filename.open('rb') as fileobj: |
|
1536 | 1536 |
data = json.load(fileobj) |
1537 | 1537 |
if not _types.is_vault_config(data): |
1538 | 1538 |
raise ValueError(_INVALID_VAULT_CONFIG) |
... | ... |
@@ -1563,12 +1563,12 @@ def _migrate_and_load_old_config() -> tuple[ |
1563 | 1563 |
""" |
1564 | 1564 |
new_filename = _config_filename(subsystem='vault') |
1565 | 1565 |
old_filename = _config_filename(subsystem='old settings.json') |
1566 |
- with open(old_filename, 'rb') as fileobj: |
|
1566 |
+ with old_filename.open('rb') as fileobj: |
|
1567 | 1567 |
data = json.load(fileobj) |
1568 | 1568 |
if not _types.is_vault_config(data): |
1569 | 1569 |
raise ValueError(_INVALID_VAULT_CONFIG) |
1570 | 1570 |
try: |
1571 |
- os.replace(old_filename, new_filename) |
|
1571 |
+ old_filename.rename(new_filename) |
|
1572 | 1572 |
except OSError as exc: |
1573 | 1573 |
return data, exc |
1574 | 1574 |
else: |
... | ... |
@@ -1591,17 +1591,13 @@ def _save_config(config: _types.VaultConfig, /) -> None: |
1591 | 1591 |
ValueError: |
1592 | 1592 |
The data cannot be stored as a vault(1)-compatible config. |
1593 | 1593 |
|
1594 |
- """ # noqa: DOC501 |
|
1594 |
+ """ |
|
1595 | 1595 |
if not _types.is_vault_config(config): |
1596 | 1596 |
raise ValueError(_INVALID_VAULT_CONFIG) |
1597 | 1597 |
filename = _config_filename(subsystem='vault') |
1598 |
- filedir = os.path.dirname(os.path.abspath(filename)) |
|
1599 |
- try: |
|
1600 |
- os.makedirs(filedir, exist_ok=False) |
|
1601 |
- except FileExistsError: |
|
1602 |
- if not os.path.isdir(filedir): |
|
1603 |
- raise |
|
1604 |
- with open(filename, 'w', encoding='UTF-8') as fileobj: |
|
1598 |
+ filedir = filename.resolve().parent |
|
1599 |
+ filedir.mkdir(parents=True, exist_ok=True) |
|
1600 |
+ with filename.open('w', encoding='UTF-8') as fileobj: |
|
1605 | 1601 |
json.dump(config, fileobj) |
1606 | 1602 |
|
1607 | 1603 |
|
... | ... |
@@ -1622,7 +1618,7 @@ def _load_user_config() -> dict[str, Any]: |
1622 | 1618 |
|
1623 | 1619 |
""" |
1624 | 1620 |
filename = _config_filename(subsystem='user configuration') |
1625 |
- with open(filename, 'rb') as fileobj: |
|
1621 |
+ with filename.open('rb') as fileobj: |
|
1626 | 1622 |
return tomllib.load(fileobj) |
1627 | 1623 |
|
1628 | 1624 |
|
... | ... |
@@ -2489,8 +2485,8 @@ def derivepassphrase_vault( # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915 |
2489 | 2485 |
delete_service_settings: bool = False, |
2490 | 2486 |
delete_globals: bool = False, |
2491 | 2487 |
clear_all_settings: bool = False, |
2492 |
- export_settings: TextIO | pathlib.Path | os.PathLike[str] | None = None, |
|
2493 |
- import_settings: TextIO | pathlib.Path | os.PathLike[str] | None = None, |
|
2488 |
+ export_settings: TextIO | os.PathLike[str] | None = None, |
|
2489 |
+ import_settings: TextIO | os.PathLike[str] | None = None, |
|
2494 | 2490 |
overwrite_config: bool = False, |
2495 | 2491 |
unset_settings: Sequence[str] = (), |
2496 | 2492 |
export_as: Literal['json', 'sh'] = 'json', |
... | ... |
@@ -2676,10 +2672,8 @@ def derivepassphrase_vault( # noqa: C901,PLR0912,PLR0913,PLR0914,PLR0915 |
2676 | 2672 |
backup_config, exc = _migrate_and_load_old_config() |
2677 | 2673 |
except FileNotFoundError: |
2678 | 2674 |
return {'services': {}} |
2679 |
- old_name = os.path.basename( |
|
2680 |
- _config_filename(subsystem='old settings.json') |
|
2681 |
- ) |
|
2682 |
- new_name = os.path.basename(_config_filename(subsystem='vault')) |
|
2675 |
+ old_name = _config_filename(subsystem='old settings.json').name |
|
2676 |
+ new_name = _config_filename(subsystem='vault').name |
|
2683 | 2677 |
deprecation.warning( |
2684 | 2678 |
_msg.TranslatedString( |
2685 | 2679 |
_msg.WarnMsgTemplate.V01_STYLE_CONFIG, |
... | ... |
@@ -8,6 +8,7 @@ from __future__ import annotations |
8 | 8 |
|
9 | 9 |
import importlib |
10 | 10 |
import os |
11 |
+import pathlib |
|
11 | 12 |
from typing import TYPE_CHECKING, Protocol |
12 | 13 |
|
13 | 14 |
import derivepassphrase as dpp |
... | ... |
@@ -34,10 +35,10 @@ class NotAVaultConfigError(ValueError): |
34 | 35 |
|
35 | 36 |
def __init__( |
36 | 37 |
self, |
37 |
- path: str | bytes, |
|
38 |
+ path: str | bytes | os.PathLike, |
|
38 | 39 |
format: str | None = None, # noqa: A002 |
39 | 40 |
) -> None: |
40 |
- self.path = path |
|
41 |
+ self.path = os.fspath(path) |
|
41 | 42 |
self.format = format |
42 | 43 |
|
43 | 44 |
def __str__(self) -> str: # pragma: no cover |
... | ... |
@@ -79,7 +80,7 @@ def get_vault_key() -> bytes: |
79 | 80 |
return username |
80 | 81 |
|
81 | 82 |
|
82 |
-def get_vault_path() -> str | bytes | os.PathLike: |
|
83 |
+def get_vault_path() -> pathlib.Path: |
|
83 | 84 |
"""Automatically determine the vault(1) configuration path. |
84 | 85 |
|
85 | 86 |
Query the `VAULT_PATH` environment variable, or default to |
... | ... |
@@ -96,13 +97,9 @@ def get_vault_path() -> str | bytes | os.PathLike: |
96 | 97 |
manually to the correct value. |
97 | 98 |
|
98 | 99 |
""" |
99 |
- result = os.path.join( |
|
100 |
- os.path.expanduser('~'), os.environ.get('VAULT_PATH', '.vault') |
|
101 |
- ) |
|
102 |
- if result.startswith('~'): |
|
103 |
- msg = 'Cannot determine home directory' |
|
104 |
- raise RuntimeError(msg) |
|
105 |
- return result |
|
100 |
+ return pathlib.Path( |
|
101 |
+ '~', os.environ.get('VAULT_PATH', '.vault') |
|
102 |
+ ).expanduser() |
|
106 | 103 |
|
107 | 104 |
|
108 | 105 |
class ExportVaultConfigDataFunction(Protocol): # pragma: no cover |
... | ... |
@@ -25,12 +25,12 @@ should *not* be used or relied on. |
25 | 25 |
from __future__ import annotations |
26 | 26 |
|
27 | 27 |
import base64 |
28 |
-import fnmatch |
|
29 | 28 |
import importlib |
30 | 29 |
import json |
31 | 30 |
import logging |
32 | 31 |
import os |
33 | 32 |
import os.path |
33 |
+import pathlib |
|
34 | 34 |
import struct |
35 | 35 |
from typing import TYPE_CHECKING, Any |
36 | 36 |
|
... | ... |
@@ -107,6 +107,8 @@ def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 |
107 | 107 |
importlib.import_module('cryptography') |
108 | 108 |
if path is None: |
109 | 109 |
path = exporter.get_vault_path() |
110 |
+ else: |
|
111 |
+ path = pathlib.Path(os.fsdecode(path)) |
|
110 | 112 |
if key is None: |
111 | 113 |
key = exporter.get_vault_key() |
112 | 114 |
if format != 'storeroom': # pragma: no cover |
... | ... |
@@ -115,15 +117,11 @@ def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 |
115 | 117 |
) |
116 | 118 |
raise ValueError(msg) |
117 | 119 |
try: |
118 |
- master_keys_file = open( # noqa: SIM115 |
|
119 |
- os.path.join(os.fsdecode(path), '.keys'), |
|
120 |
+ master_keys_file = pathlib.Path(path, '.keys').open( # noqa: SIM115 |
|
120 | 121 |
encoding='utf-8', |
121 | 122 |
) |
122 | 123 |
except FileNotFoundError as exc: |
123 |
- raise exporter.NotAVaultConfigError( |
|
124 |
- os.fsdecode(path), |
|
125 |
- format='storeroom', |
|
126 |
- ) from exc |
|
124 |
+ raise exporter.NotAVaultConfigError(path, format='storeroom') from exc |
|
127 | 125 |
with master_keys_file: |
128 | 126 |
header = json.loads(master_keys_file.readline()) |
129 | 127 |
if header != {'version': 1}: |
... | ... |
@@ -149,14 +147,7 @@ def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 |
149 | 147 |
|
150 | 148 |
config_structure: dict[str, Any] = {} |
151 | 149 |
json_contents: dict[str, bytes] = {} |
152 |
- # Use glob.glob(..., root_dir=...) here once Python 3.9 becomes |
|
153 |
- # unsupported. |
|
154 |
- storeroom_path_str = os.fsdecode(path) |
|
155 |
- valid_hashdirs = [ |
|
156 |
- hashdir_name |
|
157 |
- for hashdir_name in os.listdir(storeroom_path_str) |
|
158 |
- if fnmatch.fnmatch(hashdir_name, '[01][0-9a-f]') |
|
159 |
- ] |
|
150 |
+ valid_hashdirs = list(path.glob('[01][0-9a-f]')) |
|
160 | 151 |
for file in valid_hashdirs: |
161 | 152 |
logger.info( |
162 | 153 |
_msg.TranslatedString( |
... | ... |
@@ -165,8 +156,7 @@ def export_storeroom_data( # noqa: C901,D417,PLR0912,PLR0914,PLR0915 |
165 | 156 |
) |
166 | 157 |
) |
167 | 158 |
bucket_contents = [ |
168 |
- bytes(item) |
|
169 |
- for item in _decrypt_bucket_file(file, master_keys, root_dir=path) |
|
159 |
+ bytes(item) for item in _decrypt_bucket_file(file, master_keys) |
|
170 | 160 |
] |
171 | 161 |
bucket_index = json.loads(bucket_contents.pop(0)) |
172 | 162 |
for pos, item in enumerate(bucket_index): |
... | ... |
@@ -669,7 +659,7 @@ def _decrypt_bucket_item( |
669 | 659 |
|
670 | 660 |
|
671 | 661 |
def _decrypt_bucket_file( |
672 |
- filename: str, |
|
662 |
+ filename: str | bytes | os.PathLike, |
|
673 | 663 |
master_keys: _types.StoreroomMasterKeys, |
674 | 664 |
*, |
675 | 665 |
root_dir: str | bytes | os.PathLike = '.', |
... | ... |
@@ -705,9 +695,9 @@ def _decrypt_bucket_file( |
705 | 695 |
|
706 | 696 |
""" |
707 | 697 |
master_keys = master_keys.toreadonly() |
708 |
- with open( |
|
709 |
- os.path.join(os.fsdecode(root_dir), filename), 'rb' |
|
710 |
- ) as bucket_file: |
|
698 |
+ root_dir = pathlib.Path(os.fsdecode(root_dir)) |
|
699 |
+ filename = pathlib.Path(os.fsdecode(filename)) |
|
700 |
+ with (root_dir / filename).open('rb') as bucket_file: |
|
711 | 701 |
header_line = bucket_file.readline() |
712 | 702 |
try: |
713 | 703 |
header = json.loads(header_line) |
... | ... |
@@ -31,6 +31,7 @@ import importlib |
31 | 31 |
import json |
32 | 32 |
import logging |
33 | 33 |
import os |
34 |
+import pathlib |
|
34 | 35 |
import warnings |
35 | 36 |
from typing import TYPE_CHECKING |
36 | 37 |
|
... | ... |
@@ -107,7 +108,9 @@ def export_vault_native_data( # noqa: D417 |
107 | 108 |
importlib.import_module('cryptography') |
108 | 109 |
if path is None: |
109 | 110 |
path = exporter.get_vault_path() |
110 |
- with open(path, 'rb') as infile: |
|
111 |
+ else: |
|
112 |
+ path = pathlib.Path(os.fsdecode(path)) |
|
113 |
+ with path.open('rb') as infile: |
|
111 | 114 |
contents = base64.standard_b64decode(infile.read()) |
112 | 115 |
if key is None: |
113 | 116 |
key = exporter.get_vault_key() |
... | ... |
@@ -123,10 +126,7 @@ def export_vault_native_data( # noqa: D417 |
123 | 126 |
try: |
124 | 127 |
return parser_class(contents, key)() |
125 | 128 |
except ValueError as exc: |
126 |
- raise exporter.NotAVaultConfigError( |
|
127 |
- os.fsdecode(path), |
|
128 |
- format=format, |
|
129 |
- ) from exc |
|
129 |
+ raise exporter.NotAVaultConfigError(path, format=format) from exc |
|
130 | 130 |
|
131 | 131 |
|
132 | 132 |
def _h(bs: Buffer) -> str: |
... | ... |
@@ -615,10 +615,7 @@ class VaultNativeV02ConfigParser(VaultNativeConfigParser): |
615 | 615 |
value (if any): |
616 | 616 |
|
617 | 617 |
~~~~ python |
618 |
- |
|
619 |
- data = block_input = b''.join([ |
|
620 |
- previous_block, input_string, salt |
|
621 |
- ]) |
|
618 |
+ data = block_input = b''.join([previous_block, input_string, salt]) |
|
622 | 619 |
for i in range(iteration_count): |
623 | 620 |
data = message_digest(data) |
624 | 621 |
block = data |
... | ... |
@@ -728,7 +724,7 @@ if __name__ == '__main__': |
728 | 724 |
import os |
729 | 725 |
|
730 | 726 |
logging.basicConfig(level=('DEBUG' if os.getenv('DEBUG') else 'WARNING')) |
731 |
- with open(exporter.get_vault_path(), 'rb') as infile: |
|
727 |
+ with exporter.get_vault_path().open('rb') as infile: |
|
732 | 728 |
contents = base64.standard_b64decode(infile.read()) |
733 | 729 |
password = exporter.get_vault_key() |
734 | 730 |
try: |
... | ... |
@@ -12,6 +12,7 @@ import importlib.util |
12 | 12 |
import json |
13 | 13 |
import logging |
14 | 14 |
import os |
15 |
+import pathlib |
|
15 | 16 |
import re |
16 | 17 |
import shlex |
17 | 18 |
import stat |
... | ... |
@@ -1440,18 +1441,16 @@ def isolated_config( |
1440 | 1441 |
stack.enter_context( |
1441 | 1442 |
cli.StandardCLILogging.ensure_standard_warnings_logging() |
1442 | 1443 |
) |
1443 |
- monkeypatch.setenv('HOME', os.getcwd()) |
|
1444 |
- monkeypatch.setenv('USERPROFILE', os.getcwd()) |
|
1444 |
+ cwd = str(pathlib.Path.cwd().resolve()) |
|
1445 |
+ monkeypatch.setenv('HOME', cwd) |
|
1446 |
+ monkeypatch.setenv('USERPROFILE', cwd) |
|
1445 | 1447 |
monkeypatch.delenv(env_name, raising=False) |
1446 | 1448 |
config_dir = cli._config_filename(subsystem=None) |
1447 |
- os.makedirs(config_dir, exist_ok=True) |
|
1449 |
+ config_dir.mkdir(parents=True, exist_ok=True) |
|
1448 | 1450 |
if isinstance(main_config_str, str): |
1449 |
- with open( |
|
1450 |
- cli._config_filename('user configuration'), |
|
1451 |
- 'w', |
|
1452 |
- encoding='UTF-8', |
|
1453 |
- ) as outfile: |
|
1454 |
- outfile.write(main_config_str) |
|
1451 |
+ cli._config_filename('user configuration').write_text( |
|
1452 |
+ main_config_str, encoding='UTF-8' |
|
1453 |
+ ) |
|
1455 | 1454 |
yield |
1456 | 1455 |
|
1457 | 1456 |
|
... | ... |
@@ -1466,7 +1465,7 @@ def isolated_vault_config( |
1466 | 1465 |
monkeypatch=monkeypatch, runner=runner, main_config_str=main_config_str |
1467 | 1466 |
): |
1468 | 1467 |
config_filename = cli._config_filename(subsystem='vault') |
1469 |
- with open(config_filename, 'w', encoding='UTF-8') as outfile: |
|
1468 |
+ with config_filename.open('w', encoding='UTF-8') as outfile: |
|
1470 | 1469 |
json.dump(vault_config, outfile) |
1471 | 1470 |
yield |
1472 | 1471 |
|
... | ... |
@@ -1486,15 +1485,18 @@ def isolated_vault_exporter_config( |
1486 | 1485 |
except AttributeError: |
1487 | 1486 |
|
1488 | 1487 |
@contextlib.contextmanager |
1489 |
- def chdir(newpath: str) -> Iterator[None]: # pragma: no branch |
|
1490 |
- oldpath = os.getcwd() |
|
1488 |
+ def chdir( |
|
1489 |
+ newpath: str | bytes | os.PathLike, |
|
1490 |
+ ) -> Iterator[None]: # pragma: no branch |
|
1491 |
+ oldpath = pathlib.Path.cwd().resolve() |
|
1491 | 1492 |
os.chdir(newpath) |
1492 | 1493 |
yield |
1493 | 1494 |
os.chdir(oldpath) |
1494 | 1495 |
|
1495 | 1496 |
with runner.isolated_filesystem(): |
1496 |
- monkeypatch.setenv('HOME', os.getcwd()) |
|
1497 |
- monkeypatch.setenv('USERPROFILE', os.getcwd()) |
|
1497 |
+ cwd = str(pathlib.Path.cwd().resolve()) |
|
1498 |
+ monkeypatch.setenv('HOME', cwd) |
|
1499 |
+ monkeypatch.setenv('USERPROFILE', cwd) |
|
1498 | 1500 |
monkeypatch.delenv('VAULT_PATH', raising=False) |
1499 | 1501 |
monkeypatch.delenv('VAULT_KEY', raising=False) |
1500 | 1502 |
monkeypatch.delenv('LOGNAME', raising=False) |
... | ... |
@@ -1502,16 +1504,16 @@ def isolated_vault_exporter_config( |
1502 | 1504 |
monkeypatch.delenv('USERNAME', raising=False) |
1503 | 1505 |
if vault_key is not None: |
1504 | 1506 |
monkeypatch.setenv('VAULT_KEY', vault_key) |
1507 |
+ vault_config_path = pathlib.Path('.vault').resolve() |
|
1505 | 1508 |
# Use match/case here once Python 3.9 becomes unsupported. |
1506 | 1509 |
if isinstance(vault_config, str): |
1507 |
- with open('.vault', 'w', encoding='UTF-8') as outfile: |
|
1508 |
- print(vault_config, file=outfile) |
|
1510 |
+ vault_config_path.write_text(f'{vault_config}\n', encoding='UTF-8') |
|
1509 | 1511 |
elif isinstance(vault_config, bytes): |
1510 |
- os.makedirs('.vault', mode=0o700, exist_ok=True) |
|
1512 |
+ vault_config_path.mkdir(parents=True, mode=0o700, exist_ok=True) |
|
1511 | 1513 |
# Use parenthesized context manager expressions here once |
1512 | 1514 |
# Python 3.9 becomes unsupported. |
1513 | 1515 |
with contextlib.ExitStack() as stack: |
1514 |
- stack.enter_context(chdir('.vault')) |
|
1516 |
+ stack.enter_context(chdir(vault_config_path)) |
|
1515 | 1517 |
tmpzipfile = stack.enter_context( |
1516 | 1518 |
tempfile.NamedTemporaryFile(suffix='.zip') |
1517 | 1519 |
) |
... | ... |
@@ -1562,7 +1564,7 @@ def make_file_readonly( |
1562 | 1564 |
and are susceptible to race conditions. |
1563 | 1565 |
|
1564 | 1566 |
""" |
1565 |
- fname: int | str | bytes | os.PathLike[str] |
|
1567 |
+ fname: int | str | bytes | os.PathLike |
|
1566 | 1568 |
if try_race_free_implementation and {os.stat, os.chmod} <= os.supports_fd: |
1567 | 1569 |
fname = os.open( |
1568 | 1570 |
pathname, |
... | ... |
@@ -1573,13 +1575,13 @@ def make_file_readonly( |
1573 | 1575 |
else: |
1574 | 1576 |
fname = pathname |
1575 | 1577 |
try: |
1576 |
- orig_mode = os.stat(fname).st_mode |
|
1578 |
+ orig_mode = os.stat(fname).st_mode # noqa: PTH116 |
|
1577 | 1579 |
new_mode = ( |
1578 | 1580 |
orig_mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH |
1579 | 1581 |
| stat.S_IREAD |
1580 | 1582 |
) |
1581 |
- os.chmod(fname, stat.S_IREAD) |
|
1582 |
- os.chmod(fname, new_mode) |
|
1583 |
+ os.chmod(fname, stat.S_IREAD) # noqa: PTH101 |
|
1584 |
+ os.chmod(fname, new_mode) # noqa: PTH101 |
|
1583 | 1585 |
finally: |
1584 | 1586 |
if isinstance(fname, int): |
1585 | 1587 |
os.close(fname) |
... | ... |
@@ -12,6 +12,7 @@ import io |
12 | 12 |
import json |
13 | 13 |
import logging |
14 | 14 |
import os |
15 |
+import pathlib |
|
15 | 16 |
import shlex |
16 | 17 |
import shutil |
17 | 18 |
import socket |
... | ... |
@@ -926,8 +927,8 @@ class TestCLI: |
926 | 927 |
input=json.dumps(config), |
927 | 928 |
catch_exceptions=False, |
928 | 929 |
) |
929 |
- with open( |
|
930 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
930 |
+ with cli._config_filename(subsystem='vault').open( |
|
931 |
+ encoding='UTF-8' |
|
931 | 932 |
) as infile: |
932 | 933 |
config2 = json.load(infile) |
933 | 934 |
result = tests.ReadableResult.parse(result_) |
... | ... |
@@ -965,8 +966,8 @@ class TestCLI: |
965 | 966 |
input=json.dumps(config), |
966 | 967 |
catch_exceptions=False, |
967 | 968 |
) |
968 |
- with open( |
|
969 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
969 |
+ with cli._config_filename(subsystem='vault').open( |
|
970 |
+ encoding='UTF-8' |
|
970 | 971 |
) as infile: |
971 | 972 |
config3 = json.load(infile) |
972 | 973 |
result = tests.ReadableResult.parse(result_) |
... | ... |
@@ -1020,10 +1021,9 @@ class TestCLI: |
1020 | 1021 |
# configuration file ourselves afterwards, inside the context. |
1021 | 1022 |
# We also might as well use `isolated_config` instead. |
1022 | 1023 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
1023 |
- with open( |
|
1024 |
- cli._config_filename(subsystem='vault'), 'w', encoding='UTF-8' |
|
1025 |
- ) as outfile: |
|
1026 |
- print('This string is not valid JSON.', file=outfile) |
|
1024 |
+ cli._config_filename(subsystem='vault').write_text( |
|
1025 |
+ 'This string is not valid JSON.\n', encoding='UTF-8' |
|
1026 |
+ ) |
|
1027 | 1027 |
dname = cli._config_filename(subsystem=None) |
1028 | 1028 |
result_ = runner.invoke( |
1029 | 1029 |
cli.derivepassphrase_vault, |
... | ... |
@@ -1049,8 +1049,7 @@ class TestCLI: |
1049 | 1049 |
) -> None: |
1050 | 1050 |
runner = click.testing.CliRunner(mix_stderr=False) |
1051 | 1051 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
1052 |
- with contextlib.suppress(FileNotFoundError): |
|
1053 |
- os.remove(cli._config_filename(subsystem='vault')) |
|
1052 |
+ cli._config_filename(subsystem='vault').unlink(missing_ok=True) |
|
1054 | 1053 |
result_ = runner.invoke( |
1055 | 1054 |
# Test parent context navigation by not calling |
1056 | 1055 |
# `cli.derivepassphrase_vault` directly. Used e.g. in |
... | ... |
@@ -1104,9 +1103,9 @@ class TestCLI: |
1104 | 1103 |
) -> None: |
1105 | 1104 |
runner = click.testing.CliRunner(mix_stderr=False) |
1106 | 1105 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
1107 |
- with contextlib.suppress(FileNotFoundError): |
|
1108 |
- os.remove(cli._config_filename(subsystem='vault')) |
|
1109 |
- os.makedirs(cli._config_filename(subsystem='vault')) |
|
1106 |
+ config_file = cli._config_filename(subsystem='vault') |
|
1107 |
+ config_file.unlink(missing_ok=True) |
|
1108 |
+ config_file.mkdir(parents=True, exist_ok=True) |
|
1110 | 1109 |
result_ = runner.invoke( |
1111 | 1110 |
cli.derivepassphrase_vault, |
1112 | 1111 |
['--export', '-', *export_options], |
... | ... |
@@ -1158,10 +1157,10 @@ class TestCLI: |
1158 | 1157 |
) -> None: |
1159 | 1158 |
runner = click.testing.CliRunner(mix_stderr=False) |
1160 | 1159 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
1160 |
+ config_dir = cli._config_filename(subsystem=None) |
|
1161 | 1161 |
with contextlib.suppress(FileNotFoundError): |
1162 |
- shutil.rmtree('.derivepassphrase') |
|
1163 |
- with open('.derivepassphrase', 'w', encoding='UTF-8') as outfile: |
|
1164 |
- print('Obstruction!!', file=outfile) |
|
1162 |
+ shutil.rmtree(config_dir) |
|
1163 |
+ config_dir.write_text('Obstruction!!\n') |
|
1165 | 1164 |
result_ = runner.invoke( |
1166 | 1165 |
cli.derivepassphrase_vault, |
1167 | 1166 |
['--export', '-', *export_options], |
... | ... |
@@ -1197,8 +1196,8 @@ contents go here |
1197 | 1196 |
) |
1198 | 1197 |
result = tests.ReadableResult.parse(result_) |
1199 | 1198 |
assert result.clean_exit(empty_stderr=True), 'expected clean exit' |
1200 |
- with open( |
|
1201 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
1199 |
+ with cli._config_filename(subsystem='vault').open( |
|
1200 |
+ encoding='UTF-8' |
|
1202 | 1201 |
) as infile: |
1203 | 1202 |
config = json.load(infile) |
1204 | 1203 |
assert config == { |
... | ... |
@@ -1223,8 +1222,8 @@ contents go here |
1223 | 1222 |
) |
1224 | 1223 |
result = tests.ReadableResult.parse(result_) |
1225 | 1224 |
assert result.clean_exit(empty_stderr=True), 'expected clean exit' |
1226 |
- with open( |
|
1227 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
1225 |
+ with cli._config_filename(subsystem='vault').open( |
|
1226 |
+ encoding='UTF-8' |
|
1228 | 1227 |
) as infile: |
1229 | 1228 |
config = json.load(infile) |
1230 | 1229 |
assert config == {'global': {'phrase': 'abc'}, 'services': {}} |
... | ... |
@@ -1246,8 +1245,8 @@ contents go here |
1246 | 1245 |
) |
1247 | 1246 |
result = tests.ReadableResult.parse(result_) |
1248 | 1247 |
assert result.clean_exit(empty_stderr=True), 'expected clean exit' |
1249 |
- with open( |
|
1250 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
1248 |
+ with cli._config_filename(subsystem='vault').open( |
|
1249 |
+ encoding='UTF-8' |
|
1251 | 1250 |
) as infile: |
1252 | 1251 |
config = json.load(infile) |
1253 | 1252 |
assert config == { |
... | ... |
@@ -1274,8 +1273,8 @@ contents go here |
1274 | 1273 |
assert result.error_exit(error='the user aborted the request'), ( |
1275 | 1274 |
'expected known error message' |
1276 | 1275 |
) |
1277 |
- with open( |
|
1278 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
1276 |
+ with cli._config_filename(subsystem='vault').open( |
|
1277 |
+ encoding='UTF-8' |
|
1279 | 1278 |
) as infile: |
1280 | 1279 |
config = json.load(infile) |
1281 | 1280 |
assert config == {'global': {'phrase': 'abc'}, 'services': {}} |
... | ... |
@@ -1346,8 +1345,8 @@ contents go here |
1346 | 1345 |
) |
1347 | 1346 |
result = tests.ReadableResult.parse(result_) |
1348 | 1347 |
assert result.clean_exit(), 'expected clean exit' |
1349 |
- with open( |
|
1350 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
1348 |
+ with cli._config_filename(subsystem='vault').open( |
|
1349 |
+ encoding='UTF-8' |
|
1351 | 1350 |
) as infile: |
1352 | 1351 |
config = json.load(infile) |
1353 | 1352 |
assert config == result_config, ( |
... | ... |
@@ -1457,7 +1456,8 @@ contents go here |
1457 | 1456 |
runner=runner, |
1458 | 1457 |
vault_config={'global': {'phrase': 'abc'}, 'services': {}}, |
1459 | 1458 |
): |
1460 |
- monkeypatch.setenv('SSH_AUTH_SOCK', os.getcwd()) |
|
1459 |
+ cwd = pathlib.Path.cwd().resolve() |
|
1460 |
+ monkeypatch.setenv('SSH_AUTH_SOCK', str(cwd)) |
|
1461 | 1461 |
result_ = runner.invoke( |
1462 | 1462 |
cli.derivepassphrase_vault, |
1463 | 1463 |
['--key', '--config'], |
... | ... |
@@ -1674,16 +1674,8 @@ contents go here |
1674 | 1674 |
monkeypatch=monkeypatch, |
1675 | 1675 |
runner=runner, |
1676 | 1676 |
): |
1677 |
- shutil.rmtree('.derivepassphrase') |
|
1678 |
- os_makedirs_called = False |
|
1679 |
- real_os_makedirs = os.makedirs |
|
1680 |
- |
|
1681 |
- def makedirs(*args: Any, **kwargs: Any) -> Any: |
|
1682 |
- nonlocal os_makedirs_called |
|
1683 |
- os_makedirs_called = True |
|
1684 |
- return real_os_makedirs(*args, **kwargs) |
|
1685 |
- |
|
1686 |
- monkeypatch.setattr(os, 'makedirs', makedirs) |
|
1677 |
+ with contextlib.suppress(FileNotFoundError): |
|
1678 |
+ shutil.rmtree(cli._config_filename(subsystem=None)) |
|
1687 | 1679 |
result_ = runner.invoke( |
1688 | 1680 |
cli.derivepassphrase_vault, |
1689 | 1681 |
['--config', '-p'], |
... | ... |
@@ -1695,9 +1687,8 @@ contents go here |
1695 | 1687 |
assert result.stderr == 'Passphrase:', ( |
1696 | 1688 |
'program unexpectedly failed?!' |
1697 | 1689 |
) |
1698 |
- assert os_makedirs_called, 'os.makedirs has not been called?!' |
|
1699 |
- with open( |
|
1700 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
1690 |
+ with cli._config_filename(subsystem='vault').open( |
|
1691 |
+ encoding='UTF-8' |
|
1701 | 1692 |
) as infile: |
1702 | 1693 |
config_readback = json.load(infile) |
1703 | 1694 |
assert config_readback == { |
... | ... |
@@ -1717,12 +1708,10 @@ contents go here |
1717 | 1708 |
save_config_ = cli._save_config |
1718 | 1709 |
|
1719 | 1710 |
def obstruct_config_saving(*args: Any, **kwargs: Any) -> Any: |
1711 |
+ config_dir = cli._config_filename(subsystem=None) |
|
1720 | 1712 |
with contextlib.suppress(FileNotFoundError): |
1721 |
- shutil.rmtree('.derivepassphrase') |
|
1722 |
- with open( |
|
1723 |
- '.derivepassphrase', 'w', encoding='UTF-8' |
|
1724 |
- ) as outfile: |
|
1725 |
- print('Obstruction!!', file=outfile) |
|
1713 |
+ shutil.rmtree(config_dir) |
|
1714 |
+ config_dir.write_text('Obstruction!!\n') |
|
1726 | 1715 |
monkeypatch.setattr(cli, '_save_config', save_config_) |
1727 | 1716 |
return save_config_(*args, **kwargs) |
1728 | 1717 |
|
... | ... |
@@ -2089,7 +2078,7 @@ class TestCLIUtils: |
2089 | 2078 |
monkeypatch=monkeypatch, runner=runner, vault_config=config |
2090 | 2079 |
): |
2091 | 2080 |
config_filename = cli._config_filename(subsystem='vault') |
2092 |
- with open(config_filename, encoding='UTF-8') as fileobj: |
|
2081 |
+ with config_filename.open(encoding='UTF-8') as fileobj: |
|
2093 | 2082 |
assert json.load(fileobj) == config |
2094 | 2083 |
assert cli._load_config() == config |
2095 | 2084 |
|
... | ... |
@@ -2600,8 +2589,8 @@ Boo. |
2600 | 2589 |
assert result.clean_exit(empty_stderr=True), ( |
2601 | 2590 |
'expected clean exit' |
2602 | 2591 |
) |
2603 |
- with open( |
|
2604 |
- cli._config_filename(subsystem='vault'), encoding='UTF-8' |
|
2592 |
+ with cli._config_filename(subsystem='vault').open( |
|
2593 |
+ encoding='UTF-8' |
|
2605 | 2594 |
) as infile: |
2606 | 2595 |
config_readback = json.load(infile) |
2607 | 2596 |
assert config_readback == result_config |
... | ... |
@@ -2844,11 +2833,9 @@ class TestCLITransition: |
2844 | 2833 |
) -> None: |
2845 | 2834 |
runner = click.testing.CliRunner() |
2846 | 2835 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
2847 |
- config_filename = cli._config_filename( |
|
2848 |
- subsystem='old settings.json' |
|
2836 |
+ cli._config_filename(subsystem='old settings.json').write_text( |
|
2837 |
+ json.dumps(config, indent=2) + '\n', encoding='UTF-8' |
|
2849 | 2838 |
) |
2850 |
- with open(config_filename, 'w', encoding='UTF-8') as fileobj: |
|
2851 |
- print(json.dumps(config, indent=2), file=fileobj) |
|
2852 | 2839 |
assert cli._migrate_and_load_old_config()[0] == config |
2853 | 2840 |
|
2854 | 2841 |
@pytest.mark.parametrize( |
... | ... |
@@ -2875,11 +2862,9 @@ class TestCLITransition: |
2875 | 2862 |
) -> None: |
2876 | 2863 |
runner = click.testing.CliRunner() |
2877 | 2864 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
2878 |
- config_filename = cli._config_filename( |
|
2879 |
- subsystem='old settings.json' |
|
2865 |
+ cli._config_filename(subsystem='old settings.json').write_text( |
|
2866 |
+ json.dumps(config, indent=2) + '\n', encoding='UTF-8' |
|
2880 | 2867 |
) |
2881 |
- with open(config_filename, 'w', encoding='UTF-8') as fileobj: |
|
2882 |
- print(json.dumps(config, indent=2), file=fileobj) |
|
2883 | 2868 |
assert cli._migrate_and_load_old_config() == (config, None) |
2884 | 2869 |
|
2885 | 2870 |
@pytest.mark.parametrize( |
... | ... |
@@ -2906,12 +2891,12 @@ class TestCLITransition: |
2906 | 2891 |
) -> None: |
2907 | 2892 |
runner = click.testing.CliRunner() |
2908 | 2893 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
2909 |
- config_filename = cli._config_filename( |
|
2910 |
- subsystem='old settings.json' |
|
2894 |
+ cli._config_filename(subsystem='old settings.json').write_text( |
|
2895 |
+ json.dumps(config, indent=2) + '\n', encoding='UTF-8' |
|
2896 |
+ ) |
|
2897 |
+ cli._config_filename(subsystem='vault').mkdir( |
|
2898 |
+ parents=True, exist_ok=True |
|
2911 | 2899 |
) |
2912 |
- with open(config_filename, 'w', encoding='UTF-8') as fileobj: |
|
2913 |
- print(json.dumps(config, indent=2), file=fileobj) |
|
2914 |
- os.mkdir(cli._config_filename(subsystem='vault')) |
|
2915 | 2900 |
config2, err = cli._migrate_and_load_old_config() |
2916 | 2901 |
assert config2 == config |
2917 | 2902 |
assert isinstance(err, OSError) |
... | ... |
@@ -2941,11 +2926,9 @@ class TestCLITransition: |
2941 | 2926 |
) -> None: |
2942 | 2927 |
runner = click.testing.CliRunner() |
2943 | 2928 |
with tests.isolated_config(monkeypatch=monkeypatch, runner=runner): |
2944 |
- config_filename = cli._config_filename( |
|
2945 |
- subsystem='old settings.json' |
|
2929 |
+ cli._config_filename(subsystem='old settings.json').write_text( |
|
2930 |
+ json.dumps(config, indent=2) + '\n', encoding='UTF-8' |
|
2946 | 2931 |
) |
2947 |
- with open(config_filename, 'w', encoding='UTF-8') as fileobj: |
|
2948 |
- print(json.dumps(config, indent=2), file=fileobj) |
|
2949 | 2932 |
with pytest.raises(ValueError, match=cli._INVALID_VAULT_CONFIG): |
2950 | 2933 |
cli._migrate_and_load_old_config() |
2951 | 2934 |
|
... | ... |
@@ -3077,17 +3060,13 @@ class TestCLITransition: |
3077 | 3060 |
monkeypatch=monkeypatch, |
3078 | 3061 |
runner=runner, |
3079 | 3062 |
): |
3080 |
- with open( |
|
3081 |
- cli._config_filename(subsystem='old settings.json'), |
|
3082 |
- 'w', |
|
3083 |
- encoding='UTF-8', |
|
3084 |
- ) as fileobj: |
|
3085 |
- print( |
|
3063 |
+ cli._config_filename(subsystem='old settings.json').write_text( |
|
3086 | 3064 |
json.dumps( |
3087 | 3065 |
{'services': {DUMMY_SERVICE: DUMMY_CONFIG_SETTINGS}}, |
3088 | 3066 |
indent=2, |
3089 |
- ), |
|
3090 |
- file=fileobj, |
|
3067 |
+ ) |
|
3068 |
+ + '\n', |
|
3069 |
+ encoding='UTF-8', |
|
3091 | 3070 |
) |
3092 | 3071 |
result_ = runner.invoke( |
3093 | 3072 |
cli.derivepassphrase_vault, |
... | ... |
@@ -3113,17 +3092,13 @@ class TestCLITransition: |
3113 | 3092 |
monkeypatch=monkeypatch, |
3114 | 3093 |
runner=runner, |
3115 | 3094 |
): |
3116 |
- with open( |
|
3117 |
- cli._config_filename(subsystem='old settings.json'), |
|
3118 |
- 'w', |
|
3119 |
- encoding='UTF-8', |
|
3120 |
- ) as fileobj: |
|
3121 |
- print( |
|
3095 |
+ cli._config_filename(subsystem='old settings.json').write_text( |
|
3122 | 3096 |
json.dumps( |
3123 | 3097 |
{'services': {DUMMY_SERVICE: DUMMY_CONFIG_SETTINGS}}, |
3124 | 3098 |
indent=2, |
3125 |
- ), |
|
3126 |
- file=fileobj, |
|
3099 |
+ ) |
|
3100 |
+ + '\n', |
|
3101 |
+ encoding='UTF-8', |
|
3127 | 3102 |
) |
3128 | 3103 |
|
3129 | 3104 |
def raiser(*_args: Any, **_kwargs: Any) -> None: |
... | ... |
@@ -3134,6 +3109,7 @@ class TestCLITransition: |
3134 | 3109 |
) |
3135 | 3110 |
|
3136 | 3111 |
monkeypatch.setattr(os, 'replace', raiser) |
3112 |
+ monkeypatch.setattr(pathlib.Path, 'rename', raiser) |
|
3137 | 3113 |
result_ = runner.invoke( |
3138 | 3114 |
cli.derivepassphrase_vault, |
3139 | 3115 |
['--export', '-'], |
... | ... |
@@ -3161,9 +3137,8 @@ class TestCLITransition: |
3161 | 3137 |
): |
3162 | 3138 |
old_name = cli._config_filename(subsystem='old settings.json') |
3163 | 3139 |
new_name = cli._config_filename(subsystem='vault') |
3164 |
- with contextlib.suppress(FileNotFoundError): |
|
3165 |
- os.remove(old_name) |
|
3166 |
- os.rename(new_name, old_name) |
|
3140 |
+ old_name.unlink(missing_ok=True) |
|
3141 |
+ new_name.rename(old_name) |
|
3167 | 3142 |
assert cli._shell_complete_service( |
3168 | 3143 |
click.Context(cli.derivepassphrase), |
3169 | 3144 |
click.Argument(['some_parameter']), |
... | ... |
@@ -4183,7 +4158,7 @@ class TestShellCompletion: |
4183 | 4158 |
runner=runner, |
4184 | 4159 |
vault_config={'services': {DUMMY_SERVICE: DUMMY_CONFIG_SETTINGS}}, |
4185 | 4160 |
): |
4186 |
- os.remove(cli._config_filename(subsystem='vault')) |
|
4161 |
+ cli._config_filename(subsystem='vault').unlink(missing_ok=True) |
|
4187 | 4162 |
assert not cli._shell_complete_service( |
4188 | 4163 |
click.Context(cli.derivepassphrase), |
4189 | 4164 |
click.Argument(['some_parameter']), |
... | ... |
@@ -7,7 +7,7 @@ from __future__ import annotations |
7 | 7 |
import base64 |
8 | 8 |
import contextlib |
9 | 9 |
import json |
10 |
-import os |
|
10 |
+import pathlib |
|
11 | 11 |
from typing import TYPE_CHECKING |
12 | 12 |
|
13 | 13 |
import click.testing |
... | ... |
@@ -180,11 +180,12 @@ class TestCLI: |
180 | 180 |
vault_config='', |
181 | 181 |
vault_key=tests.VAULT_MASTER_KEY, |
182 | 182 |
): |
183 |
- os.remove('.vault') |
|
184 |
- os.mkdir('.vault') |
|
183 |
+ p = pathlib.Path('.vault') |
|
184 |
+ p.unlink() |
|
185 |
+ p.mkdir() |
|
185 | 186 |
result_ = runner.invoke( |
186 | 187 |
cli.derivepassphrase_export_vault, |
187 |
- ['.vault'], |
|
188 |
+ [str(p)], |
|
188 | 189 |
) |
189 | 190 |
result = tests.ReadableResult.parse(result_) |
190 | 191 |
assert result.error_exit( |
... | ... |
@@ -322,10 +323,11 @@ class TestStoreroom: |
322 | 323 |
runner=runner, |
323 | 324 |
vault_config=tests.VAULT_STOREROOM_CONFIG_ZIPPED, |
324 | 325 |
): |
325 |
- with open('.vault/20', 'w', encoding='UTF-8') as outfile: |
|
326 |
+ p = pathlib.Path('.vault', '20') |
|
327 |
+ with p.open('w', encoding='UTF-8') as outfile: |
|
326 | 328 |
print(config, file=outfile) |
327 | 329 |
with pytest.raises(ValueError, match='Invalid bucket file: '): |
328 |
- list(storeroom._decrypt_bucket_file('.vault/20', master_keys)) |
|
330 |
+ list(storeroom._decrypt_bucket_file(p, master_keys)) |
|
329 | 331 |
|
330 | 332 |
@pytest.mark.parametrize( |
331 | 333 |
['data', 'err_msg'], |
... | ... |
@@ -356,7 +358,8 @@ class TestStoreroom: |
356 | 358 |
vault_config=tests.VAULT_STOREROOM_CONFIG_ZIPPED, |
357 | 359 |
vault_key=tests.VAULT_MASTER_KEY, |
358 | 360 |
): |
359 |
- with open('.vault/.keys', 'w', encoding='UTF-8') as outfile: |
|
361 |
+ p = pathlib.Path('.vault', '.keys') |
|
362 |
+ with p.open('w', encoding='UTF-8') as outfile: |
|
360 | 363 |
print(data, file=outfile) |
361 | 364 |
with pytest.raises(RuntimeError, match=err_msg): |
362 | 365 |
handler(format='storeroom') |
... | ... |
@@ -5,6 +5,7 @@ |
5 | 5 |
from __future__ import annotations |
6 | 6 |
|
7 | 7 |
import os |
8 |
+import pathlib |
|
8 | 9 |
from typing import TYPE_CHECKING, Any |
9 | 10 |
|
10 | 11 |
import click.testing |
... | ... |
@@ -65,26 +66,29 @@ class Test001ExporterUtils: |
65 | 66 |
@pytest.mark.parametrize( |
66 | 67 |
['expected', 'path'], |
67 | 68 |
[ |
68 |
- ('/tmp', '/tmp'), |
|
69 |
- ('~', os.path.curdir), |
|
70 |
- ('~/.vault', None), |
|
69 |
+ (pathlib.Path('/tmp'), pathlib.Path('/tmp')), |
|
70 |
+ (pathlib.Path('~'), pathlib.Path()), |
|
71 |
+ (pathlib.Path('~/.vault'), None), |
|
71 | 72 |
], |
72 | 73 |
) |
73 | 74 |
def test_210_get_vault_path( |
74 | 75 |
self, |
75 | 76 |
monkeypatch: pytest.MonkeyPatch, |
76 |
- expected: str, |
|
77 |
- path: str | None, |
|
77 |
+ expected: pathlib.Path, |
|
78 |
+ path: str | os.PathLike[str] | None, |
|
78 | 79 |
) -> None: |
79 | 80 |
runner = click.testing.CliRunner(mix_stderr=False) |
80 | 81 |
with tests.isolated_vault_exporter_config( |
81 | 82 |
monkeypatch=monkeypatch, runner=runner |
82 | 83 |
): |
83 | 84 |
if path: |
84 |
- monkeypatch.setenv('VAULT_PATH', path) |
|
85 |
- assert os.fsdecode( |
|
86 |
- os.path.realpath(exporter.get_vault_path()) |
|
87 |
- ) == os.path.realpath(os.path.expanduser(expected)) |
|
85 |
+ monkeypatch.setenv( |
|
86 |
+ 'VAULT_PATH', os.fspath(path) if path is not None else None |
|
87 |
+ ) |
|
88 |
+ assert ( |
|
89 |
+ exporter.get_vault_path().resolve() |
|
90 |
+ == expected.expanduser().resolve() |
|
91 |
+ ) |
|
88 | 92 |
|
89 | 93 |
def test_220_register_export_vault_config_data_handler( |
90 | 94 |
self, monkeypatch: pytest.MonkeyPatch |
... | ... |
@@ -126,7 +130,11 @@ class Test001ExporterUtils: |
126 | 130 |
def test_310_get_vault_path_without_home( |
127 | 131 |
self, monkeypatch: pytest.MonkeyPatch |
128 | 132 |
) -> None: |
129 |
- monkeypatch.setattr(os.path, 'expanduser', lambda x: x) |
|
133 |
+ def raiser(*_args: Any, **_kwargs: Any) -> Any: |
|
134 |
+ raise RuntimeError('Cannot determine home directory.') # noqa: EM101,TRY003 |
|
135 |
+ |
|
136 |
+ monkeypatch.setattr(pathlib.Path, 'expanduser', raiser) |
|
137 |
+ monkeypatch.setattr(os.path, 'expanduser', raiser) |
|
130 | 138 |
with pytest.raises( |
131 | 139 |
RuntimeError, match=r'[Cc]annot determine home directory' |
132 | 140 |
): |
133 | 141 |