blob: a38b4ffbcf7723495cf5120d651fed1cbed2fdbc [file] [log] [blame]
John Asmuth864311d2014-04-24 15:46:08 -04001"""Channel notifications support.
2
3Classes and functions to support channel subscriptions and notifications
4on those channels.
5
6Notes:
7 - This code is based on experimental APIs and is subject to change.
8 - Notification does not do deduplication of notification ids, that's up to
9 the receiver.
10 - Storing the Channel between calls is up to the caller.
11
12
13Example setting up a channel:
14
15 # Create a new channel that gets notifications via webhook.
16 channel = new_webhook_channel("https://example.com/my_web_hook")
17
18 # Store the channel, keyed by 'channel.id'. Store it before calling the
19 # watch method because notifications may start arriving before the watch
20 # method returns.
21 ...
22
23 resp = service.objects().watchAll(
24 bucket="some_bucket_id", body=channel.body()).execute()
25 channel.update(resp)
26
27 # Store the channel, keyed by 'channel.id'. Store it after being updated
28 # since the resource_id value will now be correct, and that's needed to
29 # stop a subscription.
30 ...
31
32
33An example Webhook implementation using webapp2. Note that webapp2 puts
34headers in a case insensitive dictionary, as headers aren't guaranteed to
35always be upper case.
36
37 id = self.request.headers[X_GOOG_CHANNEL_ID]
38
39 # Retrieve the channel by id.
40 channel = ...
41
42 # Parse notification from the headers, including validating the id.
43 n = notification_from_headers(channel, self.request.headers)
44
45 # Do app specific stuff with the notification here.
46 if n.resource_state == 'sync':
47 # Code to handle sync state.
48 elif n.resource_state == 'exists':
49 # Code to handle the exists state.
50 elif n.resource_state == 'not_exists':
51 # Code to handle the not exists state.
52
53
54Example of unsubscribing.
55
56 service.channels().stop(channel.body())
57"""
INADA Naokie4ea1a92015-03-04 03:45:42 +090058from __future__ import absolute_import
John Asmuth864311d2014-04-24 15:46:08 -040059
60import datetime
61import uuid
62
63from googleapiclient import errors
INADA Naokie4ea1a92015-03-04 03:45:42 +090064import six
John Asmuth864311d2014-04-24 15:46:08 -040065
Jon Wayne Parrott6755f612016-08-15 10:52:26 -070066# Oauth2client < 3 has the positional helper in 'util', >= 3 has it
67# in '_helpers'.
68try:
69 from oauth2client import util
70except ImportError:
71 from oauth2client import _helpers as util
72
John Asmuth864311d2014-04-24 15:46:08 -040073
74# The unix time epoch starts at midnight 1970.
75EPOCH = datetime.datetime.utcfromtimestamp(0)
76
77# Map the names of the parameters in the JSON channel description to
78# the parameter names we use in the Channel class.
79CHANNEL_PARAMS = {
80 'address': 'address',
81 'id': 'id',
82 'expiration': 'expiration',
83 'params': 'params',
84 'resourceId': 'resource_id',
85 'resourceUri': 'resource_uri',
86 'type': 'type',
87 'token': 'token',
88 }
89
90X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID'
91X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
92X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
93X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI'
94X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID'
95
96
97def _upper_header_keys(headers):
98 new_headers = {}
INADA Naokie4ea1a92015-03-04 03:45:42 +090099 for k, v in six.iteritems(headers):
John Asmuth864311d2014-04-24 15:46:08 -0400100 new_headers[k.upper()] = v
101 return new_headers
102
103
104class Notification(object):
105 """A Notification from a Channel.
106
107 Notifications are not usually constructed directly, but are returned
108 from functions like notification_from_headers().
109
110 Attributes:
111 message_number: int, The unique id number of this notification.
112 state: str, The state of the resource being monitored.
113 uri: str, The address of the resource being monitored.
114 resource_id: str, The unique identifier of the version of the resource at
115 this event.
116 """
117 @util.positional(5)
118 def __init__(self, message_number, state, resource_uri, resource_id):
119 """Notification constructor.
120
121 Args:
122 message_number: int, The unique id number of this notification.
123 state: str, The state of the resource being monitored. Can be one
124 of "exists", "not_exists", or "sync".
125 resource_uri: str, The address of the resource being monitored.
126 resource_id: str, The identifier of the watched resource.
127 """
128 self.message_number = message_number
129 self.state = state
130 self.resource_uri = resource_uri
131 self.resource_id = resource_id
132
133
134class Channel(object):
135 """A Channel for notifications.
136
137 Usually not constructed directly, instead it is returned from helper
138 functions like new_webhook_channel().
139
140 Attributes:
141 type: str, The type of delivery mechanism used by this channel. For
142 example, 'web_hook'.
143 id: str, A UUID for the channel.
144 token: str, An arbitrary string associated with the channel that
145 is delivered to the target address with each event delivered
146 over this channel.
147 address: str, The address of the receiving entity where events are
148 delivered. Specific to the channel type.
149 expiration: int, The time, in milliseconds from the epoch, when this
150 channel will expire.
151 params: dict, A dictionary of string to string, with additional parameters
152 controlling delivery channel behavior.
153 resource_id: str, An opaque id that identifies the resource that is
154 being watched. Stable across different API versions.
155 resource_uri: str, The canonicalized ID of the watched resource.
156 """
157
158 @util.positional(5)
159 def __init__(self, type, id, token, address, expiration=None,
160 params=None, resource_id="", resource_uri=""):
161 """Create a new Channel.
162
163 In user code, this Channel constructor will not typically be called
164 manually since there are functions for creating channels for each specific
165 type with a more customized set of arguments to pass.
166
167 Args:
168 type: str, The type of delivery mechanism used by this channel. For
169 example, 'web_hook'.
170 id: str, A UUID for the channel.
171 token: str, An arbitrary string associated with the channel that
172 is delivered to the target address with each event delivered
173 over this channel.
174 address: str, The address of the receiving entity where events are
175 delivered. Specific to the channel type.
176 expiration: int, The time, in milliseconds from the epoch, when this
177 channel will expire.
178 params: dict, A dictionary of string to string, with additional parameters
179 controlling delivery channel behavior.
180 resource_id: str, An opaque id that identifies the resource that is
181 being watched. Stable across different API versions.
182 resource_uri: str, The canonicalized ID of the watched resource.
183 """
184 self.type = type
185 self.id = id
186 self.token = token
187 self.address = address
188 self.expiration = expiration
189 self.params = params
190 self.resource_id = resource_id
191 self.resource_uri = resource_uri
192
193 def body(self):
194 """Build a body from the Channel.
195
196 Constructs a dictionary that's appropriate for passing into watch()
197 methods as the value of body argument.
198
199 Returns:
200 A dictionary representation of the channel.
201 """
202 result = {
203 'id': self.id,
204 'token': self.token,
205 'type': self.type,
206 'address': self.address
207 }
208 if self.params:
209 result['params'] = self.params
210 if self.resource_id:
211 result['resourceId'] = self.resource_id
212 if self.resource_uri:
213 result['resourceUri'] = self.resource_uri
214 if self.expiration:
215 result['expiration'] = self.expiration
216
217 return result
218
219 def update(self, resp):
220 """Update a channel with information from the response of watch().
221
222 When a request is sent to watch() a resource, the response returned
223 from the watch() request is a dictionary with updated channel information,
224 such as the resource_id, which is needed when stopping a subscription.
225
226 Args:
227 resp: dict, The response from a watch() method.
228 """
INADA Naokie4ea1a92015-03-04 03:45:42 +0900229 for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
John Asmuth864311d2014-04-24 15:46:08 -0400230 value = resp.get(json_name)
231 if value is not None:
232 setattr(self, param_name, value)
233
234
235def notification_from_headers(channel, headers):
236 """Parse a notification from the webhook request headers, validate
237 the notification, and return a Notification object.
238
239 Args:
240 channel: Channel, The channel that the notification is associated with.
241 headers: dict, A dictionary like object that contains the request headers
242 from the webhook HTTP request.
243
244 Returns:
245 A Notification object.
246
247 Raises:
248 errors.InvalidNotificationError if the notification is invalid.
249 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
250 """
251 headers = _upper_header_keys(headers)
252 channel_id = headers[X_GOOG_CHANNEL_ID]
253 if channel.id != channel_id:
254 raise errors.InvalidNotificationError(
255 'Channel id mismatch: %s != %s' % (channel.id, channel_id))
256 else:
257 message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
258 state = headers[X_GOOG_RESOURCE_STATE]
259 resource_uri = headers[X_GOOG_RESOURCE_URI]
260 resource_id = headers[X_GOOG_RESOURCE_ID]
261 return Notification(message_number, state, resource_uri, resource_id)
262
263
264@util.positional(2)
265def new_webhook_channel(url, token=None, expiration=None, params=None):
266 """Create a new webhook Channel.
267
268 Args:
269 url: str, URL to post notifications to.
270 token: str, An arbitrary string associated with the channel that
271 is delivered to the target address with each notification delivered
272 over this channel.
273 expiration: datetime.datetime, A time in the future when the channel
274 should expire. Can also be None if the subscription should use the
275 default expiration. Note that different services may have different
276 limits on how long a subscription lasts. Check the response from the
277 watch() method to see the value the service has set for an expiration
278 time.
279 params: dict, Extra parameters to pass on channel creation. Currently
280 not used for webhook channels.
281 """
282 expiration_ms = 0
283 if expiration:
284 delta = expiration - EPOCH
285 expiration_ms = delta.microseconds/1000 + (
286 delta.seconds + delta.days*24*3600)*1000
287 if expiration_ms < 0:
288 expiration_ms = 0
289
290 return Channel('web_hook', str(uuid.uuid4()),
291 token, url, expiration=expiration_ms,
292 params=params)
293