blob: 24b75d16686bd791c626d4c9bc18259f93039879 [file] [log] [blame]
Jan Tattermuscha7fff862015-02-13 11:08:08 -08001#region Copyright notice and license
2
Jan Tattermuschaf77b3d2015-02-13 11:22:21 -08003// Copyright 2015, Google Inc.
Jan Tattermuscha7fff862015-02-13 11:08:08 -08004// All rights reserved.
Jan Tattermusch337a2dd2015-02-13 15:41:41 -08005//
Jan Tattermuscha7fff862015-02-13 11:08:08 -08006// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are
8// met:
Jan Tattermusch337a2dd2015-02-13 15:41:41 -08009//
Jan Tattermuscha7fff862015-02-13 11:08:08 -080010// * Redistributions of source code must retain the above copyright
11// notice, this list of conditions and the following disclaimer.
12// * Redistributions in binary form must reproduce the above
13// copyright notice, this list of conditions and the following disclaimer
14// in the documentation and/or other materials provided with the
15// distribution.
16// * Neither the name of Google Inc. nor the names of its
17// contributors may be used to endorse or promote products derived from
18// this software without specific prior written permission.
Jan Tattermusch337a2dd2015-02-13 15:41:41 -080019//
Jan Tattermuscha7fff862015-02-13 11:08:08 -080020// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32#endregion
33
Jan Tattermuscha7608b02015-02-03 17:54:38 -080034using System;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080035using System.Diagnostics;
Jan Tattermusch30868622015-02-19 09:22:33 -080036using System.Runtime.CompilerServices;
37using System.Runtime.InteropServices;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080038using System.Threading;
39using System.Threading.Tasks;
Jan Tattermusch30868622015-02-19 09:22:33 -080040using Grpc.Core.Internal;
Jan Tattermusch4d703262015-02-23 11:33:44 -080041using Grpc.Core.Utils;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080042
Jan Tattermusch30868622015-02-19 09:22:33 -080043namespace Grpc.Core.Internal
Jan Tattermuscha7608b02015-02-03 17:54:38 -080044{
45 /// <summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -070046 /// Manages client side native call lifecycle.
Jan Tattermuscha7608b02015-02-03 17:54:38 -080047 /// </summary>
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080048 internal class AsyncCall<TRequest, TResponse> : AsyncCallBase<TRequest, TResponse>
Jan Tattermuscha7608b02015-02-03 17:54:38 -080049 {
Jan Tattermusch04eb89c2015-06-12 13:03:05 -070050 Channel channel;
51
Jan Tattermusch607307d2015-02-18 11:05:45 -080052 // Completion of a pending unary response if not null.
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080053 TaskCompletionSource<TResponse> unaryResponseTcs;
Jan Tattermusch337a2dd2015-02-13 15:41:41 -080054
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080055 // Set after status is received. Only used for streaming response calls.
Jan Tattermusch075dde42015-03-11 18:21:00 -070056 Status? finishedStatus;
Jan Tattermusch607307d2015-02-18 11:05:45 -080057
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080058 bool readObserverCompleted; // True if readObserver has already been completed.
Jan Tattermuscha7608b02015-02-03 17:54:38 -080059
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080060 public AsyncCall(Func<TRequest, byte[]> serializer, Func<byte[], TResponse> deserializer) : base(serializer, deserializer)
Jan Tattermuscha7608b02015-02-03 17:54:38 -080061 {
Jan Tattermuscha7608b02015-02-03 17:54:38 -080062 }
63
Jan Tattermusch075dde42015-03-11 18:21:00 -070064 public void Initialize(Channel channel, CompletionQueueSafeHandle cq, string methodName)
Jan Tattermusch607307d2015-02-18 11:05:45 -080065 {
Jan Tattermusch04eb89c2015-06-12 13:03:05 -070066 this.channel = channel;
67 var call = CallSafeHandle.Create(channel.Handle, channel.CompletionRegistry, cq, methodName, channel.Target, Timespec.InfFuture);
68 channel.Environment.DebugStats.ActiveClientCalls.Increment();
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080069 InitializeInternal(call);
Jan Tattermuscha7608b02015-02-03 17:54:38 -080070 }
71
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080072 // TODO: this method is not Async, so it shouldn't be in AsyncCall class, but
73 // it is reusing fair amount of code in this class, so we are leaving it here.
74 // TODO: for other calls, you need to call Initialize, this methods calls initialize
75 // on its own, so there's a usage inconsistency.
76 /// <summary>
77 /// Blocking unary request - unary response call.
78 /// </summary>
Jan Tattermuschc0b37212015-03-13 08:35:41 -070079 public TResponse UnaryCall(Channel channel, string methodName, TRequest msg, Metadata headers)
Jan Tattermusch50faa8f2015-02-21 17:51:52 -080080 {
Jan Tattermusch075dde42015-03-11 18:21:00 -070081 using (CompletionQueueSafeHandle cq = CompletionQueueSafeHandle.Create())
Jan Tattermusch50faa8f2015-02-21 17:51:52 -080082 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080083 byte[] payload = UnsafeSerialize(msg);
Jan Tattermusch50faa8f2015-02-21 17:51:52 -080084
Jan Tattermuscha29d0f32015-03-04 17:54:56 -080085 unaryResponseTcs = new TaskCompletionSource<TResponse>();
Jan Tattermusch50faa8f2015-02-21 17:51:52 -080086
87 lock (myLock)
88 {
89 Initialize(channel, cq, methodName);
90 started = true;
91 halfcloseRequested = true;
92 readingDone = true;
93 }
Jan Tattermuschc0b37212015-03-13 08:35:41 -070094
95 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
96 {
Jan Tattermuschd3677482015-06-01 19:27:40 -070097 using (var ctx = BatchContextSafeHandle.Create())
98 {
99 call.StartUnary(payload, ctx, metadataArray);
100 var ev = cq.Pluck(ctx.Handle);
101
102 bool success = (ev.success != 0);
103 try
104 {
105 HandleUnaryResponse(success, ctx);
106 }
107 catch (Exception e)
108 {
109 Console.WriteLine("Exception occured while invoking completion delegate: " + e);
110 }
111 }
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700112 }
Jan Tattermusch50faa8f2015-02-21 17:51:52 -0800113
Jan Tattermusch4d703262015-02-23 11:33:44 -0800114 try
115 {
116 // Once the blocking call returns, the result should be available synchronously.
117 return unaryResponseTcs.Task.Result;
118 }
119 catch (AggregateException ae)
120 {
121 throw ExceptionHelper.UnwrapRpcException(ae);
122 }
Jan Tattermusch50faa8f2015-02-21 17:51:52 -0800123 }
124 }
125
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800126 /// <summary>
127 /// Starts a unary request - unary response call.
128 /// </summary>
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700129 public Task<TResponse> UnaryCallAsync(TRequest msg, Metadata headers)
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800130 {
131 lock (myLock)
132 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800133 Preconditions.CheckNotNull(call);
134
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800135 started = true;
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800136 halfcloseRequested = true;
Jan Tattermusch607307d2015-02-18 11:05:45 -0800137 readingDone = true;
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800138
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800139 byte[] payload = UnsafeSerialize(msg);
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800140
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800141 unaryResponseTcs = new TaskCompletionSource<TResponse>();
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700142 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
143 {
Jan Tattermuschd3677482015-06-01 19:27:40 -0700144 call.StartUnary(payload, HandleUnaryResponse, metadataArray);
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700145 }
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800146 return unaryResponseTcs.Task;
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800147 }
148 }
149
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800150 /// <summary>
151 /// Starts a streamed request - unary response call.
152 /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
153 /// </summary>
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700154 public Task<TResponse> ClientStreamingCallAsync(Metadata headers)
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800155 {
156 lock (myLock)
157 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800158 Preconditions.CheckNotNull(call);
159
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800160 started = true;
Jan Tattermusch607307d2015-02-18 11:05:45 -0800161 readingDone = true;
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800162
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800163 unaryResponseTcs = new TaskCompletionSource<TResponse>();
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700164 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
165 {
Jan Tattermuschd3677482015-06-01 19:27:40 -0700166 call.StartClientStreaming(HandleUnaryResponse, metadataArray);
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700167 }
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800168
169 return unaryResponseTcs.Task;
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800170 }
171 }
172
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800173 /// <summary>
174 /// Starts a unary request - streamed response call.
175 /// </summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700176 public void StartServerStreamingCall(TRequest msg, Metadata headers)
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800177 {
178 lock (myLock)
179 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800180 Preconditions.CheckNotNull(call);
181
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800182 started = true;
183 halfcloseRequested = true;
Jan Tattermusch4788e782015-02-24 16:14:28 -0800184 halfclosed = true; // halfclose not confirmed yet, but it will be once finishedHandler is called.
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800185
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800186 byte[] payload = UnsafeSerialize(msg);
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700187
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700188 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
189 {
Jan Tattermuschd3677482015-06-01 19:27:40 -0700190 call.StartServerStreaming(payload, HandleFinished, metadataArray);
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700191 }
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800192 }
193 }
194
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800195 /// <summary>
196 /// Starts a streaming request - streaming response call.
197 /// Use StartSendMessage and StartSendCloseFromClient to stream requests.
198 /// </summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700199 public void StartDuplexStreamingCall(Metadata headers)
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800200 {
201 lock (myLock)
202 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800203 Preconditions.CheckNotNull(call);
204
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800205 started = true;
206
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700207 using (var metadataArray = MetadataArraySafeHandle.Create(headers))
208 {
Jan Tattermuschd3677482015-06-01 19:27:40 -0700209 call.StartDuplexStreaming(HandleFinished, metadataArray);
Jan Tattermuschc0b37212015-03-13 08:35:41 -0700210 }
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800211 }
212 }
213
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800214 /// <summary>
215 /// Sends a streaming request. Only one pending send action is allowed at any given time.
216 /// completionDelegate is called when the operation finishes.
217 /// </summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700218 public void StartSendMessage(TRequest msg, AsyncCompletionDelegate<object> completionDelegate)
Jan Tattermusch607307d2015-02-18 11:05:45 -0800219 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800220 StartSendMessageInternal(msg, completionDelegate);
Jan Tattermusch607307d2015-02-18 11:05:45 -0800221 }
Craig Tiller190d3602015-02-18 09:23:38 -0800222
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800223 /// <summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700224 /// Receives a streaming response. Only one pending read action is allowed at any given time.
225 /// completionDelegate is called when the operation finishes.
226 /// </summary>
227 public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
228 {
229 StartReadMessageInternal(completionDelegate);
230 }
231
232 /// <summary>
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800233 /// Sends halfclose, indicating client is done with streaming requests.
234 /// Only one pending send action is allowed at any given time.
235 /// completionDelegate is called when the operation finishes.
236 /// </summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700237 public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
Jan Tattermusch607307d2015-02-18 11:05:45 -0800238 {
239 lock (myLock)
240 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800241 Preconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
242 CheckSendingAllowed();
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800243
Jan Tattermuschd3677482015-06-01 19:27:40 -0700244 call.StartSendCloseFromClient(HandleHalfclosed);
Jan Tattermusch607307d2015-02-18 11:05:45 -0800245
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800246 halfcloseRequested = true;
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800247 sendCompletionDelegate = completionDelegate;
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800248 }
249 }
250
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800251 /// <summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700252 /// On client-side, we only fire readCompletionDelegate once all messages have been read
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800253 /// and status has been received.
254 /// </summary>
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700255 protected override void ProcessLastRead(AsyncCompletionDelegate<TResponse> completionDelegate)
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800256 {
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700257 if (completionDelegate != null && readingDone && finishedStatus.HasValue)
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800258 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800259 bool shouldComplete;
260 lock (myLock)
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800261 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800262 shouldComplete = !readObserverCompleted;
263 readObserverCompleted = true;
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800264 }
265
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800266 if (shouldComplete)
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800267 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800268 var status = finishedStatus.Value;
269 if (status.StatusCode != StatusCode.OK)
270 {
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700271 FireCompletion(completionDelegate, default(TResponse), new RpcException(status));
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800272 }
273 else
274 {
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700275 FireCompletion(completionDelegate, default(TResponse), null);
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800276 }
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800277 }
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800278 }
279 }
280
Jan Tattermusch1b54fcf2015-05-01 14:30:16 -0700281 protected override void OnReleaseResources()
282 {
Jan Tattermusch04eb89c2015-06-12 13:03:05 -0700283 channel.Environment.DebugStats.ActiveClientCalls.Decrement();
Jan Tattermusch1b54fcf2015-05-01 14:30:16 -0700284 }
285
Jan Tattermusch607307d2015-02-18 11:05:45 -0800286 /// <summary>
287 /// Handler for unary response completion.
288 /// </summary>
Jan Tattermuschd3677482015-06-01 19:27:40 -0700289 private void HandleUnaryResponse(bool success, BatchContextSafeHandle ctx)
Jan Tattermusch607307d2015-02-18 11:05:45 -0800290 {
Jan Tattermusch075dde42015-03-11 18:21:00 -0700291 lock (myLock)
Jan Tattermusch607307d2015-02-18 11:05:45 -0800292 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800293 finished = true;
294 halfclosed = true;
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800295
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800296 ReleaseResourcesIfPossible();
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800297 }
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800298
Jan Tattermusch1215c332015-05-04 19:39:59 -0700299 if (!success)
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800300 {
Jan Tattermusch075dde42015-03-11 18:21:00 -0700301 unaryResponseTcs.SetException(new RpcException(new Status(StatusCode.Internal, "Internal error occured.")));
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800302 return;
303 }
304
305 var status = ctx.GetReceivedStatus();
306 if (status.StatusCode != StatusCode.OK)
307 {
308 unaryResponseTcs.SetException(new RpcException(status));
309 return;
310 }
311
312 // TODO: handle deserialization error
313 TResponse msg;
314 TryDeserialize(ctx.GetReceivedMessage(), out msg);
315
316 unaryResponseTcs.SetResult(msg);
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800317 }
318
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800319 /// <summary>
320 /// Handles receive status completion for calls with streaming response.
321 /// </summary>
Jan Tattermuschd3677482015-06-01 19:27:40 -0700322 private void HandleFinished(bool success, BatchContextSafeHandle ctx)
Jan Tattermusch607307d2015-02-18 11:05:45 -0800323 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800324 var status = ctx.GetReceivedStatus();
325
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700326 AsyncCompletionDelegate<TResponse> origReadCompletionDelegate = null;
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800327 lock (myLock)
Jan Tattermusch607307d2015-02-18 11:05:45 -0800328 {
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800329 finished = true;
330 finishedStatus = status;
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800331
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700332 origReadCompletionDelegate = readCompletionDelegate;
333
Jan Tattermuscha29d0f32015-03-04 17:54:56 -0800334 ReleaseResourcesIfPossible();
Jan Tattermusch607307d2015-02-18 11:05:45 -0800335 }
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800336
Jan Tattermuscha5272b62015-04-30 11:56:46 -0700337 ProcessLastRead(origReadCompletionDelegate);
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800338 }
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800339 }
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800340}