Browse code

Option -b for a local memory buffer

Lorenz Hüdepohl authored on01/08/2019 17:59:00
Showing1 changed files
... ...
@@ -39,7 +39,10 @@ parser.add_argument('-c', '--cache-dir', metavar="DIR",
39 39
                          'a partially sent file without starting over')
40 40
 
41 41
 parser.add_argument('-m', '--min-cache-size',
42
-                    help='Minimum (estimated) size of stream to use the cache-dir funcitonality, default 50M')
42
+                    help='Minimum (estimated) size of stream to use the cache-dir functionality, default 50M')
43
+
44
+parser.add_argument('-b', '--buffer-size',
45
+                    help='Use a local memory buffer (helps when using the same spinning disk pool as source and destination)')
43 46
 
44 47
 parser.add_argument('-z', '--compression', action="store_true",
45 48
                     help='Filter network streams through gzip/gunzip')
... ...
@@ -64,6 +67,9 @@ args.verbose = sum(args.verbose) if args.verbose is not None else 0
64 67
 if not args.cache_dir and args.min_cache_size is not None:
65 68
     print("Cannot specify -m/--min-cache-size without -c/--cache-dir", file=sys.stderr)
66 69
     raise SystemExit(1)
70
+elif args.buffer_size is not None and args.cache_dir:
71
+    print("Cannot specify both -b and -c", file=sys.stderr)
72
+    raise SystemExit(1)
67 73
 elif args.cache_dir and args.min_cache_size is None:
68 74
     args.min_cache_size = "50M"
69 75
 
... ...
@@ -317,6 +323,8 @@ for fs, snapname in origin_snapshots:
317 323
 
318 324
     if args.compression:
319 325
         send_cmd += ["|", "gzip"]
326
+    if args.buffer_size:
327
+        send_cmd += ["|", "mbuffer", "-m", args.buffer_size]
320 328
     if use_cache_dir:
321 329
         send_cmd += [">", orig_cache_filename]
322 330
 
Browse code

Add option -w/--raw to pass to zfs send

