blob: a8919a10d9d0d4d899c7fd6dfec490ec0a5feae6 [file] [log] [blame]
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -08001/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 * * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 * * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34#include "src/cpp/client/channel.h"
35
36#include <chrono>
yangg59dfc902014-12-19 14:00:14 -080037#include <memory>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080038
39#include <grpc/grpc.h>
yangg59dfc902014-12-19 14:00:14 -080040#include <grpc/grpc_security.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080041#include <grpc/support/log.h>
42#include <grpc/support/slice.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080043
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080044#include "src/cpp/proto/proto_utils.h"
45#include "src/cpp/stream/stream_context.h"
yangg59dfc902014-12-19 14:00:14 -080046#include <grpc++/channel_arguments.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080047#include <grpc++/client_context.h>
yangg59dfc902014-12-19 14:00:14 -080048#include <grpc++/config.h>
49#include <grpc++/credentials.h>
yangg1b151092015-01-09 15:31:05 -080050#include <grpc++/impl/rpc_method.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080051#include <grpc++/status.h>
yangg59dfc902014-12-19 14:00:14 -080052#include <google/protobuf/message.h>
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080053
54namespace grpc {
55
Craig Tillerecd49342015-01-18 14:36:47 -080056Channel::Channel(const grpc::string &target, const ChannelArguments &args)
yangg59dfc902014-12-19 14:00:14 -080057 : target_(target) {
58 grpc_channel_args channel_args;
59 args.SetChannelArgs(&channel_args);
60 c_channel_ = grpc_channel_create(
61 target_.c_str(), channel_args.num_args > 0 ? &channel_args : nullptr);
62}
63
Craig Tillerecd49342015-01-18 14:36:47 -080064Channel::Channel(const grpc::string &target,
65 const std::unique_ptr<Credentials> &creds,
66 const ChannelArguments &args)
yangg7cebec72014-12-22 12:47:45 -080067 : target_(args.GetSslTargetNameOverride().empty()
68 ? target
69 : args.GetSslTargetNameOverride()) {
yangg59dfc902014-12-19 14:00:14 -080070 grpc_channel_args channel_args;
71 args.SetChannelArgs(&channel_args);
Craig Tillerecd49342015-01-18 14:36:47 -080072 grpc_credentials *c_creds = creds ? creds->GetRawCreds() : nullptr;
yangg59dfc902014-12-19 14:00:14 -080073 c_channel_ = grpc_secure_channel_create(
yangg4105e2b2015-01-09 14:19:44 -080074 c_creds, target.c_str(),
yangg59dfc902014-12-19 14:00:14 -080075 channel_args.num_args > 0 ? &channel_args : nullptr);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080076}
77
78Channel::~Channel() { grpc_channel_destroy(c_channel_); }
79
80namespace {
nnoble0c475f02014-12-05 15:37:39 -080081// Pluck the finished event and set to status when it is not nullptr.
Craig Tillerecd49342015-01-18 14:36:47 -080082void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag,
83 Status *status) {
84 grpc_event *ev =
nnoble0c475f02014-12-05 15:37:39 -080085 grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future);
86 if (status) {
ctiller2845cad2014-12-15 15:14:12 -080087 StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status);
nnoble0c475f02014-12-05 15:37:39 -080088 grpc::string details(ev->data.finished.details ? ev->data.finished.details
89 : "");
90 *status = Status(error_code, details);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080091 }
92 grpc_event_finish(ev);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080093}
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -080094} // namespace
95
96// TODO(yangg) more error handling
Craig Tillerecd49342015-01-18 14:36:47 -080097Status Channel::StartBlockingRpc(const RpcMethod &method,
98 ClientContext *context,
99 const google::protobuf::Message &request,
100 google::protobuf::Message *result) {
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800101 Status status;
Craig Tillerecd49342015-01-18 14:36:47 -0800102 grpc_call *call = grpc_channel_create_call(
yangg246ec3b2014-12-17 16:48:06 -0800103 c_channel_, method.name(), target_.c_str(), context->RawDeadline());
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800104 context->set_call(call);
Craig Tillerecd49342015-01-18 14:36:47 -0800105 grpc_event *ev;
106 void *finished_tag = reinterpret_cast<char *>(call);
107 void *invoke_tag = reinterpret_cast<char *>(call) + 1;
108 void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
109 void *write_tag = reinterpret_cast<char *>(call) + 3;
110 void *halfclose_tag = reinterpret_cast<char *>(call) + 4;
111 void *read_tag = reinterpret_cast<char *>(call) + 5;
nnoble0c475f02014-12-05 15:37:39 -0800112
Craig Tillerecd49342015-01-18 14:36:47 -0800113 grpc_completion_queue *cq = grpc_completion_queue_create();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800114 context->set_cq(cq);
115 // add_metadata from context
116 //
117 // invoke
nnoble0c475f02014-12-05 15:37:39 -0800118 GPR_ASSERT(grpc_call_start_invoke(call, cq, invoke_tag, metadata_read_tag,
119 finished_tag,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800120 GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
nnoble0c475f02014-12-05 15:37:39 -0800121 ev = grpc_completion_queue_pluck(cq, invoke_tag, gpr_inf_future);
yangg4105e2b2015-01-09 14:19:44 -0800122 bool success = ev->data.invoke_accepted == GRPC_OP_OK;
nnoble0c475f02014-12-05 15:37:39 -0800123 grpc_event_finish(ev);
yangg4105e2b2015-01-09 14:19:44 -0800124 if (!success) {
125 GetFinalStatus(cq, finished_tag, &status);
126 return status;
127 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800128 // write request
Craig Tillerecd49342015-01-18 14:36:47 -0800129 grpc_byte_buffer *write_buffer = nullptr;
yangg4105e2b2015-01-09 14:19:44 -0800130 success = SerializeProto(request, &write_buffer);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800131 if (!success) {
132 grpc_call_cancel(call);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800133 status =
134 Status(StatusCode::DATA_LOSS, "Failed to serialize request proto.");
nnoble0c475f02014-12-05 15:37:39 -0800135 GetFinalStatus(cq, finished_tag, nullptr);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800136 return status;
137 }
nnoble0c475f02014-12-05 15:37:39 -0800138 GPR_ASSERT(grpc_call_start_write(call, write_buffer, write_tag,
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800139 GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
140 grpc_byte_buffer_destroy(write_buffer);
nnoble0c475f02014-12-05 15:37:39 -0800141 ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future);
142
143 success = ev->data.write_accepted == GRPC_OP_OK;
144 grpc_event_finish(ev);
145 if (!success) {
146 GetFinalStatus(cq, finished_tag, &status);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800147 return status;
148 }
149 // writes done
nnoble0c475f02014-12-05 15:37:39 -0800150 GPR_ASSERT(grpc_call_writes_done(call, halfclose_tag) == GRPC_CALL_OK);
151 ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future);
152 grpc_event_finish(ev);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800153 // start read metadata
154 //
nnoble0c475f02014-12-05 15:37:39 -0800155 ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future);
156 grpc_event_finish(ev);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800157 // start read
nnoble0c475f02014-12-05 15:37:39 -0800158 GPR_ASSERT(grpc_call_start_read(call, read_tag) == GRPC_CALL_OK);
159 ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future);
160 if (ev->data.read) {
161 if (!DeserializeProto(ev->data.read, result)) {
162 grpc_event_finish(ev);
163 status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto.");
164 GetFinalStatus(cq, finished_tag, nullptr);
165 return status;
166 }
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800167 }
nnoble0c475f02014-12-05 15:37:39 -0800168 grpc_event_finish(ev);
169
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800170 // wait status
nnoble0c475f02014-12-05 15:37:39 -0800171 GetFinalStatus(cq, finished_tag, &status);
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800172 return status;
173}
174
Craig Tillerecd49342015-01-18 14:36:47 -0800175StreamContextInterface *Channel::CreateStream(
176 const RpcMethod &method, ClientContext *context,
177 const google::protobuf::Message *request,
178 google::protobuf::Message *result) {
179 grpc_call *call = grpc_channel_create_call(
yangg03df7f62015-01-07 11:41:10 -0800180 c_channel_, method.name(), target_.c_str(), context->RawDeadline());
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800181 context->set_call(call);
Craig Tillerecd49342015-01-18 14:36:47 -0800182 grpc_completion_queue *cq = grpc_completion_queue_create();
Nicolas Nobleb7ebd3b2014-11-26 16:33:03 -0800183 context->set_cq(cq);
184 return new StreamContext(method, context, request, result);
185}
186
187} // namespace grpc