blob: 4aa21398e7057204315b936b09633c986fbfb41a [file] [log] [blame]
Primiano Tucci3d4217d2021-11-05 11:11:51 +00001#!/usr/bin/env python3
2"""
3Script to synchronize (local>remote and viceversa) test data files from/to GCS.
4
5//test/data files are not checked in the codebase because they are large binary
6file and change frequently. Instead we check-in only xxx.sha256 files, which
7contain the SHA-256 of the actual binary file, and sync them from a GCS bucket.
8
9File in the GCS bucket are content-indexed as gs://bucket/file_name-a1b2c3f4 .
10
11Usage:
12./test_data status # Prints the status of new & modified files.
13./test_data download # To sync remote>local (used by install-build-deps).
14./test_data upload # To upload newly created and modified files.
15"""
16
17import argparse
18import logging
19import os
20import sys
21import hashlib
22import subprocess
23
24from multiprocessing.pool import ThreadPool
25from collections import namedtuple, defaultdict
26
27ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
28BUCKET = 'gs://perfetto/test_data'
29SUFFIX = '.sha256'
30
31FS_MATCH = 'matches'
32FS_NEW_FILE = 'needs upload'
33FS_MODIFIED = 'modified'
34FS_MISSING = 'needs download'
35
36FileStat = namedtuple('FileStat',
37 ['path', 'status', 'actual_digest', 'expected_digest'])
38args = None
39
40
41def relpath(path):
42 return os.path.relpath(path, ROOT_DIR)
43
44
45def download(url, out_file):
46 subprocess.check_call(['curl', '-L', '-s', '-o', out_file, url])
47
48
49def list_files(path, scan_new_files=False):
50 """ List files recursively in path.
51
52 If scan_new_files=False, returns only files with a maching xxx.sha256 tracker.
53 If scan_new_files=True returns all files including untracked ones.
54 """
55 seen = set()
56 for root, _, files in os.walk(path):
57 for fname in files:
58 if fname.endswith('.swp'):
59 continue # Temporary files left around if CTRL-C-ing while downloading.
60 fpath = os.path.join(root, fname)
61 if not os.path.isfile(fpath) or fname.startswith('.'):
62 continue
63 if fpath.endswith(SUFFIX):
64 fpath = fpath[:-len(SUFFIX)]
65 elif not scan_new_files:
66 continue
67 if fpath not in seen:
68 seen.add(fpath)
69 yield fpath
70
71
72def hash_file(fpath):
73 hasher = hashlib.sha256()
74 with open(fpath, 'rb') as f:
75 for chunk in iter(lambda: f.read(32768), b''):
76 hasher.update(chunk)
77 return hasher.hexdigest()
78
79
80def map_concurrently(fn, files):
81 done = 0
82 for fs in ThreadPool(args.jobs).imap_unordered(fn, files):
83 assert (isinstance(fs, FileStat))
84 done += 1
85 if not args.quiet:
86 print(
87 '[%d/%d] %-60s' % (done, len(files), relpath(fs.path)[-60:]),
88 end='\r')
89 if not args.quiet:
90 print('')
91
92
93def get_file_status(fpath):
94 sha_file = fpath + SUFFIX
95 sha_exists = os.path.exists(sha_file)
96 file_exists = os.path.exists(fpath)
97 actual_digest = None
98 expected_digest = None
99 if sha_exists:
100 with open(sha_file, 'r') as f:
101 expected_digest = f.readline().strip()
102 if file_exists:
103 actual_digest = hash_file(fpath)
104 if sha_exists and not file_exists:
105 status = FS_MISSING
106 elif not sha_exists and file_exists:
107 status = FS_NEW_FILE
108 elif not sha_exists and not file_exists:
109 raise Exception(fpath)
110 elif expected_digest == actual_digest:
111 status = FS_MATCH
112 else:
113 status = FS_MODIFIED
114 return FileStat(fpath, status, actual_digest, expected_digest)
115
116
117def cmd_upload(dir):
118 all_files = list_files(dir, scan_new_files=True)
119 files_to_upload = []
120 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
121 if fs.status in (FS_NEW_FILE, FS_MODIFIED):
122 files_to_upload.append(fs)
123 if len(files_to_upload) == 0:
124 if not args.quiet:
125 print('No modified or new files require uploading')
126 return 0
127 if args.dry_run:
128 return 0
129 if not args.quiet:
130 print('About to upload %d files:' % len(files_to_upload))
131 print('\n'.join(relpath(f.path) for f in files_to_upload))
132 print('')
133 input('Press a key to continue or CTRL-C to abort')
134
135 def upload_one_file(fs):
136 assert (fs.actual_digest is not None)
137 dst_name = '%s/%s-%s' % (args.bucket, os.path.basename(
138 fs.path), fs.actual_digest)
139 cmd = ['gsutil', '-q', 'cp', '-a', 'public-read', fs.path, dst_name]
140 logging.debug(' '.join(cmd))
141 subprocess.check_call(cmd)
142 with open(fs.path + SUFFIX + '.swp', 'w') as f:
143 f.write(fs.actual_digest)
144 os.rename(fs.path + SUFFIX + '.swp', fs.path + SUFFIX)
145 return fs
146
147 map_concurrently(upload_one_file, files_to_upload)
148 return 0
149
150
151def cmd_download(dir, overwrite_locally_modified=False):
152 files_to_download = []
153 modified = []
154 all_files = list_files(dir, scan_new_files=False)
155 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, all_files):
156 if fs.status == FS_MISSING:
157 files_to_download.append(fs)
158 elif fs.status == FS_MODIFIED:
159 modified.append(fs)
160
161 if len(modified) > 0 and not overwrite_locally_modified:
162 print('WARNING: The following files diverged locally and will NOT be ' +
163 'overwritten if you continue')
164 print('\n'.join(relpath(f.path) for f in modified))
165 print('')
166 print('Re run `download --overwrite` to overwrite locally modified files')
167 print('or `upload` to sync them on the GCS bucket')
168 print('')
169 input('Press a key to continue or CTRL-C to abort')
170 elif overwrite_locally_modified:
171 files_to_download += modified
172
173 if len(files_to_download) == 0:
174 if not args.quiet:
175 print('Nothing to do, all files are synced')
176 return 0
177
178 if not args.quiet:
179 print('Downloading %d files in //%s' %
180 (len(files_to_download), relpath(args.dir)))
181 if args.dry_run:
182 print('\n'.join(files_to_download))
183 return
184
185 def download_one_file(fs):
186 assert (fs.expected_digest is not None)
187 uri = '%s/%s-%s' % (args.bucket, os.path.basename(
188 fs.path), fs.expected_digest)
189 uri = uri.replace('gs://', 'https://storage.googleapis.com/')
190 logging.debug(uri)
191 tmp_path = fs.path + '.swp'
192 download(uri, tmp_path)
193 digest = hash_file(tmp_path)
194 if digest != fs.expected_digest:
195 raise Exception('Mismatching digest for %s. expected=%s, actual=%s' %
196 (uri, fs.expected_digest, digest))
197 os.rename(tmp_path, fs.path)
198 return fs
199
200 map_concurrently(download_one_file, files_to_download)
201 return 0
202
203
204def cmd_status(dir):
205 files = list_files(dir, scan_new_files=True)
206 file_by_status = defaultdict(list)
207 num_files = 0
208 num_out_of_sync = 0
209 for fs in ThreadPool(args.jobs).imap_unordered(get_file_status, files):
210 file_by_status[fs.status].append(relpath(fs.path))
211 num_files += 1
212 for status, rpaths in sorted(file_by_status.items()):
213 if status != FS_MATCH:
214 for rpath in rpaths:
215 num_out_of_sync += 1
216 if not args.quiet:
217 print('%-15s: %s' % (status, rpath))
218 if num_out_of_sync == 0:
219 if not args.quiet:
220 print('Scanned %d files in //%s, everything in sync.' %
221 (num_files, relpath(dir)))
222 return 0
223 return 1
224
225
226def main():
227 parser = argparse.ArgumentParser()
228 parser.add_argument('--dir', default=os.path.join(ROOT_DIR, 'test/data'))
229 parser.add_argument('--overwrite', action='store_true')
230 parser.add_argument('--bucket', default=BUCKET)
231 parser.add_argument('--jobs', '-j', default=10, type=int)
232 parser.add_argument('--dry-run', '-n', action='store_true')
233 parser.add_argument('--quiet', '-q', action='store_true')
234 parser.add_argument('--verbose', '-v', action='store_true')
235 parser.add_argument('cmd', choices=['status', 'download', 'upload'])
236 global args
237 args = parser.parse_args()
238 logging.basicConfig(
239 format='%(asctime)s %(levelname).1s %(message)s',
240 level=logging.DEBUG if args.verbose else logging.INFO,
241 datefmt=r'%H:%M:%S')
242 if args.cmd == 'status':
243 return cmd_status(args.dir)
244 if args.cmd == 'download':
245 return cmd_download(args.dir, overwrite_locally_modified=args.overwrite)
246 if args.cmd == 'upload':
247 return cmd_upload(args.dir)
248 print('Unknown command: %s' % args.cmd)
249
250
251if __name__ == '__main__':
252 sys.exit(main())