Skip to content
Snippets Groups Projects
Commit ddd46917 authored by Daniele Nicolodi's avatar Daniele Nicolodi
Browse files

datasync: New tool to synchronize data files to archive directory

parent 85da643b
No related branches found
No related tags found
No related merge requests found
import click
import contextlib
import datetime
import os
import re
import subprocess
import time
from ptblab import scheduler, terminal
def sync(src, dst, *include):
"""Use robocopy to copy modified files from SRC to DST."""
# See robocopy documentation
# https://learn.microsoft.com/en-us/windows-server/administration/windows-commands/robocopy
# https://static.spiceworks.com/attachments/post/0016/6429/robocopy.pdf
# Measure execution time.
t1 = time.time()
# robocopy uses the FindFirstFileW() and FindNextFileW() functions to
# enumerate the files to copy. On NTFS filesystems these functions report
# cached file metadata stored in the directory inode. The cached file
# metadata is not updated when appending data to a file without closing
# the file handle.
#
# This is the update pattern used by most data acquisition programs. This
# results in robocopy seeing stale size and modification times and not
# mirroring the data files. Executing GetFileInformationByHandle() forces
# the metadata cache to be updated.
#
# https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-findfirstfilea
# https://devblogs.microsoft.com/oldnewthing/20111226-00/?p=8813
for entry in os.listdir(src):
s = os.stat(os.path.join(src, entry))
r = subprocess.run([
'robocopy',
src,
dst,
*include,
'/xx', # exclude files present in DST but not in SRC
'/ns', # do not log file sizes
'/nc', # do not log file classes
'/ndl', # do not log directory names
'/np', # do not display percentage copied
'/bytes', # sizes as bytes
], stdout=subprocess.PIPE, text=True)
for line in r.stdout.splitlines():
m = re.match(r'^\s+Files :\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)$', line)
if m:
n_total, n_copied = m.group(1), m.group(2)
m = re.match(r'^\s+Bytes :\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)\s+([0-9]+)$', line)
if m:
mb_total, mb_copied = int(m.group(1)) / 1_000_000, int(m.group(2)) / 1_000_000
t2 = time.time()
s = datetime.datetime.fromtimestamp(t2).isoformat(' ', 'milliseconds')
print(f'{t2:.3f} {s} copied {n_copied}/{n_total} files {mb_copied:.2f}/{mb_total:.2f} MB in {t2 - t1:.3f} s', end='\033[0K\r')
@click.command()
@click.argument('src')
@click.argument('dst')
@click.option('--include', metavar='GLOB', default=('*.txt',), multiple=True, help='Glob pattern for the files to include, *.txt by default.')
@click.option('--interval', type=int, metavar='SEC', default=3, help='Time interval between copy operations.')
def main(src, dst, include, interval):
"""Copy modified files from SRC to DST running "robocopy" in a loop.
An hack is implemented to invalidate the NTFS directory listing
case and have current metadata returned for files which have been
written to but not closed by the process writing them. This
enables "robocopy" work with the update pattern used by most data
acquisition programs.
"""
terminal.setup()
s = scheduler.Scheduler()
s.periodic(interval, 1, sync, (src, dst, *include))
with contextlib.suppress(KeyboardInterrupt):
s.run()
print()
if __name__ == '__main__':
main()
import sched
import time
class Scheduler(sched.scheduler):
"""Schduler with periodic events support."""
def __init__(self, timefunc=time.time, delayfunc=time.sleep):
super().__init__(timefunc, delayfunc)
def periodic(self, interval, priority, action, argument=(), kwargs={}):
def callback(t, argument, kwargs):
action(*argument, **kwargs)
t += interval
self.enterabs(t, priority, callback, (t, argument, kwargs))
t = self.timefunc()
return self.enterabs(t, priority, callback, (t, argument, kwargs))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment