zfs-sync
a02f4d34
 #!/usr/bin/python3
 import os
 import sys
 import locale
 import socket
 import argparse
 from shutil import which
d10a93cf
 from subprocess import Popen, PIPE, run
0bbb80fa
 from collections import namedtuple
a02f4d34
 
d10a93cf
 sys.tracebacklimit = 0
a02f4d34
 encoding = locale.getdefaultlocale()[1]
 
 parser = argparse.ArgumentParser(
d10a93cf
     description='This utility uses zfs send/receive to transfer snapshots from '
                 'one zfs pool/filesytem to another. If preexisting snapshots with the same '
                 'name exist on both sides they are assumed to hold identical state and only '
                 'incremental send and receives are done to reduce the amount of data transferred.',
     formatter_class=argparse.ArgumentDefaultsHelpFormatter,
     fromfile_prefix_chars='@'
 )
a02f4d34
 
46737daf
 parser.add_argument('origin', type=str,
d10a93cf
                     help='Origin zfs filesystem, could be an SSH remote path, e.g. "user@host:pool/fs"')
a02f4d34
 
46737daf
 parser.add_argument('destination', type=str,
d10a93cf
                     help='Destination prefix, e.g. backup_pool/this_host. The ZFS snapshots from "origin" '
                          'are then stored below there. As with "origin", can be an SSH remote path')
a02f4d34
 
46737daf
 parser.add_argument('--snapname', metavar='TAGNAME', type=str,
d10a93cf
                     help='Only consider snapshots starting with TAGNAME')
 
0bbb80fa
 parser.add_argument('-c', '--cache-dir', metavar="DIR",
d10a93cf
                     help='First create a temporary file in directory DIR with the `zfs send` stream, '
                          'then rsync that to the remote host. With that, one can resume '
                          'a partially sent file without starting over')
 
713a191e
 parser.add_argument('-m', '--min-cache-size',
                     help='Minimum (estimated) size of stream to use the cache-dir funcitonality, default 50M')
a02f4d34
 
0bbb80fa
 parser.add_argument('-v', '--verbose', action="append_const", const=1,
                     help='Echo the commands that are issued. Two -v pass a -v along to the zfs send/recv commands')
a02f4d34
 
 parser.add_argument('-n', '--dry-run', action="store_true",
d10a93cf
                     help='Only echo the transfer commands that would be issued,\n'
                          'do not actually send anything.')
a02f4d34
 
 args = parser.parse_args()
0bbb80fa
 args.verbose = sum(args.verbose) if args.verbose is not None else 0
a02f4d34
 
713a191e
 if not args.cache_dir and args.min_cache_size is not None:
0bbb80fa
     print("Cannot specify -m/--min-cache-size without -c/--cache-dir", file=sys.stderr)
     raise SystemExit(1)
713a191e
 elif args.cache_dir and args.min_cache_size is None:
     args.min_cache_size = "50M"
d10a93cf
 
a02f4d34
 def check_returncode(proc):
     if not proc.returncode == 0:
d10a93cf
         print("\nCommand \"{0}\" returned error {1}, aborting...".format(" ".join(proc.args), proc.returncode), file=sys.stderr)
a02f4d34
         raise SystemExit(1)
 
d10a93cf
 
a02f4d34
 def select_snapshots(proc, prefix):
     res = []
     for line in proc.stdout:
         snapshot, *_ = line.decode(encoding).split("\t")
         fs, snapname = snapshot.split("@", 2)
         if not fs.startswith(prefix):
             print(("Unexpexted filesystem: \"{0}\", "
                    "expected a string starting with \"{1}\"").format(fs, prefix), file=sys.stderr)
             raise SystemExit(1)
         fs = fs[len(prefix):]
dbeb9507
         if args.snapname:
             if snapname.startswith(args.snapname):
                 res.append((fs, snapname))
         else:
a02f4d34
             res.append((fs, snapname))
     proc.wait()
d10a93cf
     if proc.returncode == 1:
         if proc.stderr.read().endswith(b"dataset does not exist\n"):
             return []
a02f4d34
     check_returncode(proc)
     return res
 
d10a93cf
 
dc600894
 def prefix_ssh(location):
     if ":" in location:
         host, location = location.split(":")
         ssh = ["ssh", host]
     else:
         location = location
d10a93cf
         host = ""
dc600894
         ssh = []
d10a93cf
     return ssh, host, location
 
 ssh_dest, dest_host, destination = prefix_ssh(args.destination)
 ssh_orig, orig_host, origin = prefix_ssh(args.origin)
dc600894
 
d10a93cf
 compression = ssh_orig or ssh_dest
 
 if args.cache_dir:
0bbb80fa
     if ":" in args.cache_dir:
