blob: 0fdb080fe729e3782c73b9e09c507c33db0ac8cc [file] [log] [blame]
John Asmuth864311d2014-04-24 15:46:08 -04001"""Channel notifications support.
2
3Classes and functions to support channel subscriptions and notifications
4on those channels.
5
6Notes:
7 - This code is based on experimental APIs and is subject to change.
8 - Notification does not do deduplication of notification ids, that's up to
9 the receiver.
10 - Storing the Channel between calls is up to the caller.
11
12
13Example setting up a channel:
14
15 # Create a new channel that gets notifications via webhook.
16 channel = new_webhook_channel("https://example.com/my_web_hook")
17
18 # Store the channel, keyed by 'channel.id'. Store it before calling the
19 # watch method because notifications may start arriving before the watch
20 # method returns.
21 ...
22
23 resp = service.objects().watchAll(
24 bucket="some_bucket_id", body=channel.body()).execute()
25 channel.update(resp)
26
27 # Store the channel, keyed by 'channel.id'. Store it after being updated
28 # since the resource_id value will now be correct, and that's needed to
29 # stop a subscription.
30 ...
31
32
33An example Webhook implementation using webapp2. Note that webapp2 puts
34headers in a case insensitive dictionary, as headers aren't guaranteed to
35always be upper case.
36
37 id = self.request.headers[X_GOOG_CHANNEL_ID]
38
39 # Retrieve the channel by id.
40 channel = ...
41
42 # Parse notification from the headers, including validating the id.
43 n = notification_from_headers(channel, self.request.headers)
44
45 # Do app specific stuff with the notification here.
46 if n.resource_state == 'sync':
47 # Code to handle sync state.
48 elif n.resource_state == 'exists':
49 # Code to handle the exists state.
50 elif n.resource_state == 'not_exists':
51 # Code to handle the not exists state.
52
53
54Example of unsubscribing.
55
56 service.channels().stop(channel.body())
57"""
INADA Naokie4ea1a92015-03-04 03:45:42 +090058from __future__ import absolute_import
John Asmuth864311d2014-04-24 15:46:08 -040059
60import datetime
61import uuid
62
63from googleapiclient import errors
Helen Koikede13e3b2018-04-26 16:05:16 -030064from googleapiclient import _helpers as util
INADA Naokie4ea1a92015-03-04 03:45:42 +090065import six
John Asmuth864311d2014-04-24 15:46:08 -040066
67
68# The unix time epoch starts at midnight 1970.
69EPOCH = datetime.datetime.utcfromtimestamp(0)
70
71# Map the names of the parameters in the JSON channel description to
72# the parameter names we use in the Channel class.
73CHANNEL_PARAMS = {
74 'address': 'address',
75 'id': 'id',
76 'expiration': 'expiration',
77 'params': 'params',
78 'resourceId': 'resource_id',
79 'resourceUri': 'resource_uri',
80 'type': 'type',
81 'token': 'token',
82 }
83
84X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID'
85X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER'
86X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE'
87X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI'
88X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID'
89
90
91def _upper_header_keys(headers):
92 new_headers = {}
INADA Naokie4ea1a92015-03-04 03:45:42 +090093 for k, v in six.iteritems(headers):
John Asmuth864311d2014-04-24 15:46:08 -040094 new_headers[k.upper()] = v
95 return new_headers
96
97
98class Notification(object):
99 """A Notification from a Channel.
100
101 Notifications are not usually constructed directly, but are returned
102 from functions like notification_from_headers().
103
104 Attributes:
105 message_number: int, The unique id number of this notification.
106 state: str, The state of the resource being monitored.
107 uri: str, The address of the resource being monitored.
108 resource_id: str, The unique identifier of the version of the resource at
109 this event.
110 """
111 @util.positional(5)
112 def __init__(self, message_number, state, resource_uri, resource_id):
113 """Notification constructor.
114
115 Args:
116 message_number: int, The unique id number of this notification.
117 state: str, The state of the resource being monitored. Can be one
118 of "exists", "not_exists", or "sync".
119 resource_uri: str, The address of the resource being monitored.
120 resource_id: str, The identifier of the watched resource.
121 """
122 self.message_number = message_number
123 self.state = state
124 self.resource_uri = resource_uri
125 self.resource_id = resource_id
126
127
128class Channel(object):
129 """A Channel for notifications.
130
131 Usually not constructed directly, instead it is returned from helper
132 functions like new_webhook_channel().
133
134 Attributes:
135 type: str, The type of delivery mechanism used by this channel. For
136 example, 'web_hook'.
137 id: str, A UUID for the channel.
138 token: str, An arbitrary string associated with the channel that
139 is delivered to the target address with each event delivered
140 over this channel.
141 address: str, The address of the receiving entity where events are
142 delivered. Specific to the channel type.
143 expiration: int, The time, in milliseconds from the epoch, when this
144 channel will expire.
145 params: dict, A dictionary of string to string, with additional parameters
146 controlling delivery channel behavior.
147 resource_id: str, An opaque id that identifies the resource that is
148 being watched. Stable across different API versions.
149 resource_uri: str, The canonicalized ID of the watched resource.
150 """
151
152 @util.positional(5)
153 def __init__(self, type, id, token, address, expiration=None,
154 params=None, resource_id="", resource_uri=""):
155 """Create a new Channel.
156
157 In user code, this Channel constructor will not typically be called
158 manually since there are functions for creating channels for each specific
159 type with a more customized set of arguments to pass.
160
161 Args:
162 type: str, The type of delivery mechanism used by this channel. For
163 example, 'web_hook'.
164 id: str, A UUID for the channel.
165 token: str, An arbitrary string associated with the channel that
166 is delivered to the target address with each event delivered
167 over this channel.
168 address: str, The address of the receiving entity where events are
169 delivered. Specific to the channel type.
170 expiration: int, The time, in milliseconds from the epoch, when this
171 channel will expire.
172 params: dict, A dictionary of string to string, with additional parameters
173 controlling delivery channel behavior.
174 resource_id: str, An opaque id that identifies the resource that is
175 being watched. Stable across different API versions.
176 resource_uri: str, The canonicalized ID of the watched resource.
177 """
178 self.type = type
179 self.id = id
180 self.token = token
181 self.address = address
182 self.expiration = expiration
183 self.params = params
184 self.resource_id = resource_id
185 self.resource_uri = resource_uri
186
187 def body(self):
188 """Build a body from the Channel.
189
190 Constructs a dictionary that's appropriate for passing into watch()
191 methods as the value of body argument.
192
193 Returns:
194 A dictionary representation of the channel.
195 """
196 result = {
197 'id': self.id,
198 'token': self.token,
199 'type': self.type,
200 'address': self.address
201 }
202 if self.params:
203 result['params'] = self.params
204 if self.resource_id:
205 result['resourceId'] = self.resource_id
206 if self.resource_uri:
207 result['resourceUri'] = self.resource_uri
208 if self.expiration:
209 result['expiration'] = self.expiration
210
211 return result
212
213 def update(self, resp):
214 """Update a channel with information from the response of watch().
215
216 When a request is sent to watch() a resource, the response returned
217 from the watch() request is a dictionary with updated channel information,
218 such as the resource_id, which is needed when stopping a subscription.
219
220 Args:
221 resp: dict, The response from a watch() method.
222 """
INADA Naokie4ea1a92015-03-04 03:45:42 +0900223 for json_name, param_name in six.iteritems(CHANNEL_PARAMS):
John Asmuth864311d2014-04-24 15:46:08 -0400224 value = resp.get(json_name)
225 if value is not None:
226 setattr(self, param_name, value)
227
228
229def notification_from_headers(channel, headers):
230 """Parse a notification from the webhook request headers, validate
231 the notification, and return a Notification object.
232
233 Args:
234 channel: Channel, The channel that the notification is associated with.
235 headers: dict, A dictionary like object that contains the request headers
236 from the webhook HTTP request.
237
238 Returns:
239 A Notification object.
240
241 Raises:
242 errors.InvalidNotificationError if the notification is invalid.
243 ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int.
244 """
245 headers = _upper_header_keys(headers)
246 channel_id = headers[X_GOOG_CHANNEL_ID]
247 if channel.id != channel_id:
248 raise errors.InvalidNotificationError(
249 'Channel id mismatch: %s != %s' % (channel.id, channel_id))
250 else:
251 message_number = int(headers[X_GOOG_MESSAGE_NUMBER])
252 state = headers[X_GOOG_RESOURCE_STATE]
253 resource_uri = headers[X_GOOG_RESOURCE_URI]
254 resource_id = headers[X_GOOG_RESOURCE_ID]
255 return Notification(message_number, state, resource_uri, resource_id)
256
257
258@util.positional(2)
259def new_webhook_channel(url, token=None, expiration=None, params=None):
260 """Create a new webhook Channel.
261
262 Args:
263 url: str, URL to post notifications to.
264 token: str, An arbitrary string associated with the channel that
265 is delivered to the target address with each notification delivered
266 over this channel.
267 expiration: datetime.datetime, A time in the future when the channel
268 should expire. Can also be None if the subscription should use the
269 default expiration. Note that different services may have different
270 limits on how long a subscription lasts. Check the response from the
271 watch() method to see the value the service has set for an expiration
272 time.
273 params: dict, Extra parameters to pass on channel creation. Currently
274 not used for webhook channels.
275 """
276 expiration_ms = 0
277 if expiration:
278 delta = expiration - EPOCH
279 expiration_ms = delta.microseconds/1000 + (
280 delta.seconds + delta.days*24*3600)*1000
281 if expiration_ms < 0:
282 expiration_ms = 0
283
284 return Channel('web_hook', str(uuid.uuid4()),
285 token, url, expiration=expiration_ms,
286 params=params)
287