blob: 3caee13ae77e7f71e06ccb34bd899831b9f6a539 [file] [log] [blame]
Chris McDonough781de4c2018-07-26 12:33:30 -04001# Copyright 2014 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
John Asmuth864311d2014-04-24 15:46:08 -040015"""Channel notifications support.
16
17Classes and functions to support channel subscriptions and notifications
18on those channels.
19
20Notes:
21 - This code is based on experimental APIs and is subject to change.
22 - Notification does not do deduplication of notification ids, that's up to
23 the receiver.
24 - Storing the Channel between calls is up to the caller.
25
26
27Example setting up a channel:
28
29 # Create a new channel that gets notifications via webhook.
30 channel = new_webhook_channel("https://example.com/my_web_hook")
31
32 # Store the channel, keyed by 'channel.id'. Store it before calling the
33 # watch method because notifications may start arriving before the watch
34 # method returns.
35 ...
36
37 resp = service.objects().watchAll(
38 bucket="some_bucket_id", body=channel.body()).execute()
39 channel.update(resp)
40
41 # Store the channel, keyed by 'channel.id'. Store it after being updated
42 # since the resource_id value will now be correct, and that's needed to
43 # stop a subscription.
44 ...
45
46
47An example Webhook implementation using webapp2. Note that webapp2 puts
48headers in a case insensitive dictionary, as headers aren't guaranteed to
49always be upper case.
50
51 id = self.request.headers[X_GOOG_CHANNEL_ID]
52
53 # Retrieve the channel by id.
54 channel = ...
55
56 # Parse notification from the headers, including validating the id.
57 n = notification_from_headers(channel, self.request.headers)
58
59 # Do app specific stuff with the notification here.
60 if n.resource_state == 'sync':
61 # Code to handle sync state.
62 elif n.resource_state == 'exists':
63 # Code to handle the exists state.
64 elif n.resource_state == 'not_exists':
65 # Code to handle the not exists state.
66
67
68Example of unsubscribing.
69
Corey Schafb1b16fd2018-12-12 14:13:23 -050070 service.channels().stop(channel.body()).execute()
John Asmuth864311d2014-04-24 15:46:08 -040071"""
INADA Naokie4ea1a92015-03-04 03:45:42 +090072from __future__ import absolute_import
John Asmuth864311d2014-04-24 15:46:08 -040073
74import datetime
75import uuid
76
77from googleapiclient import errors
Helen Koikede13e3b2018-04-26 16:05:16 -030078from googleapiclient import _helpers as util
INADA Naokie4ea1a92015-03-04 03:45:42 +090079import six
John Asmuth864311d2014-04-24 15:46:08 -040080
81
82# The unix time epoch starts at midnight 1970.
83EPOCH = datetime.datetime.utcfromtimestamp(0)
84
85# Map the names of the parameters in the JSON channel description to
86# the parameter names we use in the Channel class.
87CHANNEL_PARAMS = {
88 'address': 'address',
89 'id': 'id',
90 'expiration': 'expiration',
91 'params': 'params',
92 'resourceId': 'resource_id',
93 'resourceUri': 'resource_uri',
94 'type': 'type',
95 'token': 'token',
96 }
97
98X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID'
99X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
100X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
101X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI'
102X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID'
103
104
105def _upper_header_keys(headers):
106 new_headers = {}
INADA Naokie4ea1a92015-03-04 03:45:42 +0900107 for k, v in six.iteritems(headers):
John Asmuth864311d2014-04-24 15:46:08 -0400108 new_headers[k.upper()] = v
109 return new_headers
110
111
112class Notification(object):
113 """A Notification from a Channel.
114
115 Notifications are not usually constructed directly, but are returned
116 from functions like notification_from_headers().
117
118 Attributes:
119 message_number: int, The unique id number of this notification.
120 state: str, The state of the resource being monitored.
121 uri: str, The address of the resource being monitored.
122 resource_id: str, The unique identifier of the version of the resource at
123 this event.
124 """
125 @util.positional(5)
126 def __init__(self, message_number, state, resource_uri, resource_id):
127 """Notification constructor.
128
129 Args:
130 message_number: int, The unique id number of this notification.
131 state: str, The state of the resource being monitored. Can be one
132 of "exists", "not_exists", or "sync".
133 resource_uri: str, The address of the resource being monitored.
134 resource_id: str, The identifier of the watched resource.
135 """
136 self.message_number = message_number
137 self.state = state
138 self.resource_uri = resource_uri
139 self.resource_id = resource_id
140
141
142class Channel(object):
143 """A Channel for notifications.
144
145 Usually not constructed directly, instead it is returned from helper
146 functions like new_webhook_channel().
147
148 Attributes:
149 type: str, The type of delivery mechanism used by this channel. For
150 example, 'web_hook'.
151 id: str, A UUID for the channel.
152 token: str, An arbitrary string associated with the channel that
153 is delivered to the target address with each event delivered
154 over this channel.
155 address: str, The address of the receiving entity where events are
156 delivered. Specific to the channel type.
157 expiration: int, The time, in milliseconds from the epoch, when this
158 channel will expire.
159 params: dict, A dictionary of string to string, with additional parameters
160 controlling delivery channel behavior.
161 resource_id: str, An opaque id that identifies the resource that is
162 being watched. Stable across different API versions.
163 resource_uri: str, The canonicalized ID of the watched resource.
164 """
165
166 @util.positional(5)
167 def __init__(self, type, id, token, address, expiration=None,
168 params=None, resource_id="", resource_uri=""):
169 """Create a new Channel.
170
171 In user code, this Channel constructor will not typically be called
172 manually since there are functions for creating channels for each specific
173 type with a more customized set of arguments to pass.
174
175 Args:
176 type: str, The type of delivery mechanism used by this channel. For
177 example, 'web_hook'.
178 id: str, A UUID for the channel.
179 token: str, An arbitrary string associated with the channel that
180 is delivered to the target address with each event delivered
181 over this channel.
182 address: str, The address of the receiving entity where events are
183 delivered. Specific to the channel type.
184 expiration: int, The time, in milliseconds from the epoch, when this
185 channel will expire.
186 params: dict, A dictionary of string to string, with additional parameters
187 controlling delivery channel behavior.
188 resource_id: str, An opaque id that identifies the resource that is
189 being watched. Stable across different API versions.
190 resource_uri: str, The canonicalized ID of the watched resource.
191 """
192 self.type = type
193 self.id = id
194 self.token = token
195 self.address = address
196 self.expiration = expiration
197 self.params = params
198 self.resource_id = resource_id
199 self.resource_uri = resource_uri
200
201 def body(self):
202 """Build a body from the Channel.
203
204 Constructs a dictionary that's appropriate for passing into watch()
205 methods as the value of body argument.
206
207 Returns:
208 A dictionary representation of the channel.
209 """
210 result = {
211 'id': self.id,
212 'token': self.token,
213 'type': self.type,
214 'address': self.address
215 }
216 if self.params:
217 result['params'] = self.params
218 if self.resource_id:
219 result['resourceId'] = self.resource_id
220 if self.resource_uri:
221 result['resourceUri'] = self.resource_uri
222 if self.expiration:
223 result['expiration'] = self.expiration
224
225 return result
226
227 def update(self, resp):
228 """Update a channel with information from the response of watch().
229
230 When a request is sent to watch() a resource, the response returned
231 from the watch() request is a dictionary with updated channel information,
232 such as the resource_id, which is needed when stopping a subscription.
233
234 Args:
235 resp: dict, The response from a watch() method.
236 """
INADA Naokie4ea1a92015-03-04 03:45:42 +0900237 for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
John Asmuth864311d2014-04-24 15:46:08 -0400238 value = resp.get(json_name)
239 if value is not None:
240 setattr(self, param_name, value)
241
242
243def notification_from_headers(channel, headers):
244 """Parse a notification from the webhook request headers, validate
245 the notification, and return a Notification object.
246
247 Args:
248 channel: Channel, The channel that the notification is associated with.
249 headers: dict, A dictionary like object that contains the request headers
250 from the webhook HTTP request.
251
252 Returns:
253 A Notification object.
254
255 Raises:
256 errors.InvalidNotificationError if the notification is invalid.
257 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
258 """
259 headers = _upper_header_keys(headers)
260 channel_id = headers[X_GOOG_CHANNEL_ID]
261 if channel.id != channel_id:
262 raise errors.InvalidNotificationError(
263 'Channel id mismatch: %s != %s' % (channel.id, channel_id))
264 else:
265 message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
266 state = headers[X_GOOG_RESOURCE_STATE]
267 resource_uri = headers[X_GOOG_RESOURCE_URI]
268 resource_id = headers[X_GOOG_RESOURCE_ID]
269 return Notification(message_number, state, resource_uri, resource_id)
270
271
272@util.positional(2)
273def new_webhook_channel(url, token=None, expiration=None, params=None):
274 """Create a new webhook Channel.
275
276 Args:
277 url: str, URL to post notifications to.
278 token: str, An arbitrary string associated with the channel that
279 is delivered to the target address with each notification delivered
280 over this channel.
281 expiration: datetime.datetime, A time in the future when the channel
282 should expire. Can also be None if the subscription should use the
283 default expiration. Note that different services may have different
284 limits on how long a subscription lasts. Check the response from the
285 watch() method to see the value the service has set for an expiration
286 time.
287 params: dict, Extra parameters to pass on channel creation. Currently
288 not used for webhook channels.
289 """
290 expiration_ms = 0
291 if expiration:
292 delta = expiration - EPOCH
293 expiration_ms = delta.microseconds/1000 + (
294 delta.seconds + delta.days*24*3600)*1000
295 if expiration_ms < 0:
296 expiration_ms = 0
297
298 return Channel('web_hook', str(uuid.uuid4()),
299 token, url, expiration=expiration_ms,
300 params=params)
301