d10a93cf
         orig_cache_dir, dest_cache_dir = args.cache_dir.split(":")
0bbb80fa
     else:
d10a93cf
         orig_cache_dir, dest_cache_dir = (args.cache_dir,) * 2
     if not (bool(ssh_orig) != bool(ssh_dest)):
         print("-c/--cache-dir is not supported for two local or two remote filesystems", file=sys.stderr)
         raise SystemExit(1)
 
     min_cache_size = int(args.min_cache_size.replace("k", "000").replace("M", "000000").replace("G", "000000000"))
a02f4d34
 
0bbb80fa
 if args.verbose > 1:
a02f4d34
     verbose = ["-v"]
 else:
     verbose = []
 
d10a93cf
 with Popen(ssh_dest + ["zfs", "list", "-t", "snapshot", "-r", destination, "-H"], stdout=PIPE, stderr=PIPE) as proc:
a02f4d34
     # Split into chunks for each fs
     destinations = {}
     for fs, snapname in select_snapshots(proc, destination):
d10a93cf
         if fs not in destinations:
a02f4d34
             destinations[fs] = []
         destinations[fs].append(snapname)
 
d10a93cf
 with Popen(ssh_orig + ["zfs", "list", "-t", "snapshot", "-r", origin, "-H"], stdout=PIPE, stderr=PIPE) as proc:
dc600894
     origin_snapshots = select_snapshots(proc, origin)
a02f4d34
 
 # Make a dictionary of all origin snapshots
 # to quickly test if they exist
dc600894
 origin_dict = {}
a02f4d34
 for fs, snapname in origin_snapshots:
dc600894
     origin_dict[fs, snapname] = True
a02f4d34
 
d10a93cf
 
 def rsync(orig_url, dest_url):
0bbb80fa
     if args.verbose > 1:
         progress = ["--info=progress2"]
     else:
         progress = []
66a8842c
     with echoPopen(["rsync", "--append"] + progress + ["--inplace", orig_url, dest_url]) as rsync:
0bbb80fa
         rsync.wait()
         check_returncode(rsync)
d10a93cf
 
0bbb80fa
 def command_on_url(*commands, abort=True, dry_run=False, echo=False):
d10a93cf
     *commands, url = commands
     if ":" in url:
         host, filename = url.split(":")
         ssh = ["ssh", host]
     else:
         filename = url
         ssh = []
0bbb80fa
 
     if echo:
         if ssh:
             print(*ssh, '"' + " ".join(commands + [filename]) + '"')
         else:
             print(*commands, filename)
         return True
     elif not dry_run:
         cmd = run(ssh + commands + [filename])
         if abort:
             check_returncode(cmd)
         else:
             return cmd.returncode == 0
d10a93cf
 
 def exists(url):
0bbb80fa
     return command_on_url("test", "-f", url, abort=False, dry_run=False, echo=False)
d10a93cf
 
 def touch(url):