backup authored on01/08/2019 17:55:56 • Lorenz Hüdepohl committed on01/08/2019 17:58:32
Showing1 changed files
... ...
@@ -44,6 +44,9 @@ parser.add_argument('-m', '--min-cache-size',
44 44
 parser.add_argument('-z', '--compression', action="store_true",
45 45
                     help='Filter network streams through gzip/gunzip')
46 46
 
47
+parser.add_argument('-w', '--raw', action="store_true",
48
+                    help='Pass -w/--raw to zfs send')
49
+
47 50
 parser.add_argument('-v', '--verbose', action="append_const", const=1,
48 51
                     help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/receive commands')
49 52
 
... ...
@@ -140,6 +143,11 @@ if args.force:
140 143
 else:
141 144
     force = []
142 145
 
146
+if args.raw:
147
+    raw = ["--raw"]
148
+else:
149
+    raw = []
150
+
143 151
 with Popen(ssh_dest + ["zfs", "list", "-t", "snapshot", "-r", destination, "-H"], stdout=PIPE, stderr=PIPE) as proc:
144 152
     # Split into chunks for each fs
145 153
     destinations = {}
... ...
@@ -256,9 +264,9 @@ for fs, snapname in origin_snapshots:
256 264
                 last = old_snapshot
257 265
 
258 266
     if last:
259
-        send_cmd = ssh_orig + ["zfs", "send"] + verbose + ["-i", "@{0}".format(last), "{0}{1}@{2}".format(origin, fs, snapname)]
267
+        send_cmd = ssh_orig + ["zfs", "send"] + verbose + raw + ["-i", "@{0}".format(last), "{0}{1}@{2}".format(origin, fs, snapname)]
260 268
     else:
261
-        send_cmd = ssh_orig + ["zfs", "send"] + verbose + ["{0}{1}@{2}".format(origin, fs, snapname)]
269
+        send_cmd = ssh_orig + ["zfs", "send"] + verbose + raw + ["{0}{1}@{2}".format(origin, fs, snapname)]
262 270
 
263 271
     # Check free space
264 272
     if args.cache_dir:
Browse code

Option to ignore certain snapshot names

Lorenz Hüdepohl authored on05/12/2017 21:48:01
Showing1 changed files
... ...
@@ -29,6 +29,10 @@ parser.add_argument('--snapname', metavar='TAGNAME', type=str, action='append',
29 29
                     help='Only consider snapshot names starting with TAGNAME. '
30 30
                          'Can be specified more than once')
31 31
 
32
+parser.add_argument('--ignore-snapname', metavar='TAGNAME', type=str, action='append',
33
+                    help='Do not consider snapshot names starting with TAGNAME. '
34
+                         'Can be specified more than once')
35
+
32 36
 parser.add_argument('-c', '--cache-dir', metavar="DIR",
33 37
                     help='First create a temporary file in directory DIR with the `zfs send` stream, '
34 38
                          'then rsync that to the remote host. With that, one can resume '
... ...
@@ -78,6 +82,14 @@ def select_snapshots(proc, prefix):
78 82
                    "expected a string starting with \"{1}\"").format(fs, prefix), file=sys.stderr)
79 83
             raise SystemExit(1)
80 84
         fs = fs[len(prefix):]
85
+        ignore = False
86
+        if args.ignore_snapname:
87
+            for ignore_snapname in args.ignore_snapname:
88
+                if snapname.startswith(ignore_snapname):
89
+                    ignore = True
90
+                    break
91
+        if ignore:
92
+            continue
81 93
         if args.snapname:
82 94
             for allowed_snapname in args.snapname:
83 95
                 if snapname.startswith(allowed_snapname):
Browse code

Style checker corrections

Lorenz Hüdepohl authored on05/12/2017 21:46:55
Showing1 changed files
... ...
@@ -2,9 +2,7 @@
2 2
 import os
3 3
 import sys
4 4
 import locale
5
-import socket
6 5
 import argparse
7
-from shutil import which
8 6
 from subprocess import Popen, PIPE, run
9 7
 from collections import namedtuple
10 8
 
... ...
@@ -62,6 +60,7 @@ if not args.cache_dir and args.min_cache_size is not None:
62 60
 elif args.cache_dir and args.min_cache_size is None:
63 61
     args.min_cache_size = "50M"
64 62
 
63
+
65 64
 def check_returncode(proc, verbose=True):
66 65
     if not proc.returncode == 0:
67 66
         if verbose:
... ...
@@ -104,6 +103,7 @@ def prefix_ssh(location):
104 103
         ssh = []
105 104
     return ssh, host, location
106 105
 
106
+
107 107
 ssh_dest, dest_host, destination = prefix_ssh(args.destination)
108 108
 ssh_orig, orig_host, origin = prefix_ssh(args.origin)
109 109
 
... ...
@@ -155,6 +155,7 @@ def rsync(orig_url, dest_url):
155 155
         rsync.wait()
156 156
         check_returncode(rsync)
157 157
 
158
+
158 159
 def command_on_url(*commands, abort=True, dry_run=False, echo=False):
159 160
     *commands, url = commands
160 161
     if ":" in url:
... ...
@@ -179,17 +180,22 @@ def command_on_url(*commands, abort=True, dry_run=False, echo=False):
179 180
 
180 181
     return True
181 182
 
183
+
182 184
 def exists(url):
183 185
     return command_on_url("test", "-f", url, abort=False, dry_run=False, echo=False)
184 186
 
187
+
185 188
 def touch(url):
186 189
     return command_on_url("touch", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
187 190
 
191
+
188 192
 def rm(url):
189 193
     return command_on_url("rm", "-v", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
190 194
 
195
+
191 196
 MockStdout = namedtuple("MockStdout", ["cmd_string"])
192 197
 
198
+
193 199
 class echoPopen(Popen):
194 200
     def __init__(self, commands, stdin=None, stdout=None, **kwargs):
195 201
         if stdin is not None:
... ...
@@ -222,6 +228,7 @@ class echoPopen(Popen):
222 228
         if not args.dry_run:
223 229
             return super().__exit__(type, value, traceback)
224 230
 
231
+
225 232
 for fs, snapname in origin_snapshots:
226 233
     if fs in destinations and snapname in destinations[fs]:
227 234
         continue
... ...
@@ -269,7 +276,7 @@ for fs, snapname in origin_snapshots:
269 276
             if estimated_size * 1.25 > free_space:
270 277
                 print("Cannot store intermediate stream at {0}, would consume "
271 278
                       "about {1} MB free space in {2}, but there is just {3} MB available".format(
272
-                    where, estimated_size // 1000000, cache_dir, free_space // 1000000), file=sys.stderr)
279
+                          where, estimated_size // 1000000, cache_dir, free_space // 1000000), file=sys.stderr)
273 280
                 raise SystemExit(1)
274 281
 
275 282
         if not args.dry_run:
Browse code

Return SSH return code, be silent for SSH errors if not a terminal

Lorenz Hüdepohl authored on05/12/2017 19:55:42
Showing1 changed files
... ...
@@ -50,8 +50,8 @@ parser.add_argument('-n', '--dry-run', action="store_true",
50 50
                          'do not actually send anything.')
51 51
 
52 52
 parser.add_argument('-F', '--force', action="store_true",
53
-                    help='Pass `-F` to `zfs receive` to Only echo the transfer commands that would be issued,\n'
54
-                         'do not actually send anything.')
53
+                    help='Pass `-F` to `zfs receive` to overwrite other '
54
+                         'snapshots or diverged changes on the remote side\n')
55 55
 
56 56
 args = parser.parse_args()
57 57
 args.verbose = sum(args.verbose) if args.verbose is not None else 0
... ...
@@ -62,10 +62,11 @@ if not args.cache_dir and args.min_cache_size is not None:
62 62
 elif args.cache_dir and args.min_cache_size is None:
63 63
     args.min_cache_size = "50M"
64 64
 
65
-def check_returncode(proc):
65
+def check_returncode(proc, verbose=True):
66 66
     if not proc.returncode == 0:
67
-        print("\nCommand \"{0}\" returned error {1}, aborting...".format(" ".join(proc.args), proc.returncode), file=sys.stderr)
68
-        raise SystemExit(1)
67
+        if verbose:
68
+            print("\nCommand \"{0}\" returned error {1}, aborting...".format(" ".join(proc.args), proc.returncode), file=sys.stderr)
69
+        raise SystemExit(proc.returncode)
69 70
 
70 71
 
71 72
 def select_snapshots(proc, prefix):
... ...
@@ -89,7 +90,7 @@ def select_snapshots(proc, prefix):
89 90
     if proc.returncode == 1:
90 91
         if proc.stderr.read().endswith(b"dataset does not exist\n"):
91 92
             return []
92
-    check_returncode(proc)
93
+    check_returncode(proc, verbose=os.isatty(2))
93 94
     return res
94 95
 
95 96
 
Browse code

New option "-F" to force rollback to last snapshot

Lorenz Hüdepohl authored on30/11/2017 19:48:28
Showing1 changed files
... ...
@@ -49,6 +49,10 @@ parser.add_argument('-n', '--dry-run', action="store_true",
49 49
                     help='Only echo the transfer commands that would be issued,\n'
50 50
                          'do not actually send anything.')
51 51
 
52
+parser.add_argument('-F', '--force', action="store_true",
53
+                    help='Pass `-F` to `zfs receive` to Only echo the transfer commands that would be issued,\n'
54
+                         'do not actually send anything.')
55
+
52 56
 args = parser.parse_args()
53 57
 args.verbose = sum(args.verbose) if args.verbose is not None else 0
54 58
 
... ...
@@ -118,6 +122,11 @@ if args.verbose > 1:
118 122
 else:
119 123
     verbose = []
120 124
 
125
+if args.force:
126
+    force = ["-F"]
127
+else:
128
+    force = []
129
+
121 130
 with Popen(ssh_dest + ["zfs", "list", "-t", "snapshot", "-r", destination, "-H"], stdout=PIPE, stderr=PIPE) as proc:
122 131
     # Split into chunks for each fs
123 132
     destinations = {}
... ...
@@ -290,7 +299,7 @@ for fs, snapname in origin_snapshots:
290 299
     if use_cache_dir:
291 300
         pre_pipe = ["cat", dest_cache_filename, "|"] + pre_pipe
292 301
 
293
-    receive_cmd = ssh_dest + pre_pipe + ["zfs", "receive"] + verbose + ["-u", destination + fs]
302
+    receive_cmd = ssh_dest + pre_pipe + ["zfs", "receive"] + force + verbose + ["-u", destination + fs]
294 303
 
295 304
     send_shell = not ssh_orig
296 305
     if send_shell:
Browse code

Use "receive" instead of "recv" subcommand

Lorenz Hüdepohl authored on30/11/2017 19:48:00
Showing1 changed files
... ...
@@ -43,7 +43,7 @@ parser.add_argument('-z', '--compression', action="store_true",
43 43
                     help='Filter network streams through gzip/gunzip')
44 44
 
45 45
 parser.add_argument('-v', '--verbose', action="append_const", const=1,
46
-                    help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/recv commands')
46
+                    help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/receive commands')
47 47
 
48 48
 parser.add_argument('-n', '--dry-run', action="store_true",
49 49
                     help='Only echo the transfer commands that would be issued,\n'
... ...
@@ -290,15 +290,15 @@ for fs, snapname in origin_snapshots:
290 290
     if use_cache_dir:
291 291
         pre_pipe = ["cat", dest_cache_filename, "|"] + pre_pipe
292 292
 
293
-    recv_cmd = ssh_dest + pre_pipe + ["zfs", "recv"] + verbose + ["-u", destination + fs]
293
+    receive_cmd = ssh_dest + pre_pipe + ["zfs", "receive"] + verbose + ["-u", destination + fs]
294 294
 
295 295
     send_shell = not ssh_orig
296 296
     if send_shell:
297 297
         send_cmd = [" ".join(send_cmd)]
298 298
 
299
-    recv_shell = not ssh_dest
300
-    if recv_shell:
301
-        recv_cmd = [" ".join(recv_cmd)]
299
+    receive_shell = not ssh_dest
300
+    if receive_shell:
301
+        receive_cmd = [" ".join(receive_cmd)]
302 302
 
303 303
     # send
304 304
     if use_cache_dir:
... ...
@@ -313,14 +313,14 @@ for fs, snapname in origin_snapshots:
313 313
             print("# Resuming upload of partial stream file")
314 314
 
315 315
         rsync(orig_url, dest_url)
316
-        recv_stdin = None
316
+        receive_stdin = None
317 317
     else:
318
-        # direct `zfs send | zfs recv` pipe
318
+        # direct `zfs send | zfs receive` pipe
319 319
         sender = echoPopen(send_cmd, stdout=PIPE, shell=send_shell)
320
-        recv_stdin = sender.stdout
320
+        receive_stdin = sender.stdout
321 321
 
322
-    # recv
323
-    with echoPopen(recv_cmd, stdin=recv_stdin, shell=recv_shell) as receiver:
322
+    # receive
323
+    with echoPopen(receive_cmd, stdin=receive_stdin, shell=receive_shell) as receiver:
324 324
         receiver.wait()
325 325
         check_returncode(receiver)
326 326
 
Browse code

Allow multiple --snapname arguments

Lorenz Hüdepohl authored on25/11/2017 19:34:19
Showing1 changed files
... ...
@@ -27,8 +27,9 @@ parser.add_argument('destination', type=str,
27 27
                     help='Destination prefix, e.g. backup_pool/this_host. The ZFS snapshots from "origin" '
28 28
                          'are then stored below there. As with "origin", can be an SSH remote path')
29 29
 
30
-parser.add_argument('--snapname', metavar='TAGNAME', type=str,
31
-                    help='Only consider snapshots starting with TAGNAME')
30
+parser.add_argument('--snapname', metavar='TAGNAME', type=str, action='append',
31
+                    help='Only consider snapshot names starting with TAGNAME. '
32
+                         'Can be specified more than once')
32 33
 
33 34
 parser.add_argument('-c', '--cache-dir', metavar="DIR",
34 35
                     help='First create a temporary file in directory DIR with the `zfs send` stream, '
... ...
@@ -74,8 +75,10 @@ def select_snapshots(proc, prefix):
74 75
             raise SystemExit(1)
75 76
         fs = fs[len(prefix):]
76 77
         if args.snapname:
77
-            if snapname.startswith(args.snapname):
78
-                res.append((fs, snapname))
78
+            for allowed_snapname in args.snapname:
79
+                if snapname.startswith(allowed_snapname):
80
+                    res.append((fs, snapname))
81
+                    break
79 82
         else:
80 83
             res.append((fs, snapname))
81 84
     proc.wait()
Browse code

Merge branch 'master' of git.schokokeks.org:zfs-sync

Lorenz Hüdepohl authored on10/11/2017 00:32:11
Showing0 changed files
Browse code

Enable/disable compression via command line flag

Lorenz Hüdepohl authored on10/11/2017 00:30:54
Showing1 changed files
... ...
@@ -38,6 +38,9 @@ parser.add_argument('-c', '--cache-dir', metavar="DIR",
38 38
 parser.add_argument('-m', '--min-cache-size',
39 39
                     help='Minimum (estimated) size of stream to use the cache-dir funcitonality, default 50M')
40 40
 
41
+parser.add_argument('-z', '--compression', action="store_true",
42
+                    help='Filter network streams through gzip/gunzip')
43
+
41 44
 parser.add_argument('-v', '--verbose', action="append_const", const=1,
42 45
                     help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/recv commands')
43 46
 
... ...
@@ -96,8 +99,6 @@ def prefix_ssh(location):
96 99
 ssh_dest, dest_host, destination = prefix_ssh(args.destination)
97 100
 ssh_orig, orig_host, origin = prefix_ssh(args.origin)
98 101
 
99
-compression = ssh_orig or ssh_dest
100
-
101 102
 if args.cache_dir:
102 103
     if ":" in args.cache_dir:
103 104
         orig_cache_dir, dest_cache_dir = args.cache_dir.split(":")
... ...
@@ -272,12 +273,12 @@ for fs, snapname in origin_snapshots:
272 273
         orig_url = ((orig_host + ":") if orig_host else "") + orig_cache_filename
273 274
         dest_url = ((dest_host + ":") if dest_host else "") + dest_cache_filename
274 275
 
275
-    if compression:
276
+    if args.compression:
276 277
         send_cmd += ["|", "gzip"]
277 278
     if use_cache_dir:
278 279
         send_cmd += [">", orig_cache_filename]
279 280
 
280
-    if compression:
281
+    if args.compression:
281 282
         pre_pipe = ["gunzip", "|"]
282 283
     else:
283 284
         pre_pipe = []
Browse code

Fix logic error in command_on_url

This prevented the deletion of cached streams on the remote side

Lorenz Hüdepohl authored on08/11/2017 18:14:56
Showing1 changed files
... ...
@@ -155,14 +155,16 @@ def command_on_url(*commands, abort=True, dry_run=False, echo=False):
155 155
             print(*ssh, '"' + " ".join(commands + [filename]) + '"')
156 156
         else:
157 157
             print(*commands, filename)
158
-        return True
159
-    elif not dry_run:
158
+
159
+    if not dry_run:
160 160
         cmd = run(ssh + commands + [filename])
161 161
         if abort:
162 162
             check_returncode(cmd)
163 163
         else:
164 164
             return cmd.returncode == 0
165 165
 
166
+    return True
167
+
166 168
 def exists(url):
167 169
     return command_on_url("test", "-f", url, abort=False, dry_run=False, echo=False)
168 170
 
Browse code

Fix usage without "-v"

Lorenz Hüdepohl authored on03/11/2017 15:41:56
Showing1 changed files
... ...
@@ -176,16 +176,17 @@ MockStdout = namedtuple("MockStdout", ["cmd_string"])
176 176
 
177 177
 class echoPopen(Popen):
178 178
     def __init__(self, commands, stdin=None, stdout=None, **kwargs):
179
+        if stdin is not None:
180
+            in_pipe = stdin.cmd_string + " | "
181
+        else:
182
+            in_pipe = ""
183
+        if commands[0] == "ssh":
184
+            cmd = " ".join(commands[0:2]) + ' "' + " ".join(commands[2:]) + '"'
185
+        else:
186
+            cmd = " ".join(commands)
187
+        cmd_string = in_pipe + cmd
188
+
179 189
         if args.verbose or args.dry_run:
180
-            if stdin is not None:
181
-                in_pipe = stdin.cmd_string + " | "
182
-            else:
183
-                in_pipe = ""
184
-            if commands[0] == "ssh":
185
-                cmd = " ".join(commands[0:2]) + ' "' + " ".join(commands[2:]) + '"'
186
-            else:
187
-                cmd = " ".join(commands)
188
-            cmd_string = in_pipe + cmd
189 190
             if stdout is None:
190 191
                 print(cmd_string)
191 192
 
Browse code

Fix error in "-m" argument handling when "-c" is not given

Lorenz Hüdepohl authored on03/11/2017 15:20:35
Showing1 changed files
... ...
@@ -35,8 +35,8 @@ parser.add_argument('-c', '--cache-dir', metavar="DIR",
35 35
                          'then rsync that to the remote host. With that, one can resume '
36 36
                          'a partially sent file without starting over')
37 37
 
38
-parser.add_argument('-m', '--min-cache-size', default="50M",
39
-                    help='Minimum (estimated) size of stream to use the cache-dir funcitonality')
38
+parser.add_argument('-m', '--min-cache-size',
39
+                    help='Minimum (estimated) size of stream to use the cache-dir funcitonality, default 50M')
40 40
 
41 41
 parser.add_argument('-v', '--verbose', action="append_const", const=1,
42 42
                     help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/recv commands')
... ...
@@ -48,11 +48,11 @@ parser.add_argument('-n', '--dry-run', action="store_true",
48 48
 args = parser.parse_args()
49 49
 args.verbose = sum(args.verbose) if args.verbose is not None else 0
50 50
 
51
-no_defaults_parser = argparse.ArgumentParser(parents=(parser,), argument_default=None, add_help=False)
52
-no_defaults_args = no_defaults_parser.parse_args()
53
-if not args.cache_dir and no_defaults_args.min_cache_size is not None:
51
+if not args.cache_dir and args.min_cache_size is not None:
54 52
     print("Cannot specify -m/--min-cache-size without -c/--cache-dir", file=sys.stderr)
55 53
     raise SystemExit(1)
54
+elif args.cache_dir and args.min_cache_size is None:
55
+    args.min_cache_size = "50M"
56 56
 
57 57
 def check_returncode(proc):
58 58
     if not proc.returncode == 0:
Browse code

Small bugfixes and cleanup

Lorenz Hüdepohl authored on03/11/2017 14:17:55
Showing1 changed files
... ...
@@ -6,7 +6,6 @@ import socket
6 6
 import argparse
7 7
 from shutil import which
8 8
 from subprocess import Popen, PIPE, run
9
-from contextlib import contextmanager
10 9
 from collections import namedtuple
11 10
 
12 11
 sys.tracebacklimit = 0
... ...
@@ -36,7 +35,7 @@ parser.add_argument('-c', '--cache-dir', metavar="DIR",
36 35
                          'then rsync that to the remote host. With that, one can resume '
37 36
                          'a partially sent file without starting over')
38 37
 
39
-parser.add_argument('-m', '--min-cache-size', default="1M",
38
+parser.add_argument('-m', '--min-cache-size', default="50M",
40 39
                     help='Minimum (estimated) size of stream to use the cache-dir funcitonality')
41 40
 
42 41
 parser.add_argument('-v', '--verbose', action="append_const", const=1,
... ...
@@ -138,7 +137,7 @@ def rsync(orig_url, dest_url):
138 137
         progress = ["--info=progress2"]
139 138
     else:
140 139
         progress = []
141
-    with echoPopen(["rsync", "--partial"] + progress + ["--inplace", orig_url, dest_url]) as rsync:
140
+    with echoPopen(["rsync", "--append"] + progress + ["--inplace", orig_url, dest_url]) as rsync:
142 141
         rsync.wait()
143 142
         check_returncode(rsync)
144 143
 
... ...
@@ -171,36 +170,40 @@ def touch(url):
171 170
     return command_on_url("touch", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
172 171
 
173 172
 def rm(url):
174
-    return command_on_url("rm", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
173
+    return command_on_url("rm", "-v", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
175 174
 
176 175
 MockStdout = namedtuple("MockStdout", ["cmd_string"])
177
-MockPopen = namedtuple("MockPopen", ["stdout", "returncode", "wait"])
178
-def mockWait():
179
-    pass
180 176
 
181
-@contextmanager
182
-def echoPopen(commands, stdin=None, stdout=None, **kwargs):
183
-    if args.verbose or args.dry_run:
184
-        if stdin is not None:
185
-            in_pipe = stdin.cmd_string + " | "
186
-        else:
187
-            in_pipe = ""
188
-        if commands[0] == "ssh":
189
-            cmd = " ".join(commands[0:2]) + ' "' + " ".join(commands[2:]) + '"'
190
-        else:
191
-            cmd = " ".join(commands)
192
-        cmd_string = in_pipe + cmd
193
-        if stdout is None:
194
-            print(cmd_string)
177
+class echoPopen(Popen):
178
+    def __init__(self, commands, stdin=None, stdout=None, **kwargs):
179
+        if args.verbose or args.dry_run:
180
+            if stdin is not None:
181
+                in_pipe = stdin.cmd_string + " | "
182
+            else:
183
+                in_pipe = ""
184
+            if commands[0] == "ssh":
185
+                cmd = " ".join(commands[0:2]) + ' "' + " ".join(commands[2:]) + '"'
186
+            else:
187
+                cmd = " ".join(commands)
188
+            cmd_string = in_pipe + cmd
189
+            if stdout is None:
190
+                print(cmd_string)
195 191
 
196
-    if not args.dry_run:
197
-        with Popen(commands, stdin=stdin, stdout=stdout, **kwargs) as proc:
192
+        if not args.dry_run:
193
+            super().__init__(commands, stdin=stdin, stdout=stdout, **kwargs)
198 194
             if stdout is not None:
199
-                proc.stdout.cmd_string = cmd_string
200
-            yield proc
201
-    else:
202
-        yield MockPopen(MockStdout(cmd_string), 0, mockWait)
195
+                self.stdout.cmd_string = cmd_string
196
+        else:
197
+            self.stdout = MockStdout(cmd_string)
198
+            self.returncode = 0
199
+
200
+    def wait(self):
201
+        if not args.dry_run:
202
+            super().wait()
203 203
 
204
+    def __exit__(self, type, value, traceback):
205
+        if not args.dry_run:
206
+            return super().__exit__(type, value, traceback)
204 207
 
205 208
 for fs, snapname in origin_snapshots:
206 209
     if fs in destinations and snapname in destinations[fs]:
... ...
@@ -280,7 +283,7 @@ for fs, snapname in origin_snapshots:
280 283
     if use_cache_dir:
281 284
         pre_pipe = ["cat", dest_cache_filename, "|"] + pre_pipe
282 285
 
283
-    recv_cmd = ssh_dest + pre_pipe + ["zfs", "recv", "-u", destination + fs]
286
+    recv_cmd = ssh_dest + pre_pipe + ["zfs", "recv"] + verbose + ["-u", destination + fs]
284 287
 
285 288
     send_shell = not ssh_orig
286 289
     if send_shell:
... ...
@@ -306,7 +309,7 @@ for fs, snapname in origin_snapshots:
306 309
         recv_stdin = None
307 310
     else:
308 311
         # direct `zfs send | zfs recv` pipe
309
-        sender = echoPopen(send_cmd, shell=send_shell)
312
+        sender = echoPopen(send_cmd, stdout=PIPE, shell=send_shell)
310 313
         recv_stdin = sender.stdout
311 314
 
312 315
     # recv
Browse code

Refactor of 'dry-run' and 'verbose' options

Lorenz Hüdepohl authored on13/09/2017 23:05:19
Showing1 changed files
... ...
@@ -6,10 +6,10 @@ import socket
6 6
 import argparse
7 7
 from shutil import which
8 8
 from subprocess import Popen, PIPE, run
9
+from contextlib import contextmanager
10
+from collections import namedtuple
9 11
 
10
-import sys
11 12
 sys.tracebacklimit = 0
12
-
13 13
 encoding = locale.getdefaultlocale()[1]
14 14
 
15 15
 parser = argparse.ArgumentParser(
... ...
@@ -31,7 +31,7 @@ parser.add_argument('destination', type=str,
31 31
 parser.add_argument('--snapname', metavar='TAGNAME', type=str,
32 32
                     help='Only consider snapshots starting with TAGNAME')
33 33
 
34
-parser.add_argument('-c', '--cache-dir', nargs='?', metavar="DIR",
34
+parser.add_argument('-c', '--cache-dir', metavar="DIR",
35 35
                     help='First create a temporary file in directory DIR with the `zfs send` stream, '
36 36
                          'then rsync that to the remote host. With that, one can resume '
37 37
                          'a partially sent file without starting over')
... ...
@@ -39,15 +39,21 @@ parser.add_argument('-c', '--cache-dir', nargs='?', metavar="DIR",
39 39
 parser.add_argument('-m', '--min-cache-size', default="1M",
40 40
                     help='Minimum (estimated) size of stream to use the cache-dir funcitonality')
41 41
 
42
-parser.add_argument('-v', '--verbose', action="store_true",
43
-                    help='Echo the commands that are issued.')
42
+parser.add_argument('-v', '--verbose', action="append_const", const=1,
43
+                    help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/recv commands')
44 44
 
45 45
 parser.add_argument('-n', '--dry-run', action="store_true",
46 46
                     help='Only echo the transfer commands that would be issued,\n'
47 47
                          'do not actually send anything.')
48 48
 
49 49
 args = parser.parse_args()
50
+args.verbose = sum(args.verbose) if args.verbose is not None else 0
50 51
 
52
+no_defaults_parser = argparse.ArgumentParser(parents=(parser,), argument_default=None, add_help=False)
53
+no_defaults_args = no_defaults_parser.parse_args()
54
+if not args.cache_dir and no_defaults_args.min_cache_size is not None:
55
+    print("Cannot specify -m/--min-cache-size without -c/--cache-dir", file=sys.stderr)
56
+    raise SystemExit(1)
51 57
 
52 58
 def check_returncode(proc):
53 59
     if not proc.returncode == 0:
... ...
@@ -94,9 +100,9 @@ ssh_orig, orig_host, origin = prefix_ssh(args.origin)
94 100
 compression = ssh_orig or ssh_dest
95 101
 
96 102
 if args.cache_dir:
97
-    try:
103
+    if ":" in args.cache_dir:
98 104
         orig_cache_dir, dest_cache_dir = args.cache_dir.split(":")
99
-    except:
105
+    else:
100 106
         orig_cache_dir, dest_cache_dir = (args.cache_dir,) * 2
101 107
     if not (bool(ssh_orig) != bool(ssh_dest)):
102 108
         print("-c/--cache-dir is not supported for two local or two remote filesystems", file=sys.stderr)
... ...
@@ -104,7 +110,7 @@ if args.cache_dir:
104 110
 
105 111
     min_cache_size = int(args.min_cache_size.replace("k", "000").replace("M", "000000").replace("G", "000000000"))
106 112
 
107
-if args.verbose:
113
+if args.verbose > 1:
108 114
     verbose = ["-v"]
109 115
 else:
110 116
     verbose = []
... ...
@@ -128,10 +134,15 @@ for fs, snapname in origin_snapshots:
128 134
 
129 135
 
130 136
 def rsync(orig_url, dest_url):
131
-    rsync = run(["rsync", "--partial", "--inplace", orig_url, dest_url])
132
-    check_returncode(rsync)
137
+    if args.verbose > 1:
138
+        progress = ["--info=progress2"]
139
+    else:
140
+        progress = []
141
+    with echoPopen(["rsync", "--partial"] + progress + ["--inplace", orig_url, dest_url]) as rsync:
142
+        rsync.wait()
143
+        check_returncode(rsync)
133 144
 
134
-def command_on_url(*commands, abort=True):
145
+def command_on_url(*commands, abort=True, dry_run=False, echo=False):
135 146
     *commands, url = commands
136 147
     if ":" in url:
137 148
         host, filename = url.split(":")
... ...
@@ -139,20 +150,57 @@ def command_on_url(*commands, abort=True):
139 150
     else:
140 151
         filename = url
141 152
         ssh = []
142
-    cmd = run(ssh + commands + [filename])
143
-    if abort:
144
-        check_returncode(cmd)
145
-    else:
146
-        return cmd.returncode == 0
153
+
154
+    if echo:
155
+        if ssh:
156
+            print(*ssh, '"' + " ".join(commands + [filename]) + '"')
157
+        else:
158
+            print(*commands, filename)
159
+        return True
160
+    elif not dry_run:
161
+        cmd = run(ssh + commands + [filename])
162
+        if abort:
163
+            check_returncode(cmd)
164
+        else:
165
+            return cmd.returncode == 0
147 166
 
148 167
 def exists(url):
149
-    return command_on_url("test", "-f", url, abort=False)
168
+    return command_on_url("test", "-f", url, abort=False, dry_run=False, echo=False)
150 169
 
151 170
 def touch(url):
152
-    return command_on_url("touch", url)
171
+    return command_on_url("touch", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
153 172
 
154 173
 def rm(url):
155
-    return command_on_url("rm", url)
174
+    return command_on_url("rm", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
175
+
176
+MockStdout = namedtuple("MockStdout", ["cmd_string"])
177
+MockPopen = namedtuple("MockPopen", ["stdout", "returncode", "wait"])
178
+def mockWait():
179
+    pass
180
+
181
+@contextmanager
182
+def echoPopen(commands, stdin=None, stdout=None, **kwargs):
183
+    if args.verbose or args.dry_run:
184
+        if stdin is not None:
185
+            in_pipe = stdin.cmd_string + " | "
186
+        else:
187
+            in_pipe = ""
188
+        if commands[0] == "ssh":
189
+            cmd = " ".join(commands[0:2]) + ' "' + " ".join(commands[2:]) + '"'
190
+        else:
191
+            cmd = " ".join(commands)
192
+        cmd_string = in_pipe + cmd
193
+        if stdout is None:
194
+            print(cmd_string)
195
+
196
+    if not args.dry_run:
197
+        with Popen(commands, stdin=stdin, stdout=stdout, **kwargs) as proc:
198
+            if stdout is not None:
199
+                proc.stdout.cmd_string = cmd_string
200
+            yield proc
201
+    else:
202
+        yield MockPopen(MockStdout(cmd_string), 0, mockWait)
203
+
156 204
 
157 205
 for fs, snapname in origin_snapshots:
158 206
     if fs in destinations and snapname in destinations[fs]:
... ...
@@ -186,6 +234,8 @@ for fs, snapname in origin_snapshots:
186 234
             print("# Using rsync cache for this transfer")
187 235
         elif args.verbose:
188 236
             print("# Not using rsync cache for this transfer")
237
+    else:
238
+        use_cache_dir = False
189 239
 
190 240
     if use_cache_dir:
191 241
         def check_space(ssh_prefix, where, cache_dir):
... ...
@@ -232,34 +282,6 @@ for fs, snapname in origin_snapshots:
232 282
 
233 283
     recv_cmd = ssh_dest + pre_pipe + ["zfs", "recv", "-u", destination + fs]
234 284
 
235
-    if args.verbose or args.dry_run:
236
-        if send_cmd[0] == "ssh":
237
-            print_send_cmd = send_cmd[0:2] + ['"'] + send_cmd[2:] + ['"']
238
-            if use_cache_dir:
239
-                print_rm_orig_cmd = ssh_orig + ['"', "rm", orig_cache_filename, '"']
240
-        else:
241
-            print_send_cmd = send_cmd
242
-            if use_cache_dir:
243
-                print_rm_orig_cmd = ["rm", orig_cache_filename]
244
-
245
-        if recv_cmd[0] == "ssh":
246
-            print_recv_cmd = recv_cmd[0:2] + ['"'] + recv_cmd[2:] + ['"']
247
-            if use_cache_dir:
248
-                print_rm_dest_cmd = ssh_dest + ['"', "rm", dest_cache_filename, '"']
249
-        else:
250
-            print_recv_cmd = recv_cmd
251
-            if use_cache_dir:
252
-                print_rm_dest_cmd = ["rm", dest_cache_filename]
253
-
254
-        if use_cache_dir:
255
-            print(*print_send_cmd)
256
-            print("rsync", "--partial", "--inplace", orig_url, dest_url)
257
-            print(*print_rm_orig_cmd)
258
-            print(*print_recv_cmd)
259
-            print(*print_rm_dest_cmd)
260
-        else:
261
-            print(" ".join(print_send_cmd), "|", " ".join(print_recv_cmd))
262
-
263 285
     send_shell = not ssh_orig
264 286
     if send_shell:
265 287
         send_cmd = [" ".join(send_cmd)]
... ...
@@ -268,43 +290,41 @@ for fs, snapname in origin_snapshots:
268 290
     if recv_shell:
269 291
         recv_cmd = [" ".join(recv_cmd)]
270 292
 
271
-    if not args.dry_run:
272
-
273
-        # send
274
-        if use_cache_dir:
275
-            # Via rsync'ed stream files
276
-            if not exists(orig_url + ".total"):
277
-                # Create stream file
278
-                with Popen(send_cmd, shell=send_shell) as sender:
279
-                    sender.wait()
280
-                    check_returncode(sender)
281
-                touch(orig_url + ".total")
282
-            else:
283
-                print("# Resuming upload of partial stream file")
284
-
285
-            rsync(orig_url, dest_url)
286
-            recv_stdin = None
287
-        else:
288
-            # direct `zfs send | zfs recv` pipe
289
-            sender = Popen(send_cmd, shell=send_shell)
290
-            recv_stdin = sender.stdout
291
-
292
-        # recv
293
-        with Popen(recv_cmd, stdin=recv_stdin, shell=recv_shell) as receiver:
294
-            receiver.wait()
295
-            check_returncode(receiver)
296
-
297
-        # Cleanup
298
-        if not use_cache_dir:
299
-            sender.wait()
300
-            check_returncode(sender)
293
+    # send
294
+    if use_cache_dir:
295
+        # Via rsync'ed stream files
296
+        if not exists(orig_url + ".total"):
297
+            # Create stream file
298
+            with echoPopen(send_cmd, shell=send_shell) as sender:
299
+                sender.wait()
300
+                check_returncode(sender)
301
+            touch(orig_url + ".total")
301 302
         else:
302
-            rm(orig_url + ".total")
303
-            rm(orig_url)
304
-            rm(dest_url)
303
+            print("# Resuming upload of partial stream file")
304
+
305
+        rsync(orig_url, dest_url)
306
+        recv_stdin = None
307
+    else:
308
+        # direct `zfs send | zfs recv` pipe
309
+        sender = echoPopen(send_cmd, shell=send_shell)
310
+        recv_stdin = sender.stdout
311
+
312
+    # recv
313
+    with echoPopen(recv_cmd, stdin=recv_stdin, shell=recv_shell) as receiver:
314
+        receiver.wait()
315
+        check_returncode(receiver)
316
+
317
+    # Cleanup
318
+    if not use_cache_dir:
319
+        sender.wait()
320
+        check_returncode(sender)
321
+    else:
322
+        rm(orig_url + ".total")
323
+        rm(orig_url)
324
+        rm(dest_url)
305 325
 
306 326
     destinations[fs].append(snapname)
307
-    if args.verbose:
327
+    if args.verbose or args.dry_run:
308 328
         print()
309 329
 
310 330
 # test
Browse code

Rename into zfs-sync

Lorenz Hüdepohl authored on13/09/2017 23:05:13
Showing1 changed files
1 1
new file mode 100755
... ...
@@ -0,0 +1,313 @@
1
+#!/usr/bin/python3
2
+import os
3
+import sys
4
+import locale
5
+import socket
6
+import argparse
7
+from shutil import which
8
+from subprocess import Popen, PIPE, run
9
+
10
+import sys
11
+sys.tracebacklimit = 0
12
+
13
+encoding = locale.getdefaultlocale()[1]
14
+
15
+parser = argparse.ArgumentParser(
16
+    description='This utility uses zfs send/receive to transfer snapshots from '
17
+                'one zfs pool/filesytem to another. If preexisting snapshots with the same '
18
+                'name exist on both sides they are assumed to hold identical state and only '
19
+                'incremental send and receives are done to reduce the amount of data transferred.',
20
+    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
21
+    fromfile_prefix_chars='@'
22
+)
23
+
24
+parser.add_argument('origin', type=str,
25
+                    help='Origin zfs filesystem, could be an SSH remote path, e.g. "user@host:pool/fs"')
26
+
27
+parser.add_argument('destination', type=str,
28
+                    help='Destination prefix, e.g. backup_pool/this_host. The ZFS snapshots from "origin" '
29
+                         'are then stored below there. As with "origin", can be an SSH remote path')
30
+
31
+parser.add_argument('--snapname', metavar='TAGNAME', type=str,
32
+                    help='Only consider snapshots starting with TAGNAME')
33
+
34
+parser.add_argument('-c', '--cache-dir', nargs='?', metavar="DIR",
35
+                    help='First create a temporary file in directory DIR with the `zfs send` stream, '
36
+                         'then rsync that to the remote host. With that, one can resume '
37
+                         'a partially sent file without starting over')
38
+
39
+parser.add_argument('-m', '--min-cache-size', default="1M",
40
+                    help='Minimum (estimated) size of stream to use the cache-dir funcitonality')
41
+
42
+parser.add_argument('-v', '--verbose', action="store_true",
43
+                    help='Echo the commands that are issued.')
44
+
45
+parser.add_argument('-n', '--dry-run', action="store_true",
46
+                    help='Only echo the transfer commands that would be issued,\n'
47
+                         'do not actually send anything.')
48
+
49
+args = parser.parse_args()
50
+
51
+
52
+def check_returncode(proc):
53
+    if not proc.returncode == 0:
54
+        print("\nCommand \"{0}\" returned error {1}, aborting...".format(" ".join(proc.args), proc.returncode), file=sys.stderr)
55
+        raise SystemExit(1)
56
+
57
+
58
+def select_snapshots(proc, prefix):
59
+    res = []
60
+    for line in proc.stdout:
61
+        snapshot, *_ = line.decode(encoding).split("\t")
62
+        fs, snapname = snapshot.split("@", 2)
63
+        if not fs.startswith(prefix):
64
+            print(("Unexpexted filesystem: \"{0}\", "
65
+                   "expected a string starting with \"{1}\"").format(fs, prefix), file=sys.stderr)
66
+            raise SystemExit(1)
67
+        fs = fs[len(prefix):]
68
+        if args.snapname:
69
+            if snapname.startswith(args.snapname):
70
+                res.append((fs, snapname))
71
+        else:
72
+            res.append((fs, snapname))
73
+    proc.wait()
74
+    if proc.returncode == 1:
75
+        if proc.stderr.read().endswith(b"dataset does not exist\n"):
76
+            return []
77
+    check_returncode(proc)
78
+    return res
79
+
80
+
81
+def prefix_ssh(location):
82
+    if ":" in location:
83
+        host, location = location.split(":")
84
+        ssh = ["ssh", host]
85
+    else:
86
+        location = location
87
+        host = ""
88
+        ssh = []
89
+    return ssh, host, location
90
+
91
+ssh_dest, dest_host, destination = prefix_ssh(args.destination)
92
+ssh_orig, orig_host, origin = prefix_ssh(args.origin)
93
+
94
+compression = ssh_orig or ssh_dest
95
+
96
+if args.cache_dir:
97
+    try:
98
+        orig_cache_dir, dest_cache_dir = args.cache_dir.split(":")
99
+    except:
100
+        orig_cache_dir, dest_cache_dir = (args.cache_dir,) * 2
101
+    if not (bool(ssh_orig) != bool(ssh_dest)):
102
+        print("-c/--cache-dir is not supported for two local or two remote filesystems", file=sys.stderr)
103
+        raise SystemExit(1)
104
+
105
+    min_cache_size = int(args.min_cache_size.replace("k", "000").replace("M", "000000").replace("G", "000000000"))
106
+
107
+if args.verbose:
108
+    verbose = ["-v"]
109
+else:
110
+    verbose = []
111
+
112
+with Popen(ssh_dest + ["zfs", "list", "-t", "snapshot", "-r", destination, "-H"], stdout=PIPE, stderr=PIPE) as proc:
113
+    # Split into chunks for each fs
114
+    destinations = {}
115
+    for fs, snapname in select_snapshots(proc, destination):
116
+        if fs not in destinations:
117
+            destinations[fs] = []
118
+        destinations[fs].append(snapname)
119
+
120
+with Popen(ssh_orig + ["zfs", "list", "-t", "snapshot", "-r", origin, "-H"], stdout=PIPE, stderr=PIPE) as proc:
121
+    origin_snapshots = select_snapshots(proc, origin)
122
+
123
+# Make a dictionary of all origin snapshots
124
+# to quickly test if they exist
125
+origin_dict = {}
126
+for fs, snapname in origin_snapshots:
127
+    origin_dict[fs, snapname] = True
128
+
129
+
130
+def rsync(orig_url, dest_url):
131
+    rsync = run(["rsync", "--partial", "--inplace", orig_url, dest_url])
132
+    check_returncode(rsync)
133
+
134
+def command_on_url(*commands, abort=True):
135
+    *commands, url = commands
136
+    if ":" in url:
137
+        host, filename = url.split(":")
138
+        ssh = ["ssh", host]
139
+    else:
140
+        filename = url
141
+        ssh = []
142
+    cmd = run(ssh + commands + [filename])
143
+    if abort:
144
+        check_returncode(cmd)
145
+    else:
146
+        return cmd.returncode == 0
147
+
148
+def exists(url):
149
+    return command_on_url("test", "-f", url, abort=False)
150
+
151
+def touch(url):
152
+    return command_on_url("touch", url)
153
+
154
+def rm(url):
155
+    return command_on_url("rm", url)
156
+
157
+for fs, snapname in origin_snapshots:
158
+    if fs in destinations and snapname in destinations[fs]:
159
+        continue
160
+    if fs not in destinations:
161
+        # Send full
162
+        last = None
163
+        destinations[fs] = []
164
+    else:
165
+        # Find older common snapshot
166
+        last = None
167
+        for old_snapshot in destinations[fs]:
168
+            if (fs, old_snapshot) in origin_dict:
169
+                last = old_snapshot
170
+
171
+    if last:
172
+        send_cmd = ssh_orig + ["zfs", "send"] + verbose + ["-i", "@{0}".format(last), "{0}{1}@{2}".format(origin, fs, snapname)]
173
+    else:
174
+        send_cmd = ssh_orig + ["zfs", "send"] + verbose + ["{0}{1}@{2}".format(origin, fs, snapname)]
175
+
176
+    # Check free space
177
+    if args.cache_dir:
178
+        with Popen(send_cmd + ["-nP"], stdout=PIPE) as proc:
179
+            for line in proc.stdout:
180
+                line = line.decode()
181
+                if line.startswith("size"):
182
+                    estimated_size = int(line.split()[1])
183
+
184
+        use_cache_dir = estimated_size > min_cache_size
185
+        if use_cache_dir and args.verbose:
186
+            print("# Using rsync cache for this transfer")
187
+        elif args.verbose:
188
+            print("# Not using rsync cache for this transfer")
189
+
190
+    if use_cache_dir:
191
+        def check_space(ssh_prefix, where, cache_dir):
192
+            with Popen(ssh_prefix + ["df", "-B1", cache_dir], stdout=PIPE) as df:
193
+                for line in df.stdout:
194
+                    line = line.decode()
195
+                df.wait()
196
+                check_returncode(df)
197
+                free_space = int(line.split()[3])
198
+
199
+            if estimated_size * 1.25 > free_space:
200
+                print("Cannot store intermediate stream at {0}, would consume "
201
+                      "about {1} MB free space in {2}, but there is just {3} MB available".format(
202
+                    where, estimated_size // 1000000, cache_dir, free_space // 1000000), file=sys.stderr)
203
+                raise SystemExit(1)
204
+
205
+        if not args.dry_run:
206
+            check_space(ssh_orig, "origin", orig_cache_dir)
207
+            check_space(ssh_dest, "destination", dest_cache_dir)
208
+        cache_filename = origin + fs
209
+        if last:
210
+            cache_filename += "@" + last
211
+        cache_filename += "@" + snapname
212
+
213
+        # replace "/" and ":"
214
+        cache_filename = cache_filename.replace("/", "_").replace(":", "_")
215
+
216
+        dest_cache_filename = os.path.join(dest_cache_dir, cache_filename)
217
+        orig_cache_filename = os.path.join(orig_cache_dir, cache_filename)
218
+        orig_url = ((orig_host + ":") if orig_host else "") + orig_cache_filename
219
+        dest_url = ((dest_host + ":") if dest_host else "") + dest_cache_filename
220
+
221
+    if compression:
222
+        send_cmd += ["|", "gzip"]
223
+    if use_cache_dir:
224
+        send_cmd += [">", orig_cache_filename]
225
+
226
+    if compression:
227
+        pre_pipe = ["gunzip", "|"]
228
+    else:
229
+        pre_pipe = []
230
+    if use_cache_dir:
231
+        pre_pipe = ["cat", dest_cache_filename, "|"] + pre_pipe
232
+
233
+    recv_cmd = ssh_dest + pre_pipe + ["zfs", "recv", "-u", destination + fs]
234
+
235
+    if args.verbose or args.dry_run:
236
+        if send_cmd[0] == "ssh":
237
+            print_send_cmd = send_cmd[0:2] + ['"'] + send_cmd[2:] + ['"']
238
+            if use_cache_dir:
239
+                print_rm_orig_cmd = ssh_orig + ['"', "rm", orig_cache_filename, '"']
240
+        else:
241
+            print_send_cmd = send_cmd
242
+            if use_cache_dir:
243
+                print_rm_orig_cmd = ["rm", orig_cache_filename]
244
+
245
+        if recv_cmd[0] == "ssh":
246
+            print_recv_cmd = recv_cmd[0:2] + ['"'] + recv_cmd[2:] + ['"']
247
+            if use_cache_dir:
248
+                print_rm_dest_cmd = ssh_dest + ['"', "rm", dest_cache_filename, '"']
249
+        else:
250
+            print_recv_cmd = recv_cmd
251
+            if use_cache_dir:
252
+                print_rm_dest_cmd = ["rm", dest_cache_filename]
253
+
254
+        if use_cache_dir:
255
+            print(*print_send_cmd)
256
+            print("rsync", "--partial", "--inplace", orig_url, dest_url)
257
+            print(*print_rm_orig_cmd)
258
+            print(*print_recv_cmd)
259
+            print(*print_rm_dest_cmd)
260
+        else:
261
+            print(" ".join(print_send_cmd), "|", " ".join(print_recv_cmd))
262
+
263
+    send_shell = not ssh_orig
264
+    if send_shell:
265
+        send_cmd = [" ".join(send_cmd)]
266
+
267
+    recv_shell = not ssh_dest
268
+    if recv_shell:
269
+        recv_cmd = [" ".join(recv_cmd)]
270
+
271
+    if not args.dry_run:
272
+
273
+        # send
274
+        if use_cache_dir:
275
+            # Via rsync'ed stream files
276
+            if not exists(orig_url + ".total"):
277
+                # Create stream file
278
+                with Popen(send_cmd, shell=send_shell) as sender:
279
+                    sender.wait()
280
+                    check_returncode(sender)
281
+                touch(orig_url + ".total")
282
+            else:
283
+                print("# Resuming upload of partial stream file")
284
+
285
+            rsync(orig_url, dest_url)
286
+            recv_stdin = None
287
+        else:
288
+            # direct `zfs send | zfs recv` pipe
289
+            sender = Popen(send_cmd, shell=send_shell)
290
+            recv_stdin = sender.stdout
291
+
292
+        # recv
293
+        with Popen(recv_cmd, stdin=recv_stdin, shell=recv_shell) as receiver:
294
+            receiver.wait()
295
+            check_returncode(receiver)
296
+
297
+        # Cleanup
298
+        if not use_cache_dir:
299
+            sender.wait()
300
+            check_returncode(sender)
301
+        else:
302
+            rm(orig_url + ".total")
303
+            rm(orig_url)
304
+            rm(dest_url)
305
+
306
+    destinations[fs].append(snapname)
307
+    if args.verbose:
308
+        print()
309
+
310
+# test
311
+for fs, snapname in origin_snapshots:
312
+    assert(fs in destinations)
313
+    assert(snapname in destinations[fs])