| #!/usr/bin/env python3 |
| |
| # Copyright 2022 Fairphone B.V. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| |
| """Minimal Google Drive service account client. |
| |
| Command line client for uploading files and folder to a Google Drive folder via |
| a Google service account. The client supports both regular personal drives ("My |
| Drive") as well as team drives. It is designed for scripting and automation, |
| thus working with a service account only. It is not intended to work as OAuth |
| client. |
| |
| To use this client: |
| * Set up a Google Service account and give it access to required Drives. |
| * Create a service account key and download in JSON format. |
| * Point environment variable GOOGLE_APPLICATION_CREDENTIALS to this key file. |
| |
| Refer to the Google Drive (Python) API reference for details: |
| * https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/index.html # noqa: E501 # pylint: disable=line-too-long |
| """ |
| |
| import argparse |
| from io import FileIO |
| import logging |
| import os |
| from pathlib import Path |
| import sys |
| from typing import Any, Dict, List, Optional, Union |
| |
| import google.auth # type: ignore |
| from googleapiclient.discovery import build, Resource # type: ignore |
| from googleapiclient.errors import HttpError # type: ignore |
| from googleapiclient.http import ( # type: ignore |
| MediaUpload, |
| MediaFileUpload, |
| MediaIoBaseDownload, |
| ) |
| |
| import log |
| |
| # See also https://developers.google.com/drive/api/guides/mime-types |
| MIMETYPE_GOOGLE_DRIVE_FOLDER = "application/vnd.google-apps.folder" |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| # https://stackoverflow.com/a/1094933 |
| def sizeof_fmt(num: float, suffix: str = "B") -> str: |
| """Pretty-print a file size.""" |
| for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: |
| if abs(num) < 1024.0: |
| return f"{num:3.1f}{unit}{suffix}" |
| num /= 1024.0 |
| return f"{num:.1f}Yi{suffix}" |
| |
| |
| def list_drives(service: Resource) -> List[Dict[str, Any]]: |
| """List accessible Google Drives. |
| |
| List both personal and team drives accessible with the current account. |
| |
| Returns: |
| List of drives, each a dictionary of drive attributes. |
| """ |
| drives = [] |
| page_token = None |
| while True: |
| response = ( |
| service.drives() |
| .list(fields="nextPageToken,drives(id,name)", pageToken=page_token) |
| .execute() |
| ) |
| drives.extend(response.get("drives", [])) |
| page_token = response.get("nextPageToken", None) |
| if not page_token: |
| break |
| |
| return drives |
| |
| |
| def list_files( |
| service: Resource, |
| drive_id: Optional[str] = None, |
| query: Optional[str] = None, |
| max_entries: Optional[int] = None, |
| order_by: str = "recency", |
| ) -> List[Dict[str, str]]: |
| """List files in a Google Drive folder. |
| |
| Arguments: |
| service: Google Drive service resource object |
| drive_id: Optional. ID of the Google drive to query. Default to the |
| accounts own drive, "My Drive". |
| query: Optional. A query for filtering the file results. See Drive API |
| reference for details. |
| max_entries: Optional. Restrict number of files returned. |
| order_by: Choose ordering for returned results. |
| |
| Returns: |
| List of files, for each list a mapping of fileId, name and mimeType. |
| """ |
| files: List[Dict[str, Any]] = [] |
| page_token = None |
| drive_params = {} |
| page_size = 100 |
| if drive_id: |
| drive_params = { |
| "driveId": drive_id, |
| # Below three MUST be set when using parameter 'driveId' |
| "corpora": "drive", # list files contained in driveId |
| "includeItemsFromAllDrives": True, |
| "supportsAllDrives": True, |
| } |
| while True: |
| if max_entries and max_entries >= 0: |
| remaining = max(0, max_entries - len(files)) |
| if remaining <= 0: |
| break |
| if remaining < page_size: |
| page_size = remaining |
| response = ( |
| service.files() |
| .list( |
| orderBy=order_by, |
| pageSize=page_size, |
| q=query, |
| fields="nextPageToken,files(id,name,mimeType)", |
| **drive_params, |
| pageToken=page_token, |
| ) |
| .execute() |
| ) |
| files.extend(response.get("files", [])) |
| page_token = response.get("nextPageToken", None) |
| if page_token is None: |
| break |
| return files |
| |
| |
| def list_files_strict( # pylint: disable=too-many-arguments |
| service: Resource, |
| filename_match: Union[str, List[str]], |
| drive_id: Optional[str] = None, |
| query: Optional[str] = None, |
| max_entries: Optional[int] = None, |
| order_by: str = "recency", |
| ) -> List[Dict[str, str]]: |
| """List files, strictly filtered after fuzzy Google Drive search. |
| |
| Exactly one of filename or filename_contains must be set. |
| |
| Arguments: |
| (All arguments of list_files()) |
| filename_match: Either of two: Full filename that result files must |
| match; or list of filename components that must all be included in |
| all result file name. |
| |
| Returns: |
| Same results as list_files(), except for applying stricter filename |
| matching. |
| """ |
| files = list_files( |
| service=service, |
| drive_id=drive_id, |
| query=query, |
| max_entries=max_entries, |
| order_by=order_by, |
| ) |
| |
| filtered_files = [] |
| if isinstance(filename_match, str): |
| for file_entry in files: |
| # Match full filenames |
| if filename_match == file_entry["name"]: |
| filtered_files.append(file_entry) |
| else: |
| logger.debug( |
| "Skipping mismatching fuzzy file match %s", file_entry |
| ) |
| else: |
| assert isinstance(filename_match, list) |
| for file_entry in files: |
| # Ensure all components are included in all result filename. |
| if all(part in file_entry["name"] for part in filename_match): |
| filtered_files.append(file_entry) |
| else: |
| logger.debug( |
| "Skipping mismatching fuzzy file match %s", file_entry |
| ) |
| |
| return filtered_files |
| |
| |
| def create_file( |
| service: Resource, |
| name: str, |
| mime_type: Optional[str] = None, |
| parent: Optional[str] = None, |
| media_body: Optional[Union[str, MediaUpload]] = None, |
| ) -> str: |
| """Create a file in a Google Drive. |
| |
| Arguments: |
| service: Google Drive service resource object |
| name: User-visible name of the new Drive folder |
| mime_type: Optional. MIME_TYPE of the created file. By default Google |
| Drive will guess the type; see API reference for details. |
| parent: Optional. ID of the parent folder. By default, the new file |
| will be put into the root folder of the account's "My Drive". |
| media_body: Optional. The filename of the media request body, or an |
| instance of a MediaUpload object. If this is None, an empty file |
| will be created. See API reference for details. |
| |
| Returns: |
| ID of the created file in the Google Drive. |
| """ |
| file_metadata: Dict[str, Any] = { |
| "name": name, |
| } |
| if mime_type: |
| file_metadata["mimeType"] = mime_type |
| if parent: |
| file_metadata["parents"] = [parent] |
| |
| fields: Dict[str, str] = ( |
| service.files() |
| .create( |
| body=file_metadata, |
| media_body=media_body, |
| fields="id", |
| supportsAllDrives=True, |
| ) |
| .execute() |
| ) |
| return fields["id"] |
| |
| |
| def create_folder( |
| service: Resource, name: str, parent: Optional[str] = None |
| ) -> str: |
| """Create a folder in a Google Drive. |
| |
| Arguments: |
| service: Google Drive service resource object |
| name: User-visible name of the new Drive folder |
| parent: Optional. ID of the parent folder. By default, the new folder |
| will be put into the root folder of the account's "My Drive". |
| |
| Returns: |
| ID of the created folder in the Google Drive. |
| """ |
| return create_file( |
| service, |
| name=name, |
| mime_type=MIMETYPE_GOOGLE_DRIVE_FOLDER, |
| parent=parent, |
| ) |
| |
| |
| def upload_file( |
| service: Resource, |
| file_path: Path, |
| upload_name: Optional[str] = None, |
| parent: Optional[str] = None, |
| ) -> str: |
| """Create a file with contents to a Google Drive folder. |
| |
| Arguments: |
| service: Google Drive service resource object |
| file_path: Path of the local source file to upload. |
| upload_name: Optional. User-visible name of the uploaded file. Defaults |
| to the name of the source file. |
| parent: Optional. ID of the parent folder. By default, the new file |
| will be put into the root folder of the account's "My Drive". |
| |
| Returns: |
| ID of the created folder in the Google Drive. |
| """ |
| if not file_path.exists(): |
| raise RuntimeError(f"Source file does not exist: {str(file_path)}") |
| if not file_path.is_file(): |
| raise RuntimeError(f"Source is not a file: {str(file_path)}") |
| |
| media = MediaFileUpload( |
| filename=str(file_path), |
| mimetype=None, # let the lib or GDrive guess the mime type |
| chunksize=-1, # upload in one go if possible |
| resumable=True, # resume might be needed for big files |
| ) |
| logger.info( |
| "Starting upload of %s (%s)", |
| file_path, |
| sizeof_fmt(file_path.stat().st_size), |
| ) |
| |
| uploaded_id = create_file( |
| service=service, |
| name=upload_name if upload_name else file_path.name, |
| parent=parent, |
| media_body=media, |
| ) |
| |
| logger.info("Uploaded file %s as fileId=%s", str(file_path), uploaded_id) |
| return uploaded_id |
| |
| |
| def upload_folder_recursively( |
| service: Resource, folder_path: Path, parent: Optional[str] = None |
| ) -> str: |
| """Upload a local folder into a Google Drive parent folder. |
| |
| Upload a local folder recursively into a Google Drive parent folder. Ignore |
| all symlinks and special files. |
| |
| Arguments: |
| service: Google Drive service resource object |
| folder_path: Local folder path to upload. |
| parent: Parent Google drive folder id (fileID) to upload into. If None, |
| upload to My Drive's root folder. |
| |
| Returns: |
| fileId of root of the uploaded folder structure. |
| |
| Raises: |
| RuntimeError: If folder_path is not a directory. |
| """ |
| if not folder_path.is_dir(): |
| raise RuntimeError( |
| f"upload_folder_recursively must be called on directories only. " |
| f'Called on "{str(folder_path)}"', |
| ) |
| |
| logger.info("Uploading folder recursively: %s", str(folder_path)) |
| |
| # Google Drive calls the identifier "fileId" no matter if it's actually a |
| # directory or file, so stick with it. |
| current_dir_file_id = create_folder( |
| service=service, |
| # For ".", name would be empty without resolving absolute path first. |
| name=folder_path.absolute().name, |
| parent=parent, |
| ) |
| |
| files = [] |
| sub_dirs = [] |
| for entry in folder_path.iterdir(): |
| # Sort out file and subfolders, and make sure we ignore anything odd. |
| if entry.is_symlink(): |
| logger.warning("Ignoring symbolic link: %s", str(entry)) |
| elif entry.is_dir(): |
| sub_dirs.append(entry) |
| elif entry.is_file(): |
| files.append(entry) |
| else: |
| logger.warning( |
| "Ignoring special file: %s (Could be device, FIFO, etc.)", entry |
| ) |
| |
| for file in files: |
| upload_file(service=service, file_path=file, parent=current_dir_file_id) |
| |
| for sub_dir in sub_dirs: |
| upload_folder_recursively( |
| service=service, folder_path=sub_dir, parent=current_dir_file_id |
| ) |
| |
| logger.info( |
| "Folder upload done: %s to id=%s", str(folder_path), current_dir_file_id |
| ) |
| |
| return current_dir_file_id |
| |
| |
| def debug_list_all_drive_contents( |
| service: Resource, |
| max_entries: int = 10, |
| folders_only: bool = False, |
| order_by: str = "recency", |
| ) -> None: |
| """Print quick overview of accessible Drives and files. |
| |
| Query all accessible Google Drives, both personal and team drives, and list |
| some files in them. This is not meant as a complete content listing, but |
| just to give a quick overview of what an account has access to. |
| |
| Arguments: |
| service: Google Drive service resource object |
| max_entries: Maximum number of files to list per Drive. |
| folders_only: List folders only, skip files. |
| order_by: Set ordering for files in Drives. |
| """ |
| # "My Drive" needs special handling |
| drives = [{"id": None, "name": "(My Drive)"}] |
| # Add all other (team) Drives to the list |
| drives.extend(list_drives(service)) |
| for drive in drives: |
| drive_name = drive["name"] |
| drive_id = drive["id"] |
| print(f'Drive: "{drive_name}" (id: {drive_id})') |
| if max_entries > 0: |
| print(f"Files (maximum {max_entries} most recent):") |
| else: |
| print("Files:") |
| # Search includes trashed files by default. |
| search_query = "trashed = false" |
| if folders_only: |
| search_query += f" and mimeType='{MIMETYPE_GOOGLE_DRIVE_FOLDER}'" |
| files = list_files( |
| service, |
| drive_id, |
| max_entries=max_entries, |
| query=search_query, |
| order_by=order_by, |
| ) |
| for file in files: |
| file_name = file["name"] |
| file_id = file["id"] |
| file_mime = file["mimeType"] |
| if folders_only: |
| print(f"* {file_name} (id: {file_id})") |
| else: |
| print(f"* {file_name} (mime: {file_mime} id: {file_id})") |
| print() |
| |
| |
| def cmd_overview_drives(args: argparse.Namespace, service: Resource) -> None: |
| """Command entry point for "overview_drives".""" |
| debug_list_all_drive_contents( |
| service, |
| max_entries=args.max_entries, |
| folders_only=args.folders_only, |
| order_by=args.order_by, |
| ) |
| |
| |
| def cmd_upload_file(args: argparse.Namespace, service: Resource) -> None: |
| """Command entry point for "upload_file".""" |
| uploaded_id = upload_file( |
| service=service, |
| file_path=Path(args.file), |
| upload_name=args.name, |
| parent=args.parent, |
| ) |
| print(uploaded_id) |
| |
| |
| def cmd_create_folder(args: argparse.Namespace, service: Resource) -> None: |
| """Command entry point for "create_folder".""" |
| file_id = create_folder(service=service, name=args.name, parent=args.parent) |
| logger.debug("Created folder %s with parent %s", args.name, args.parent) |
| print(file_id) |
| |
| |
| def cmd_upload(args: argparse.Namespace, service: Resource) -> None: |
| """Command entry point for "upload".""" |
| # Validate args before uploading anything. |
| if not args.recursive: |
| for source in args.source_paths: |
| if source.is_dir(): |
| args.parser.error( |
| 'Source path is a directory but "--recursive" argument was ' |
| "not set: %s", |
| str(source), |
| ) |
| if source.is_symlink(): |
| args.parser.error( |
| "Source path is a symbolic link, not supported: %s", |
| str(source), |
| ) |
| if not source.is_dir() and not source.is_file(): |
| args.parser.error( |
| "Source is a special file, not supported (Could be device, " |
| " FIFO, etc): %s", |
| str(source), |
| ) |
| |
| for source in args.source_paths: |
| if source.is_dir(): |
| upload_folder_recursively( |
| service=service, folder_path=source, parent=args.parent |
| ) |
| else: |
| upload_file(service=service, file_path=source, parent=args.parent) |
| |
| |
| def cmd_download_file_by_name( |
| args: argparse.Namespace, service: Resource |
| ) -> None: |
| """Command entry point for "download_file_by_name".""" |
| # Search includes trashed files by default. |
| query_base = ( |
| f"mimeType != '{MIMETYPE_GOOGLE_DRIVE_FOLDER}' and trashed = false" |
| ) |
| if args.filename: |
| query_filename = f"name = '{args.filename}'" |
| else: |
| query_filename = " and ".join( |
| f"name contains '{part}'" for part in args.filename_contains |
| ) |
| query = f"{query_base} and {query_filename}" |
| |
| # Google Drive search is fuzzy. Filter by actual search queries. |
| files = list_files_strict( |
| service=service, |
| drive_id=args.drive_id, |
| query=query, |
| filename_match=args.filename |
| if args.filename |
| else args.filename_contains, |
| ) |
| |
| if not files: |
| logger.error("No matching file found.") |
| sys.exit(1) |
| if len(files) != 1: |
| logger.error("Multiple matching files found: %s", files) |
| sys.exit(1) |
| |
| file_name = files[0]["name"] |
| file_id = files[0]["id"] |
| logger.info( |
| 'Downloading file "%s" with id=%s to %s', |
| file_name, |
| file_id, |
| args.out_filepath, |
| ) |
| |
| request = service.files().get_media(fileId=file_id) |
| file = FileIO(args.out_filepath, mode="wb") |
| downloader = MediaIoBaseDownload(file, request) |
| done = False |
| while done is False: |
| status, done = downloader.next_chunk() |
| logger.debug("Downloaded %s%%.", int(status.progress() * 100)) |
| logger.info("Download done.") |
| |
| |
| def parse_cmdline_arguments() -> argparse.Namespace: |
| """Parse the command line arguments. |
| |
| Returns: |
| argparse Namespace containing the parsed command line arguments of this |
| script. |
| |
| """ |
| parser = argparse.ArgumentParser( |
| description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| ) |
| log.add_logging_arguments(parser) |
| |
| subparsers = parser.add_subparsers(required=True) |
| |
| parser_overview_drives = subparsers.add_parser( |
| "overview_drives", |
| help="Print brief overview of accessible Google Drives.", |
| ) |
| parser_overview_drives.set_defaults(func=cmd_overview_drives) |
| parser_overview_drives.add_argument( |
| "-m", |
| "--max-entries", |
| help=( |
| "Maximum number of entries to query. Defaults to 10. Set to " |
| "negative number to list without limit." |
| ), |
| default=10, |
| type=int, |
| ) |
| parser_overview_drives.add_argument( |
| "--order-by", |
| help='Choose ordering of the outputs. Defaults to "recency"', |
| default="recency", |
| choices=[ |
| "createdTime", |
| "folder", |
| "modifiedByMeTime", |
| "modifiedTime", |
| "name", |
| "name_natural", |
| "quotaBytesUsed", |
| "recency", |
| "sharedWithMeTime", |
| "starred", |
| "viewedByMeTime", |
| ], |
| ) |
| parser_overview_drives.add_argument( |
| "--folders-only", |
| help=f'List mime type "{MIMETYPE_GOOGLE_DRIVE_FOLDER}" only.', |
| default=False, |
| action="store_true", |
| ) |
| |
| parser_upload_file = subparsers.add_parser( |
| "upload_file", |
| help="Upload a single file and output uploaded fileId.", |
| ) |
| parser_upload_file.set_defaults(func=cmd_upload_file) |
| parser_upload_file.add_argument( |
| "-f", |
| "--file", |
| help="Local file to upload to Google Drive.", |
| type=str, |
| required=True, |
| ) |
| parser_upload_file.add_argument( |
| "-p", |
| "--parent", |
| help=( |
| "Parent folder to put the file into. Leave empty to put to My " |
| "Drive's root." |
| ), |
| type=str, |
| ) |
| parser_upload_file.add_argument( |
| "-n", |
| "--name", |
| help=( |
| "Name of the uploaded file in Google Drive. Leave empty to take " |
| "over source file name." |
| ), |
| type=str, |
| ) |
| |
| parser_upload = subparsers.add_parser( |
| "upload", |
| help="Upload files and folders to a Google Drive folder.", |
| ) |
| parser_upload.set_defaults(func=cmd_upload) |
| parser_upload.add_argument( |
| "source_paths", |
| metavar="SOURCE_PATH", |
| help=( |
| "Local file or folder to upload to Google Drive. Multiple files " |
| "and folders can be uploaded in one run. When uploading folders, " |
| "-r must be set as well. Symbolic links and special files are not " |
| "supported." |
| ), |
| type=Path, |
| nargs="+", |
| ) |
| parser_upload.add_argument( |
| "-r", |
| "--recursive", |
| help="Upload directories recursively.", |
| default=False, |
| action="store_true", |
| ) |
| parser_upload.add_argument( |
| "-p", |
| "--parent", |
| help=( |
| "Parent folder to upload into. Leave empty to put to My Drive's " |
| "root folder." |
| ), |
| type=str, |
| ) |
| |
| parser_create_folder = subparsers.add_parser( |
| "create_folder", |
| help="Create a new empty folder and print its fileId.", |
| ) |
| parser_create_folder.set_defaults(func=cmd_create_folder) |
| parser_create_folder.add_argument( |
| "-n", |
| "--name", |
| help="Name of the folder to create.", |
| type=str, |
| required=True, |
| ) |
| parser_create_folder.add_argument( |
| "-p", |
| "--parent", |
| help=( |
| "Parent folder to put the new folder into. Leave empty to put to " |
| "My Drive's root." |
| ), |
| type=str, |
| ) |
| |
| parser_download_file_by_name = subparsers.add_parser( |
| "download_file_by_name", |
| help="Download a file by name, assuming it is unique under a parent.", |
| ) |
| parser_download_file_by_name.set_defaults(func=cmd_download_file_by_name) |
| parser_download_file_by_name_filename_args = ( |
| parser_download_file_by_name.add_mutually_exclusive_group(required=True) |
| ) |
| parser_download_file_by_name_filename_args.add_argument( |
| "-n", |
| "--filename", |
| help=("Find files by unique filename."), |
| type=str, |
| ) |
| parser_download_file_by_name_filename_args.add_argument( |
| "-c", |
| "--filename-contains", |
| help=( |
| "Find file with containing a string. If set multiple times, all of " |
| 'the "contains" parameters must match.' |
| ), |
| type=str, |
| action="append", |
| ) |
| parser_download_file_by_name.add_argument( |
| "-d", |
| "--drive-id", |
| help=( |
| 'ID of the Google Drive to search within. Defaults to "My Drive".' |
| ), |
| type=str, |
| ) |
| parser_download_file_by_name.add_argument( |
| "-o", |
| "--out-filepath", |
| help="Output filename or path to download the file to.", |
| type=str, |
| required=True, |
| ) |
| |
| args = parser.parse_args() |
| # Pass-through the parser object to sub commands for error handling. |
| args.parser = parser |
| log.configure_logger("", args.log, args.verbose, args.quiet) |
| return args |
| |
| |
| def main() -> None: |
| """Entry point into this command line tool.""" |
| args = parse_cmdline_arguments() |
| |
| creds, _ = google.auth.load_credentials_from_file( |
| filename=os.environ[google.auth.environment_vars.CREDENTIALS] |
| ) |
| try: |
| # create drive api client |
| service = build( |
| "drive", |
| "v3", |
| credentials=creds, |
| cache_discovery=False, # can't work for file credentials |
| ) |
| args.func(args, service) |
| |
| except HttpError as error: |
| logger.error(error) |
| raise |
| |
| |
| if __name__ == "__main__": |
| main() |