0bbb80fa
     return command_on_url("touch", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
d10a93cf
 
 def rm(url):
66a8842c
     return command_on_url("rm", "-v", url, dry_run=args.dry_run, echo=args.dry_run or args.verbose)
0bbb80fa
 
 MockStdout = namedtuple("MockStdout", ["cmd_string"])
 
66a8842c
 class echoPopen(Popen):
     def __init__(self, commands, stdin=None, stdout=None, **kwargs):
         if args.verbose or args.dry_run:
             if stdin is not None:
                 in_pipe = stdin.cmd_string + " | "
             else:
                 in_pipe = ""
             if commands[0] == "ssh":
                 cmd = " ".join(commands[0:2]) + ' "' + " ".join(commands[2:]) + '"'
             else:
                 cmd = " ".join(commands)
             cmd_string = in_pipe + cmd
             if stdout is None:
                 print(cmd_string)
0bbb80fa
 
66a8842c
         if not args.dry_run:
             super().__init__(commands, stdin=stdin, stdout=stdout, **kwargs)
0bbb80fa
             if stdout is not None:
66a8842c
                 self.stdout.cmd_string = cmd_string
         else:
             self.stdout = MockStdout(cmd_string)
             self.returncode = 0
 
     def wait(self):
         if not args.dry_run:
             super().wait()
0bbb80fa
 
66a8842c
     def __exit__(self, type, value, traceback):
         if not args.dry_run:
             return super().__exit__(type, value, traceback)
d10a93cf
 
a02f4d34
 for fs, snapname in origin_snapshots:
     if fs in destinations and snapname in destinations[fs]:
         continue
d10a93cf
     if fs not in destinations:
a02f4d34
         # Send full
         last = None
         destinations[fs] = []
     else:
         # Find older common snapshot
         last = None
         for old_snapshot in destinations[fs]:
dc600894
             if (fs, old_snapshot) in origin_dict:
a02f4d34
                 last = old_snapshot
 
     if last:
dc600894
         send_cmd = ssh_orig + ["zfs", "send"] + verbose + ["-i", "@{0}".format(last), "{0}{1}@{2}".format(origin, fs, snapname)]
a02f4d34
     else:
dc600894
         send_cmd = ssh_orig + ["zfs", "send"] + verbose + ["{0}{1}@{2}".format(origin, fs, snapname)]
d10a93cf
 
     # Check free space
     if args.cache_dir:
         with Popen(send_cmd + ["-nP"], stdout=PIPE) as proc:
             for line in proc.stdout:
                 line = line.decode()
                 if line.startswith("size"):
                     estimated_size = int(line.split()[1])
 
         use_cache_dir = estimated_size > min_cache_size
         if use_cache_dir and args.verbose:
             print("# Using rsync cache for this transfer")
         elif args.verbose:
             print("# Not using rsync cache for this transfer")
0bbb80fa
     else:
         use_cache_dir = False
d10a93cf
 
     if use_cache_dir:
         def check_space(ssh_prefix, where, cache_dir):
             with Popen(ssh_prefix + ["df", "-B1", cache_dir], stdout=PIPE) as df:
                 for line in df.stdout:
                     line = line.decode()
                 df.wait()
                 check_returncode(df)
                 free_space = int(line.split()[3])
 
             if estimated_size * 1.25 > free_space:
                 print("Cannot store intermediate stream at {0}, would consume "
                       "about {1} MB free space in {2}, but there is just {3} MB available".format(
                     where, estimated_size // 1000000, cache_dir, free_space // 1000000), file=sys.stderr)
                 raise SystemExit(1)
 
         if not args.dry_run:
             check_space(ssh_orig, "origin", orig_cache_dir)
             check_space(ssh_dest, "destination", dest_cache_dir)
         cache_filename = origin + fs
         if last:
             cache_filename += "@" + last
         cache_filename += "@" + snapname
 
         # replace "/" and ":"
         cache_filename = cache_filename.replace("/", "_").replace(":", "_")
 
         dest_cache_filename = os.path.join(dest_cache_dir, cache_filename)
         orig_cache_filename = os.path.join(orig_cache_dir, cache_filename)
         orig_url = ((orig_host + ":") if orig_host else "") + orig_cache_filename
         dest_url = ((dest_host + ":") if dest_host else "") + dest_cache_filename
 
     if compression:
         send_cmd += ["|", "gzip"]
     if use_cache_dir:
         send_cmd += [">", orig_cache_filename]
 
     if compression:
         pre_pipe = ["gunzip", "|"]
     else:
         pre_pipe = []
     if use_cache_dir:
         pre_pipe = ["cat", dest_cache_filename, "|"] + pre_pipe
 
66a8842c
     recv_cmd = ssh_dest + pre_pipe + ["zfs", "recv"] + verbose + ["-u", destination + fs]
a02f4d34
 
d10a93cf
     send_shell = not ssh_orig
     if send_shell:
         send_cmd = [" ".join(send_cmd)]
 
     recv_shell = not ssh_dest
     if recv_shell:
         recv_cmd = [" ".join(recv_cmd)]
a02f4d34
 
0bbb80fa
     # send
     if use_cache_dir:
         # Via rsync'ed stream files
         if not exists(orig_url + ".total"):
             # Create stream file
             with echoPopen(send_cmd, shell=send_shell) as sender:
                 sender.wait()
                 check_returncode(sender)
             touch(orig_url + ".total")
d10a93cf
         else:
0bbb80fa
             print("# Resuming upload of partial stream file")
 
         rsync(orig_url, dest_url)
         recv_stdin = None
     else:
         # direct `zfs send | zfs recv` pipe
66a8842c
         sender = echoPopen(send_cmd, stdout=PIPE, shell=send_shell)
0bbb80fa
         recv_stdin = sender.stdout
 
     # recv
     with echoPopen(recv_cmd, stdin=recv_stdin, shell=recv_shell) as receiver:
         receiver.wait()
         check_returncode(receiver)
 
     # Cleanup
     if not use_cache_dir:
         sender.wait()
         check_returncode(sender)
     else:
         rm(orig_url + ".total")
         rm(orig_url)
         rm(dest_url)
a02f4d34
 
     destinations[fs].append(snapname)
0bbb80fa
     if args.verbose or args.dry_run:
d10a93cf
         print()
a02f4d34
 
 # test
 for fs, snapname in origin_snapshots:
     assert(fs in destinations)
     assert(snapname in destinations[fs])