| #!/usr/bin/python |
| # |
| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| """Module to upload a MySQL dump file to Cloud SQL. |
| |
| Usage: |
| dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE |
| [REMOTE] |
| |
| Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional |
| arguments will connect to localhost as root with an empty password. |
| |
| positional arguments: |
| FILE text dump file containing MySQL commands |
| REMOTE Cloud SQL instance name or MySQL hostname |
| |
| optional arguments: |
| -h, --help show this help message and exit |
| --resume NUM resume dump at command NUM |
| --user USER user (ignored for CloudSQL) |
| --passwd PASSWD passwd (ignored for CloudSQL) |
| """ |
| |
| from __future__ import division |
| import argparse |
| import collections |
| import datetime |
| import os |
| import re |
| import sys |
| import time |
| |
| |
| BYTES_PER_GB = 2**30 |
| |
| |
| class MySQLConnectionManager(object): |
| """Manages connections to a MySQL database. |
| |
| Vars: |
| factory: A *ConnectionFactory. |
| connected: Whether we currently hold a live DB connection. |
| cmd_num: The number of commands executed. |
| """ |
| def __init__(self, connection_factory): |
| self.factory = connection_factory |
| self.connected = False |
| self.cmd_num = 0 |
| |
| def write(self, data, execute_cmd=True, increment_cmd=False): |
| """Buffers writes to command boundaries. |
| |
| Args: |
| data: A line of data from the MySQL dump. |
| execute_cmd: Whether to execute the command, defaults to True. |
| increment_cmd: Whether to increment cmd_num, defaults to False. |
| """ |
| if not data or not data.strip() or data == '\n' or data[:2] == '--': |
| return |
| self._cmd += data[:-1] if data[-1] == '\n' else data |
| if self._cmd[-1] != ';': |
| return |
| # Execute command. |
| if execute_cmd: |
| self._cursor.execute(self._cmd.decode('utf-8')) |
| self._cmd = '' |
| if increment_cmd: |
| self.cmd_num += 1 |
| |
| def disconnect(self): |
| """Closes the current database connection.""" |
| if self.connected: |
| self.connected = False |
| self._cursor.close() |
| self._db.close() |
| |
| def connect(self): |
| """Creates a new database connection.""" |
| self.disconnect() |
| self._db = self.factory.connect() |
| self.connected = True |
| self._cursor = self._db.cursor() |
| self._cmd = '' |
| |
| |
| class CloudSQLConnectionFactory(object): |
| """Creates Cloud SQL database connections.""" |
| def __init__(self, cloudsql_instance): |
| self._instance = cloudsql_instance |
| |
| def connect(self): |
| """Connects to the Cloud SQL database and returns the connection. |
| |
| Returns: |
| A MySQLdb compatible database connection to the Cloud SQL instance. |
| """ |
| print 'Connecting to Cloud SQL instance %s.' % self._instance |
| try: |
| from google.storage.speckle.python.api import rdbms_googleapi |
| except ImportError: |
| sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK ' |
| 'directory to your PYTHONPATH. Download the SDK from: ' |
| 'https://developers.google.com/appengine/downloads') |
| return rdbms_googleapi.connect(None, instance=self._instance) |
| |
| |
| class LocalSQLConnectionFactory(object): |
| """Creates local MySQL database connections.""" |
| def __init__(self, host=None, user='root', passwd=''): |
| if not host: |
| host = 'localhost' |
| self._host = host |
| self._user = user |
| self._passwd = passwd |
| |
| def connect(self): |
| """Connects to the local MySQL database and returns the connection. |
| |
| Returns: |
| A MySQLdb database connection to the local MySQL database. |
| """ |
| print 'Connecting to mysql at localhost as %s.' % self._user |
| try: |
| import MySQLdb |
| except ImportError: |
| sys.exit('Unable to import MySQLdb. To install on Ubuntu: ' |
| 'apt-get install python-mysqldb') |
| return MySQLdb.connect(host=self._host, user=self._user, |
| passwd=self._passwd) |
| |
| |
| class MySQLState(object): |
| """Maintains the MySQL global state. |
| |
| This is a hack that keeps record of all MySQL lines that set global state. |
| These are needed to reconstruct the MySQL state on resume. |
| """ |
| _set_regex = re.compile('\S*\s*SET(.*)[\s=]') |
| |
| def __init__(self): |
| self._db_line = '' |
| self._table_lock = [] |
| self._sets = collections.OrderedDict() |
| |
| def process(self, line): |
| """Check and save lines that affect the global state. |
| |
| Args: |
| line: A line from the MySQL dump file. |
| """ |
| # Most recent USE line. |
| if line[:3] == 'USE': |
| self._db_line = line |
| # SET variables. |
| m = self._set_regex.match(line) |
| if m: |
| self._sets[m.group(1).strip()] = line |
| # Maintain LOCK TABLES |
| if (line[:11] == 'LOCK TABLES' or |
| ('ALTER TABLE' in line and 'DISABLE KEYS' in line)): |
| self._table_lock.append(line) |
| if (line[:14] == 'UNLOCK TABLES;'): |
| self._table_lock = [] |
| |
| def write(self, out): |
| """Print lines to recreate the saved state. |
| |
| Args: |
| out: A File-like object to write out saved state. |
| """ |
| out.write(self._db_line) |
| for v in self._sets.itervalues(): |
| out.write(v) |
| for l in self._table_lock: |
| out.write(l) |
| |
| def breakpoint(self, line): |
| """Returns true if we can handle breaking after this line. |
| |
| Args: |
| line: A line from the MySQL dump file. |
| |
| Returns: |
| Boolean indicating whether we can break after |line|. |
| """ |
| return (line[:28] == '-- Table structure for table' or |
| line[:11] == 'INSERT INTO') |
| |
| |
| def dump_to_cloudsql(dumpfile, manager, cmd_offset=0): |
| """Dumps a MySQL dump file to a database through a MySQLConnectionManager. |
| |
| Args: |
| dumpfile: Path to a file from which to read the MySQL dump. |
| manager: An instance of MySQLConnectionManager. |
| cmd_offset: No commands will be executed on the database before this count |
| is reached. Used to continue an uncompleted dump. Defaults to 0. |
| """ |
| state = MySQLState() |
| total = os.path.getsize(dumpfile) |
| start_time = time.time() |
| line_num = 0 |
| with open(dumpfile, 'r') as dump: |
| for line in dump: |
| line_num += 1 |
| if not manager.connected: |
| manager.connect() |
| try: |
| # Construct commands from lines and execute them. |
| state.process(line) |
| if manager.cmd_num == cmd_offset and cmd_offset != 0: |
| print '\nRecreating state at line: %d' % line_num |
| state.write(manager) |
| manager.write(line, manager.cmd_num >= cmd_offset, True) |
| # Print status. |
| sys.stdout.write( |
| '\rstatus: %.3f%% %0.2f GB %d commands ' % |
| (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB, |
| manager.cmd_num)) |
| sys.stdout.flush() |
| # Handle interrupts and connection failures. |
| except KeyboardInterrupt: |
| print ('\nInterrupted while executing command: %d' % |
| manager.cmd_num) |
| raise |
| except: |
| print '\nFailed while executing command: %d' % manager.cmd_num |
| delta = int(time.time() - start_time) |
| print 'Total time: %s' % str(datetime.timedelta(seconds=delta)) |
| if state.breakpoint(line): |
| # Attempt to resume. |
| print ('Execution can resume from here (line = %d)' % |
| line_num) |
| manager.cmd_num += 1 |
| cmd_offset = manager.cmd_num |
| print ('Will now attempt to auto-resume at command: %d' % |
| cmd_offset) |
| manager.disconnect() |
| else: |
| print 'Execution may fail to resume correctly from here.' |
| print ('Use --resume=%d to attempt to resume the dump.' % |
| manager.cmd_num) |
| raise |
| print '\nDone.' |
| |
| |
| if __name__ == '__main__': |
| """Imports a MySQL database from a dump file. |
| |
| Interprets command line arguments and calls dump_to_cloudsql appropriately. |
| """ |
| description = """Uploads MySQL dump file to a MySQL database or Cloud SQL. |
| With no optional arguments will connect to localhost as root |
| with an empty password.""" |
| parser = argparse.ArgumentParser(description=description) |
| parser.add_argument('mysqldump', metavar='FILE', |
| help='text dump file containing MySQL commands') |
| parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE', |
| help='either a Cloud SQL account:instance or a hostname') |
| parser.add_argument('--resume', default=0, type=int, metavar='NUM', |
| help='resume dump at command NUM') |
| parser.add_argument('--user', default='root', metavar='USER', |
| help='user (ignored for Cloud SQL)') |
| parser.add_argument('--passwd', default='', metavar='PASSWD', |
| help='passwd (ignored for Cloud SQL)') |
| args = parser.parse_args() |
| if args.remote and ':' in args.remote: |
| connection = CloudSQLConnectionFactory(args.remote) |
| else: |
| connection = LocalSQLConnectionFactory(args.remote, args.user, |
| args.passwd) |
| if args.resume: |
| print 'Resuming execution at command: %d' % options.resume |
| dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection), |
| args.resume) |