Craig Harrison | bb33291 | 2012-09-24 17:08:31 -0700 | [diff] [blame] | 1 | #!/usr/bin/python |
| 2 | # |
| 3 | # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| 4 | # Use of this source code is governed by a BSD-style license that can be |
| 5 | # found in the LICENSE file. |
| 6 | |
| 7 | """Module to upload a MySQL dump file to Cloud SQL. |
| 8 | |
| 9 | Usage: |
| 10 | dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE |
| 11 | [REMOTE] |
| 12 | |
| 13 | Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional |
| 14 | arguments will connect to localhost as root with an empty password. |
| 15 | |
| 16 | positional arguments: |
| 17 | FILE text dump file containing MySQL commands |
| 18 | REMOTE Cloud SQL instance name or MySQL hostname |
| 19 | |
| 20 | optional arguments: |
| 21 | -h, --help show this help message and exit |
| 22 | --resume NUM resume dump at command NUM |
| 23 | --user USER user (ignored for CloudSQL) |
| 24 | --passwd PASSWD passwd (ignored for CloudSQL) |
| 25 | """ |
| 26 | |
| 27 | from __future__ import division |
| 28 | import argparse |
| 29 | import collections |
| 30 | import datetime |
| 31 | import os |
| 32 | import re |
| 33 | import sys |
| 34 | import time |
| 35 | |
| 36 | |
| 37 | BYTES_PER_GB = 2**30 |
| 38 | |
| 39 | |
| 40 | class MySQLConnectionManager(object): |
| 41 | """Manages connections to a MySQL database. |
| 42 | |
| 43 | Vars: |
| 44 | factory: A *ConnectionFactory. |
| 45 | connected: Whether we currently hold a live DB connection. |
| 46 | cmd_num: The number of commands executed. |
| 47 | """ |
| 48 | def __init__(self, connection_factory): |
| 49 | self.factory = connection_factory |
| 50 | self.connected = False |
| 51 | self.cmd_num = 0 |
| 52 | |
| 53 | def write(self, data, execute_cmd=True, increment_cmd=False): |
| 54 | """Buffers writes to command boundaries. |
| 55 | |
| 56 | Args: |
| 57 | data: A line of data from the MySQL dump. |
| 58 | execute_cmd: Whether to execute the command, defaults to True. |
| 59 | increment_cmd: Whether to increment cmd_num, defaults to False. |
| 60 | """ |
| 61 | if not data or not data.strip() or data == '\n' or data[:2] == '--': |
| 62 | return |
| 63 | self._cmd += data[:-1] if data[-1] == '\n' else data |
| 64 | if self._cmd[-1] != ';': |
| 65 | return |
| 66 | # Execute command. |
| 67 | if execute_cmd: |
| 68 | self._cursor.execute(self._cmd.decode('utf-8')) |
| 69 | self._cmd = '' |
| 70 | if increment_cmd: |
| 71 | self.cmd_num += 1 |
| 72 | |
| 73 | def disconnect(self): |
| 74 | """Closes the current database connection.""" |
| 75 | if self.connected: |
| 76 | self.connected = False |
| 77 | self._cursor.close() |
| 78 | self._db.close() |
| 79 | |
| 80 | def connect(self): |
| 81 | """Creates a new database connection.""" |
| 82 | self.disconnect() |
| 83 | self._db = self.factory.connect() |
| 84 | self.connected = True |
| 85 | self._cursor = self._db.cursor() |
| 86 | self._cmd = '' |
| 87 | |
| 88 | |
| 89 | class CloudSQLConnectionFactory(object): |
| 90 | """Creates Cloud SQL database connections.""" |
| 91 | def __init__(self, cloudsql_instance): |
| 92 | self._instance = cloudsql_instance |
| 93 | |
| 94 | def connect(self): |
| 95 | """Connects to the Cloud SQL database and returns the connection. |
| 96 | |
| 97 | Returns: |
| 98 | A MySQLdb compatible database connection to the Cloud SQL instance. |
| 99 | """ |
| 100 | print 'Connecting to Cloud SQL instance %s.' % self._instance |
| 101 | try: |
| 102 | from google.storage.speckle.python.api import rdbms_googleapi |
| 103 | except ImportError: |
| 104 | sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK ' |
| 105 | 'directory to your PYTHONPATH. Download the SDK from: ' |
| 106 | 'https://developers.google.com/appengine/downloads') |
| 107 | return rdbms_googleapi.connect(None, instance=self._instance) |
| 108 | |
| 109 | |
| 110 | class LocalSQLConnectionFactory(object): |
| 111 | """Creates local MySQL database connections.""" |
| 112 | def __init__(self, host=None, user='root', passwd=''): |
| 113 | if not host: |
| 114 | host = 'localhost' |
| 115 | self._host = host |
| 116 | self._user = user |
| 117 | self._passwd = passwd |
| 118 | |
| 119 | def connect(self): |
| 120 | """Connects to the local MySQL database and returns the connection. |
| 121 | |
| 122 | Returns: |
| 123 | A MySQLdb database connection to the local MySQL database. |
| 124 | """ |
| 125 | print 'Connecting to mysql at localhost as %s.' % self._user |
| 126 | try: |
| 127 | import MySQLdb |
| 128 | except ImportError: |
| 129 | sys.exit('Unable to import MySQLdb. To install on Ubuntu: ' |
| 130 | 'apt-get install python-mysqldb') |
| 131 | return MySQLdb.connect(host=self._host, user=self._user, |
| 132 | passwd=self._passwd) |
| 133 | |
| 134 | |
| 135 | class MySQLState(object): |
| 136 | """Maintains the MySQL global state. |
| 137 | |
| 138 | This is a hack that keeps record of all MySQL lines that set global state. |
| 139 | These are needed to reconstruct the MySQL state on resume. |
| 140 | """ |
| 141 | _set_regex = re.compile('\S*\s*SET(.*)[\s=]') |
| 142 | |
| 143 | def __init__(self): |
| 144 | self._db_line = '' |
| 145 | self._table_lock = [] |
| 146 | self._sets = collections.OrderedDict() |
| 147 | |
| 148 | def process(self, line): |
| 149 | """Check and save lines that affect the global state. |
| 150 | |
| 151 | Args: |
| 152 | line: A line from the MySQL dump file. |
| 153 | """ |
| 154 | # Most recent USE line. |
| 155 | if line[:3] == 'USE': |
| 156 | self._db_line = line |
| 157 | # SET variables. |
| 158 | m = self._set_regex.match(line) |
| 159 | if m: |
| 160 | self._sets[m.group(1).strip()] = line |
| 161 | # Maintain LOCK TABLES |
| 162 | if (line[:11] == 'LOCK TABLES' or |
| 163 | ('ALTER TABLE' in line and 'DISABLE KEYS' in line)): |
| 164 | self._table_lock.append(line) |
| 165 | if (line[:14] == 'UNLOCK TABLES;'): |
| 166 | self._table_lock = [] |
| 167 | |
| 168 | def write(self, out): |
| 169 | """Print lines to recreate the saved state. |
| 170 | |
| 171 | Args: |
| 172 | out: A File-like object to write out saved state. |
| 173 | """ |
| 174 | out.write(self._db_line) |
| 175 | for v in self._sets.itervalues(): |
| 176 | out.write(v) |
| 177 | for l in self._table_lock: |
| 178 | out.write(l) |
| 179 | |
| 180 | def breakpoint(self, line): |
| 181 | """Returns true if we can handle breaking after this line. |
| 182 | |
| 183 | Args: |
| 184 | line: A line from the MySQL dump file. |
| 185 | |
| 186 | Returns: |
| 187 | Boolean indicating whether we can break after |line|. |
| 188 | """ |
| 189 | return (line[:28] == '-- Table structure for table' or |
| 190 | line[:11] == 'INSERT INTO') |
| 191 | |
| 192 | |
| 193 | def dump_to_cloudsql(dumpfile, manager, cmd_offset=0): |
| 194 | """Dumps a MySQL dump file to a database through a MySQLConnectionManager. |
| 195 | |
| 196 | Args: |
| 197 | dumpfile: Path to a file from which to read the MySQL dump. |
| 198 | manager: An instance of MySQLConnectionManager. |
| 199 | cmd_offset: No commands will be executed on the database before this count |
| 200 | is reached. Used to continue an uncompleted dump. Defaults to 0. |
| 201 | """ |
| 202 | state = MySQLState() |
| 203 | total = os.path.getsize(dumpfile) |
| 204 | start_time = time.time() |
| 205 | line_num = 0 |
| 206 | with open(dumpfile, 'r') as dump: |
| 207 | for line in dump: |
| 208 | line_num += 1 |
| 209 | if not manager.connected: |
| 210 | manager.connect() |
| 211 | try: |
| 212 | # Construct commands from lines and execute them. |
| 213 | state.process(line) |
| 214 | if manager.cmd_num == cmd_offset and cmd_offset != 0: |
| 215 | print '\nRecreating state at line: %d' % line_num |
| 216 | state.write(manager) |
| 217 | manager.write(line, manager.cmd_num >= cmd_offset, True) |
| 218 | # Print status. |
| 219 | sys.stdout.write( |
| 220 | '\rstatus: %.3f%% %0.2f GB %d commands ' % |
| 221 | (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB, |
| 222 | manager.cmd_num)) |
| 223 | sys.stdout.flush() |
| 224 | # Handle interrupts and connection failures. |
| 225 | except KeyboardInterrupt: |
| 226 | print ('\nInterrupted while executing command: %d' % |
| 227 | manager.cmd_num) |
| 228 | raise |
| 229 | except: |
| 230 | print '\nFailed while executing command: %d' % manager.cmd_num |
| 231 | delta = int(time.time() - start_time) |
| 232 | print 'Total time: %s' % str(datetime.timedelta(seconds=delta)) |
| 233 | if state.breakpoint(line): |
| 234 | # Attempt to resume. |
| 235 | print ('Execution can resume from here (line = %d)' % |
| 236 | line_num) |
| 237 | manager.cmd_num += 1 |
| 238 | cmd_offset = manager.cmd_num |
| 239 | print ('Will now attempt to auto-resume at command: %d' % |
| 240 | cmd_offset) |
| 241 | manager.disconnect() |
| 242 | else: |
| 243 | print 'Execution may fail to resume correctly from here.' |
| 244 | print ('Use --resume=%d to attempt to resume the dump.' % |
| 245 | manager.cmd_num) |
| 246 | raise |
| 247 | print '\nDone.' |
| 248 | |
| 249 | |
| 250 | if __name__ == '__main__': |
| 251 | """Imports a MySQL database from a dump file. |
| 252 | |
| 253 | Interprets command line arguments and calls dump_to_cloudsql appropriately. |
| 254 | """ |
| 255 | description = """Uploads MySQL dump file to a MySQL database or Cloud SQL. |
| 256 | With no optional arguments will connect to localhost as root |
| 257 | with an empty password.""" |
| 258 | parser = argparse.ArgumentParser(description=description) |
| 259 | parser.add_argument('mysqldump', metavar='FILE', |
| 260 | help='text dump file containing MySQL commands') |
| 261 | parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE', |
| 262 | help='either a Cloud SQL account:instance or a hostname') |
| 263 | parser.add_argument('--resume', default=0, type=int, metavar='NUM', |
| 264 | help='resume dump at command NUM') |
| 265 | parser.add_argument('--user', default='root', metavar='USER', |
| 266 | help='user (ignored for Cloud SQL)') |
| 267 | parser.add_argument('--passwd', default='', metavar='PASSWD', |
| 268 | help='passwd (ignored for Cloud SQL)') |
| 269 | args = parser.parse_args() |
| 270 | if args.remote and ':' in args.remote: |
| 271 | connection = CloudSQLConnectionFactory(args.remote) |
| 272 | else: |
| 273 | connection = LocalSQLConnectionFactory(args.remote, args.user, |
| 274 | args.passwd) |
| 275 | if args.resume: |
| 276 | print 'Resuming execution at command: %d' % options.resume |
| 277 | dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection), |
| 278 | args.resume) |