nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 1 | # Copyright 2014, Google Inc. |
| 2 | # All rights reserved. |
| 3 | # |
| 4 | # Redistribution and use in source and binary forms, with or without |
| 5 | # modification, are permitted provided that the following conditions are |
| 6 | # met: |
| 7 | # |
| 8 | # * Redistributions of source code must retain the above copyright |
| 9 | # notice, this list of conditions and the following disclaimer. |
| 10 | # * Redistributions in binary form must reproduce the above |
| 11 | # copyright notice, this list of conditions and the following disclaimer |
| 12 | # in the documentation and/or other materials provided with the |
| 13 | # distribution. |
| 14 | # * Neither the name of Google Inc. nor the names of its |
| 15 | # contributors may be used to endorse or promote products derived from |
| 16 | # this software without specific prior written permission. |
| 17 | # |
| 18 | # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 19 | # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 20 | # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 21 | # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 22 | # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 23 | # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 24 | # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 25 | # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 26 | # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 27 | # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 28 | # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 29 | |
| 30 | require 'grpc' |
| 31 | require 'grpc/generic/active_call' |
| 32 | require 'xray/thread_dump_signal_handler' |
| 33 | |
| 34 | module GRPC |
| 35 | |
| 36 | # ClientStub represents an endpoint used to send requests to GRPC servers. |
| 37 | class ClientStub |
nnoble | 0c475f0 | 2014-12-05 15:37:39 -0800 | [diff] [blame] | 38 | include Core::StatusCodes |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 39 | |
| 40 | # Default deadline is 5 seconds. |
| 41 | DEFAULT_DEADLINE = 5 |
| 42 | |
| 43 | # Creates a new ClientStub. |
| 44 | # |
| 45 | # Minimally, a stub is created with the just the host of the gRPC service |
| 46 | # it wishes to access, e.g., |
| 47 | # |
| 48 | # my_stub = ClientStub.new(example.host.com:50505) |
| 49 | # |
| 50 | # Any arbitrary keyword arguments are treated as channel arguments used to |
| 51 | # configure the RPC connection to the host. |
| 52 | # |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 53 | # There are some specific keyword args that are not used to configure the |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 54 | # channel: |
| 55 | # |
| 56 | # - :channel_override |
| 57 | # when present, this must be a pre-created GRPC::Channel. If it's present |
| 58 | # the host and arbitrary keyword arg areignored, and the RPC connection uses |
| 59 | # this channel. |
| 60 | # |
| 61 | # - :deadline |
| 62 | # when present, this is the default deadline used for calls |
| 63 | # |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 64 | # - :update_metadata |
| 65 | # when present, this a func that takes a hash and returns a hash |
| 66 | # it can be used to update metadata, i.e, remove, change or update |
| 67 | # amend metadata values. |
| 68 | # |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 69 | # @param host [String] the host the stub connects to |
nnoble | 0c475f0 | 2014-12-05 15:37:39 -0800 | [diff] [blame] | 70 | # @param q [Core::CompletionQueue] used to wait for events |
| 71 | # @param channel_override [Core::Channel] a pre-created channel |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 72 | # @param deadline [Number] the default deadline to use in requests |
nnoble | 0c475f0 | 2014-12-05 15:37:39 -0800 | [diff] [blame] | 73 | # @param creds [Core::Credentials] secures and/or authenticates the channel |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 74 | # @param update_metadata a func that updates metadata as described above |
| 75 | # @param kw [KeywordArgs]the channel arguments |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 76 | def initialize(host, q, |
| 77 | channel_override:nil, |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 78 | deadline: DEFAULT_DEADLINE, |
| 79 | creds: nil, |
| 80 | update_metadata: nil, |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 81 | **kw) |
nnoble | 0c475f0 | 2014-12-05 15:37:39 -0800 | [diff] [blame] | 82 | if !q.is_a?Core::CompletionQueue |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 83 | raise ArgumentError.new('not a CompletionQueue') |
| 84 | end |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 85 | @queue = q |
| 86 | |
| 87 | # set the channel instance |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 88 | if !channel_override.nil? |
| 89 | ch = channel_override |
nnoble | 0c475f0 | 2014-12-05 15:37:39 -0800 | [diff] [blame] | 90 | raise ArgumentError.new('not a Channel') unless ch.is_a?(Core::Channel) |
| 91 | elsif creds.nil? |
| 92 | ch = Core::Channel.new(host, kw) |
| 93 | elsif !creds.is_a?(Core::Credentials) |
| 94 | raise ArgumentError.new('not a Credentials') |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 95 | else |
nnoble | 0c475f0 | 2014-12-05 15:37:39 -0800 | [diff] [blame] | 96 | ch = Core::Channel.new(host, kw, creds) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 97 | end |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 98 | @ch = ch |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 99 | |
| 100 | @update_metadata = nil |
| 101 | if !update_metadata.nil? |
| 102 | if !update_metadata.is_a?(Proc) |
| 103 | raise ArgumentError.new('update_metadata is not a Proc') |
| 104 | end |
| 105 | @update_metadata = update_metadata |
| 106 | end |
| 107 | |
| 108 | |
| 109 | @host = host |
| 110 | @deadline = deadline |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 111 | end |
| 112 | |
| 113 | # request_response sends a request to a GRPC server, and returns the |
| 114 | # response. |
| 115 | # |
| 116 | # == Flow Control == |
| 117 | # This is a blocking call. |
| 118 | # |
| 119 | # * it does not return until a response is received. |
| 120 | # |
| 121 | # * the requests is sent only when GRPC core's flow control allows it to |
| 122 | # be sent. |
| 123 | # |
| 124 | # == Errors == |
| 125 | # An RuntimeError is raised if |
| 126 | # |
| 127 | # * the server responds with a non-OK status |
| 128 | # |
| 129 | # * the deadline is exceeded |
| 130 | # |
| 131 | # == Return Value == |
| 132 | # |
| 133 | # If return_op is false, the call returns the response |
| 134 | # |
| 135 | # If return_op is true, the call returns an Operation, calling execute |
| 136 | # on the Operation returns the response. |
| 137 | # |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 138 | # == Keyword Args == |
| 139 | # |
| 140 | # Unspecified keyword arguments are treated as metadata to be sent to the |
| 141 | # server. |
| 142 | # |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 143 | # @param method [String] the RPC method to call on the GRPC server |
| 144 | # @param req [Object] the request sent to the server |
| 145 | # @param marshal [Function] f(obj)->string that marshals requests |
| 146 | # @param unmarshal [Function] f(string)->obj that unmarshals responses |
| 147 | # @param deadline [Numeric] (optional) the max completion time in seconds |
| 148 | # @param return_op [true|false] (default false) return an Operation if true |
| 149 | # @return [Object] the response received from the server |
| 150 | def request_response(method, req, marshal, unmarshal, deadline=nil, |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 151 | return_op:false, **kw) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 152 | c = new_active_call(method, marshal, unmarshal, deadline || @deadline) |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 153 | md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) |
| 154 | return c.request_response(req, **md) unless return_op |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 155 | |
| 156 | # return the operation view of the active_call; define #execute as a |
| 157 | # new method for this instance that invokes #request_response. |
| 158 | op = c.operation |
| 159 | op.define_singleton_method(:execute) do |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 160 | c.request_response(req, **md) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 161 | end |
| 162 | op |
| 163 | end |
| 164 | |
| 165 | # client_streamer sends a stream of requests to a GRPC server, and |
| 166 | # returns a single response. |
| 167 | # |
| 168 | # requests provides an 'iterable' of Requests. I.e. it follows Ruby's |
| 169 | # #each enumeration protocol. In the simplest case, requests will be an |
| 170 | # array of marshallable objects; in typical case it will be an Enumerable |
| 171 | # that allows dynamic construction of the marshallable objects. |
| 172 | # |
| 173 | # == Flow Control == |
| 174 | # This is a blocking call. |
| 175 | # |
| 176 | # * it does not return until a response is received. |
| 177 | # |
| 178 | # * each requests is sent only when GRPC core's flow control allows it to |
| 179 | # be sent. |
| 180 | # |
| 181 | # == Errors == |
| 182 | # An RuntimeError is raised if |
| 183 | # |
| 184 | # * the server responds with a non-OK status |
| 185 | # |
| 186 | # * the deadline is exceeded |
| 187 | # |
| 188 | # == Return Value == |
| 189 | # |
| 190 | # If return_op is false, the call consumes the requests and returns |
| 191 | # the response. |
| 192 | # |
| 193 | # If return_op is true, the call returns the response. |
| 194 | # |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 195 | # == Keyword Args == |
| 196 | # |
| 197 | # Unspecified keyword arguments are treated as metadata to be sent to the |
| 198 | # server. |
| 199 | # |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 200 | # @param method [String] the RPC method to call on the GRPC server |
| 201 | # @param requests [Object] an Enumerable of requests to send |
| 202 | # @param marshal [Function] f(obj)->string that marshals requests |
| 203 | # @param unmarshal [Function] f(string)->obj that unmarshals responses |
| 204 | # @param deadline [Numeric] the max completion time in seconds |
| 205 | # @param return_op [true|false] (default false) return an Operation if true |
| 206 | # @return [Object|Operation] the response received from the server |
| 207 | def client_streamer(method, requests, marshal, unmarshal, deadline=nil, |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 208 | return_op:false, **kw) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 209 | c = new_active_call(method, marshal, unmarshal, deadline || @deadline) |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 210 | md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) |
| 211 | return c.client_streamer(requests, **md) unless return_op |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 212 | |
| 213 | # return the operation view of the active_call; define #execute as a |
| 214 | # new method for this instance that invokes #client_streamer. |
| 215 | op = c.operation |
| 216 | op.define_singleton_method(:execute) do |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 217 | c.client_streamer(requests, **md) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 218 | end |
| 219 | op |
| 220 | end |
| 221 | |
| 222 | # server_streamer sends one request to the GRPC server, which yields a |
| 223 | # stream of responses. |
| 224 | # |
| 225 | # responses provides an enumerator over the streamed responses, i.e. it |
| 226 | # follows Ruby's #each iteration protocol. The enumerator blocks while |
| 227 | # waiting for each response, stops when the server signals that no |
| 228 | # further responses will be supplied. If the implicit block is provided, |
| 229 | # it is executed with each response as the argument and no result is |
| 230 | # returned. |
| 231 | # |
| 232 | # == Flow Control == |
| 233 | # This is a blocking call. |
| 234 | # |
| 235 | # * the request is sent only when GRPC core's flow control allows it to |
| 236 | # be sent. |
| 237 | # |
| 238 | # * the request will not complete until the server sends the final response |
| 239 | # followed by a status message. |
| 240 | # |
| 241 | # == Errors == |
| 242 | # An RuntimeError is raised if |
| 243 | # |
| 244 | # * the server responds with a non-OK status when any response is |
| 245 | # * retrieved |
| 246 | # |
| 247 | # * the deadline is exceeded |
| 248 | # |
| 249 | # == Return Value == |
| 250 | # |
| 251 | # if the return_op is false, the return value is an Enumerator of the |
| 252 | # results, unless a block is provided, in which case the block is |
| 253 | # executed with each response. |
| 254 | # |
| 255 | # if return_op is true, the function returns an Operation whose #execute |
| 256 | # method runs server streamer call. Again, Operation#execute either |
| 257 | # calls the given block with each response or returns an Enumerator of the |
| 258 | # responses. |
| 259 | # |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 260 | # == Keyword Args == |
| 261 | # |
| 262 | # Unspecified keyword arguments are treated as metadata to be sent to the |
| 263 | # server. |
| 264 | # |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 265 | # @param method [String] the RPC method to call on the GRPC server |
| 266 | # @param req [Object] the request sent to the server |
| 267 | # @param marshal [Function] f(obj)->string that marshals requests |
| 268 | # @param unmarshal [Function] f(string)->obj that unmarshals responses |
| 269 | # @param deadline [Numeric] the max completion time in seconds |
| 270 | # @param return_op [true|false] (default false) return an Operation if true |
| 271 | # @param blk [Block] when provided, is executed for each response |
| 272 | # @return [Enumerator|Operation|nil] as discussed above |
| 273 | def server_streamer(method, req, marshal, unmarshal, deadline=nil, |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 274 | return_op:false, **kw, &blk) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 275 | c = new_active_call(method, marshal, unmarshal, deadline || @deadline) |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 276 | md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) |
| 277 | return c.server_streamer(req, **md, &blk) unless return_op |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 278 | |
| 279 | # return the operation view of the active_call; define #execute |
| 280 | # as a new method for this instance that invokes #server_streamer |
| 281 | op = c.operation |
| 282 | op.define_singleton_method(:execute) do |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 283 | c.server_streamer(req, **md, &blk) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 284 | end |
| 285 | op |
| 286 | end |
| 287 | |
| 288 | # bidi_streamer sends a stream of requests to the GRPC server, and yields |
| 289 | # a stream of responses. |
| 290 | # |
| 291 | # This method takes an Enumerable of requests, and returns and enumerable |
| 292 | # of responses. |
| 293 | # |
| 294 | # == requests == |
| 295 | # |
| 296 | # requests provides an 'iterable' of Requests. I.e. it follows Ruby's #each |
| 297 | # enumeration protocol. In the simplest case, requests will be an array of |
| 298 | # marshallable objects; in typical case it will be an Enumerable that |
| 299 | # allows dynamic construction of the marshallable objects. |
| 300 | # |
| 301 | # == responses == |
| 302 | # |
| 303 | # This is an enumerator of responses. I.e, its #next method blocks |
| 304 | # waiting for the next response. Also, if at any point the block needs |
| 305 | # to consume all the remaining responses, this can be done using #each or |
| 306 | # #collect. Calling #each or #collect should only be done if |
| 307 | # the_call#writes_done has been called, otherwise the block will loop |
| 308 | # forever. |
| 309 | # |
| 310 | # == Flow Control == |
| 311 | # This is a blocking call. |
| 312 | # |
| 313 | # * the call completes when the next call to provided block returns |
| 314 | # * [False] |
| 315 | # |
| 316 | # * the execution block parameters are two objects for sending and |
| 317 | # receiving responses, each of which blocks waiting for flow control. |
| 318 | # E.g, calles to bidi_call#remote_send will wait until flow control |
| 319 | # allows another write before returning; and obviously calls to |
| 320 | # responses#next block until the next response is available. |
| 321 | # |
| 322 | # == Termination == |
| 323 | # |
| 324 | # As well as sending and receiving messages, the block passed to the |
| 325 | # function is also responsible for: |
| 326 | # |
| 327 | # * calling bidi_call#writes_done to indicate no further reqs will be |
| 328 | # sent. |
| 329 | # |
| 330 | # * returning false if once the bidi stream is functionally completed. |
| 331 | # |
| 332 | # Note that response#next will indicate that there are no further |
| 333 | # responses by throwing StopIteration, but can only happen either |
| 334 | # if bidi_call#writes_done is called. |
| 335 | # |
| 336 | # To terminate the RPC correctly the block: |
| 337 | # |
| 338 | # * must call bidi#writes_done and then |
| 339 | # |
| 340 | # * either return false as soon as there is no need for other responses |
| 341 | # |
| 342 | # * loop on responses#next until no further responses are available |
| 343 | # |
| 344 | # == Errors == |
| 345 | # An RuntimeError is raised if |
| 346 | # |
| 347 | # * the server responds with a non-OK status when any response is |
| 348 | # * retrieved |
| 349 | # |
| 350 | # * the deadline is exceeded |
| 351 | # |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 352 | # |
| 353 | # == Keyword Args == |
| 354 | # |
| 355 | # Unspecified keyword arguments are treated as metadata to be sent to the |
| 356 | # server. |
| 357 | # |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 358 | # == Return Value == |
| 359 | # |
| 360 | # if the return_op is false, the return value is an Enumerator of the |
| 361 | # results, unless a block is provided, in which case the block is |
| 362 | # executed with each response. |
| 363 | # |
| 364 | # if return_op is true, the function returns an Operation whose #execute |
| 365 | # method runs the Bidi call. Again, Operation#execute either calls a |
| 366 | # given block with each response or returns an Enumerator of the responses. |
| 367 | # |
| 368 | # @param method [String] the RPC method to call on the GRPC server |
| 369 | # @param requests [Object] an Enumerable of requests to send |
| 370 | # @param marshal [Function] f(obj)->string that marshals requests |
| 371 | # @param unmarshal [Function] f(string)->obj that unmarshals responses |
| 372 | # @param deadline [Numeric] (optional) the max completion time in seconds |
| 373 | # @param blk [Block] when provided, is executed for each response |
| 374 | # @param return_op [true|false] (default false) return an Operation if true |
| 375 | # @return [Enumerator|nil|Operation] as discussed above |
| 376 | def bidi_streamer(method, requests, marshal, unmarshal, deadline=nil, |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 377 | return_op:false, **kw, &blk) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 378 | c = new_active_call(method, marshal, unmarshal, deadline || @deadline) |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 379 | md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone) |
| 380 | return c.bidi_streamer(requests, **md, &blk) unless return_op |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 381 | |
| 382 | # return the operation view of the active_call; define #execute |
| 383 | # as a new method for this instance that invokes #bidi_streamer |
| 384 | op = c.operation |
| 385 | op.define_singleton_method(:execute) do |
temiola | 6919c75 | 2014-12-10 13:22:00 -0800 | [diff] [blame^] | 386 | c.bidi_streamer(requests, **md, &blk) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 387 | end |
| 388 | op |
| 389 | end |
| 390 | |
| 391 | private |
| 392 | # Creates a new active stub |
| 393 | # |
| 394 | # @param ch [GRPC::Channel] the channel used to create the stub. |
| 395 | # @param marshal [Function] f(obj)->string that marshals requests |
| 396 | # @param unmarshal [Function] f(string)->obj that unmarshals responses |
| 397 | # @param deadline [TimeConst] |
| 398 | def new_active_call(ch, marshal, unmarshal, deadline=nil) |
nnoble | 0c475f0 | 2014-12-05 15:37:39 -0800 | [diff] [blame] | 399 | absolute_deadline = Core::TimeConsts.from_relative_time(deadline) |
nnoble | 097ef9b | 2014-12-01 17:06:10 -0800 | [diff] [blame] | 400 | call = @ch.create_call(ch, @host, absolute_deadline) |
| 401 | ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline, |
| 402 | started:false) |
| 403 | end |
| 404 | |
| 405 | end |
| 406 | |
| 407 | end |