blob: 77a5f5691e9ea1155083d41cde84fc75e62189c0 [file] [log] [blame]
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -08001#!/usr/bin/env python2.7
Jan Tattermusch7897ae92017-06-07 22:57:36 +02002# Copyright 2015 gRPC authors.
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -08003#
Jan Tattermusch7897ae92017-06-07 22:57:36 +02004# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -08007#
Jan Tattermusch7897ae92017-06-07 22:57:36 +02008# http://www.apache.org/licenses/LICENSE-2.0
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -08009#
Jan Tattermusch7897ae92017-06-07 22:57:36 +020010# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -080015
16import argparse
17import json
18import uuid
19import httplib2
20
21from apiclient import discovery
22from apiclient.errors import HttpError
23from oauth2client.client import GoogleCredentials
24
Matt Kwongd14c0ea2017-05-19 14:25:01 -070025# 30 days in milliseconds
26_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -080027NUM_RETRIES = 3
28
29
30def create_big_query():
31 """Authenticates with cloud platform and gets a BiqQuery service object
32 """
33 creds = GoogleCredentials.get_application_default()
Matt Kwong8961a102017-06-16 12:29:59 -070034 return discovery.build('bigquery', 'v2', credentials=creds, cache_discovery=False)
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -080035
36
37def create_dataset(biq_query, project_id, dataset_id):
38 is_success = True
39 body = {
40 'datasetReference': {
41 'projectId': project_id,
42 'datasetId': dataset_id
43 }
44 }
45
46 try:
47 dataset_req = biq_query.datasets().insert(projectId=project_id, body=body)
48 dataset_req.execute(num_retries=NUM_RETRIES)
49 except HttpError as http_error:
50 if http_error.resp.status == 409:
51 print 'Warning: The dataset %s already exists' % dataset_id
52 else:
53 # Note: For more debugging info, print "http_error.content"
54 print 'Error in creating dataset: %s. Err: %s' % (dataset_id, http_error)
55 is_success = False
56 return is_success
57
58
59def create_table(big_query, project_id, dataset_id, table_id, table_schema,
60 description):
Jan Tattermusch7d54db82016-04-14 16:57:45 -070061 fields = [{'name': field_name,
62 'type': field_type,
63 'description': field_description
64 } for (field_name, field_type, field_description) in table_schema]
65 return create_table2(big_query, project_id, dataset_id, table_id,
66 fields, description)
67
68
Matt Kwongd14c0ea2017-05-19 14:25:01 -070069def create_partitioned_table(big_query, project_id, dataset_id, table_id, table_schema,
70 description, partition_type='DAY', expiration_ms=_EXPIRATION_MS):
71 """Creates a partitioned table. By default, a date-paritioned table is created with
72 each partition lasting 30 days after it was last modified.
73 """
74 fields = [{'name': field_name,
75 'type': field_type,
76 'description': field_description
77 } for (field_name, field_type, field_description) in table_schema]
78 return create_table2(big_query, project_id, dataset_id, table_id,
79 fields, description, partition_type, expiration_ms)
80
81
Jan Tattermusch7d54db82016-04-14 16:57:45 -070082def create_table2(big_query, project_id, dataset_id, table_id, fields_schema,
Matt Kwongd14c0ea2017-05-19 14:25:01 -070083 description, partition_type=None, expiration_ms=None):
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -080084 is_success = True
85
86 body = {
87 'description': description,
88 'schema': {
Jan Tattermusch7d54db82016-04-14 16:57:45 -070089 'fields': fields_schema
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -080090 },
91 'tableReference': {
92 'datasetId': dataset_id,
93 'projectId': project_id,
94 'tableId': table_id
95 }
96 }
97
Matt Kwongd14c0ea2017-05-19 14:25:01 -070098 if partition_type and expiration_ms:
99 body["timePartitioning"] = {
100 "type": partition_type,
101 "expirationMs": expiration_ms
102 }
103
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -0800104 try:
105 table_req = big_query.tables().insert(projectId=project_id,
106 datasetId=dataset_id,
107 body=body)
108 res = table_req.execute(num_retries=NUM_RETRIES)
109 print 'Successfully created %s "%s"' % (res['kind'], res['id'])
110 except HttpError as http_error:
111 if http_error.resp.status == 409:
112 print 'Warning: Table %s already exists' % table_id
113 else:
114 print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
115 is_success = False
116 return is_success
117
118
Jan Tattermusch58ca8be2017-06-27 13:45:46 +0200119def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
120 is_success = True
121
122 body = {
123 'schema': {
124 'fields': fields_schema
125 },
126 'tableReference': {
127 'datasetId': dataset_id,
128 'projectId': project_id,
129 'tableId': table_id
130 }
131 }
132
133 try:
134 table_req = big_query.tables().patch(projectId=project_id,
135 datasetId=dataset_id,
136 tableId=table_id,
137 body=body)
138 res = table_req.execute(num_retries=NUM_RETRIES)
139 print 'Successfully patched %s "%s"' % (res['kind'], res['id'])
140 except HttpError as http_error:
141 print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
142 is_success = False
143 return is_success
144
145
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -0800146def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
147 is_success = True
148 body = {'rows': rows_list}
149 try:
150 insert_req = big_query.tabledata().insertAll(projectId=project_id,
151 datasetId=dataset_id,
152 tableId=table_id,
153 body=body)
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -0800154 res = insert_req.execute(num_retries=NUM_RETRIES)
Jan Tattermuschac4251a2016-04-15 14:44:59 -0700155 if res.get('insertErrors', None):
156 print 'Error inserting rows! Response: %s' % res
157 is_success = False
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -0800158 except HttpError as http_error:
Jan Tattermuschac4251a2016-04-15 14:44:59 -0700159 print 'Error inserting rows to the table %s' % table_id
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -0800160 is_success = False
Jan Tattermuschac4251a2016-04-15 14:44:59 -0700161
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -0800162 return is_success
163
164
165def sync_query_job(big_query, project_id, query, timeout=5000):
166 query_data = {'query': query, 'timeoutMs': timeout}
167 query_job = None
168 try:
169 query_job = big_query.jobs().query(
170 projectId=project_id,
171 body=query_data).execute(num_retries=NUM_RETRIES)
172 except HttpError as http_error:
173 print 'Query execute job failed with error: %s' % http_error
174 print http_error.content
175 return query_job
176
177 # List of (column name, column type, description) tuples
178def make_row(unique_row_id, row_values_dict):
Sree Kuchibhotla2715a392016-02-24 12:01:52 -0800179 """row_values_dict is a dictionary of column name and column value.
Sree Kuchibhotla559e45b2016-02-19 03:02:16 -0800180 """
181 return {'insertId': unique_row_id, 'json': row_values_dict}