blob: b0e72262ffaeb565b3e2e6633a1c4e56d623e001 [file] [log] [blame]
nnoble097ef9b2014-12-01 17:06:10 -08001# Copyright 2014, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14# * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30require 'grpc'
31require 'grpc/generic/active_call'
32require 'xray/thread_dump_signal_handler'
33
34module GRPC
35
36 # ClientStub represents an endpoint used to send requests to GRPC servers.
37 class ClientStub
nnoble0c475f02014-12-05 15:37:39 -080038 include Core::StatusCodes
nnoble097ef9b2014-12-01 17:06:10 -080039
40 # Default deadline is 5 seconds.
41 DEFAULT_DEADLINE = 5
42
43 # Creates a new ClientStub.
44 #
45 # Minimally, a stub is created with the just the host of the gRPC service
46 # it wishes to access, e.g.,
47 #
48 # my_stub = ClientStub.new(example.host.com:50505)
49 #
50 # Any arbitrary keyword arguments are treated as channel arguments used to
51 # configure the RPC connection to the host.
52 #
temiola6919c752014-12-10 13:22:00 -080053 # There are some specific keyword args that are not used to configure the
nnoble097ef9b2014-12-01 17:06:10 -080054 # channel:
55 #
56 # - :channel_override
57 # when present, this must be a pre-created GRPC::Channel. If it's present
58 # the host and arbitrary keyword arg areignored, and the RPC connection uses
59 # this channel.
60 #
61 # - :deadline
62 # when present, this is the default deadline used for calls
63 #
temiola6919c752014-12-10 13:22:00 -080064 # - :update_metadata
65 # when present, this a func that takes a hash and returns a hash
66 # it can be used to update metadata, i.e, remove, change or update
67 # amend metadata values.
68 #
nnoble097ef9b2014-12-01 17:06:10 -080069 # @param host [String] the host the stub connects to
nnoble0c475f02014-12-05 15:37:39 -080070 # @param q [Core::CompletionQueue] used to wait for events
71 # @param channel_override [Core::Channel] a pre-created channel
nnoble097ef9b2014-12-01 17:06:10 -080072 # @param deadline [Number] the default deadline to use in requests
nnoble0c475f02014-12-05 15:37:39 -080073 # @param creds [Core::Credentials] secures and/or authenticates the channel
temiola6919c752014-12-10 13:22:00 -080074 # @param update_metadata a func that updates metadata as described above
75 # @param kw [KeywordArgs]the channel arguments
nnoble097ef9b2014-12-01 17:06:10 -080076 def initialize(host, q,
77 channel_override:nil,
temiola6919c752014-12-10 13:22:00 -080078 deadline: DEFAULT_DEADLINE,
79 creds: nil,
80 update_metadata: nil,
nnoble097ef9b2014-12-01 17:06:10 -080081 **kw)
nnoble0c475f02014-12-05 15:37:39 -080082 if !q.is_a?Core::CompletionQueue
nnoble097ef9b2014-12-01 17:06:10 -080083 raise ArgumentError.new('not a CompletionQueue')
84 end
temiola6919c752014-12-10 13:22:00 -080085 @queue = q
86
87 # set the channel instance
nnoble097ef9b2014-12-01 17:06:10 -080088 if !channel_override.nil?
89 ch = channel_override
nnoble0c475f02014-12-05 15:37:39 -080090 raise ArgumentError.new('not a Channel') unless ch.is_a?(Core::Channel)
91 elsif creds.nil?
92 ch = Core::Channel.new(host, kw)
93 elsif !creds.is_a?(Core::Credentials)
94 raise ArgumentError.new('not a Credentials')
nnoble097ef9b2014-12-01 17:06:10 -080095 else
nnoble0c475f02014-12-05 15:37:39 -080096 ch = Core::Channel.new(host, kw, creds)
nnoble097ef9b2014-12-01 17:06:10 -080097 end
nnoble097ef9b2014-12-01 17:06:10 -080098 @ch = ch
temiola6919c752014-12-10 13:22:00 -080099
100 @update_metadata = nil
101 if !update_metadata.nil?
102 if !update_metadata.is_a?(Proc)
103 raise ArgumentError.new('update_metadata is not a Proc')
104 end
105 @update_metadata = update_metadata
106 end
107
108
109 @host = host
110 @deadline = deadline
nnoble097ef9b2014-12-01 17:06:10 -0800111 end
112
113 # request_response sends a request to a GRPC server, and returns the
114 # response.
115 #
116 # == Flow Control ==
117 # This is a blocking call.
118 #
119 # * it does not return until a response is received.
120 #
121 # * the requests is sent only when GRPC core's flow control allows it to
122 # be sent.
123 #
124 # == Errors ==
125 # An RuntimeError is raised if
126 #
127 # * the server responds with a non-OK status
128 #
129 # * the deadline is exceeded
130 #
131 # == Return Value ==
132 #
133 # If return_op is false, the call returns the response
134 #
135 # If return_op is true, the call returns an Operation, calling execute
136 # on the Operation returns the response.
137 #
temiola6919c752014-12-10 13:22:00 -0800138 # == Keyword Args ==
139 #
140 # Unspecified keyword arguments are treated as metadata to be sent to the
141 # server.
142 #
nnoble097ef9b2014-12-01 17:06:10 -0800143 # @param method [String] the RPC method to call on the GRPC server
144 # @param req [Object] the request sent to the server
145 # @param marshal [Function] f(obj)->string that marshals requests
146 # @param unmarshal [Function] f(string)->obj that unmarshals responses
147 # @param deadline [Numeric] (optional) the max completion time in seconds
148 # @param return_op [true|false] (default false) return an Operation if true
149 # @return [Object] the response received from the server
150 def request_response(method, req, marshal, unmarshal, deadline=nil,
temiola6919c752014-12-10 13:22:00 -0800151 return_op:false, **kw)
nnoble097ef9b2014-12-01 17:06:10 -0800152 c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
temiola6919c752014-12-10 13:22:00 -0800153 md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
154 return c.request_response(req, **md) unless return_op
nnoble097ef9b2014-12-01 17:06:10 -0800155
156 # return the operation view of the active_call; define #execute as a
157 # new method for this instance that invokes #request_response.
158 op = c.operation
159 op.define_singleton_method(:execute) do
temiola6919c752014-12-10 13:22:00 -0800160 c.request_response(req, **md)
nnoble097ef9b2014-12-01 17:06:10 -0800161 end
162 op
163 end
164
165 # client_streamer sends a stream of requests to a GRPC server, and
166 # returns a single response.
167 #
168 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
169 # #each enumeration protocol. In the simplest case, requests will be an
170 # array of marshallable objects; in typical case it will be an Enumerable
171 # that allows dynamic construction of the marshallable objects.
172 #
173 # == Flow Control ==
174 # This is a blocking call.
175 #
176 # * it does not return until a response is received.
177 #
178 # * each requests is sent only when GRPC core's flow control allows it to
179 # be sent.
180 #
181 # == Errors ==
182 # An RuntimeError is raised if
183 #
184 # * the server responds with a non-OK status
185 #
186 # * the deadline is exceeded
187 #
188 # == Return Value ==
189 #
190 # If return_op is false, the call consumes the requests and returns
191 # the response.
192 #
193 # If return_op is true, the call returns the response.
194 #
temiola6919c752014-12-10 13:22:00 -0800195 # == Keyword Args ==
196 #
197 # Unspecified keyword arguments are treated as metadata to be sent to the
198 # server.
199 #
nnoble097ef9b2014-12-01 17:06:10 -0800200 # @param method [String] the RPC method to call on the GRPC server
201 # @param requests [Object] an Enumerable of requests to send
202 # @param marshal [Function] f(obj)->string that marshals requests
203 # @param unmarshal [Function] f(string)->obj that unmarshals responses
204 # @param deadline [Numeric] the max completion time in seconds
205 # @param return_op [true|false] (default false) return an Operation if true
206 # @return [Object|Operation] the response received from the server
207 def client_streamer(method, requests, marshal, unmarshal, deadline=nil,
temiola6919c752014-12-10 13:22:00 -0800208 return_op:false, **kw)
nnoble097ef9b2014-12-01 17:06:10 -0800209 c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
temiola6919c752014-12-10 13:22:00 -0800210 md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
211 return c.client_streamer(requests, **md) unless return_op
nnoble097ef9b2014-12-01 17:06:10 -0800212
213 # return the operation view of the active_call; define #execute as a
214 # new method for this instance that invokes #client_streamer.
215 op = c.operation
216 op.define_singleton_method(:execute) do
temiola6919c752014-12-10 13:22:00 -0800217 c.client_streamer(requests, **md)
nnoble097ef9b2014-12-01 17:06:10 -0800218 end
219 op
220 end
221
222 # server_streamer sends one request to the GRPC server, which yields a
223 # stream of responses.
224 #
225 # responses provides an enumerator over the streamed responses, i.e. it
226 # follows Ruby's #each iteration protocol. The enumerator blocks while
227 # waiting for each response, stops when the server signals that no
228 # further responses will be supplied. If the implicit block is provided,
229 # it is executed with each response as the argument and no result is
230 # returned.
231 #
232 # == Flow Control ==
233 # This is a blocking call.
234 #
235 # * the request is sent only when GRPC core's flow control allows it to
236 # be sent.
237 #
238 # * the request will not complete until the server sends the final response
239 # followed by a status message.
240 #
241 # == Errors ==
242 # An RuntimeError is raised if
243 #
244 # * the server responds with a non-OK status when any response is
245 # * retrieved
246 #
247 # * the deadline is exceeded
248 #
249 # == Return Value ==
250 #
251 # if the return_op is false, the return value is an Enumerator of the
252 # results, unless a block is provided, in which case the block is
253 # executed with each response.
254 #
255 # if return_op is true, the function returns an Operation whose #execute
256 # method runs server streamer call. Again, Operation#execute either
257 # calls the given block with each response or returns an Enumerator of the
258 # responses.
259 #
temiola6919c752014-12-10 13:22:00 -0800260 # == Keyword Args ==
261 #
262 # Unspecified keyword arguments are treated as metadata to be sent to the
263 # server.
264 #
nnoble097ef9b2014-12-01 17:06:10 -0800265 # @param method [String] the RPC method to call on the GRPC server
266 # @param req [Object] the request sent to the server
267 # @param marshal [Function] f(obj)->string that marshals requests
268 # @param unmarshal [Function] f(string)->obj that unmarshals responses
269 # @param deadline [Numeric] the max completion time in seconds
270 # @param return_op [true|false] (default false) return an Operation if true
271 # @param blk [Block] when provided, is executed for each response
272 # @return [Enumerator|Operation|nil] as discussed above
273 def server_streamer(method, req, marshal, unmarshal, deadline=nil,
temiola6919c752014-12-10 13:22:00 -0800274 return_op:false, **kw, &blk)
nnoble097ef9b2014-12-01 17:06:10 -0800275 c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
temiola6919c752014-12-10 13:22:00 -0800276 md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
277 return c.server_streamer(req, **md, &blk) unless return_op
nnoble097ef9b2014-12-01 17:06:10 -0800278
279 # return the operation view of the active_call; define #execute
280 # as a new method for this instance that invokes #server_streamer
281 op = c.operation
282 op.define_singleton_method(:execute) do
temiola6919c752014-12-10 13:22:00 -0800283 c.server_streamer(req, **md, &blk)
nnoble097ef9b2014-12-01 17:06:10 -0800284 end
285 op
286 end
287
288 # bidi_streamer sends a stream of requests to the GRPC server, and yields
289 # a stream of responses.
290 #
291 # This method takes an Enumerable of requests, and returns and enumerable
292 # of responses.
293 #
294 # == requests ==
295 #
296 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's #each
297 # enumeration protocol. In the simplest case, requests will be an array of
298 # marshallable objects; in typical case it will be an Enumerable that
299 # allows dynamic construction of the marshallable objects.
300 #
301 # == responses ==
302 #
303 # This is an enumerator of responses. I.e, its #next method blocks
304 # waiting for the next response. Also, if at any point the block needs
305 # to consume all the remaining responses, this can be done using #each or
306 # #collect. Calling #each or #collect should only be done if
307 # the_call#writes_done has been called, otherwise the block will loop
308 # forever.
309 #
310 # == Flow Control ==
311 # This is a blocking call.
312 #
313 # * the call completes when the next call to provided block returns
314 # * [False]
315 #
316 # * the execution block parameters are two objects for sending and
317 # receiving responses, each of which blocks waiting for flow control.
318 # E.g, calles to bidi_call#remote_send will wait until flow control
319 # allows another write before returning; and obviously calls to
320 # responses#next block until the next response is available.
321 #
322 # == Termination ==
323 #
324 # As well as sending and receiving messages, the block passed to the
325 # function is also responsible for:
326 #
327 # * calling bidi_call#writes_done to indicate no further reqs will be
328 # sent.
329 #
330 # * returning false if once the bidi stream is functionally completed.
331 #
332 # Note that response#next will indicate that there are no further
333 # responses by throwing StopIteration, but can only happen either
334 # if bidi_call#writes_done is called.
335 #
336 # To terminate the RPC correctly the block:
337 #
338 # * must call bidi#writes_done and then
339 #
340 # * either return false as soon as there is no need for other responses
341 #
342 # * loop on responses#next until no further responses are available
343 #
344 # == Errors ==
345 # An RuntimeError is raised if
346 #
347 # * the server responds with a non-OK status when any response is
348 # * retrieved
349 #
350 # * the deadline is exceeded
351 #
temiola6919c752014-12-10 13:22:00 -0800352 #
353 # == Keyword Args ==
354 #
355 # Unspecified keyword arguments are treated as metadata to be sent to the
356 # server.
357 #
nnoble097ef9b2014-12-01 17:06:10 -0800358 # == Return Value ==
359 #
360 # if the return_op is false, the return value is an Enumerator of the
361 # results, unless a block is provided, in which case the block is
362 # executed with each response.
363 #
364 # if return_op is true, the function returns an Operation whose #execute
365 # method runs the Bidi call. Again, Operation#execute either calls a
366 # given block with each response or returns an Enumerator of the responses.
367 #
368 # @param method [String] the RPC method to call on the GRPC server
369 # @param requests [Object] an Enumerable of requests to send
370 # @param marshal [Function] f(obj)->string that marshals requests
371 # @param unmarshal [Function] f(string)->obj that unmarshals responses
372 # @param deadline [Numeric] (optional) the max completion time in seconds
373 # @param blk [Block] when provided, is executed for each response
374 # @param return_op [true|false] (default false) return an Operation if true
375 # @return [Enumerator|nil|Operation] as discussed above
376 def bidi_streamer(method, requests, marshal, unmarshal, deadline=nil,
temiola6919c752014-12-10 13:22:00 -0800377 return_op:false, **kw, &blk)
nnoble097ef9b2014-12-01 17:06:10 -0800378 c = new_active_call(method, marshal, unmarshal, deadline || @deadline)
temiola6919c752014-12-10 13:22:00 -0800379 md = @update_metadata.nil? ? kw : @update_metadata.call(kw.clone)
380 return c.bidi_streamer(requests, **md, &blk) unless return_op
nnoble097ef9b2014-12-01 17:06:10 -0800381
382 # return the operation view of the active_call; define #execute
383 # as a new method for this instance that invokes #bidi_streamer
384 op = c.operation
385 op.define_singleton_method(:execute) do
temiola6919c752014-12-10 13:22:00 -0800386 c.bidi_streamer(requests, **md, &blk)
nnoble097ef9b2014-12-01 17:06:10 -0800387 end
388 op
389 end
390
391 private
392 # Creates a new active stub
393 #
394 # @param ch [GRPC::Channel] the channel used to create the stub.
395 # @param marshal [Function] f(obj)->string that marshals requests
396 # @param unmarshal [Function] f(string)->obj that unmarshals responses
397 # @param deadline [TimeConst]
398 def new_active_call(ch, marshal, unmarshal, deadline=nil)
nnoble0c475f02014-12-05 15:37:39 -0800399 absolute_deadline = Core::TimeConsts.from_relative_time(deadline)
nnoble097ef9b2014-12-01 17:06:10 -0800400 call = @ch.create_call(ch, @host, absolute_deadline)
401 ActiveCall.new(call, @queue, marshal, unmarshal, absolute_deadline,
402 started:false)
403 end
404
405 end
406
407end