Prathmesh Prabhu | 13b26cc | 2018-04-26 12:37:15 -0700 | [diff] [blame] | 1 | """A simple script to backfill tko_task_references table with throttling.""" |
| 2 | |
| 3 | from __future__ import absolute_import |
| 4 | from __future__ import division |
| 5 | from __future__ import print_function |
| 6 | |
| 7 | import argparse |
| 8 | import collections |
| 9 | import contextlib |
| 10 | import logging |
| 11 | import time |
| 12 | |
| 13 | import MySQLdb |
| 14 | |
| 15 | |
| 16 | class BackfillException(Exception): |
| 17 | pass |
| 18 | |
| 19 | |
| 20 | def _parse_args(): |
| 21 | parser = argparse.ArgumentParser( |
| 22 | description=__doc__) |
| 23 | parser.add_argument('--host', required=True, help='mysql server host') |
| 24 | parser.add_argument('--user', required=True, help='mysql server user') |
| 25 | parser.add_argument('--password', required=True, help='mysql server password') |
| 26 | parser.add_argument('--dryrun', action='store_true', default=False) |
| 27 | parser.add_argument( |
| 28 | '--num-iterations', |
| 29 | default=None, |
| 30 | type=int, |
| 31 | help='If set, total number of iterations. Default is no limit.', |
| 32 | ) |
| 33 | parser.add_argument( |
| 34 | '--batch-size', |
| 35 | default=1000, |
| 36 | help='Number of tko_jobs rows to read in one iteration', |
| 37 | ) |
| 38 | parser.add_argument( |
| 39 | '--sleep-seconds', |
| 40 | type=int, |
| 41 | default=1, |
| 42 | help='Time to sleep between iterations', |
| 43 | ) |
| 44 | |
| 45 | args = parser.parse_args() |
| 46 | if args.dryrun: |
| 47 | if not args.num_iterations: |
| 48 | logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.') |
| 49 | args.num_iterations = 5 |
| 50 | return args |
| 51 | |
| 52 | |
| 53 | |
| 54 | @contextlib.contextmanager |
| 55 | def _mysql_connection(args): |
| 56 | conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password) |
| 57 | with _mysql_cursor(conn) as c: |
| 58 | c.execute('USE chromeos_autotest_db;') |
| 59 | try: |
| 60 | yield conn |
| 61 | finally: |
| 62 | conn.close() |
| 63 | |
| 64 | |
| 65 | @contextlib.contextmanager |
| 66 | def _autocommit(conn): |
| 67 | try: |
| 68 | yield conn |
| 69 | except: |
| 70 | conn.rollback() |
| 71 | else: |
| 72 | conn.commit() |
| 73 | |
| 74 | |
| 75 | @contextlib.contextmanager |
| 76 | def _mysql_cursor(conn): |
| 77 | c = conn.cursor() |
| 78 | try: |
| 79 | yield c |
| 80 | finally: |
| 81 | c.close() |
| 82 | |
| 83 | |
| 84 | def _latest_unfilled_job_idx(conn): |
| 85 | with _mysql_cursor(conn) as c: |
| 86 | c.execute(""" |
| 87 | SELECT tko_job_idx |
| 88 | FROM tko_task_references |
| 89 | ORDER BY tko_job_idx |
| 90 | LIMIT 1 |
| 91 | ;""") |
| 92 | r = c.fetchall() |
| 93 | if r: |
| 94 | return str(long(r[0][0]) - 1) |
| 95 | logging.debug('tko_task_references is empty.' |
| 96 | ' Grabbing the latest tko_job_idx to fill.') |
| 97 | with _mysql_cursor(conn) as c: |
| 98 | c.execute(""" |
| 99 | SELECT job_idx |
| 100 | FROM tko_jobs |
| 101 | ORDER BY job_idx DESC |
| 102 | LIMIT 1 |
| 103 | ;""") |
| 104 | r = c.fetchall() |
| 105 | if r: |
| 106 | return r[0][0] |
| 107 | return None |
| 108 | |
| 109 | |
| 110 | _TKOTaskReference = collections.namedtuple( |
| 111 | '_TKOTaskReference', |
| 112 | ['tko_job_idx', 'task_reference', 'parent_task_reference'], |
| 113 | ) |
| 114 | |
| 115 | _SQL_SELECT_TASK_REFERENCES = """ |
| 116 | SELECT job_idx, afe_job_id, afe_parent_job_id |
| 117 | FROM tko_jobs |
| 118 | WHERE job_idx <= %(latest_job_idx)s |
| 119 | ORDER BY job_idx DESC |
| 120 | LIMIT %(batch_size)s |
| 121 | ;""" |
| 122 | _SQL_INSERT_TASK_REFERENCES = """ |
| 123 | INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id) |
| 124 | VALUES %(values)s |
| 125 | ;""" |
| 126 | _SQL_SELECT_TASK_REFERENCE = """ |
| 127 | SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s |
| 128 | ;""" |
| 129 | |
| 130 | |
| 131 | def _compute_task_references(conn, latest_job_idx, batch_size): |
| 132 | with _mysql_cursor(conn) as c: |
| 133 | sql = _SQL_SELECT_TASK_REFERENCES % { |
| 134 | 'latest_job_idx': latest_job_idx, |
| 135 | 'batch_size': batch_size, |
| 136 | } |
| 137 | c.execute(sql) |
| 138 | rs = c.fetchall() |
| 139 | if rs is None: |
| 140 | return [] |
| 141 | |
| 142 | return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs] |
| 143 | |
| 144 | |
| 145 | def _insert_task_references(conn, task_references, dryrun): |
| 146 | values = ', '.join([ |
| 147 | '("afe", %s, "%s", "%s")' % |
| 148 | (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference) |
| 149 | for tr in task_references |
| 150 | ]) |
| 151 | sql = _SQL_INSERT_TASK_REFERENCES % {'values': values} |
| 152 | if dryrun: |
| 153 | if len(sql) < 200: |
| 154 | sql_log = sql |
| 155 | else: |
| 156 | sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:]) |
| 157 | logging.debug('Would have run: %s', sql_log) |
| 158 | with _autocommit(conn) as conn: |
| 159 | with _mysql_cursor(conn) as c: |
| 160 | c.execute(sql) |
| 161 | |
| 162 | |
| 163 | def _verify_task_references(conn, task_references): |
| 164 | # Just verify that the last one was inserted. |
| 165 | if not task_references: |
| 166 | return |
| 167 | tko_job_idx = task_references[-1].tko_job_idx |
| 168 | sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx} |
| 169 | with _mysql_cursor(conn) as c: |
| 170 | c.execute(sql) |
| 171 | r = c.fetchall() |
| 172 | if not r or r[0][0] != tko_job_idx: |
| 173 | raise BackfillException( |
| 174 | 'Failed to insert task reference for tko_job_id %s' % tko_job_idx) |
| 175 | |
| 176 | |
| 177 | def _next_job_idx(task_references): |
| 178 | return str(long(task_references[-1].tko_job_idx) - 1) |
| 179 | |
| 180 | def main(): |
| 181 | logging.basicConfig(level=logging.DEBUG) |
| 182 | args = _parse_args() |
| 183 | with _mysql_connection(args) as conn: |
| 184 | tko_job_idx = _latest_unfilled_job_idx(conn) |
| 185 | if tko_job_idx is None: |
| 186 | raise BackfillException('Failed to get last unfilled tko_job_idx') |
| 187 | logging.info('First tko_job_idx to fill: %s', tko_job_idx) |
| 188 | |
| 189 | while True: |
| 190 | logging.info('####################################') |
| 191 | logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx) |
| 192 | |
| 193 | task_references = () |
| 194 | with _mysql_connection(args) as conn: |
| 195 | task_references = _compute_task_references( |
| 196 | conn, tko_job_idx, args.batch_size) |
| 197 | if not task_references: |
| 198 | logging.info('No more unfilled task references. All done!') |
| 199 | break |
| 200 | |
| 201 | logging.info( |
| 202 | 'Inserting %d task references. tko_job_ids: %d...%d', |
| 203 | len(task_references), |
| 204 | task_references[0].tko_job_idx, |
| 205 | task_references[-1].tko_job_idx, |
| 206 | ) |
| 207 | with _mysql_connection(args) as conn: |
| 208 | _insert_task_references(conn, task_references, args.dryrun) |
| 209 | if not args.dryrun: |
| 210 | with _mysql_connection(args) as conn: |
| 211 | _verify_task_references(conn, task_references) |
| 212 | |
| 213 | tko_job_idx = _next_job_idx(task_references) |
| 214 | |
| 215 | if args.num_iterations is not None: |
| 216 | args.num_iterations -= 1 |
| 217 | if args.num_iterations <= 0: |
| 218 | break |
| 219 | logging.info('%d more iterations left', args.num_iterations) |
| 220 | logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds) |
| 221 | time.sleep(args.sleep_seconds) |
| 222 | |
| 223 | |
| 224 | if __name__ == '__main__': |
| 225 | main() |