Joe Gregorio | 1a5e30e | 2013-06-25 15:35:47 -0400 | [diff] [blame^] | 1 | """Channel notifications support. |
| 2 | |
| 3 | Classes and functions to support channel subscriptions and notifications |
| 4 | on those channels. |
| 5 | |
| 6 | Notes: |
| 7 | - This code is based on experimental APIs and is subject to change. |
| 8 | - Notification does not do deduplication of notification ids, that's up to |
| 9 | the receiver. |
| 10 | - Storing the Channel between calls is up to the caller. |
| 11 | |
| 12 | |
| 13 | Example setting up a channel: |
| 14 | |
| 15 | # Create a new channel that gets notifications via webhook. |
| 16 | channel = new_webhook_channel("https://example.com/my_web_hook") |
| 17 | |
| 18 | # Store the channel, keyed by 'channel.id'. Store it before calling the |
| 19 | # watch method because notifications may start arriving before the watch |
| 20 | # method returns. |
| 21 | ... |
| 22 | |
| 23 | resp = service.objects().watchAll( |
| 24 | bucket="some_bucket_id", body=channel.body()).execute() |
| 25 | channel.update(resp) |
| 26 | |
| 27 | # Store the channel, keyed by 'channel.id'. Store it after being updated |
| 28 | # since the resource_id value will now be correct, and that's needed to |
| 29 | # stop a subscription. |
| 30 | ... |
| 31 | |
| 32 | |
| 33 | An example Webhook implementation using webapp2. Note that webapp2 puts |
| 34 | headers in a case insensitive dictionary, as headers aren't guaranteed to |
| 35 | always be upper case. |
| 36 | |
| 37 | id = self.request.headers[X_GOOG_CHANNEL_ID] |
| 38 | |
| 39 | # Retrieve the channel by id. |
| 40 | channel = ... |
| 41 | |
| 42 | # Parse notification from the headers, including validating the id. |
| 43 | n = notification_from_headers(channel, self.request.headers) |
| 44 | |
| 45 | # Do app specific stuff with the notification here. |
| 46 | if n.resource_state == 'sync': |
| 47 | # Code to handle sync state. |
| 48 | elif n.resource_state == 'exists': |
| 49 | # Code to handle the exists state. |
| 50 | elif n.resource_state == 'not_exists': |
| 51 | # Code to handle the not exists state. |
| 52 | |
| 53 | |
| 54 | Example of unsubscribing. |
| 55 | |
| 56 | service.channels().stop(channel.body()) |
| 57 | """ |
| 58 | |
| 59 | import datetime |
| 60 | import uuid |
| 61 | |
| 62 | from apiclient import errors |
| 63 | from oauth2client import util |
| 64 | |
| 65 | |
| 66 | # The unix time epoch starts at midnight 1970. |
| 67 | EPOCH = datetime.datetime.utcfromtimestamp(0) |
| 68 | |
| 69 | # Map the names of the parameters in the JSON channel description to |
| 70 | # the parameter names we use in the Channel class. |
| 71 | CHANNEL_PARAMS = { |
| 72 | 'address': 'address', |
| 73 | 'id': 'id', |
| 74 | 'expiration': 'expiration', |
| 75 | 'params': 'params', |
| 76 | 'resourceId': 'resource_id', |
| 77 | 'resourceUri': 'resource_uri', |
| 78 | 'type': 'type', |
| 79 | 'token': 'token', |
| 80 | } |
| 81 | |
| 82 | X_GOOG_CHANNEL_ID = 'X-GOOG-CHANNEL-ID' |
| 83 | X_GOOG_MESSAGE_NUMBER = 'X-GOOG-MESSAGE-NUMBER' |
| 84 | X_GOOG_RESOURCE_STATE = 'X-GOOG-RESOURCE-STATE' |
| 85 | X_GOOG_RESOURCE_URI = 'X-GOOG-RESOURCE-URI' |
| 86 | X_GOOG_RESOURCE_ID = 'X-GOOG-RESOURCE-ID' |
| 87 | |
| 88 | |
| 89 | def _upper_header_keys(headers): |
| 90 | new_headers = {} |
| 91 | for k, v in headers.iteritems(): |
| 92 | new_headers[k.upper()] = v |
| 93 | return new_headers |
| 94 | |
| 95 | |
| 96 | class Notification(object): |
| 97 | """A Notification from a Channel. |
| 98 | |
| 99 | Notifications are not usually constructed directly, but are returned |
| 100 | from functions like notification_from_headers(). |
| 101 | |
| 102 | Attributes: |
| 103 | message_number: int, The unique id number of this notification. |
| 104 | state: str, The state of the resource being monitored. |
| 105 | uri: str, The address of the resource being monitored. |
| 106 | resource_id: str, The unique identifier of the version of the resource at |
| 107 | this event. |
| 108 | """ |
| 109 | @util.positional(5) |
| 110 | def __init__(self, message_number, state, resource_uri, resource_id): |
| 111 | """Notification constructor. |
| 112 | |
| 113 | Args: |
| 114 | message_number: int, The unique id number of this notification. |
| 115 | state: str, The state of the resource being monitored. Can be one |
| 116 | of "exists", "not_exists", or "sync". |
| 117 | resource_uri: str, The address of the resource being monitored. |
| 118 | resource_id: str, The identifier of the watched resource. |
| 119 | """ |
| 120 | self.message_number = message_number |
| 121 | self.state = state |
| 122 | self.resource_uri = resource_uri |
| 123 | self.resource_id = resource_id |
| 124 | |
| 125 | |
| 126 | class Channel(object): |
| 127 | """A Channel for notifications. |
| 128 | |
| 129 | Usually not constructed directly, instead it is returned from helper |
| 130 | functions like new_webhook_channel(). |
| 131 | |
| 132 | Attributes: |
| 133 | type: str, The type of delivery mechanism used by this channel. For |
| 134 | example, 'web_hook'. |
| 135 | id: str, A UUID for the channel. |
| 136 | token: str, An arbitrary string associated with the channel that |
| 137 | is delivered to the target address with each event delivered |
| 138 | over this channel. |
| 139 | address: str, The address of the receiving entity where events are |
| 140 | delivered. Specific to the channel type. |
| 141 | expiration: int, The time, in milliseconds from the epoch, when this |
| 142 | channel will expire. |
| 143 | params: dict, A dictionary of string to string, with additional parameters |
| 144 | controlling delivery channel behavior. |
| 145 | resource_id: str, An opaque id that identifies the resource that is |
| 146 | being watched. Stable across different API versions. |
| 147 | resource_uri: str, The canonicalized ID of the watched resource. |
| 148 | """ |
| 149 | |
| 150 | @util.positional(5) |
| 151 | def __init__(self, type, id, token, address, expiration=None, |
| 152 | params=None, resource_id="", resource_uri=""): |
| 153 | """Create a new Channel. |
| 154 | |
| 155 | In user code, this Channel constructor will not typically be called |
| 156 | manually since there are functions for creating channels for each specific |
| 157 | type with a more customized set of arguments to pass. |
| 158 | |
| 159 | Args: |
| 160 | type: str, The type of delivery mechanism used by this channel. For |
| 161 | example, 'web_hook'. |
| 162 | id: str, A UUID for the channel. |
| 163 | token: str, An arbitrary string associated with the channel that |
| 164 | is delivered to the target address with each event delivered |
| 165 | over this channel. |
| 166 | address: str, The address of the receiving entity where events are |
| 167 | delivered. Specific to the channel type. |
| 168 | expiration: int, The time, in milliseconds from the epoch, when this |
| 169 | channel will expire. |
| 170 | params: dict, A dictionary of string to string, with additional parameters |
| 171 | controlling delivery channel behavior. |
| 172 | resource_id: str, An opaque id that identifies the resource that is |
| 173 | being watched. Stable across different API versions. |
| 174 | resource_uri: str, The canonicalized ID of the watched resource. |
| 175 | """ |
| 176 | self.type = type |
| 177 | self.id = id |
| 178 | self.token = token |
| 179 | self.address = address |
| 180 | self.expiration = expiration |
| 181 | self.params = params |
| 182 | self.resource_id = resource_id |
| 183 | self.resource_uri = resource_uri |
| 184 | |
| 185 | def body(self): |
| 186 | """Build a body from the Channel. |
| 187 | |
| 188 | Constructs a dictionary that's appropriate for passing into watch() |
| 189 | methods as the value of body argument. |
| 190 | |
| 191 | Returns: |
| 192 | A dictionary representation of the channel. |
| 193 | """ |
| 194 | result = { |
| 195 | 'id': self.id, |
| 196 | 'token': self.token, |
| 197 | 'type': self.type, |
| 198 | 'address': self.address |
| 199 | } |
| 200 | if self.params: |
| 201 | result['params'] = self.params |
| 202 | if self.resource_id: |
| 203 | result['resourceId'] = self.resource_id |
| 204 | if self.resource_uri: |
| 205 | result['resourceUri'] = self.resource_uri |
| 206 | if self.expiration: |
| 207 | result['expiration'] = self.expiration |
| 208 | |
| 209 | return result |
| 210 | |
| 211 | def update(self, resp): |
| 212 | """Update a channel with information from the response of watch(). |
| 213 | |
| 214 | When a request is sent to watch() a resource, the response returned |
| 215 | from the watch() request is a dictionary with updated channel information, |
| 216 | such as the resource_id, which is needed when stopping a subscription. |
| 217 | |
| 218 | Args: |
| 219 | resp: dict, The response from a watch() method. |
| 220 | """ |
| 221 | for json_name, param_name in CHANNEL_PARAMS.iteritems(): |
| 222 | value = resp.get(json_name) |
| 223 | if value is not None: |
| 224 | setattr(self, param_name, value) |
| 225 | |
| 226 | |
| 227 | def notification_from_headers(channel, headers): |
| 228 | """Parse a notification from the webhook request headers, validate |
| 229 | the notification, and return a Notification object. |
| 230 | |
| 231 | Args: |
| 232 | channel: Channel, The channel that the notification is associated with. |
| 233 | headers: dict, A dictionary like object that contains the request headers |
| 234 | from the webhook HTTP request. |
| 235 | |
| 236 | Returns: |
| 237 | A Notification object. |
| 238 | |
| 239 | Raises: |
| 240 | errors.InvalidNotificationError if the notification is invalid. |
| 241 | ValueError if the X-GOOG-MESSAGE-NUMBER can't be converted to an int. |
| 242 | """ |
| 243 | headers = _upper_header_keys(headers) |
| 244 | channel_id = headers[X_GOOG_CHANNEL_ID] |
| 245 | if channel.id != channel_id: |
| 246 | raise errors.InvalidNotificationError( |
| 247 | 'Channel id mismatch: %s != %s' % (channel.id, channel_id)) |
| 248 | else: |
| 249 | message_number = int(headers[X_GOOG_MESSAGE_NUMBER]) |
| 250 | state = headers[X_GOOG_RESOURCE_STATE] |
| 251 | resource_uri = headers[X_GOOG_RESOURCE_URI] |
| 252 | resource_id = headers[X_GOOG_RESOURCE_ID] |
| 253 | return Notification(message_number, state, resource_uri, resource_id) |
| 254 | |
| 255 | |
| 256 | @util.positional(2) |
| 257 | def new_webhook_channel(url, token=None, expiration=None, params=None): |
| 258 | """Create a new webhook Channel. |
| 259 | |
| 260 | Args: |
| 261 | url: str, URL to post notifications to. |
| 262 | token: str, An arbitrary string associated with the channel that |
| 263 | is delivered to the target address with each notification delivered |
| 264 | over this channel. |
| 265 | expiration: datetime.datetime, A time in the future when the channel |
| 266 | should expire. Can also be None if the subscription should use the |
| 267 | default expiration. Note that different services may have different |
| 268 | limits on how long a subscription lasts. Check the response from the |
| 269 | watch() method to see the value the service has set for an expiration |
| 270 | time. |
| 271 | params: dict, Extra parameters to pass on channel creation. Currently |
| 272 | not used for webhook channels. |
| 273 | """ |
| 274 | expiration_ms = 0 |
| 275 | if expiration: |
| 276 | delta = expiration - EPOCH |
| 277 | expiration_ms = delta.microseconds/1000 + ( |
| 278 | delta.seconds + delta.days*24*3600)*1000 |
| 279 | if expiration_ms < 0: |
| 280 | expiration_ms = 0 |
| 281 | |
| 282 | return Channel('web_hook', str(uuid.uuid4()), |
| 283 | token, url, expiration=expiration_ms, |
| 284 | params=params) |
| 285 | |