Browse code

Refactor of 'dry-run' and 'verbose' options

Lorenz Hüdepohl authored on 13/09/2017 23:05:19
Showing 1 changed files
... ...
@@ -6,10 +6,10 @@ import socket
6 6
 import argparse
7 7
 from shutil import which
8 8
 from subprocess import Popen, PIPE, run
9
+from contextlib import contextmanager
10
+from collections import namedtuple
9 11
 
10
-import sys
11 12
 sys.tracebacklimit = 0
12
-
13 13
 encoding = locale.getdefaultlocale()[1]
14 14
 
15 15
 parser = argparse.ArgumentParser(
... ...
@@ -31,7 +31,7 @@ parser.add_argument('destination', type=str,
31 31
 parser.add_argument('--snapname', metavar='TAGNAME', type=str,
32 32
                     help='Only consider snapshots starting with TAGNAME')
33 33
 
34
-parser.add_argument('-c', '--cache-dir', nargs='?', metavar="DIR",
34
+parser.add_argument('-c', '--cache-dir', metavar="DIR",
35 35
                     help='First create a temporary file in directory DIR with the `zfs send` stream, '
36 36
                          'then rsync that to the remote host. With that, one can resume '
37 37
                          'a partially sent file without starting over')
... ...
@@ -39,15 +39,21 @@ parser.add_argument('-c', '--cache-dir', nargs='?', metavar="DIR",
39 39
 parser.add_argument('-m', '--min-cache-size', default="1M",
40 40
                     help='Minimum (estimated) size of stream to use the cache-dir funcitonality')
41 41
 
42
-parser.add_argument('-v', '--verbose', action="store_true",
43
-                    help='Echo the commands that are issued.')
42
+parser.add_argument('-v', '--verbose', action="append_const", const=1,
43
+                    help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/recv commands')
44 44
 
45 45
 parser.add_argument('-n', '--dry-run', action="store_true",
46 46
                     help='Only echo the transfer commands that would be issued,\n'
47 47
                          'do not actually send anything.')
48 48
 
49 49
 args = parser.parse_args()
50
+args.verbose = sum(args.verbose) if args.verbose is not None else 0
50 51
 
52
+no_defaults_parser = argparse.ArgumentParser(parents=(parser,), argument_default=None, add_help=False)
53
+no_defaults_args = no_defaults_parser.parse_args()
54
+if not args.cache_dir and no_defaults_args.min_cache_size is not None:
55
+    print("Cannot specify -m/--min-cache-size without -c/--cache-dir", file=sys.stderr)
56
+    raise SystemExit(1)
51 57
 
52 58
 def check_returncode(proc):
53 59
     if not proc.returncode == 0:
... ...
@@ -94,9 +100,9 @@ ssh_orig, orig_host, origin = prefix_ssh(args.origin)
94 100
 compression = ssh_orig or ssh_dest
95 101
 
96 102
 if args.cache_dir:
97
-    try:
103
+    if ":" in args.cache_dir:
98 104
         orig_cache_dir, dest_cache_dir = args.cache_dir.split(":")
99
-    except:
105
+    else:
100 106
         orig_cache_dir, dest_cache_dir = (args.cache_dir,) * 2
101 107
     if not (bool(ssh_orig) != bool(ssh_dest)):
102 108
         print("-c/--cache-dir is not supported for two local or two remote filesystems", file=sys.stderr)
... ...
@@ -104,7 +110,7 @@ if args.cache_dir:
104 110
 
105 111
     min_cache_size = int(args.min_cache_size.replace("k", "000").replace("M", "000000").replace("G", "000000000"))
106 112
 
107
-if args.verbose:
113
+if args.verbose > 1:
108 114
     verbose = ["-v"]
109 115
 else:
110 116
     verbose = []
... ...
@@ -128,10 +134,15 @@ for fs, snapname in origin_snapshots:
128 134
 
129 135
 
130 136
 def rsync(orig_url, dest_url):
131
-    rsync = run(["rsync", "--partial", "--inplace", orig_url, dest_url])
132
-    check_returncode(rsync)
137
+    if args.verbose > 1:
138
+        progress = ["--info=progress2"]
139
+    else:
140
+        progress = []
141
+    with echoPopen(["rsync", "--partial"] + progress + ["--inplace", orig_url, dest_url]) as rsync:
142
+        rsync.wait()
143
+        check_returncode(rsync)
133 144
 
134
-def command_on_url(*commands, abort=True):
145
+def command_on_url(*commands, abort=True, dry_run=False, echo=False):
135 146
     *commands, url = commands
136 147
     if ":" in url:
137 148
         host, filename = url.split(":")
... ...
@@ -139,20 +150,57 @@ def command_on_url(*commands, abort=True):
139 150
     else:
140 151
         filename = url
141 152
         ssh = []
142
-    cmd = run(ssh + commands + [filename])
143
-    if abort:
144
-        check_returncode(cmd)
145
-    else:
146
-        return cmd.returncode == 0
153
+
154
+    if echo:
155
+        if ssh:
156
+            print(*ssh, '"' + " ".join(commands + [filename]) + '"')
157
+        else:
158
+            print(*commands, filename)
159
+        return True
160
+    elif not dry_run:
161
+        cmd = run(ssh + commands + [filename])
162
+        if abort:
163
+            check_returncode(cmd)
164
+        else:
165
+            return cmd.returncode == 0
147 166
 
148 167
 def exists(url):
149
-    return command_on_url("test", "-f", url, abort=False)
168
+    return command_on_url("test", "-f", url, abort=False, dry_run=False, echo=False)
150 169
 
151 170
 def touch(url):
152
-    return command_on_url("touch", url)
171
+    return command_on_url("touch", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
153 172
 
154 173
 def rm(url):
155
-    return command_on_url("rm", url)
174
+    return command_on_url("rm", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
175
+
176
+MockStdout = namedtuple("MockStdout", ["cmd_string"])
177
+MockPopen = namedtuple("MockPopen", ["stdout", "returncode", "wait"])
178
+def mockWait():
179
+    pass
180
+
181
+@contextmanager
182
+def echoPopen(commands, stdin=None, stdout=None, **kwargs):
183
+    if args.verbose or args.dry_run:
184
+        if stdin is not None:
185
+            in_pipe = stdin.cmd_string + " | "
186
+        else:
187
+            in_pipe = ""
188
+        if commands[0] == "ssh":
189
+            cmd = " ".join(commands[0:2]) + ' "' + " ".join(commands[2:]) + '"'
190
+        else:
191
+            cmd = " ".join(commands)
192
+        cmd_string = in_pipe + cmd
193
+        if stdout is None:
194
+            print(cmd_string)
195
+
196
+    if not args.dry_run:
197
+        with Popen(commands, stdin=stdin, stdout=stdout, **kwargs) as proc:
198
+            if stdout is not None:
199
+                proc.stdout.cmd_string = cmd_string
200
+            yield proc
201
+    else:
202
+        yield MockPopen(MockStdout(cmd_string), 0, mockWait)
203
+
156 204
 
157 205
 for fs, snapname in origin_snapshots:
158 206
     if fs in destinations and snapname in destinations[fs]:
... ...
@@ -186,6 +234,8 @@ for fs, snapname in origin_snapshots:
186 234
             print("# Using rsync cache for this transfer")
187 235
         elif args.verbose:
188 236
             print("# Not using rsync cache for this transfer")
237
+    else:
238
+        use_cache_dir = False
189 239
 
190 240
     if use_cache_dir:
191 241
         def check_space(ssh_prefix, where, cache_dir):
... ...
@@ -232,34 +282,6 @@ for fs, snapname in origin_snapshots:
232 282
 
233 283
     recv_cmd = ssh_dest + pre_pipe + ["zfs", "recv", "-u", destination + fs]
234 284
 
235
-    if args.verbose or args.dry_run:
236
-        if send_cmd[0] == "ssh":
237
-            print_send_cmd = send_cmd[0:2] + ['"'] + send_cmd[2:] + ['"']
238
-            if use_cache_dir:
239
-                print_rm_orig_cmd = ssh_orig + ['"', "rm", orig_cache_filename, '"']
240
-        else:
241
-            print_send_cmd = send_cmd
242
-            if use_cache_dir:
243
-                print_rm_orig_cmd = ["rm", orig_cache_filename]
244
-
245
-        if recv_cmd[0] == "ssh":
246
-            print_recv_cmd = recv_cmd[0:2] + ['"'] + recv_cmd[2:] + ['"']
247
-            if use_cache_dir:
248
-                print_rm_dest_cmd = ssh_dest + ['"', "rm", dest_cache_filename, '"']
249
-        else:
250
-            print_recv_cmd = recv_cmd
251
-            if use_cache_dir:
252
-                print_rm_dest_cmd = ["rm", dest_cache_filename]
253
-
254
-        if use_cache_dir:
255
-            print(*print_send_cmd)
256
-            print("rsync", "--partial", "--inplace", orig_url, dest_url)
257
-            print(*print_rm_orig_cmd)
258
-            print(*print_recv_cmd)
259
-            print(*print_rm_dest_cmd)
260
-        else:
261
-            print(" ".join(print_send_cmd), "|", " ".join(print_recv_cmd))
262
-
263 285
     send_shell = not ssh_orig
264 286
     if send_shell:
265 287
         send_cmd = [" ".join(send_cmd)]
... ...
@@ -268,43 +290,41 @@ for fs, snapname in origin_snapshots:
268 290
     if recv_shell:
269 291
         recv_cmd = [" ".join(recv_cmd)]
270 292
 
271
-    if not args.dry_run:
272
-
273
-        # send
274
-        if use_cache_dir:
275
-            # Via rsync'ed stream files
276
-            if not exists(orig_url + ".total"):
277
-                # Create stream file
278
-                with Popen(send_cmd, shell=send_shell) as sender:
279
-                    sender.wait()
280
-                    check_returncode(sender)
281
-                touch(orig_url + ".total")
282
-            else:
283
-                print("# Resuming upload of partial stream file")
284
-
285
-            rsync(orig_url, dest_url)
286
-            recv_stdin = None
287
-        else:
288
-            # direct `zfs send | zfs recv` pipe
289
-            sender = Popen(send_cmd, shell=send_shell)
290
-            recv_stdin = sender.stdout
291
-
292
-        # recv
293
-        with Popen(recv_cmd, stdin=recv_stdin, shell=recv_shell) as receiver:
294
-            receiver.wait()
295
-            check_returncode(receiver)
296
-
297
-        # Cleanup
298
-        if not use_cache_dir:
299
-            sender.wait()
300
-            check_returncode(sender)
293
+    # send
294
+    if use_cache_dir:
295
+        # Via rsync'ed stream files
296
+        if not exists(orig_url + ".total"):
297
+            # Create stream file
298
+            with echoPopen(send_cmd, shell=send_shell) as sender:
299
+                sender.wait()
300
+                check_returncode(sender)
301
+            touch(orig_url + ".total")
301 302
         else:
302
-            rm(orig_url + ".total")
303
-            rm(orig_url)
304
-            rm(dest_url)
303
+            print("# Resuming upload of partial stream file")
304
+
305
+        rsync(orig_url, dest_url)
306
+        recv_stdin = None
307
+    else:
308
+        # direct `zfs send | zfs recv` pipe
309
+        sender = echoPopen(send_cmd, shell=send_shell)
310
+        recv_stdin = sender.stdout
311
+
312
+    # recv
313
+    with echoPopen(recv_cmd, stdin=recv_stdin, shell=recv_shell) as receiver:
314
+        receiver.wait()
315
+        check_returncode(receiver)
316
+
317
+    # Cleanup
318
+    if not use_cache_dir:
319
+        sender.wait()
320
+        check_returncode(sender)
321
+    else:
322
+        rm(orig_url + ".total")
323
+        rm(orig_url)
324
+        rm(dest_url)
305 325
 
306 326
     destinations[fs].append(snapname)
307
-    if args.verbose:
327
+    if args.verbose or args.dry_run:
308 328
         print()
309 329
 
310 330
 # test