blob: 1592dcca0be790e09a5eb467adf696068ce6b91e [file] [log] [blame]
Prathmesh Prabhu13b26cc2018-04-26 12:37:15 -07001"""A simple script to backfill tko_task_references table with throttling."""
2
3from __future__ import absolute_import
4from __future__ import division
5from __future__ import print_function
6
7import argparse
8import collections
9import contextlib
10import logging
11import time
12
13import MySQLdb
14
15
16class BackfillException(Exception):
17 pass
18
19
20def _parse_args():
21 parser = argparse.ArgumentParser(
22 description=__doc__)
23 parser.add_argument('--host', required=True, help='mysql server host')
24 parser.add_argument('--user', required=True, help='mysql server user')
25 parser.add_argument('--password', required=True, help='mysql server password')
26 parser.add_argument('--dryrun', action='store_true', default=False)
27 parser.add_argument(
28 '--num-iterations',
29 default=None,
30 type=int,
31 help='If set, total number of iterations. Default is no limit.',
32 )
33 parser.add_argument(
34 '--batch-size',
35 default=1000,
36 help='Number of tko_jobs rows to read in one iteration',
37 )
38 parser.add_argument(
39 '--sleep-seconds',
40 type=int,
41 default=1,
42 help='Time to sleep between iterations',
43 )
44
45 args = parser.parse_args()
46 if args.dryrun:
47 if not args.num_iterations:
48 logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.')
49 args.num_iterations = 5
50 return args
51
52
53
54@contextlib.contextmanager
55def _mysql_connection(args):
56 conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password)
57 with _mysql_cursor(conn) as c:
58 c.execute('USE chromeos_autotest_db;')
59 try:
60 yield conn
61 finally:
62 conn.close()
63
64
65@contextlib.contextmanager
66def _autocommit(conn):
67 try:
68 yield conn
69 except:
70 conn.rollback()
71 else:
72 conn.commit()
73
74
75@contextlib.contextmanager
76def _mysql_cursor(conn):
77 c = conn.cursor()
78 try:
79 yield c
80 finally:
81 c.close()
82
83
84def _latest_unfilled_job_idx(conn):
85 with _mysql_cursor(conn) as c:
86 c.execute("""
87SELECT tko_job_idx
88FROM tko_task_references
89ORDER BY tko_job_idx
90LIMIT 1
91;""")
92 r = c.fetchall()
93 if r:
94 return str(long(r[0][0]) - 1)
95 logging.debug('tko_task_references is empty.'
96 ' Grabbing the latest tko_job_idx to fill.')
97 with _mysql_cursor(conn) as c:
98 c.execute("""
99SELECT job_idx
100FROM tko_jobs
101ORDER BY job_idx DESC
102LIMIT 1
103;""")
104 r = c.fetchall()
105 if r:
106 return r[0][0]
107 return None
108
109
110_TKOTaskReference = collections.namedtuple(
111 '_TKOTaskReference',
112 ['tko_job_idx', 'task_reference', 'parent_task_reference'],
113)
114
115_SQL_SELECT_TASK_REFERENCES = """
116SELECT job_idx, afe_job_id, afe_parent_job_id
117FROM tko_jobs
118WHERE job_idx <= %(latest_job_idx)s
119ORDER BY job_idx DESC
120LIMIT %(batch_size)s
121;"""
122_SQL_INSERT_TASK_REFERENCES = """
123INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id)
124VALUES %(values)s
125;"""
126_SQL_SELECT_TASK_REFERENCE = """
127SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s
128;"""
129
130
131def _compute_task_references(conn, latest_job_idx, batch_size):
132 with _mysql_cursor(conn) as c:
133 sql = _SQL_SELECT_TASK_REFERENCES % {
134 'latest_job_idx': latest_job_idx,
135 'batch_size': batch_size,
136 }
137 c.execute(sql)
138 rs = c.fetchall()
139 if rs is None:
140 return []
141
142 return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs]
143
144
145def _insert_task_references(conn, task_references, dryrun):
146 values = ', '.join([
147 '("afe", %s, "%s", "%s")' %
148 (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference)
149 for tr in task_references
150 ])
151 sql = _SQL_INSERT_TASK_REFERENCES % {'values': values}
152 if dryrun:
153 if len(sql) < 200:
154 sql_log = sql
155 else:
156 sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:])
157 logging.debug('Would have run: %s', sql_log)
158 with _autocommit(conn) as conn:
159 with _mysql_cursor(conn) as c:
160 c.execute(sql)
161
162
163def _verify_task_references(conn, task_references):
164 # Just verify that the last one was inserted.
165 if not task_references:
166 return
167 tko_job_idx = task_references[-1].tko_job_idx
168 sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx}
169 with _mysql_cursor(conn) as c:
170 c.execute(sql)
171 r = c.fetchall()
172 if not r or r[0][0] != tko_job_idx:
173 raise BackfillException(
174 'Failed to insert task reference for tko_job_id %s' % tko_job_idx)
175
176
177def _next_job_idx(task_references):
178 return str(long(task_references[-1].tko_job_idx) - 1)
179
180def main():
181 logging.basicConfig(level=logging.DEBUG)
182 args = _parse_args()
183 with _mysql_connection(args) as conn:
184 tko_job_idx = _latest_unfilled_job_idx(conn)
185 if tko_job_idx is None:
186 raise BackfillException('Failed to get last unfilled tko_job_idx')
187 logging.info('First tko_job_idx to fill: %s', tko_job_idx)
188
189 while True:
190 logging.info('####################################')
191 logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx)
192
193 task_references = ()
194 with _mysql_connection(args) as conn:
195 task_references = _compute_task_references(
196 conn, tko_job_idx, args.batch_size)
197 if not task_references:
198 logging.info('No more unfilled task references. All done!')
199 break
200
201 logging.info(
202 'Inserting %d task references. tko_job_ids: %d...%d',
203 len(task_references),
204 task_references[0].tko_job_idx,
205 task_references[-1].tko_job_idx,
206 )
207 with _mysql_connection(args) as conn:
208 _insert_task_references(conn, task_references, args.dryrun)
209 if not args.dryrun:
210 with _mysql_connection(args) as conn:
211 _verify_task_references(conn, task_references)
212
213 tko_job_idx = _next_job_idx(task_references)
214
215 if args.num_iterations is not None:
216 args.num_iterations -= 1
217 if args.num_iterations <= 0:
218 break
219 logging.info('%d more iterations left', args.num_iterations)
220 logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds)
221 time.sleep(args.sleep_seconds)
222
223
224if __name__ == '__main__':
225 main()