blob: 7d0feb9482bae2361e2663835e62deb4f314e6c7 [file] [log] [blame]
John Asmuth864311d2014-04-24 15:46:08 -04001"""Channel notifications support.
2
3Classes and functions to support channel subscriptions and notifications
4on those channels.
5
6Notes:
7 - This code is based on experimental APIs and is subject to change.
8 - Notification does not do deduplication of notification ids, that's up to
9 the receiver.
10 - Storing the Channel between calls is up to the caller.
11
12
13Example setting up a channel:
14
15 # Create a new channel that gets notifications via webhook.
16 channel = new_webhook_channel("https://example.com/my_web_hook")
17
18 # Store the channel, keyed by 'channel.id'. Store it before calling the
19 # watch method because notifications may start arriving before the watch
20 # method returns.
21 ...
22
23 resp = service.objects().watchAll(
24 bucket="some_bucket_id", body=channel.body()).execute()
25 channel.update(resp)
26
27 # Store the channel, keyed by 'channel.id'. Store it after being updated
28 # since the resource_id value will now be correct, and that's needed to
29 # stop a subscription.
30 ...
31
32
33An example Webhook implementation using webapp2. Note that webapp2 puts
34headers in a case insensitive dictionary, as headers aren't guaranteed to
35always be upper case.
36
37 id = self.request.headers[X_GOOG_CHANNEL_ID]
38
39 # Retrieve the channel by id.
40 channel = ...
41
42 # Parse notification from the headers, including validating the id.
43 n = notification_from_headers(channel, self.request.headers)
44
45 # Do app specific stuff with the notification here.
46 if n.resource_state == 'sync':
47 # Code to handle sync state.
48 elif n.resource_state == 'exists':
49 # Code to handle the exists state.
50 elif n.resource_state == 'not_exists':
51 # Code to handle the not exists state.
52
53
54Example of unsubscribing.
55
56 service.channels().stop(channel.body())
57"""
INADA Naokie4ea1a92015-03-04 03:45:42 +090058from __future__ import absolute_import
John Asmuth864311d2014-04-24 15:46:08 -040059
60import datetime
61import uuid
62
63from googleapiclient import errors
64from oauth2client import util
INADA Naokie4ea1a92015-03-04 03:45:42 +090065import six
John Asmuth864311d2014-04-24 15:46:08 -040066
Jon Wayne Parrott6755f612016-08-15 10:52:26 -070067# Oauth2client < 3 has the positional helper in 'util', >= 3 has it
68# in '_helpers'.
69try:
70 from oauth2client import util
71except ImportError:
72 from oauth2client import _helpers as util
73
John Asmuth864311d2014-04-24 15:46:08 -040074
75# The unix time epoch starts at midnight 1970.
76EPOCH = datetime.datetime.utcfromtimestamp(0)
77
78# Map the names of the parameters in the JSON channel description to
79# the parameter names we use in the Channel class.
80CHANNEL_PARAMS = {
81 'address': 'address',
82 'id': 'id',
83 'expiration': 'expiration',
84 'params': 'params',
85 'resourceId': 'resource_id',
86 'resourceUri': 'resource_uri',
87 'type': 'type',
88 'token': 'token',
89 }
90
91X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID'
92X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
93X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
94X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI'
95X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID'
96
97
98def _upper_header_keys(headers):
99 new_headers = {}
INADA Naokie4ea1a92015-03-04 03:45:42 +0900100 for k, v in six.iteritems(headers):
John Asmuth864311d2014-04-24 15:46:08 -0400101 new_headers[k.upper()] = v
102 return new_headers
103
104
105class Notification(object):
106 """A Notification from a Channel.
107
108 Notifications are not usually constructed directly, but are returned
109 from functions like notification_from_headers().
110
111 Attributes:
112 message_number: int, The unique id number of this notification.
113 state: str, The state of the resource being monitored.
114 uri: str, The address of the resource being monitored.
115 resource_id: str, The unique identifier of the version of the resource at
116 this event.
117 """
118 @util.positional(5)
119 def __init__(self, message_number, state, resource_uri, resource_id):
120 """Notification constructor.
121
122 Args:
123 message_number: int, The unique id number of this notification.
124 state: str, The state of the resource being monitored. Can be one
125 of "exists", "not_exists", or "sync".
126 resource_uri: str, The address of the resource being monitored.
127 resource_id: str, The identifier of the watched resource.
128 """
129 self.message_number = message_number
130 self.state = state
131 self.resource_uri = resource_uri
132 self.resource_id = resource_id
133
134
135class Channel(object):
136 """A Channel for notifications.
137
138 Usually not constructed directly, instead it is returned from helper
139 functions like new_webhook_channel().
140
141 Attributes:
142 type: str, The type of delivery mechanism used by this channel. For
143 example, 'web_hook'.
144 id: str, A UUID for the channel.
145 token: str, An arbitrary string associated with the channel that
146 is delivered to the target address with each event delivered
147 over this channel.
148 address: str, The address of the receiving entity where events are
149 delivered. Specific to the channel type.
150 expiration: int, The time, in milliseconds from the epoch, when this
151 channel will expire.
152 params: dict, A dictionary of string to string, with additional parameters
153 controlling delivery channel behavior.
154 resource_id: str, An opaque id that identifies the resource that is
155 being watched. Stable across different API versions.
156 resource_uri: str, The canonicalized ID of the watched resource.
157 """
158
159 @util.positional(5)
160 def __init__(self, type, id, token, address, expiration=None,
161 params=None, resource_id="", resource_uri=""):
162 """Create a new Channel.
163
164 In user code, this Channel constructor will not typically be called
165 manually since there are functions for creating channels for each specific
166 type with a more customized set of arguments to pass.
167
168 Args:
169 type: str, The type of delivery mechanism used by this channel. For
170 example, 'web_hook'.
171 id: str, A UUID for the channel.
172 token: str, An arbitrary string associated with the channel that
173 is delivered to the target address with each event delivered
174 over this channel.
175 address: str, The address of the receiving entity where events are
176 delivered. Specific to the channel type.
177 expiration: int, The time, in milliseconds from the epoch, when this
178 channel will expire.
179 params: dict, A dictionary of string to string, with additional parameters
180 controlling delivery channel behavior.
181 resource_id: str, An opaque id that identifies the resource that is
182 being watched. Stable across different API versions.
183 resource_uri: str, The canonicalized ID of the watched resource.
184 """
185 self.type = type
186 self.id = id
187 self.token = token
188 self.address = address
189 self.expiration = expiration
190 self.params = params
191 self.resource_id = resource_id
192 self.resource_uri = resource_uri
193
194 def body(self):
195 """Build a body from the Channel.
196
197 Constructs a dictionary that's appropriate for passing into watch()
198 methods as the value of body argument.
199
200 Returns:
201 A dictionary representation of the channel.
202 """
203 result = {
204 'id': self.id,
205 'token': self.token,
206 'type': self.type,
207 'address': self.address
208 }
209 if self.params:
210 result['params'] = self.params
211 if self.resource_id:
212 result['resourceId'] = self.resource_id
213 if self.resource_uri:
214 result['resourceUri'] = self.resource_uri
215 if self.expiration:
216 result['expiration'] = self.expiration
217
218 return result
219
220 def update(self, resp):
221 """Update a channel with information from the response of watch().
222
223 When a request is sent to watch() a resource, the response returned
224 from the watch() request is a dictionary with updated channel information,
225 such as the resource_id, which is needed when stopping a subscription.
226
227 Args:
228 resp: dict, The response from a watch() method.
229 """
INADA Naokie4ea1a92015-03-04 03:45:42 +0900230 for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
John Asmuth864311d2014-04-24 15:46:08 -0400231 value = resp.get(json_name)
232 if value is not None:
233 setattr(self, param_name, value)
234
235
236def notification_from_headers(channel, headers):
237 """Parse a notification from the webhook request headers, validate
238 the notification, and return a Notification object.
239
240 Args:
241 channel: Channel, The channel that the notification is associated with.
242 headers: dict, A dictionary like object that contains the request headers
243 from the webhook HTTP request.
244
245 Returns:
246 A Notification object.
247
248 Raises:
249 errors.InvalidNotificationError if the notification is invalid.
250 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
251 """
252 headers = _upper_header_keys(headers)
253 channel_id = headers[X_GOOG_CHANNEL_ID]
254 if channel.id != channel_id:
255 raise errors.InvalidNotificationError(
256 'Channel id mismatch: %s != %s' % (channel.id, channel_id))
257 else:
258 message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
259 state = headers[X_GOOG_RESOURCE_STATE]
260 resource_uri = headers[X_GOOG_RESOURCE_URI]
261 resource_id = headers[X_GOOG_RESOURCE_ID]
262 return Notification(message_number, state, resource_uri, resource_id)
263
264
265@util.positional(2)
266def new_webhook_channel(url, token=None, expiration=None, params=None):
267 """Create a new webhook Channel.
268
269 Args:
270 url: str, URL to post notifications to.
271 token: str, An arbitrary string associated with the channel that
272 is delivered to the target address with each notification delivered
273 over this channel.
274 expiration: datetime.datetime, A time in the future when the channel
275 should expire. Can also be None if the subscription should use the
276 default expiration. Note that different services may have different
277 limits on how long a subscription lasts. Check the response from the
278 watch() method to see the value the service has set for an expiration
279 time.
280 params: dict, Extra parameters to pass on channel creation. Currently
281 not used for webhook channels.
282 """
283 expiration_ms = 0
284 if expiration:
285 delta = expiration - EPOCH
286 expiration_ms = delta.microseconds/1000 + (
287 delta.seconds + delta.days*24*3600)*1000
288 if expiration_ms < 0:
289 expiration_ms = 0
290
291 return Channel('web_hook', str(uuid.uuid4()),
292 token, url, expiration=expiration_ms,
293 params=params)
294