blob: cf54eb056401c00e4a72cbbcc594835f828c78cf [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2022-2023 Fairphone B.V.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Minimal Google Drive service account client.
Command line client for uploading files and folder to a Google Drive folder via
a Google service account. The client supports both regular personal drives ("My
Drive") as well as team drives. It is designed for scripting and automation,
thus working with a service account only. It is not intended to work as OAuth
client.
To use this client:
* Set up a Google Service account and give it access to required Drives.
* Create a service account key and download in JSON format.
* Point environment variable GOOGLE_APPLICATION_CREDENTIALS to this key file.
Refer to the Google Drive (Python) API reference for details:
* https://developers.google.com/resources/api-libraries/documentation/drive/v3/python/latest/index.html # noqa: E501 # pylint: disable=line-too-long
"""
import argparse
from io import FileIO
import json
import logging
import os
from pathlib import Path
import sys
from typing import Any, Dict, List, Optional, Union
import google.auth # type: ignore
from googleapiclient.discovery import build, Resource # type: ignore
from googleapiclient.errors import HttpError # type: ignore
from googleapiclient.http import ( # type: ignore
MediaUpload,
MediaFileUpload,
MediaIoBaseDownload,
)
import log
# See also https://developers.google.com/drive/api/guides/mime-types
MIMETYPE_GOOGLE_DRIVE_FOLDER = "application/vnd.google-apps.folder"
logger = logging.getLogger(__name__)
# https://stackoverflow.com/a/1094933
def sizeof_fmt(num: float, suffix: str = "B") -> str:
"""Pretty-print a file size."""
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
def list_drives(service: Resource) -> List[Dict[str, Any]]:
"""List accessible Google Drives.
List both personal and team drives accessible with the current account.
Returns:
List of drives, each a dictionary of drive attributes.
"""
drives = []
page_token = None
while True:
response = (
service.drives()
.list(fields="nextPageToken,drives(id,name)", pageToken=page_token)
.execute()
)
drives.extend(response.get("drives", []))
page_token = response.get("nextPageToken", None)
if not page_token:
break
return drives
def list_files(
service: Resource,
drive_id: Optional[str] = None,
query: Optional[str] = None,
max_entries: Optional[int] = None,
order_by: str = "recency",
) -> List[Dict[str, str]]:
"""List files in a Google Drive folder.
Arguments:
service: Google Drive service resource object
drive_id: Optional. ID of the Google drive to query. Default to the
accounts own drive, "My Drive".
query: Optional. A query for filtering the file results. See Drive API
reference for details.
max_entries: Optional. Restrict number of files returned.
order_by: Choose ordering for returned results.
Returns:
List of files, for each list a mapping of fileId, name and mimeType.
"""
files: List[Dict[str, Any]] = []
page_token = None
drive_params = {}
page_size = 100
if drive_id:
drive_params = {
"driveId": drive_id,
# Below three MUST be set when using parameter 'driveId'
"corpora": "drive", # list files contained in driveId
"includeItemsFromAllDrives": True,
"supportsAllDrives": True,
}
while True:
if max_entries and max_entries >= 0:
remaining = max(0, max_entries - len(files))
if remaining <= 0:
break
if remaining < page_size:
page_size = remaining
response = (
service.files()
.list(
orderBy=order_by,
pageSize=page_size,
q=query,
fields="nextPageToken,files(id,name,mimeType)",
**drive_params,
pageToken=page_token,
)
.execute()
)
files.extend(response.get("files", []))
page_token = response.get("nextPageToken", None)
if page_token is None:
break
return files
def list_files_strict( # pylint: disable=too-many-arguments
service: Resource,
filename_match: Union[str, List[str]],
drive_id: Optional[str] = None,
query: Optional[str] = None,
max_entries: Optional[int] = None,
order_by: str = "recency",
) -> List[Dict[str, str]]:
"""List files, strictly filtered after fuzzy Google Drive search.
Exactly one of filename or filename_contains must be set.
Arguments:
(All arguments of list_files())
filename_match: Either of two: Full filename that result files must
match; or list of filename components that must all be included in
all result file name.
Returns:
Same results as list_files(), except for applying stricter filename
matching.
"""
files = list_files(
service=service,
drive_id=drive_id,
query=query,
max_entries=max_entries,
order_by=order_by,
)
filtered_files = []
if isinstance(filename_match, str):
for file_entry in files:
# Match full filenames
if filename_match == file_entry["name"]:
filtered_files.append(file_entry)
else:
logger.debug(
"Skipping mismatching fuzzy file match %s", file_entry
)
else:
assert isinstance(filename_match, list)
for file_entry in files:
# Ensure all components are included in all result filename.
if all(part in file_entry["name"] for part in filename_match):
filtered_files.append(file_entry)
else:
logger.debug(
"Skipping mismatching fuzzy file match %s", file_entry
)
return filtered_files
def create_file(
service: Resource,
name: str,
mime_type: Optional[str] = None,
parent: Optional[str] = None,
media_body: Optional[Union[str, MediaUpload]] = None,
) -> str:
"""Create a file in a Google Drive.
Arguments:
service: Google Drive service resource object
name: User-visible name of the new Drive folder
mime_type: Optional. MIME_TYPE of the created file. By default Google
Drive will guess the type; see API reference for details.
parent: Optional. ID of the parent folder. By default, the new file
will be put into the root folder of the account's "My Drive".
media_body: Optional. The filename of the media request body, or an
instance of a MediaUpload object. If this is None, an empty file
will be created. See API reference for details.
Returns:
ID of the created file in the Google Drive.
"""
file_metadata: Dict[str, Any] = {
"name": name,
}
if mime_type:
file_metadata["mimeType"] = mime_type
if parent:
file_metadata["parents"] = [parent]
fields: Dict[str, str] = (
service.files()
.create(
body=file_metadata,
media_body=media_body,
fields="id",
supportsAllDrives=True,
)
.execute()
)
return fields["id"]
def create_folder(
service: Resource, name: str, parent: Optional[str] = None
) -> str:
"""Create a folder in a Google Drive.
Arguments:
service: Google Drive service resource object
name: User-visible name of the new Drive folder
parent: Optional. ID of the parent folder. By default, the new folder
will be put into the root folder of the account's "My Drive".
Returns:
ID of the created folder in the Google Drive.
"""
return create_file(
service,
name=name,
mime_type=MIMETYPE_GOOGLE_DRIVE_FOLDER,
parent=parent,
)
def upload_file(
service: Resource,
file_path: Path,
upload_name: Optional[str] = None,
parent: Optional[str] = None,
) -> str:
"""Create a file with contents to a Google Drive folder.
Arguments:
service: Google Drive service resource object
file_path: Path of the local source file to upload.
upload_name: Optional. User-visible name of the uploaded file. Defaults
to the name of the source file.
parent: Optional. ID of the parent folder. By default, the new file
will be put into the root folder of the account's "My Drive".
Returns:
ID of the created folder in the Google Drive.
"""
if not file_path.exists():
raise RuntimeError(f"Source file does not exist: {str(file_path)}")
if not file_path.is_file():
raise RuntimeError(f"Source is not a file: {str(file_path)}")
media = MediaFileUpload(
filename=str(file_path),
mimetype=None, # let the lib or GDrive guess the mime type
chunksize=-1, # upload in one go if possible
resumable=True, # resume might be needed for big files
)
logger.info(
"Starting upload of %s (%s)",
file_path,
sizeof_fmt(file_path.stat().st_size),
)
uploaded_id = create_file(
service=service,
name=upload_name if upload_name else file_path.name,
parent=parent,
media_body=media,
)
logger.info("Uploaded file %s as fileId=%s", str(file_path), uploaded_id)
return uploaded_id
def upload_folder_recursively(
service: Resource, folder_path: Path, parent: Optional[str] = None
) -> str:
"""Upload a local folder into a Google Drive parent folder.
Upload a local folder recursively into a Google Drive parent folder. Ignore
all symlinks and special files.
Arguments:
service: Google Drive service resource object
folder_path: Local folder path to upload.
parent: Parent Google drive folder id (fileID) to upload into. If None,
upload to My Drive's root folder.
Returns:
fileId of root of the uploaded folder structure.
Raises:
RuntimeError: If folder_path is not a directory.
"""
if not folder_path.is_dir():
raise RuntimeError(
f"upload_folder_recursively must be called on directories only. "
f'Called on "{str(folder_path)}"',
)
logger.info("Uploading folder recursively: %s", str(folder_path))
# Google Drive calls the identifier "fileId" no matter if it's actually a
# directory or file, so stick with it.
current_dir_file_id = create_folder(
service=service,
# For ".", name would be empty without resolving absolute path first.
name=folder_path.absolute().name,
parent=parent,
)
files = []
sub_dirs = []
for entry in folder_path.iterdir():
# Sort out file and subfolders, and make sure we ignore anything odd.
if entry.is_symlink():
logger.warning("Ignoring symbolic link: %s", str(entry))
elif entry.is_dir():
sub_dirs.append(entry)
elif entry.is_file():
files.append(entry)
else:
logger.warning(
"Ignoring special file: %s (Could be device, FIFO, etc.)", entry
)
for file in files:
upload_file(service=service, file_path=file, parent=current_dir_file_id)
for sub_dir in sub_dirs:
upload_folder_recursively(
service=service, folder_path=sub_dir, parent=current_dir_file_id
)
logger.info(
"Folder upload done: %s to id=%s", str(folder_path), current_dir_file_id
)
return current_dir_file_id
def debug_list_all_drive_contents(
service: Resource,
max_entries: int = 10,
folders_only: bool = False,
order_by: str = "recency",
) -> None:
"""Print quick overview of accessible Drives and files.
Query all accessible Google Drives, both personal and team drives, and list
some files in them. This is not meant as a complete content listing, but
just to give a quick overview of what an account has access to.
Arguments:
service: Google Drive service resource object
max_entries: Maximum number of files to list per Drive.
folders_only: List folders only, skip files.
order_by: Set ordering for files in Drives.
"""
# "My Drive" needs special handling
drives = [{"id": None, "name": "(My Drive)"}]
# Add all other (team) Drives to the list
drives.extend(list_drives(service))
for drive in drives:
drive_name = drive["name"]
drive_id = drive["id"]
print(f'Drive: "{drive_name}" (id: {drive_id})')
if max_entries > 0:
print(f"Files (maximum {max_entries} most recent):")
else:
print("Files:")
# Search includes trashed files by default.
search_query = "trashed = false"
if folders_only:
search_query += f" and mimeType='{MIMETYPE_GOOGLE_DRIVE_FOLDER}'"
files = list_files(
service,
drive_id,
max_entries=max_entries,
query=search_query,
order_by=order_by,
)
for file in files:
file_name = file["name"]
file_id = file["id"]
file_mime = file["mimeType"]
if folders_only:
print(f"* {file_name} (id: {file_id})")
else:
print(f"* {file_name} (mime: {file_mime} id: {file_id})")
print()
def cmd_overview_drives(args: argparse.Namespace, service: Resource) -> None:
"""Command entry point for "overview_drives"."""
debug_list_all_drive_contents(
service,
max_entries=args.max_entries,
folders_only=args.folders_only,
order_by=args.order_by,
)
def cmd_upload_file(args: argparse.Namespace, service: Resource) -> None:
"""Command entry point for "upload_file"."""
uploaded_id = upload_file(
service=service,
file_path=Path(args.file),
upload_name=args.name,
parent=args.parent,
)
print(uploaded_id)
def cmd_create_folder(args: argparse.Namespace, service: Resource) -> None:
"""Command entry point for "create_folder"."""
file_id = create_folder(service=service, name=args.name, parent=args.parent)
logger.debug("Created folder %s with parent %s", args.name, args.parent)
print(file_id)
def cmd_upload(args: argparse.Namespace, service: Resource) -> None:
"""Command entry point for "upload"."""
# Validate args before uploading anything.
if not args.recursive:
for source in args.source_paths:
if source.is_dir():
args.parser.error(
'Source path is a directory but "--recursive" argument was '
"not set: %s",
str(source),
)
if source.is_symlink():
args.parser.error(
"Source path is a symbolic link, not supported: %s",
str(source),
)
if not source.is_dir() and not source.is_file():
args.parser.error(
"Source is a special file, not supported (Could be device, "
" FIFO, etc): %s",
str(source),
)
for source in args.source_paths:
if source.is_dir():
upload_folder_recursively(
service=service, folder_path=source, parent=args.parent
)
else:
upload_file(service=service, file_path=source, parent=args.parent)
def cmd_download_file_by_name(
args: argparse.Namespace, service: Resource
) -> None:
"""Command entry point for "download_file_by_name"."""
# Search includes trashed files by default.
query_base = (
f"mimeType != '{MIMETYPE_GOOGLE_DRIVE_FOLDER}' and trashed = false"
)
if args.filename:
query_filename = f"name = '{args.filename}'"
else:
query_filename = " and ".join(
f"name contains '{part}'" for part in args.filename_contains
)
query = f"{query_base} and {query_filename}"
# Google Drive search is fuzzy. Filter by actual search queries.
files = list_files_strict(
service=service,
drive_id=args.drive_id,
query=query,
filename_match=args.filename
if args.filename
else args.filename_contains,
)
if not files:
logger.error("No matching file found.")
sys.exit(1)
if len(files) != 1:
logger.error("Multiple matching files found: %s", files)
sys.exit(1)
file_name = files[0]["name"]
file_id = files[0]["id"]
logger.info(
'Downloading file "%s" with id=%s to %s',
file_name,
file_id,
args.out_filepath,
)
request = service.files().get_media(fileId=file_id)
file = FileIO(args.out_filepath, mode="wb")
downloader = MediaIoBaseDownload(file, request)
done = False
while done is False:
status, done = downloader.next_chunk()
logger.debug("Downloaded %s%%.", int(status.progress() * 100))
logger.info("Download done.")
def cmd_find_file(args: argparse.Namespace, service: Resource) -> None:
"""Command entry point for "find"."""
# Search includes trashed files by default.
query_base = (
f"mimeType != '{MIMETYPE_GOOGLE_DRIVE_FOLDER}' and trashed = false"
)
if args.filename:
query_filename = f"name = '{args.filename}'"
else:
query_filename = " and ".join(
f"name contains '{part}'" for part in args.filename_contains
)
query = f"{query_base} and {query_filename}"
# Google Drive search is fuzzy. Filter by actual search queries.
files = list_files_strict(
service=service,
drive_id=args.drive_id,
query=query,
filename_match=args.filename
if args.filename
else args.filename_contains,
)
print(json.dumps(files, ensure_ascii=False, indent=2))
def parse_cmdline_arguments() -> argparse.Namespace:
"""Parse the command line arguments.
Returns:
argparse Namespace containing the parsed command line arguments of this
script.
"""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
log.add_logging_arguments(parser)
subparsers = parser.add_subparsers(required=True)
parser_overview_drives = subparsers.add_parser(
"overview_drives",
help="Print brief overview of accessible Google Drives.",
)
parser_overview_drives.set_defaults(func=cmd_overview_drives)
parser_overview_drives.add_argument(
"-m",
"--max-entries",
help=(
"Maximum number of entries to query. Defaults to 10. Set to "
"negative number to list without limit."
),
default=10,
type=int,
)
parser_overview_drives.add_argument(
"--order-by",
help='Choose ordering of the outputs. Defaults to "recency"',
default="recency",
choices=[
"createdTime",
"folder",
"modifiedByMeTime",
"modifiedTime",
"name",
"name_natural",
"quotaBytesUsed",
"recency",
"sharedWithMeTime",
"starred",
"viewedByMeTime",
],
)
parser_overview_drives.add_argument(
"--folders-only",
help=f'List mime type "{MIMETYPE_GOOGLE_DRIVE_FOLDER}" only.',
default=False,
action="store_true",
)
parser_upload_file = subparsers.add_parser(
"upload_file",
help="Upload a single file and output uploaded fileId.",
)
parser_upload_file.set_defaults(func=cmd_upload_file)
parser_upload_file.add_argument(
"-f",
"--file",
help="Local file to upload to Google Drive.",
type=str,
required=True,
)
parser_upload_file.add_argument(
"-p",
"--parent",
help=(
"Parent folder to put the file into. Leave empty to put to My "
"Drive's root."
),
type=str,
)
parser_upload_file.add_argument(
"-n",
"--name",
help=(
"Name of the uploaded file in Google Drive. Leave empty to take "
"over source file name."
),
type=str,
)
parser_upload = subparsers.add_parser(
"upload",
help="Upload files and folders to a Google Drive folder.",
)
parser_upload.set_defaults(func=cmd_upload)
parser_upload.add_argument(
"source_paths",
metavar="SOURCE_PATH",
help=(
"Local file or folder to upload to Google Drive. Multiple files "
"and folders can be uploaded in one run. When uploading folders, "
"-r must be set as well. Symbolic links and special files are not "
"supported."
),
type=Path,
nargs="+",
)
parser_upload.add_argument(
"-r",
"--recursive",
help="Upload directories recursively.",
default=False,
action="store_true",
)
parser_upload.add_argument(
"-p",
"--parent",
help=(
"Parent folder to upload into. Leave empty to put to My Drive's "
"root folder."
),
type=str,
)
parser_create_folder = subparsers.add_parser(
"create_folder",
help="Create a new empty folder and print its fileId.",
)
parser_create_folder.set_defaults(func=cmd_create_folder)
parser_create_folder.add_argument(
"-n",
"--name",
help="Name of the folder to create.",
type=str,
required=True,
)
parser_create_folder.add_argument(
"-p",
"--parent",
help=(
"Parent folder to put the new folder into. Leave empty to put to "
"My Drive's root."
),
type=str,
)
parser_download_file_by_name = subparsers.add_parser(
"download_file_by_name",
help="Download a file by name, assuming it is unique under a parent.",
)
parser_download_file_by_name.set_defaults(func=cmd_download_file_by_name)
parser_download_file_by_name_filename_args = (
parser_download_file_by_name.add_mutually_exclusive_group(required=True)
)
parser_download_file_by_name_filename_args.add_argument(
"-n",
"--filename",
help=("Find files by unique filename."),
type=str,
)
parser_download_file_by_name_filename_args.add_argument(
"-c",
"--filename-contains",
help=(
"Find file with containing a string. If set multiple times, all of "
'the "contains" parameters must match.'
),
type=str,
action="append",
)
parser_download_file_by_name.add_argument(
"-d",
"--drive-id",
help=(
'ID of the Google Drive to search within. Defaults to "My Drive".'
),
type=str,
)
parser_download_file_by_name.add_argument(
"-o",
"--out-filepath",
help="Output filename or path to download the file to.",
type=str,
required=True,
)
parser_find_file = subparsers.add_parser(
"find_file",
help=(
"Find files by name on a Drive. Print attributes of matching files "
"as json."
),
)
parser_find_file.set_defaults(func=cmd_find_file)
parser_find_file_filename_args = (
parser_find_file.add_mutually_exclusive_group(required=True)
)
parser_find_file_filename_args.add_argument(
"-n",
"--filename",
help="Find files by unique filename.",
type=str,
)
parser_find_file_filename_args.add_argument(
"-c",
"--filename-contains",
help=(
"Find file with containing a string. If set multiple times, all of "
'the "contains" parameters must match.'
),
type=str,
action="append",
)
parser_find_file.add_argument(
"-d",
"--drive-id",
help=(
'ID of the Google Drive to search within. Defaults to "My Drive".'
),
type=str,
)
args = parser.parse_args()
# Pass-through the parser object to sub commands for error handling.
args.parser = parser
log.configure_logger("", args.log, args.verbose, args.quiet)
return args
def main() -> None:
"""Entry point into this command line tool."""
args = parse_cmdline_arguments()
creds, _ = google.auth.load_credentials_from_file(
filename=os.environ[google.auth.environment_vars.CREDENTIALS]
)
try:
# create drive api client
service = build(
"drive",
"v3",
credentials=creds,
cache_discovery=False, # can't work for file credentials
)
args.func(args, service)
except HttpError as error:
logger.error(error)
raise
if __name__ == "__main__":
main()