blob: efff0f657f35f7f41aebdf109dbecaa0abf4695d [file] [log] [blame]
Chris McDonough781de4c2018-07-26 12:33:30 -04001# Copyright 2014 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
John Asmuth864311d2014-04-24 15:46:08 -040015"""Channel notifications support.
16
17Classes and functions to support channel subscriptions and notifications
18on those channels.
19
20Notes:
21 - This code is based on experimental APIs and is subject to change.
22 - Notification does not do deduplication of notification ids, that's up to
23 the receiver.
24 - Storing the Channel between calls is up to the caller.
25
26
27Example setting up a channel:
28
29 # Create a new channel that gets notifications via webhook.
30 channel = new_webhook_channel("https://example.com/my_web_hook")
31
32 # Store the channel, keyed by 'channel.id'. Store it before calling the
33 # watch method because notifications may start arriving before the watch
34 # method returns.
35 ...
36
37 resp = service.objects().watchAll(
38 bucket="some_bucket_id", body=channel.body()).execute()
39 channel.update(resp)
40
41 # Store the channel, keyed by 'channel.id'. Store it after being updated
42 # since the resource_id value will now be correct, and that's needed to
43 # stop a subscription.
44 ...
45
46
47An example Webhook implementation using webapp2. Note that webapp2 puts
48headers in a case insensitive dictionary, as headers aren't guaranteed to
49always be upper case.
50
51 id = self.request.headers[X_GOOG_CHANNEL_ID]
52
53 # Retrieve the channel by id.
54 channel = ...
55
56 # Parse notification from the headers, including validating the id.
57 n = notification_from_headers(channel, self.request.headers)
58
59 # Do app specific stuff with the notification here.
60 if n.resource_state == 'sync':
61 # Code to handle sync state.
62 elif n.resource_state == 'exists':
63 # Code to handle the exists state.
64 elif n.resource_state == 'not_exists':
65 # Code to handle the not exists state.
66
67
68Example of unsubscribing.
69
Corey Schafb1b16fd2018-12-12 14:13:23 -050070 service.channels().stop(channel.body()).execute()
John Asmuth864311d2014-04-24 15:46:08 -040071"""
INADA Naokie4ea1a92015-03-04 03:45:42 +090072from __future__ import absolute_import
John Asmuth864311d2014-04-24 15:46:08 -040073
74import datetime
75import uuid
76
77from googleapiclient import errors
Helen Koikede13e3b2018-04-26 16:05:16 -030078from googleapiclient import _helpers as util
INADA Naokie4ea1a92015-03-04 03:45:42 +090079import six
John Asmuth864311d2014-04-24 15:46:08 -040080
81
82# The unix time epoch starts at midnight 1970.
83EPOCH = datetime.datetime.utcfromtimestamp(0)
84
85# Map the names of the parameters in the JSON channel description to
86# the parameter names we use in the Channel class.
87CHANNEL_PARAMS = {
Bu Sun Kim66bb32c2019-10-30 10:11:58 -070088 "address": "address",
89 "id": "id",
90 "expiration": "expiration",
91 "params": "params",
92 "resourceId": "resource_id",
93 "resourceUri": "resource_uri",
94 "type": "type",
95 "token": "token",
96}
John Asmuth864311d2014-04-24 15:46:08 -040097
Bu Sun Kim66bb32c2019-10-30 10:11:58 -070098X_GOOG_CHANNEL_ID = "X-GOOG-CHANNEL-ID"
99X_GOOG_MESSAGE_NUMBER = "X-GOOG-MESSAGE-NUMBER"
100X_GOOG_RESOURCE_STATE = "X-GOOG-RESOURCE-STATE"
101X_GOOG_RESOURCE_URI = "X-GOOG-RESOURCE-URI"
102X_GOOG_RESOURCE_ID = "X-GOOG-RESOURCE-ID"
John Asmuth864311d2014-04-24 15:46:08 -0400103
104
105def _upper_header_keys(headers):
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700106 new_headers = {}
107 for k, v in six.iteritems(headers):
108 new_headers[k.upper()] = v
109 return new_headers
John Asmuth864311d2014-04-24 15:46:08 -0400110
111
112class Notification(object):
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700113 """A Notification from a Channel.
John Asmuth864311d2014-04-24 15:46:08 -0400114
115 Notifications are not usually constructed directly, but are returned
116 from functions like notification_from_headers().
117
118 Attributes:
119 message_number: int, The unique id number of this notification.
120 state: str, The state of the resource being monitored.
121 uri: str, The address of the resource being monitored.
122 resource_id: str, The unique identifier of the version of the resource at
123 this event.
124 """
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700125
126 @util.positional(5)
127 def __init__(self, message_number, state, resource_uri, resource_id):
128 """Notification constructor.
John Asmuth864311d2014-04-24 15:46:08 -0400129
130 Args:
131 message_number: int, The unique id number of this notification.
132 state: str, The state of the resource being monitored. Can be one
133 of "exists", "not_exists", or "sync".
134 resource_uri: str, The address of the resource being monitored.
135 resource_id: str, The identifier of the watched resource.
136 """
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700137 self.message_number = message_number
138 self.state = state
139 self.resource_uri = resource_uri
140 self.resource_id = resource_id
John Asmuth864311d2014-04-24 15:46:08 -0400141
142
143class Channel(object):
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700144 """A Channel for notifications.
John Asmuth864311d2014-04-24 15:46:08 -0400145
146 Usually not constructed directly, instead it is returned from helper
147 functions like new_webhook_channel().
148
149 Attributes:
150 type: str, The type of delivery mechanism used by this channel. For
151 example, 'web_hook'.
152 id: str, A UUID for the channel.
153 token: str, An arbitrary string associated with the channel that
154 is delivered to the target address with each event delivered
155 over this channel.
156 address: str, The address of the receiving entity where events are
157 delivered. Specific to the channel type.
158 expiration: int, The time, in milliseconds from the epoch, when this
159 channel will expire.
160 params: dict, A dictionary of string to string, with additional parameters
161 controlling delivery channel behavior.
162 resource_id: str, An opaque id that identifies the resource that is
163 being watched. Stable across different API versions.
164 resource_uri: str, The canonicalized ID of the watched resource.
165 """
166
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700167 @util.positional(5)
168 def __init__(
169 self,
170 type,
171 id,
172 token,
173 address,
174 expiration=None,
175 params=None,
176 resource_id="",
177 resource_uri="",
178 ):
179 """Create a new Channel.
John Asmuth864311d2014-04-24 15:46:08 -0400180
181 In user code, this Channel constructor will not typically be called
182 manually since there are functions for creating channels for each specific
183 type with a more customized set of arguments to pass.
184
185 Args:
186 type: str, The type of delivery mechanism used by this channel. For
187 example, 'web_hook'.
188 id: str, A UUID for the channel.
189 token: str, An arbitrary string associated with the channel that
190 is delivered to the target address with each event delivered
191 over this channel.
192 address: str, The address of the receiving entity where events are
193 delivered. Specific to the channel type.
194 expiration: int, The time, in milliseconds from the epoch, when this
195 channel will expire.
196 params: dict, A dictionary of string to string, with additional parameters
197 controlling delivery channel behavior.
198 resource_id: str, An opaque id that identifies the resource that is
199 being watched. Stable across different API versions.
200 resource_uri: str, The canonicalized ID of the watched resource.
201 """
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700202 self.type = type
203 self.id = id
204 self.token = token
205 self.address = address
206 self.expiration = expiration
207 self.params = params
208 self.resource_id = resource_id
209 self.resource_uri = resource_uri
John Asmuth864311d2014-04-24 15:46:08 -0400210
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700211 def body(self):
212 """Build a body from the Channel.
John Asmuth864311d2014-04-24 15:46:08 -0400213
214 Constructs a dictionary that's appropriate for passing into watch()
215 methods as the value of body argument.
216
217 Returns:
218 A dictionary representation of the channel.
219 """
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700220 result = {
221 "id": self.id,
222 "token": self.token,
223 "type": self.type,
224 "address": self.address,
John Asmuth864311d2014-04-24 15:46:08 -0400225 }
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700226 if self.params:
227 result["params"] = self.params
228 if self.resource_id:
229 result["resourceId"] = self.resource_id
230 if self.resource_uri:
231 result["resourceUri"] = self.resource_uri
232 if self.expiration:
233 result["expiration"] = self.expiration
John Asmuth864311d2014-04-24 15:46:08 -0400234
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700235 return result
John Asmuth864311d2014-04-24 15:46:08 -0400236
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700237 def update(self, resp):
238 """Update a channel with information from the response of watch().
John Asmuth864311d2014-04-24 15:46:08 -0400239
240 When a request is sent to watch() a resource, the response returned
241 from the watch() request is a dictionary with updated channel information,
242 such as the resource_id, which is needed when stopping a subscription.
243
244 Args:
245 resp: dict, The response from a watch() method.
246 """
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700247 for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
248 value = resp.get(json_name)
249 if value is not None:
250 setattr(self, param_name, value)
John Asmuth864311d2014-04-24 15:46:08 -0400251
252
253def notification_from_headers(channel, headers):
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700254 """Parse a notification from the webhook request headers, validate
John Asmuth864311d2014-04-24 15:46:08 -0400255 the notification, and return a Notification object.
256
257 Args:
258 channel: Channel, The channel that the notification is associated with.
259 headers: dict, A dictionary like object that contains the request headers
260 from the webhook HTTP request.
261
262 Returns:
263 A Notification object.
264
265 Raises:
266 errors.InvalidNotificationError if the notification is invalid.
267 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
268 """
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700269 headers = _upper_header_keys(headers)
270 channel_id = headers[X_GOOG_CHANNEL_ID]
271 if channel.id != channel_id:
272 raise errors.InvalidNotificationError(
273 "Channel id mismatch: %s != %s" % (channel.id, channel_id)
274 )
275 else:
276 message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
277 state = headers[X_GOOG_RESOURCE_STATE]
278 resource_uri = headers[X_GOOG_RESOURCE_URI]
279 resource_id = headers[X_GOOG_RESOURCE_ID]
280 return Notification(message_number, state, resource_uri, resource_id)
John Asmuth864311d2014-04-24 15:46:08 -0400281
282
283@util.positional(2)
284def new_webhook_channel(url, token=None, expiration=None, params=None):
285 """Create a new webhook Channel.
286
287 Args:
288 url: str, URL to post notifications to.
289 token: str, An arbitrary string associated with the channel that
290 is delivered to the target address with each notification delivered
291 over this channel.
292 expiration: datetime.datetime, A time in the future when the channel
293 should expire. Can also be None if the subscription should use the
294 default expiration. Note that different services may have different
295 limits on how long a subscription lasts. Check the response from the
296 watch() method to see the value the service has set for an expiration
297 time.
298 params: dict, Extra parameters to pass on channel creation. Currently
299 not used for webhook channels.
300 """
301 expiration_ms = 0
302 if expiration:
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700303 delta = expiration - EPOCH
304 expiration_ms = (
305 delta.microseconds / 1000 + (delta.seconds + delta.days * 24 * 3600) * 1000
306 )
307 if expiration_ms < 0:
308 expiration_ms = 0
John Asmuth864311d2014-04-24 15:46:08 -0400309
Bu Sun Kim66bb32c2019-10-30 10:11:58 -0700310 return Channel(
311 "web_hook",
312 str(uuid.uuid4()),
313 token,
314 url,
315 expiration=expiration_ms,
316 params=params,
317 )