Merge pull request #1067 from yang-g/print
Print out status when it is not ok in interop tests
diff --git a/Makefile b/Makefile
index f63926b..8be6c5f 100644
--- a/Makefile
+++ b/Makefile
@@ -1837,6 +1837,11 @@
$(Q) $(BINDIR)/$(CONFIG)/thread_pool_test || ( echo test thread_pool_test failed ; exit 1 )
+test_python: static_c
+ $(E) "[RUN] Testing python code"
+ $(Q) tools/run_tests/run_tests.py -lpython -c$(CONFIG)
+
+
tools: privatelibs $(BINDIR)/$(CONFIG)/gen_hpack_tables $(BINDIR)/$(CONFIG)/grpc_create_jwt $(BINDIR)/$(CONFIG)/grpc_fetch_oauth2 $(BINDIR)/$(CONFIG)/grpc_print_google_default_creds_token
buildbenchmarks: privatelibs $(BINDIR)/$(CONFIG)/grpc_completion_queue_benchmark $(BINDIR)/$(CONFIG)/low_level_ping_pong_benchmark
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index f741e3c..d742d85 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -34,6 +34,7 @@
#ifndef GRPCXX_COMPLETION_QUEUE_H
#define GRPCXX_COMPLETION_QUEUE_H
+#include <chrono>
#include <grpc++/impl/client_unary_call.h>
struct grpc_completion_queue;
@@ -75,10 +76,21 @@
explicit CompletionQueue(grpc_completion_queue *take);
~CompletionQueue();
- // Blocking read from queue.
- // Returns true if an event was received, false if the queue is ready
- // for destruction.
- bool Next(void **tag, bool *ok);
+ // Tri-state return for AsyncNext: SHUTDOWN, GOT_EVENT, TIMEOUT
+ enum NextStatus {SHUTDOWN, GOT_EVENT, TIMEOUT};
+
+ // Nonblocking (until deadline) read from queue.
+ // Cannot rely on result of tag or ok if return is TIMEOUT
+ NextStatus AsyncNext(void **tag, bool *ok,
+ std::chrono::system_clock::time_point deadline);
+
+ // Blocking (until deadline) read from queue.
+ // Returns false if the queue is ready for destruction, true if event
+ bool Next(void **tag, bool *ok) {
+ return (AsyncNext(tag,ok,
+ std::chrono::system_clock::time_point::max()) !=
+ SHUTDOWN);
+ }
// Shutdown has to be called, and the CompletionQueue can only be
// destructed when false is returned from Next().
diff --git a/include/grpc++/credentials.h b/include/grpc++/credentials.h
index 59ad638..2ac3eec 100644
--- a/include/grpc++/credentials.h
+++ b/include/grpc++/credentials.h
@@ -50,8 +50,8 @@
protected:
friend std::unique_ptr<Credentials> CompositeCredentials(
- const std::unique_ptr<Credentials>& creds1,
- const std::unique_ptr<Credentials>& creds2);
+ const std::unique_ptr<Credentials>& creds1,
+ const std::unique_ptr<Credentials>& creds2);
virtual SecureCredentials* AsSecureCredentials() = 0;
@@ -113,6 +113,12 @@
std::unique_ptr<Credentials> JWTCredentials(
const grpc::string& json_key, std::chrono::seconds token_lifetime);
+// Builds refresh token credentials.
+// json_refresh_token is the JSON string containing the refresh token along
+// with a client_id and client_secret.
+std::unique_ptr<Credentials> RefreshTokenCredentials(
+ const grpc::string& json_refresh_token);
+
// Builds IAM credentials.
std::unique_ptr<Credentials> IAMCredentials(
const grpc::string& authorization_token,
diff --git a/include/grpc/support/atm.h b/include/grpc/support/atm.h
index feca6b3..ba8d7f5 100644
--- a/include/grpc/support/atm.h
+++ b/include/grpc/support/atm.h
@@ -83,7 +83,7 @@
#include <grpc/support/atm_gcc_atomic.h>
#elif defined(GPR_GCC_SYNC)
#include <grpc/support/atm_gcc_sync.h>
-#elif defined(GPR_WIN32)
+#elif defined(GPR_WIN32_ATOMIC)
#include <grpc/support/atm_win32.h>
#else
#error could not determine platform for atm
diff --git a/include/grpc/support/atm_win32.h b/include/grpc/support/atm_win32.h
index 18bf372..8b53224 100644
--- a/include/grpc/support/atm_win32.h
+++ b/include/grpc/support/atm_win32.h
@@ -51,7 +51,7 @@
static __inline gpr_atm gpr_atm_no_barrier_load(const gpr_atm *p) {
/* TODO(dklempner): Can we implement something better here? */
- gpr_atm_acq_load(p);
+ return gpr_atm_acq_load(p);
}
static __inline void gpr_atm_rel_store(gpr_atm *p, gpr_atm value) {
diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h
index f04c2e7..1b613dc 100644
--- a/include/grpc/support/port_platform.h
+++ b/include/grpc/support/port_platform.h
@@ -43,11 +43,21 @@
#define GPR_ARCH_64 1
#define GPR_GETPID_IN_PROCESS_H 1
#define GPR_WINSOCK_SOCKET 1
+#ifdef __GNUC__
+#define GPR_GCC_ATOMIC 1
+#else
+#define GPR_WIN32_ATOMIC 1
+#endif
#elif defined(_WIN32) || defined(WIN32)
#define GPR_ARCH_32 1
#define GPR_WIN32 1
#define GPR_GETPID_IN_PROCESS_H 1
#define GPR_WINSOCK_SOCKET 1
+#ifdef __GNUC__
+#define GPR_GCC_ATOMIC 1
+#else
+#define GPR_WIN32_ATOMIC 1
+#endif
#elif defined(ANDROID) || defined(__ANDROID__)
#define GPR_ANDROID 1
#define GPR_ARCH_32 1
@@ -167,8 +177,8 @@
#endif
/* Validate platform combinations */
-#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32) != 1
-#error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32
+#if defined(GPR_GCC_ATOMIC) + defined(GPR_GCC_SYNC) + defined(GPR_WIN32_ATOMIC) != 1
+#error Must define exactly one of GPR_GCC_ATOMIC, GPR_GCC_SYNC, GPR_WIN32_ATOMIC
#endif
#if defined(GPR_ARCH_32) + defined(GPR_ARCH_64) != 1
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index b5022d5..e4f8545 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -229,7 +229,8 @@
return true;
}
-bool PrintServerFactory(const ServiceDescriptor* service, Printer* out) {
+bool PrintServerFactory(const std::string& package_qualified_service_name,
+ const ServiceDescriptor* service, Printer* out) {
out->Print("def early_adopter_create_$Service$_server(servicer, port, "
"root_certificates, key_chain_pairs):\n",
"Service", service->name());
@@ -293,17 +294,18 @@
out->Print("),\n");
}
out->Print("}\n");
- // out->Print("return implementations.insecure_server("
- // "method_service_descriptions, port)\n");
out->Print(
"return implementations.secure_server("
- "method_service_descriptions, port, root_certificates,"
- " key_chain_pairs)\n");
+ "\"$PackageQualifiedServiceName$\","
+ " method_service_descriptions, port, root_certificates,"
+ " key_chain_pairs)\n",
+ "PackageQualifiedServiceName", package_qualified_service_name);
}
return true;
}
-bool PrintStubFactory(const ServiceDescriptor* service, Printer* out) {
+bool PrintStubFactory(const std::string& package_qualified_service_name,
+ const ServiceDescriptor* service, Printer* out) {
map<std::string, std::string> dict = ListToDict({
"Service", service->name(),
});
@@ -369,7 +371,9 @@
out->Print("}\n");
out->Print(
"return implementations.insecure_stub("
- "method_invocation_descriptions, host, port)\n");
+ "\"$PackageQualifiedServiceName$\","
+ " method_invocation_descriptions, host, port)\n",
+ "PackageQualifiedServiceName", package_qualified_service_name);
}
return true;
}
@@ -377,7 +381,7 @@
bool PrintPreamble(const FileDescriptor* file, Printer* out) {
out->Print("import abc\n");
out->Print("from grpc.early_adopter import implementations\n");
- out->Print("from grpc.early_adopter import utilities\n");
+ out->Print("from grpc.framework.alpha import utilities\n");
return true;
}
@@ -392,13 +396,18 @@
if (!PrintPreamble(file, &out)) {
return make_pair(false, "");
}
+ auto package = file->package();
+ if (!package.empty()) {
+ package = package.append(".");
+ }
for (int i = 0; i < file->service_count(); ++i) {
auto service = file->service(i);
+ auto package_qualified_service_name = package + service->name();
if (!(PrintServicer(service, &out) &&
PrintServer(service, &out) &&
PrintStub(service, &out) &&
- PrintServerFactory(service, &out) &&
- PrintStubFactory(service, &out))) {
+ PrintServerFactory(package_qualified_service_name, service, &out) &&
+ PrintStubFactory(package_qualified_service_name, service, &out))) {
return make_pair(false, "");
}
}
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index d1616a3..f565cbf 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -66,6 +66,10 @@
grpc_mdelem *status_ok;
grpc_mdelem *status_not_found;
grpc_mdstr *path_key;
+ grpc_mdstr *authority_key;
+ grpc_mdstr *host_key;
+
+ grpc_mdctx *mdctx;
size_t gettable_count;
gettable *gettables;
@@ -181,6 +185,15 @@
}
calld->path = op->data.metadata;
op->done_cb(op->user_data, GRPC_OP_OK);
+ } else if (op->data.metadata->key == channeld->host_key) {
+ /* translate host to :authority since :authority may be
+ omitted */
+ grpc_mdelem *authority = grpc_mdelem_from_metadata_strings(
+ channeld->mdctx, channeld->authority_key, op->data.metadata->value);
+ grpc_mdelem_unref(op->data.metadata);
+ op->data.metadata = authority;
+ /* pass the event up */
+ grpc_call_next_op(elem, op);
} else {
/* pass the event up */
grpc_call_next_op(elem, op);
@@ -305,9 +318,13 @@
channeld->https_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "https");
channeld->grpc_scheme = grpc_mdelem_from_strings(mdctx, ":scheme", "grpc");
channeld->path_key = grpc_mdstr_from_string(mdctx, ":path");
+ channeld->authority_key = grpc_mdstr_from_string(mdctx, ":authority");
+ channeld->host_key = grpc_mdstr_from_string(mdctx, "host");
channeld->content_type =
grpc_mdelem_from_strings(mdctx, "content-type", "application/grpc");
+ channeld->mdctx = mdctx;
+
/* initialize http download support */
channeld->gettable_count = 0;
channeld->gettables = NULL;
@@ -357,6 +374,8 @@
grpc_mdelem_unref(channeld->grpc_scheme);
grpc_mdelem_unref(channeld->content_type);
grpc_mdstr_unref(channeld->path_key);
+ grpc_mdstr_unref(channeld->authority_key);
+ grpc_mdstr_unref(channeld->host_key);
}
const grpc_channel_filter grpc_http_server_filter = {
diff --git a/src/cpp/client/secure_credentials.cc b/src/cpp/client/secure_credentials.cc
index a2f6a69..e3c6637 100644
--- a/src/cpp/client/secure_credentials.cc
+++ b/src/cpp/client/secure_credentials.cc
@@ -55,7 +55,8 @@
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
args.GetSslTargetNameOverride().empty()
- ? target : args.GetSslTargetNameOverride(),
+ ? target
+ : args.GetSslTargetNameOverride(),
grpc_secure_channel_create(c_creds_, target.c_str(), &channel_args)));
}
@@ -111,7 +112,7 @@
// Builds JWT credentials.
std::unique_ptr<Credentials> JWTCredentials(
- const grpc::string &json_key, std::chrono::seconds token_lifetime) {
+ const grpc::string& json_key, std::chrono::seconds token_lifetime) {
if (token_lifetime.count() <= 0) {
gpr_log(GPR_ERROR,
"Trying to create JWTCredentials with non-positive lifetime");
@@ -122,6 +123,13 @@
grpc_jwt_credentials_create(json_key.c_str(), lifetime));
}
+// Builds refresh token credentials.
+std::unique_ptr<Credentials> RefreshTokenCredentials(
+ const grpc::string& json_refresh_token) {
+ return WrapCredentials(
+ grpc_refresh_token_credentials_create(json_refresh_token.c_str()));
+}
+
// Builds IAM credentials.
std::unique_ptr<Credentials> IAMCredentials(
const grpc::string& authorization_token,
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 414966c..fede2da 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -57,19 +57,26 @@
}
};
-bool CompletionQueue::Next(void** tag, bool* ok) {
+CompletionQueue::NextStatus
+CompletionQueue::AsyncNext(void** tag, bool* ok,
+ std::chrono::system_clock::time_point deadline) {
std::unique_ptr<grpc_event, EventDeleter> ev;
+ gpr_timespec gpr_deadline;
+ Timepoint2Timespec(deadline, &gpr_deadline);
for (;;) {
- ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
+ ev.reset(grpc_completion_queue_next(cq_, gpr_deadline));
+ if (!ev) { /* got a NULL back because deadline passed */
+ return TIMEOUT;
+ }
if (ev->type == GRPC_QUEUE_SHUTDOWN) {
- return false;
+ return SHUTDOWN;
}
auto cq_tag = static_cast<CompletionQueueTag*>(ev->tag);
*ok = ev->data.op_complete == GRPC_OP_OK;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
- return true;
+ return GOT_EVENT;
}
}
}
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index afb6541..8cc3e38 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -75,6 +75,9 @@
NanCallback *Call::constructor;
Persistent<FunctionTemplate> Call::fun_tpl;
+bool EndsWith(const char *str, const char *substr) {
+ return strcmp(str+strlen(str)-strlen(substr), substr) == 0;
+}
bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array,
shared_ptr<Resources> resources) {
@@ -99,14 +102,19 @@
Handle<Value> value = values->Get(j);
grpc_metadata *current = &array->metadata[array->count];
current->key = **utf8_key;
- if (::node::Buffer::HasInstance(value)) {
- current->value = ::node::Buffer::Data(value);
- current->value_length = ::node::Buffer::Length(value);
- Persistent<Value> *handle = new Persistent<Value>();
- NanAssignPersistent(*handle, value);
- resources->handles.push_back(unique_ptr<PersistentHolder>(
- new PersistentHolder(handle)));
- } else if (value->IsString()) {
+ // Only allow binary headers for "-bin" keys
+ if (EndsWith(current->key, "-bin")) {
+ if (::node::Buffer::HasInstance(value)) {
+ current->value = ::node::Buffer::Data(value);
+ current->value_length = ::node::Buffer::Length(value);
+ Persistent<Value> *handle = new Persistent<Value>();
+ NanAssignPersistent(*handle, value);
+ resources->handles.push_back(unique_ptr<PersistentHolder>(
+ new PersistentHolder(handle)));
+ continue;
+ }
+ }
+ if (value->IsString()) {
Handle<String> string_value = value->ToString();
NanUtf8String *utf8_value = new NanUtf8String(string_value);
resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value));
@@ -146,9 +154,13 @@
array = NanNew<Array>(size_map[elem->key]);
metadata_object->Set(key_string, array);
}
- array->Set(index_map[elem->key],
- MakeFastBuffer(
- NanNewBufferHandle(elem->value, elem->value_length)));
+ if (EndsWith(elem->key, "-bin")) {
+ array->Set(index_map[elem->key],
+ MakeFastBuffer(
+ NanNewBufferHandle(elem->value, elem->value_length)));
+ } else {
+ array->Set(index_map[elem->key], NanNew(elem->value));
+ }
index_map[elem->key] += 1;
}
return NanEscapeScope(metadata_object);
diff --git a/src/node/package.json b/src/node/package.json
index 1d0aa0e..9f52f8c 100644
--- a/src/node/package.json
+++ b/src/node/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc",
- "version": "0.5.5",
+ "version": "0.6.0",
"author": "Google Inc.",
"description": "gRPC Library for Node",
"homepage": "http://www.grpc.io/",
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index 7b2b36a..98158ff 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -142,8 +142,8 @@
assert.doesNotThrow(function() {
var batch = {};
batch[grpc.opType.SEND_INITIAL_METADATA] = {
- 'key1': [new Buffer('value1')],
- 'key2': [new Buffer('value2')]
+ 'key1-bin': [new Buffer('value1')],
+ 'key2-bin': [new Buffer('value2')]
};
call.startBatch(batch, function(err, resp) {
assert.ifError(err);
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index c39364d..60e9861 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -138,21 +138,21 @@
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
assert.ifError(err);
- assert(response['send metadata']);
- assert(response['client close']);
- assert(response.hasOwnProperty('metadata'));
- assert.strictEqual(response.metadata.server_key[0].toString(),
- 'server_value');
- assert.deepEqual(response.status, {'code': grpc.status.OK,
- 'details': status_text,
- 'metadata': {}});
+ assert.deepEqual(response,{
+ 'send metadata': true,
+ 'client close': true,
+ metadata: {server_key: ['server_value']},
+ status: {'code': grpc.status.OK,
+ 'details': status_text,
+ 'metadata': {}}
+ });
done();
});
server.requestCall(function(err, call_details) {
var new_call = call_details['new call'];
assert.notEqual(new_call, null);
- assert.strictEqual(new_call.metadata.client_key[0].toString(),
+ assert.strictEqual(new_call.metadata.client_key[0],
'client_value');
var server_call = new_call.call;
assert.notEqual(server_call, null);
diff --git a/src/python/interop/interop/_insecure_interop_test.py b/src/python/interop/interop/_insecure_interop_test.py
index 1fa6b8b..e4ddff1 100644
--- a/src/python/interop/interop/_insecure_interop_test.py
+++ b/src/python/interop/interop/_insecure_interop_test.py
@@ -42,11 +42,12 @@
unittest.TestCase):
def setUp(self):
- self.server = implementations.insecure_server(methods.SERVER_METHODS, 0)
+ self.server = implementations.insecure_server(
+ methods.SERVICE_NAME, methods.SERVER_METHODS, 0)
self.server.start()
port = self.server.port()
self.stub = implementations.insecure_stub(
- methods.CLIENT_METHODS, 'localhost', port)
+ methods.SERVICE_NAME, methods.CLIENT_METHODS, 'localhost', port)
def tearDown(self):
self.server.stop()
diff --git a/src/python/interop/interop/_secure_interop_test.py b/src/python/interop/interop/_secure_interop_test.py
index cc9e938..214212d 100644
--- a/src/python/interop/interop/_secure_interop_test.py
+++ b/src/python/interop/interop/_secure_interop_test.py
@@ -46,12 +46,12 @@
def setUp(self):
self.server = implementations.secure_server(
- methods.SERVER_METHODS, 0, resources.private_key(),
- resources.certificate_chain())
+ methods.SERVICE_NAME, methods.SERVER_METHODS, 0,
+ resources.private_key(), resources.certificate_chain())
self.server.start()
port = self.server.port()
self.stub = implementations.secure_stub(
- methods.CLIENT_METHODS, 'localhost', port,
+ methods.SERVICE_NAME, methods.CLIENT_METHODS, 'localhost', port,
resources.test_root_certificates(), None, None,
server_host_override=_SERVER_HOST_OVERRIDE)
diff --git a/src/python/interop/interop/client.py b/src/python/interop/interop/client.py
index b674a64..fb7dfb5 100644
--- a/src/python/interop/interop/client.py
+++ b/src/python/interop/interop/client.py
@@ -67,12 +67,13 @@
root_certificates = resources.prod_root_certificates()
stub = implementations.secure_stub(
- methods.CLIENT_METHODS, args.server_host, args.server_port,
- root_certificates, None, None,
+ methods.SERVICE_NAME, methods.CLIENT_METHODS, args.server_host,
+ args.server_port, root_certificates, None, None,
server_host_override=args.server_host_override)
else:
stub = implementations.insecure_stub(
- methods.CLIENT_METHODS, args.server_host, args.server_port)
+ methods.SERVICE_NAME, methods.CLIENT_METHODS, args.server_host,
+ args.server_port)
return stub
diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py
index 2e15fac..79550a3 100644
--- a/src/python/interop/interop/methods.py
+++ b/src/python/interop/interop/methods.py
@@ -32,7 +32,7 @@
import enum
import threading
-from grpc.early_adopter import utilities
+from grpc.framework.alpha import utilities
from interop import empty_pb2
from interop import messages_pb2
@@ -122,31 +122,31 @@
messages_pb2.StreamingOutputCallResponse.SerializeToString)
-_SERVICE_NAME = '/grpc.testing.TestService'
+SERVICE_NAME = 'grpc.testing.TestService'
-EMPTY_CALL_METHOD_NAME = _SERVICE_NAME + '/EmptyCall'
-UNARY_CALL_METHOD_NAME = _SERVICE_NAME + '/UnaryCall'
-STREAMING_OUTPUT_CALL_METHOD_NAME = _SERVICE_NAME + '/StreamingOutputCall'
-STREAMING_INPUT_CALL_METHOD_NAME = _SERVICE_NAME + '/StreamingInputCall'
-FULL_DUPLEX_CALL_METHOD_NAME = _SERVICE_NAME + '/FullDuplexCall'
-HALF_DUPLEX_CALL_METHOD_NAME = _SERVICE_NAME + '/HalfDuplexCall'
+_EMPTY_CALL_METHOD_NAME = 'EmptyCall'
+_UNARY_CALL_METHOD_NAME = 'UnaryCall'
+_STREAMING_OUTPUT_CALL_METHOD_NAME = 'StreamingOutputCall'
+_STREAMING_INPUT_CALL_METHOD_NAME = 'StreamingInputCall'
+_FULL_DUPLEX_CALL_METHOD_NAME = 'FullDuplexCall'
+_HALF_DUPLEX_CALL_METHOD_NAME = 'HalfDuplexCall'
CLIENT_METHODS = {
- EMPTY_CALL_METHOD_NAME: _CLIENT_EMPTY_CALL,
- UNARY_CALL_METHOD_NAME: _CLIENT_UNARY_CALL,
- STREAMING_OUTPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_OUTPUT_CALL,
- STREAMING_INPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_INPUT_CALL,
- FULL_DUPLEX_CALL_METHOD_NAME: _CLIENT_FULL_DUPLEX_CALL,
- HALF_DUPLEX_CALL_METHOD_NAME: _CLIENT_HALF_DUPLEX_CALL,
+ _EMPTY_CALL_METHOD_NAME: _CLIENT_EMPTY_CALL,
+ _UNARY_CALL_METHOD_NAME: _CLIENT_UNARY_CALL,
+ _STREAMING_OUTPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_OUTPUT_CALL,
+ _STREAMING_INPUT_CALL_METHOD_NAME: _CLIENT_STREAMING_INPUT_CALL,
+ _FULL_DUPLEX_CALL_METHOD_NAME: _CLIENT_FULL_DUPLEX_CALL,
+ _HALF_DUPLEX_CALL_METHOD_NAME: _CLIENT_HALF_DUPLEX_CALL,
}
SERVER_METHODS = {
- EMPTY_CALL_METHOD_NAME: _SERVER_EMPTY_CALL,
- UNARY_CALL_METHOD_NAME: _SERVER_UNARY_CALL,
- STREAMING_OUTPUT_CALL_METHOD_NAME: _SERVER_STREAMING_OUTPUT_CALL,
- STREAMING_INPUT_CALL_METHOD_NAME: _SERVER_STREAMING_INPUT_CALL,
- FULL_DUPLEX_CALL_METHOD_NAME: _SERVER_FULL_DUPLEX_CALL,
- HALF_DUPLEX_CALL_METHOD_NAME: _SERVER_HALF_DUPLEX_CALL,
+ _EMPTY_CALL_METHOD_NAME: _SERVER_EMPTY_CALL,
+ _UNARY_CALL_METHOD_NAME: _SERVER_UNARY_CALL,
+ _STREAMING_OUTPUT_CALL_METHOD_NAME: _SERVER_STREAMING_OUTPUT_CALL,
+ _STREAMING_INPUT_CALL_METHOD_NAME: _SERVER_STREAMING_INPUT_CALL,
+ _FULL_DUPLEX_CALL_METHOD_NAME: _SERVER_FULL_DUPLEX_CALL,
+ _HALF_DUPLEX_CALL_METHOD_NAME: _SERVER_HALF_DUPLEX_CALL,
}
diff --git a/src/python/interop/interop/server.py b/src/python/interop/interop/server.py
index 4e4b127..5791203 100644
--- a/src/python/interop/interop/server.py
+++ b/src/python/interop/interop/server.py
@@ -54,10 +54,11 @@
private_key = resources.private_key()
certificate_chain = resources.certificate_chain()
server = implementations.secure_server(
- methods.SERVER_METHODS, args.port, private_key, certificate_chain)
+ methods.SERVICE_NAME, methods.SERVER_METHODS, args.port, private_key,
+ certificate_chain)
else:
server = implementations.insecure_server(
- methods.SERVER_METHODS, args.port)
+ methods.SERVICE_NAME, methods.SERVER_METHODS, args.port)
server.start()
logging.info('Server serving.')
diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py
index 2542eb6..923e889 100644
--- a/src/python/src/grpc/_adapter/_face_test_case.py
+++ b/src/python/src/grpc/_adapter/_face_test_case.py
@@ -34,7 +34,7 @@
from grpc._adapter import fore
from grpc._adapter import rear
from grpc.framework.base import util
-from grpc.framework.base.packets import implementations as tickets_implementations
+from grpc.framework.base import implementations as base_implementations
from grpc.framework.face import implementations as face_implementations
from grpc.framework.face.testing import coverage
from grpc.framework.face.testing import serial
@@ -69,8 +69,8 @@
serialization.request_serializers,
serialization.response_deserializers, False, None, None, None)
rear_link.start()
- front = tickets_implementations.front(pool, pool, pool)
- back = tickets_implementations.back(
+ front = base_implementations.front_link(pool, pool, pool)
+ back = base_implementations.back_link(
servicer, pool, pool, pool, _TIMEOUT, _MAXIMUM_TIMEOUT)
fore_link.join_rear_link(back)
back.join_fore_link(fore_link)
diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py
index 49fd1f7..cfdcc2c 100644
--- a/src/python/src/grpc/_adapter/_links_test.py
+++ b/src/python/src/grpc/_adapter/_links_test.py
@@ -37,7 +37,6 @@
from grpc._adapter import fore
from grpc._adapter import rear
from grpc.framework.base import interfaces
-from grpc.framework.base.packets import packets as tickets
from grpc.framework.foundation import logging_pool
_IDENTITY = lambda x: x
@@ -60,11 +59,11 @@
test_fore_link = _test_links.ForeLink(None, None)
def rear_action(front_to_back_ticket, fore_link):
if front_to_back_ticket.kind in (
- tickets.FrontToBackPacket.Kind.COMPLETION,
- tickets.FrontToBackPacket.Kind.ENTIRE):
- back_to_front_ticket = tickets.BackToFrontPacket(
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE):
+ back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, 0,
- tickets.BackToFrontPacket.Kind.COMPLETION, None)
+ interfaces.BackToFrontTicket.Kind.COMPLETION, None)
fore_link.accept_back_to_front_ticket(back_to_front_ticket)
test_rear_link = _test_links.RearLink(rear_action, None)
@@ -82,8 +81,8 @@
test_fore_link.join_rear_link(rear_link)
rear_link.start()
- front_to_back_ticket = tickets.FrontToBackPacket(
- test_operation_id, 0, tickets.FrontToBackPacket.Kind.ENTIRE,
+ front_to_back_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
@@ -91,7 +90,7 @@
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is
- tickets.BackToFrontPacket.Kind.CONTINUATION):
+ interfaces.BackToFrontTicket.Kind.CONTINUATION):
test_fore_link.condition.wait()
rear_link.stop()
@@ -100,7 +99,7 @@
with test_fore_link.condition:
self.assertIs(
test_fore_link.tickets[-1].kind,
- tickets.BackToFrontPacket.Kind.COMPLETION)
+ interfaces.BackToFrontTicket.Kind.COMPLETION)
def testEntireRoundTrip(self):
test_operation_id = object()
@@ -115,14 +114,14 @@
else:
payload = test_back_to_front_datum
terminal = front_to_back_ticket.kind in (
- tickets.FrontToBackPacket.Kind.COMPLETION,
- tickets.FrontToBackPacket.Kind.ENTIRE)
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
if payload is not None or terminal:
if terminal:
- kind = tickets.BackToFrontPacket.Kind.COMPLETION
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
- kind = tickets.BackToFrontPacket.Kind.CONTINUATION
- back_to_front_ticket = tickets.BackToFrontPacket(
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
payload)
rear_sequence_number[0] += 1
@@ -144,8 +143,8 @@
test_fore_link.join_rear_link(rear_link)
rear_link.start()
- front_to_back_ticket = tickets.FrontToBackPacket(
- test_operation_id, 0, tickets.FrontToBackPacket.Kind.ENTIRE,
+ front_to_back_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0, interfaces.FrontToBackTicket.Kind.ENTIRE,
test_method, interfaces.ServicedSubscription.Kind.FULL, None,
test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
@@ -153,7 +152,7 @@
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
- tickets.BackToFrontPacket.Kind.COMPLETION):
+ interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
@@ -183,14 +182,14 @@
else:
response = None
terminal = front_to_back_ticket.kind in (
- tickets.FrontToBackPacket.Kind.COMPLETION,
- tickets.FrontToBackPacket.Kind.ENTIRE)
+ interfaces.FrontToBackTicket.Kind.COMPLETION,
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
if response is not None or terminal:
if terminal:
- kind = tickets.BackToFrontPacket.Kind.COMPLETION
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
- kind = tickets.BackToFrontPacket.Kind.CONTINUATION
- back_to_front_ticket = tickets.BackToFrontPacket(
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ back_to_front_ticket = interfaces.BackToFrontTicket(
front_to_back_ticket.operation_id, rear_sequence_number[0], kind,
response)
rear_sequence_number[0] += 1
@@ -213,22 +212,23 @@
test_fore_link.join_rear_link(rear_link)
rear_link.start()
- commencement_ticket = tickets.FrontToBackPacket(
- test_operation_id, 0, tickets.FrontToBackPacket.Kind.COMMENCEMENT,
- test_method, interfaces.ServicedSubscription.Kind.FULL, None, None,
+ commencement_ticket = interfaces.FrontToBackTicket(
+ test_operation_id, 0,
+ interfaces.FrontToBackTicket.Kind.COMMENCEMENT, test_method,
+ interfaces.ServicedSubscription.Kind.FULL, None, None,
_TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():
- continuation_ticket = tickets.FrontToBackPacket(
+ continuation_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
- tickets.FrontToBackPacket.Kind.CONTINUATION, None, None, None,
+ interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, None,
request, None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(continuation_ticket)
- completion_ticket = tickets.FrontToBackPacket(
+ completion_ticket = interfaces.FrontToBackTicket(
test_operation_id, fore_sequence_number,
- tickets.FrontToBackPacket.Kind.COMPLETION, None, None, None, None,
+ interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, None,
None)
fore_sequence_number += 1
rear_link.accept_front_to_back_ticket(completion_ticket)
@@ -236,7 +236,7 @@
with test_fore_link.condition:
while (not test_fore_link.tickets or
test_fore_link.tickets[-1].kind is not
- tickets.BackToFrontPacket.Kind.COMPLETION):
+ interfaces.BackToFrontTicket.Kind.COMPLETION):
test_fore_link.condition.wait()
rear_link.stop()
diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
index ead0b9e..25799d6 100644
--- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
+++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
@@ -34,7 +34,6 @@
from grpc._adapter import _test_links
from grpc._adapter import rear
from grpc.framework.base import interfaces
-from grpc.framework.base.packets import packets
from grpc.framework.foundation import logging_pool
_IDENTITY = lambda x: x
@@ -68,7 +67,7 @@
rear_link.join_fore_link(fore_link)
rear_link.start()
- front_to_back_ticket = packets.FrontToBackPacket(
+ front_to_back_ticket = interfaces.FrontToBackTicket(
test_operation_id, 0, front_to_back_ticket_kind, test_method,
interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
@@ -77,7 +76,7 @@
while True:
if (fore_link.tickets and
fore_link.tickets[-1].kind is not
- packets.BackToFrontPacket.Kind.CONTINUATION):
+ interfaces.BackToFrontTicket.Kind.CONTINUATION):
break
fore_link.condition.wait()
@@ -86,15 +85,15 @@
with fore_link.condition:
self.assertIsNot(
fore_link.tickets[-1].kind,
- packets.BackToFrontPacket.Kind.COMPLETION)
+ interfaces.BackToFrontTicket.Kind.COMPLETION)
- def testLonelyClientCommencementPacket(self):
+ def testLonelyClientCommencementTicket(self):
self._perform_lonely_client_test_with_ticket_kind(
- packets.FrontToBackPacket.Kind.COMMENCEMENT)
+ interfaces.FrontToBackTicket.Kind.COMMENCEMENT)
- def testLonelyClientEntirePacket(self):
+ def testLonelyClientEntireTicket(self):
self._perform_lonely_client_test_with_ticket_kind(
- packets.FrontToBackPacket.Kind.ENTIRE)
+ interfaces.FrontToBackTicket.Kind.ENTIRE)
if __name__ == '__main__':
diff --git a/src/python/src/grpc/_adapter/_test_links.py b/src/python/src/grpc/_adapter/_test_links.py
index ac0d6e2..86c7e61 100644
--- a/src/python/src/grpc/_adapter/_test_links.py
+++ b/src/python/src/grpc/_adapter/_test_links.py
@@ -31,7 +31,7 @@
import threading
-from grpc.framework.base.packets import interfaces
+from grpc.framework.base import interfaces
class ForeLink(interfaces.ForeLink):
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index 16e5a20..05016cd 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -36,10 +36,8 @@
from grpc._adapter import _common
from grpc._adapter import _low
-from grpc.framework.base import interfaces
-from grpc.framework.base.packets import interfaces as ticket_interfaces
-from grpc.framework.base.packets import null
-from grpc.framework.base.packets import packets as tickets
+from grpc.framework.base import interfaces as base_interfaces
+from grpc.framework.base import null
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
@@ -69,7 +67,7 @@
rpc_state.write.low = _LowWrite.CLOSED
-class ForeLink(ticket_interfaces.ForeLink, activated.Activated):
+class ForeLink(base_interfaces.ForeLink, activated.Activated):
"""A service-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@@ -127,9 +125,9 @@
self._request_deserializers[method],
self._response_serializers[method])
- ticket = tickets.FrontToBackPacket(
- call, 0, tickets.FrontToBackPacket.Kind.COMMENCEMENT, method,
- interfaces.ServicedSubscription.Kind.FULL, None, None,
+ ticket = base_interfaces.FrontToBackTicket(
+ call, 0, base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT, method,
+ base_interfaces.ServicedSubscription.Kind.FULL, None, None,
service_acceptance.deadline - time.time())
self._rear_link.accept_front_to_back_ticket(ticket)
@@ -145,14 +143,16 @@
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
if event.bytes is None:
- ticket = tickets.FrontToBackPacket(
- call, sequence_number, tickets.FrontToBackPacket.Kind.COMPLETION,
- None, None, None, None, None)
+ ticket = base_interfaces.FrontToBackTicket(
+ call, sequence_number,
+ base_interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None,
+ None, None)
else:
call.read(call)
- ticket = tickets.FrontToBackPacket(
- call, sequence_number, tickets.FrontToBackPacket.Kind.CONTINUATION,
- None, None, None, rpc_state.deserializer(event.bytes), None)
+ ticket = base_interfaces.FrontToBackTicket(
+ call, sequence_number,
+ base_interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None,
+ None, rpc_state.deserializer(event.bytes), None)
self._rear_link.accept_front_to_back_ticket(ticket)
@@ -180,10 +180,10 @@
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
- ticket = tickets.FrontToBackPacket(
+ ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
- tickets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None, None,
- None, None, None)
+ base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None,
+ None, None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)
def _on_finish_event(self, event):
@@ -200,19 +200,21 @@
sequence_number = rpc_state.sequence_number
rpc_state.sequence_number += 1
if code is _low.Code.CANCELLED:
- ticket = tickets.FrontToBackPacket(
- call, sequence_number, tickets.FrontToBackPacket.Kind.CANCELLATION,
- None, None, None, None, None)
+ ticket = base_interfaces.FrontToBackTicket(
+ call, sequence_number,
+ base_interfaces.FrontToBackTicket.Kind.CANCELLATION, None, None,
+ None, None, None)
elif code is _low.Code.EXPIRED:
- ticket = tickets.FrontToBackPacket(
- call, sequence_number, tickets.FrontToBackPacket.Kind.EXPIRATION,
- None, None, None, None, None)
+ ticket = base_interfaces.FrontToBackTicket(
+ call, sequence_number,
+ base_interfaces.FrontToBackTicket.Kind.EXPIRATION, None, None, None,
+ None, None)
else:
# TODO(nathaniel): Better mapping of codes to ticket-categories
- ticket = tickets.FrontToBackPacket(
+ ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
- tickets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE, None, None,
- None, None, None)
+ base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None,
+ None, None, None, None)
self._rear_link.accept_front_to_back_ticket(ticket)
def _spin(self, completion_queue, server):
@@ -268,7 +270,7 @@
self._rpc_states.pop(call, None)
def join_rear_link(self, rear_link):
- """See ticket_interfaces.ForeLink.join_rear_link for specification."""
+ """See base_interfaces.ForeLink.join_rear_link for specification."""
self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link
def _start(self):
@@ -348,14 +350,14 @@
return self._port
def accept_back_to_front_ticket(self, ticket):
- """See ticket_interfaces.ForeLink.accept_back_to_front_ticket for spec."""
+ """See base_interfaces.ForeLink.accept_back_to_front_ticket for spec."""
with self._condition:
if self._server is None:
return
- if ticket.kind is tickets.BackToFrontPacket.Kind.CONTINUATION:
+ if ticket.kind is base_interfaces.BackToFrontTicket.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
- elif ticket.kind is tickets.BackToFrontPacket.Kind.COMPLETION:
+ elif ticket.kind is base_interfaces.BackToFrontTicket.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
else:
self._cancel(ticket.operation_id)
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index eee008e..f19321c 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -36,9 +36,8 @@
from grpc._adapter import _common
from grpc._adapter import _low
-from grpc.framework.base.packets import interfaces as ticket_interfaces
-from grpc.framework.base.packets import null
-from grpc.framework.base.packets import packets as tickets
+from grpc.framework.base import interfaces as base_interfaces
+from grpc.framework.base import null
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
@@ -88,7 +87,7 @@
raise ValueError('Write attempted after writes completed!')
-class RearLink(ticket_interfaces.RearLink, activated.Activated):
+class RearLink(base_interfaces.RearLink, activated.Activated):
"""An invocation-side bridge between RPC Framework and the C-ish _low code."""
def __init__(
@@ -152,9 +151,9 @@
else:
logging.error('RPC write not accepted! Event: %s', (event,))
rpc_state.active = False
- ticket = tickets.BackToFrontPacket(
+ ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number,
- tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None)
+ base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@@ -163,9 +162,9 @@
rpc_state.call.read(operation_id)
rpc_state.outstanding.add(_low.Event.Kind.READ_ACCEPTED)
- ticket = tickets.BackToFrontPacket(
+ ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number,
- tickets.BackToFrontPacket.Kind.CONTINUATION,
+ base_interfaces.BackToFrontTicket.Kind.CONTINUATION,
rpc_state.common.deserializer(event.bytes))
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@@ -174,9 +173,9 @@
if not event.complete_accepted:
logging.error('RPC complete not accepted! Event: %s', (event,))
rpc_state.active = False
- ticket = tickets.BackToFrontPacket(
+ ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number,
- tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE, None)
+ base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@@ -189,14 +188,14 @@
"""Handle termination of an RPC."""
# TODO(nathaniel): Cover all statuses.
if event.status.code is _low.Code.OK:
- kind = tickets.BackToFrontPacket.Kind.COMPLETION
+ kind = base_interfaces.BackToFrontTicket.Kind.COMPLETION
elif event.status.code is _low.Code.CANCELLED:
- kind = tickets.BackToFrontPacket.Kind.CANCELLATION
+ kind = base_interfaces.BackToFrontTicket.Kind.CANCELLATION
elif event.status.code is _low.Code.EXPIRED:
- kind = tickets.BackToFrontPacket.Kind.EXPIRATION
+ kind = base_interfaces.BackToFrontTicket.Kind.EXPIRATION
else:
- kind = tickets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE
- ticket = tickets.BackToFrontPacket(
+ kind = base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE
+ ticket = base_interfaces.BackToFrontTicket(
operation_id, rpc_state.common.sequence_number, kind, None)
rpc_state.common.sequence_number += 1
self._fore_link.accept_back_to_front_ticket(ticket)
@@ -317,7 +316,7 @@
rpc_state.active = False
def join_fore_link(self, fore_link):
- """See ticket_interfaces.RearLink.join_fore_link for specification."""
+ """See base_interfaces.RearLink.join_fore_link for specification."""
with self._condition:
self._fore_link = null.NULL_FORE_LINK if fore_link is None else fore_link
@@ -366,22 +365,22 @@
self._stop()
def accept_front_to_back_ticket(self, ticket):
- """See ticket_interfaces.RearLink.accept_front_to_back_ticket for spec."""
+ """See base_interfaces.RearLink.accept_front_to_back_ticket for spec."""
with self._condition:
if self._completion_queue is None:
return
- if ticket.kind is tickets.FrontToBackPacket.Kind.COMMENCEMENT:
+ if ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT:
self._commence(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
- elif ticket.kind is tickets.FrontToBackPacket.Kind.CONTINUATION:
+ elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CONTINUATION:
self._continue(ticket.operation_id, ticket.payload)
- elif ticket.kind is tickets.FrontToBackPacket.Kind.COMPLETION:
+ elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.COMPLETION:
self._complete(ticket.operation_id, ticket.payload)
- elif ticket.kind is tickets.FrontToBackPacket.Kind.ENTIRE:
+ elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.ENTIRE:
self._entire(
ticket.operation_id, ticket.name, ticket.payload, ticket.timeout)
- elif ticket.kind is tickets.FrontToBackPacket.Kind.CANCELLATION:
+ elif ticket.kind is base_interfaces.FrontToBackTicket.Kind.CANCELLATION:
self._cancel(ticket.operation_id)
else:
# NOTE(nathaniel): All other categories are treated as cancellation.
diff --git a/src/python/src/grpc/early_adopter/_face_utilities.py b/src/python/src/grpc/early_adopter/_face_utilities.py
deleted file mode 100644
index 2cf5760..0000000
--- a/src/python/src/grpc/early_adopter/_face_utilities.py
+++ /dev/null
@@ -1,153 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import abc
-import collections
-
-# face_interfaces is referenced from specification in this module.
-from grpc.framework.common import cardinality
-from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
-from grpc.framework.face import utilities as face_utilities
-from grpc.early_adopter import _reexport
-from grpc.early_adopter import interfaces
-
-
-class InvocationBreakdown(object):
- """An intermediate representation of invocation-side views of RPC methods.
-
- Attributes:
- cardinalities: A dictionary from RPC method name to interfaces.Cardinality
- value.
- request_serializers: A dictionary from RPC method name to callable
- behavior to be used serializing request values for the RPC.
- response_deserializers: A dictionary from RPC method name to callable
- behavior to be used deserializing response values for the RPC.
- """
- __metaclass__ = abc.ABCMeta
-
-
-class _EasyInvocationBreakdown(
- InvocationBreakdown,
- collections.namedtuple(
- '_EasyInvocationBreakdown',
- ('cardinalities', 'request_serializers', 'response_deserializers'))):
- pass
-
-
-class ServiceBreakdown(object):
- """An intermediate representation of service-side views of RPC methods.
-
- Attributes:
- implementations: A dictionary from RPC method name to
- face_interfaces.MethodImplementation implementing the RPC method.
- request_deserializers: A dictionary from RPC method name to callable
- behavior to be used deserializing request values for the RPC.
- response_serializers: A dictionary from RPC method name to callable
- behavior to be used serializing response values for the RPC.
- """
- __metaclass__ = abc.ABCMeta
-
-
-class _EasyServiceBreakdown(
- ServiceBreakdown,
- collections.namedtuple(
- '_EasyServiceBreakdown',
- ('implementations', 'request_deserializers', 'response_serializers'))):
- pass
-
-
-def break_down_invocation(method_descriptions):
- """Derives an InvocationBreakdown from several RPC method descriptions.
-
- Args:
- method_descriptions: A dictionary from RPC method name to
- interfaces.RpcMethodInvocationDescription describing the RPCs.
-
- Returns:
- An InvocationBreakdown corresponding to the given method descriptions.
- """
- cardinalities = {}
- request_serializers = {}
- response_deserializers = {}
- for name, method_description in method_descriptions.iteritems():
- cardinalities[name] = method_description.cardinality()
- request_serializers[name] = method_description.serialize_request
- response_deserializers[name] = method_description.deserialize_response
- return _EasyInvocationBreakdown(
- cardinalities, request_serializers, response_deserializers)
-
-
-def break_down_service(method_descriptions):
- """Derives a ServiceBreakdown from several RPC method descriptions.
-
- Args:
- method_descriptions: A dictionary from RPC method name to
- interfaces.RpcMethodServiceDescription describing the RPCs.
-
- Returns:
- A ServiceBreakdown corresponding to the given method descriptions.
- """
- implementations = {}
- request_deserializers = {}
- response_serializers = {}
- for name, method_description in method_descriptions.iteritems():
- cardinality = method_description.cardinality()
- if cardinality is interfaces.Cardinality.UNARY_UNARY:
- def service(
- request, face_rpc_context,
- service_behavior=method_description.service_unary_unary):
- return service_behavior(
- request, _reexport.rpc_context(face_rpc_context))
- implementations[name] = face_utilities.unary_unary_inline(service)
- elif cardinality is interfaces.Cardinality.UNARY_STREAM:
- def service(
- request, face_rpc_context,
- service_behavior=method_description.service_unary_stream):
- return service_behavior(
- request, _reexport.rpc_context(face_rpc_context))
- implementations[name] = face_utilities.unary_stream_inline(service)
- elif cardinality is interfaces.Cardinality.STREAM_UNARY:
- def service(
- request_iterator, face_rpc_context,
- service_behavior=method_description.service_stream_unary):
- return service_behavior(
- request_iterator, _reexport.rpc_context(face_rpc_context))
- implementations[name] = face_utilities.stream_unary_inline(service)
- elif cardinality is interfaces.Cardinality.STREAM_STREAM:
- def service(
- request_iterator, face_rpc_context,
- service_behavior=method_description.service_stream_stream):
- return service_behavior(
- request_iterator, _reexport.rpc_context(face_rpc_context))
- implementations[name] = face_utilities.stream_stream_inline(service)
- request_deserializers[name] = method_description.deserialize_request
- response_serializers[name] = method_description.serialize_response
-
- return _EasyServiceBreakdown(
- implementations, request_deserializers, response_serializers)
diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py
index 6fe9059..cc0b8ec 100644
--- a/src/python/src/grpc/early_adopter/implementations.py
+++ b/src/python/src/grpc/early_adopter/implementations.py
@@ -33,11 +33,11 @@
from grpc._adapter import fore as _fore
from grpc._adapter import rear as _rear
-from grpc.early_adopter import _face_utilities
-from grpc.early_adopter import _reexport
-from grpc.early_adopter import interfaces
+from grpc.framework.alpha import _face_utilities
+from grpc.framework.alpha import _reexport
+from grpc.framework.alpha import interfaces
+from grpc.framework.base import implementations as _base_implementations
from grpc.framework.base import util as _base_utilities
-from grpc.framework.base.packets import implementations as _tickets_implementations
from grpc.framework.face import implementations as _face_implementations
from grpc.framework.foundation import logging_pool
@@ -66,7 +66,7 @@
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
servicer = _face_implementations.servicer(
self._pool, self._breakdown.implementations, None)
- self._back = _tickets_implementations.back(
+ self._back = _base_implementations.back_link(
servicer, self._pool, self._pool, self._pool, _ONE_DAY_IN_SECONDS,
_ONE_DAY_IN_SECONDS)
self._fore_link = _fore.ForeLink(
@@ -134,7 +134,7 @@
with self._lock:
if self._pool is None:
self._pool = logging_pool.pool(_THREAD_POOL_SIZE)
- self._front = _tickets_implementations.front(
+ self._front = _base_implementations.front_link(
self._pool, self._pool, self._pool)
self._rear_link = _rear.RearLink(
self._host, self._port, self._pool,
@@ -146,8 +146,7 @@
self._rear_link.join_fore_link(self._front)
self._rear_link.start()
self._understub = _face_implementations.dynamic_stub(
- _reexport.common_cardinalities(self._breakdown.cardinalities),
- self._front, self._pool, '')
+ self._breakdown.face_cardinalities, self._front, self._pool, '')
else:
raise ValueError('Tried to __enter__ already-__enter__ed Stub!')
return self
@@ -171,17 +170,9 @@
if self._pool is None:
raise ValueError('Tried to __getattr__ non-__enter__ed Stub!')
else:
- underlying_attr = getattr(self._understub, attr, None)
method_cardinality = self._breakdown.cardinalities.get(attr)
- # TODO(nathaniel): Eliminate this trick.
- if underlying_attr is None:
- for method_name, method_cardinality in self._breakdown.cardinalities.iteritems():
- last_slash_index = method_name.rfind('/')
- if 0 <= last_slash_index and method_name[last_slash_index + 1:] == attr:
- underlying_attr = getattr(self._understub, method_name)
- break
- else:
- raise AttributeError(attr)
+ underlying_attr = getattr(
+ self._understub, self._breakdown.qualified_names.get(attr), None)
if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
return _reexport.unary_unary_sync_async(underlying_attr)
elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
@@ -198,44 +189,49 @@
def _build_stub(
- methods, host, port, secure, root_certificates, private_key,
+ service_name, methods, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=None):
- breakdown = _face_utilities.break_down_invocation(methods)
+ breakdown = _face_utilities.break_down_invocation(service_name, methods)
return _Stub(
breakdown, host, port, secure, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override)
-def _build_server(methods, port, private_key, certificate_chain):
- breakdown = _face_utilities.break_down_service(methods)
+def _build_server(service_name, methods, port, private_key, certificate_chain):
+ breakdown = _face_utilities.break_down_service(service_name, methods)
return _Server(breakdown, port, private_key, certificate_chain)
-def insecure_stub(methods, host, port):
+def insecure_stub(service_name, methods, host, port):
"""Constructs an insecure interfaces.Stub.
Args:
+ service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs to be
- supported by the created stub.
+ supported by the created stub. The RPC method names in the dictionary are
+ not qualified by the service name or decorated in any other way.
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
Returns:
An interfaces.Stub affording RPC invocation.
"""
- return _build_stub(methods, host, port, False, None, None, None)
+ return _build_stub(
+ service_name, methods, host, port, False, None, None, None)
def secure_stub(
- methods, host, port, root_certificates, private_key, certificate_chain,
- server_host_override=None):
+ service_name, methods, host, port, root_certificates, private_key,
+ certificate_chain, server_host_override=None):
"""Constructs an insecure interfaces.Stub.
Args:
+ service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodInvocationDescription describing the RPCs to be
- supported by the created stub.
+ supported by the created stub. The RPC method names in the dictionary are
+ not qualified by the service name or decorated in any other way.
host: The host to which to connect for RPC service.
port: The port to which to connect for RPC service.
root_certificates: The PEM-encoded root certificates or None to ask for
@@ -251,17 +247,19 @@
An interfaces.Stub affording RPC invocation.
"""
return _build_stub(
- methods, host, port, True, root_certificates, private_key,
+ service_name, methods, host, port, True, root_certificates, private_key,
certificate_chain, server_host_override=server_host_override)
-def insecure_server(methods, port):
+def insecure_server(service_name, methods, port):
"""Constructs an insecure interfaces.Server.
Args:
+ service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodServiceDescription describing the RPCs to
- be serviced by the created server.
+ be serviced by the created server. The RPC method names in the dictionary
+ are not qualified by the service name or decorated in any other way.
port: The desired port on which to serve or zero to ask for a port to
be automatically selected.
@@ -269,16 +267,18 @@
An interfaces.Server that will run with no security and
service unsecured raw requests.
"""
- return _build_server(methods, port, None, None)
+ return _build_server(service_name, methods, port, None, None)
-def secure_server(methods, port, private_key, certificate_chain):
+def secure_server(service_name, methods, port, private_key, certificate_chain):
"""Constructs a secure interfaces.Server.
Args:
+ service_name: The package-qualified full name of the service.
methods: A dictionary from RPC method name to
interfaces.RpcMethodServiceDescription describing the RPCs to
- be serviced by the created server.
+ be serviced by the created server. The RPC method names in the dictionary
+ are not qualified by the service name or decorated in any other way.
port: The port on which to serve or zero to ask for a port to be
automatically selected.
private_key: A pem-encoded private key.
@@ -287,4 +287,5 @@
Returns:
An interfaces.Server that will serve secure traffic.
"""
- return _build_server(methods, port, private_key, certificate_chain)
+ return _build_server(
+ service_name, methods, port, private_key, certificate_chain)
diff --git a/src/python/src/grpc/early_adopter/implementations_test.py b/src/python/src/grpc/early_adopter/implementations_test.py
index 9ef06c3..ae4adad 100644
--- a/src/python/src/grpc/early_adopter/implementations_test.py
+++ b/src/python/src/grpc/early_adopter/implementations_test.py
@@ -34,9 +34,11 @@
import unittest
from grpc.early_adopter import implementations
-from grpc.early_adopter import utilities
+from grpc.framework.alpha import utilities
from grpc._junkdrawer import math_pb2
+SERVICE_NAME = 'math.Math'
+
DIV = 'Div'
DIV_MANY = 'DivMany'
FIB = 'Fib'
@@ -104,10 +106,12 @@
class EarlyAdopterImplementationsTest(unittest.TestCase):
def setUp(self):
- self.server = implementations.insecure_server(_SERVICE_DESCRIPTIONS, 0)
+ self.server = implementations.insecure_server(
+ SERVICE_NAME, _SERVICE_DESCRIPTIONS, 0)
self.server.start()
port = self.server.port()
- self.stub = implementations.insecure_stub(_INVOCATION_DESCRIPTIONS, 'localhost', port)
+ self.stub = implementations.insecure_stub(
+ SERVICE_NAME, _INVOCATION_DESCRIPTIONS, 'localhost', port)
def tearDown(self):
self.server.stop()
diff --git a/src/python/src/grpc/framework/base/packets/__init__.py b/src/python/src/grpc/framework/alpha/__init__.py
similarity index 99%
rename from src/python/src/grpc/framework/base/packets/__init__.py
rename to src/python/src/grpc/framework/alpha/__init__.py
index 7086519..b893988 100644
--- a/src/python/src/grpc/framework/base/packets/__init__.py
+++ b/src/python/src/grpc/framework/alpha/__init__.py
@@ -26,5 +26,3 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
diff --git a/src/python/src/grpc/framework/alpha/_face_utilities.py b/src/python/src/grpc/framework/alpha/_face_utilities.py
new file mode 100644
index 0000000..fb0cfe4
--- /dev/null
+++ b/src/python/src/grpc/framework/alpha/_face_utilities.py
@@ -0,0 +1,183 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import abc
+import collections
+
+# face_interfaces is referenced from specification in this module.
+from grpc.framework.common import cardinality
+from grpc.framework.face import interfaces as face_interfaces # pylint: disable=unused-import
+from grpc.framework.face import utilities as face_utilities
+from grpc.framework.alpha import _reexport
+from grpc.framework.alpha import interfaces
+
+
+def _qualified_name(service_name, method_name):
+ return '/%s/%s' % (service_name, method_name)
+
+
+# TODO(nathaniel): This structure is getting bloated; it could be shrunk if
+# implementations._Stub used a generic rather than a dynamic underlying
+# face-layer stub.
+class InvocationBreakdown(object):
+ """An intermediate representation of invocation-side views of RPC methods.
+
+ Attributes:
+ cardinalities: A dictionary from RPC method name to interfaces.Cardinality
+ value.
+ qualified_names: A dictionary from unqualified RPC method name to
+ service-qualified RPC method name.
+ face_cardinalities: A dictionary from service-qualified RPC method name to
+ to cardinality.Cardinality value.
+ request_serializers: A dictionary from service-qualified RPC method name to
+ callable behavior to be used serializing request values for the RPC.
+ response_deserializers: A dictionary from service-qualified RPC method name
+ to callable behavior to be used deserializing response values for the
+ RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _EasyInvocationBreakdown(
+ InvocationBreakdown,
+ collections.namedtuple(
+ '_EasyInvocationBreakdown',
+ ('cardinalities', 'qualified_names', 'face_cardinalities',
+ 'request_serializers', 'response_deserializers'))):
+ pass
+
+
+class ServiceBreakdown(object):
+ """An intermediate representation of service-side views of RPC methods.
+
+ Attributes:
+ implementations: A dictionary from service-qualified RPC method name to
+ face_interfaces.MethodImplementation implementing the RPC method.
+ request_deserializers: A dictionary from service-qualified RPC method name
+ to callable behavior to be used deserializing request values for the RPC.
+ response_serializers: A dictionary from service-qualified RPC method name
+ to callable behavior to be used serializing response values for the RPC.
+ """
+ __metaclass__ = abc.ABCMeta
+
+
+class _EasyServiceBreakdown(
+ ServiceBreakdown,
+ collections.namedtuple(
+ '_EasyServiceBreakdown',
+ ('implementations', 'request_deserializers', 'response_serializers'))):
+ pass
+
+
+def break_down_invocation(service_name, method_descriptions):
+ """Derives an InvocationBreakdown from several RPC method descriptions.
+
+ Args:
+ service_name: The package-qualified full name of the service.
+ method_descriptions: A dictionary from RPC method name to
+ interfaces.RpcMethodInvocationDescription describing the RPCs.
+
+ Returns:
+ An InvocationBreakdown corresponding to the given method descriptions.
+ """
+ cardinalities = {}
+ qualified_names = {}
+ face_cardinalities = {}
+ request_serializers = {}
+ response_deserializers = {}
+ for name, method_description in method_descriptions.iteritems():
+ qualified_name = _qualified_name(service_name, name)
+ method_cardinality = method_description.cardinality()
+ cardinalities[name] = method_description.cardinality()
+ qualified_names[name] = qualified_name
+ face_cardinalities[qualified_name] = _reexport.common_cardinality(
+ method_cardinality)
+ request_serializers[qualified_name] = method_description.serialize_request
+ response_deserializers[qualified_name] = (
+ method_description.deserialize_response)
+ return _EasyInvocationBreakdown(
+ cardinalities, qualified_names, face_cardinalities, request_serializers,
+ response_deserializers)
+
+
+def break_down_service(service_name, method_descriptions):
+ """Derives a ServiceBreakdown from several RPC method descriptions.
+
+ Args:
+ method_descriptions: A dictionary from RPC method name to
+ interfaces.RpcMethodServiceDescription describing the RPCs.
+
+ Returns:
+ A ServiceBreakdown corresponding to the given method descriptions.
+ """
+ implementations = {}
+ request_deserializers = {}
+ response_serializers = {}
+ for name, method_description in method_descriptions.iteritems():
+ qualified_name = _qualified_name(service_name, name)
+ method_cardinality = method_description.cardinality()
+ if method_cardinality is interfaces.Cardinality.UNARY_UNARY:
+ def service(
+ request, face_rpc_context,
+ service_behavior=method_description.service_unary_unary):
+ return service_behavior(
+ request, _reexport.rpc_context(face_rpc_context))
+ implementations[qualified_name] = face_utilities.unary_unary_inline(
+ service)
+ elif method_cardinality is interfaces.Cardinality.UNARY_STREAM:
+ def service(
+ request, face_rpc_context,
+ service_behavior=method_description.service_unary_stream):
+ return service_behavior(
+ request, _reexport.rpc_context(face_rpc_context))
+ implementations[qualified_name] = face_utilities.unary_stream_inline(
+ service)
+ elif method_cardinality is interfaces.Cardinality.STREAM_UNARY:
+ def service(
+ request_iterator, face_rpc_context,
+ service_behavior=method_description.service_stream_unary):
+ return service_behavior(
+ request_iterator, _reexport.rpc_context(face_rpc_context))
+ implementations[qualified_name] = face_utilities.stream_unary_inline(
+ service)
+ elif method_cardinality is interfaces.Cardinality.STREAM_STREAM:
+ def service(
+ request_iterator, face_rpc_context,
+ service_behavior=method_description.service_stream_stream):
+ return service_behavior(
+ request_iterator, _reexport.rpc_context(face_rpc_context))
+ implementations[qualified_name] = face_utilities.stream_stream_inline(
+ service)
+ request_deserializers[qualified_name] = (
+ method_description.deserialize_request)
+ response_serializers[qualified_name] = (
+ method_description.serialize_response)
+
+ return _EasyServiceBreakdown(
+ implementations, request_deserializers, response_serializers)
diff --git a/src/python/src/grpc/early_adopter/_reexport.py b/src/python/src/grpc/framework/alpha/_reexport.py
similarity index 96%
rename from src/python/src/grpc/early_adopter/_reexport.py
rename to src/python/src/grpc/framework/alpha/_reexport.py
index f341602..198cb95 100644
--- a/src/python/src/grpc/early_adopter/_reexport.py
+++ b/src/python/src/grpc/framework/alpha/_reexport.py
@@ -31,8 +31,8 @@
from grpc.framework.face import exceptions as face_exceptions
from grpc.framework.face import interfaces as face_interfaces
from grpc.framework.foundation import future
-from grpc.early_adopter import exceptions
-from grpc.early_adopter import interfaces
+from grpc.framework.alpha import exceptions
+from grpc.framework.alpha import interfaces
_EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY = {
interfaces.Cardinality.UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
@@ -174,6 +174,11 @@
return _ReexportedFuture(self._underlying.future(request_iterator, timeout))
+def common_cardinality(early_adopter_cardinality):
+ return _EARLY_ADOPTER_CARDINALITY_TO_COMMON_CARDINALITY[
+ early_adopter_cardinality]
+
+
def common_cardinalities(early_adopter_cardinalities):
common_cardinalities = {}
for name, early_adopter_cardinality in early_adopter_cardinalities.iteritems():
diff --git a/src/python/src/grpc/early_adopter/exceptions.py b/src/python/src/grpc/framework/alpha/exceptions.py
similarity index 100%
rename from src/python/src/grpc/early_adopter/exceptions.py
rename to src/python/src/grpc/framework/alpha/exceptions.py
diff --git a/src/python/src/grpc/early_adopter/interfaces.py b/src/python/src/grpc/framework/alpha/interfaces.py
similarity index 99%
rename from src/python/src/grpc/early_adopter/interfaces.py
rename to src/python/src/grpc/framework/alpha/interfaces.py
index b733873..8380567 100644
--- a/src/python/src/grpc/early_adopter/interfaces.py
+++ b/src/python/src/grpc/framework/alpha/interfaces.py
@@ -33,7 +33,7 @@
import enum
# exceptions is referenced from specification in this module.
-from grpc.early_adopter import exceptions # pylint: disable=unused-import
+from grpc.framework.alpha import exceptions # pylint: disable=unused-import
from grpc.framework.foundation import activated
from grpc.framework.foundation import future
diff --git a/src/python/src/grpc/early_adopter/utilities.py b/src/python/src/grpc/framework/alpha/utilities.py
similarity index 99%
rename from src/python/src/grpc/early_adopter/utilities.py
rename to src/python/src/grpc/framework/alpha/utilities.py
index da8ef82..7d7f78f 100644
--- a/src/python/src/grpc/early_adopter/utilities.py
+++ b/src/python/src/grpc/framework/alpha/utilities.py
@@ -29,7 +29,7 @@
"""Utilities for use with GRPC."""
-from grpc.early_adopter import interfaces
+from grpc.framework.alpha import interfaces
class _RpcMethodDescription(
diff --git a/src/python/src/grpc/framework/base/packets/_cancellation.py b/src/python/src/grpc/framework/base/_cancellation.py
similarity index 89%
rename from src/python/src/grpc/framework/base/packets/_cancellation.py
rename to src/python/src/grpc/framework/base/_cancellation.py
index 4a0ced1..ffbc906 100644
--- a/src/python/src/grpc/framework/base/packets/_cancellation.py
+++ b/src/python/src/grpc/framework/base/_cancellation.py
@@ -29,9 +29,8 @@
"""State and behavior for operation cancellation."""
-from grpc.framework.base import interfaces as base_interfaces
-from grpc.framework.base.packets import _interfaces
-from grpc.framework.base.packets import packets
+from grpc.framework.base import _interfaces
+from grpc.framework.base import interfaces
class CancellationManager(_interfaces.CancellationManager):
@@ -59,7 +58,7 @@
def cancel(self):
"""See _interfaces.CancellationManager.cancel for specification."""
with self._lock:
- self._termination_manager.abort(base_interfaces.Outcome.CANCELLED)
- self._transmission_manager.abort(base_interfaces.Outcome.CANCELLED)
+ self._termination_manager.abort(interfaces.Outcome.CANCELLED)
+ self._transmission_manager.abort(interfaces.Outcome.CANCELLED)
self._ingestion_manager.abort()
self._expiration_manager.abort()
diff --git a/src/python/src/grpc/framework/base/packets/_constants.py b/src/python/src/grpc/framework/base/_constants.py
similarity index 100%
rename from src/python/src/grpc/framework/base/packets/_constants.py
rename to src/python/src/grpc/framework/base/_constants.py
diff --git a/src/python/src/grpc/framework/base/packets/_context.py b/src/python/src/grpc/framework/base/_context.py
similarity index 85%
rename from src/python/src/grpc/framework/base/packets/_context.py
rename to src/python/src/grpc/framework/base/_context.py
index 45241c6..d84871d 100644
--- a/src/python/src/grpc/framework/base/packets/_context.py
+++ b/src/python/src/grpc/framework/base/_context.py
@@ -32,12 +32,12 @@
import time
# _interfaces is referenced from specification in this module.
-from grpc.framework.base import interfaces as base_interfaces
-from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import
+from grpc.framework.base import interfaces
+from grpc.framework.base import _interfaces # pylint: disable=unused-import
-class OperationContext(base_interfaces.OperationContext):
- """An implementation of base_interfaces.OperationContext."""
+class OperationContext(interfaces.OperationContext):
+ """An implementation of interfaces.OperationContext."""
def __init__(
self, lock, operation_id, local_failure, termination_manager,
@@ -47,8 +47,8 @@
Args:
lock: The operation-wide lock.
operation_id: An object identifying the operation.
- local_failure: Whichever one of base_interfaces.Outcome.SERVICED_FAILURE
- or base_interfaces.Outcome.SERVICER_FAILURE describes local failure of
+ local_failure: Whichever one of interfaces.Outcome.SERVICED_FAILURE or
+ interfaces.Outcome.SERVICER_FAILURE describes local failure of
customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
@@ -75,12 +75,12 @@
self._expiration_manager = expiration_manager
def is_active(self):
- """See base_interfaces.OperationContext.is_active for specification."""
+ """See interfaces.OperationContext.is_active for specification."""
with self._lock:
return self._termination_manager.is_active()
def add_termination_callback(self, callback):
- """See base_interfaces.OperationContext.add_termination_callback."""
+ """See interfaces.OperationContext.add_termination_callback."""
with self._lock:
self._termination_manager.add_callback(callback)
diff --git a/src/python/src/grpc/framework/base/packets/_emission.py b/src/python/src/grpc/framework/base/_emission.py
similarity index 89%
rename from src/python/src/grpc/framework/base/packets/_emission.py
rename to src/python/src/grpc/framework/base/_emission.py
index cfc9e40..1829669 100644
--- a/src/python/src/grpc/framework/base/packets/_emission.py
+++ b/src/python/src/grpc/framework/base/_emission.py
@@ -29,8 +29,8 @@
"""State and behavior for handling emitted values."""
-from grpc.framework.base import interfaces as base_interfaces
-from grpc.framework.base.packets import _interfaces
+from grpc.framework.base import interfaces
+from grpc.framework.base import _interfaces
class _EmissionManager(_interfaces.EmissionManager):
@@ -42,10 +42,9 @@
Args:
lock: The operation-wide lock.
- failure_outcome: Whichever one of
- base_interfaces.Outcome.SERVICED_FAILURE or
- base_interfaces.Outcome.SERVICER_FAILURE describes this object's
- methods being called inappropriately by customer code.
+ failure_outcome: Whichever one of interfaces.Outcome.SERVICED_FAILURE or
+ interfaces.Outcome.SERVICER_FAILURE describes this object's methods
+ being called inappropriately by customer code.
termination_manager: The _interfaces.TerminationManager for the operation.
transmission_manager: The _interfaces.TransmissionManager for the
operation.
@@ -106,7 +105,7 @@
An _interfaces.EmissionManager appropriate for front-side use.
"""
return _EmissionManager(
- lock, base_interfaces.Outcome.SERVICED_FAILURE, termination_manager,
+ lock, interfaces.Outcome.SERVICED_FAILURE, termination_manager,
transmission_manager)
@@ -122,5 +121,5 @@
An _interfaces.EmissionManager appropriate for back-side use.
"""
return _EmissionManager(
- lock, base_interfaces.Outcome.SERVICER_FAILURE, termination_manager,
+ lock, interfaces.Outcome.SERVICER_FAILURE, termination_manager,
transmission_manager)
diff --git a/src/python/src/grpc/framework/base/packets/_ends.py b/src/python/src/grpc/framework/base/_ends.py
similarity index 85%
rename from src/python/src/grpc/framework/base/packets/_ends.py
rename to src/python/src/grpc/framework/base/_ends.py
index 614d1f6..176f3ac 100644
--- a/src/python/src/grpc/framework/base/packets/_ends.py
+++ b/src/python/src/grpc/framework/base/_ends.py
@@ -27,32 +27,30 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Implementations of Fronts and Backs."""
+"""Implementations of FrontLinks and BackLinks."""
import collections
import threading
import uuid
-# _interfaces and packets are referenced from specification in this module.
-from grpc.framework.base import interfaces as base_interfaces
-from grpc.framework.base.packets import _cancellation
-from grpc.framework.base.packets import _context
-from grpc.framework.base.packets import _emission
-from grpc.framework.base.packets import _expiration
-from grpc.framework.base.packets import _ingestion
-from grpc.framework.base.packets import _interfaces # pylint: disable=unused-import
-from grpc.framework.base.packets import _reception
-from grpc.framework.base.packets import _termination
-from grpc.framework.base.packets import _transmission
-from grpc.framework.base.packets import interfaces
-from grpc.framework.base.packets import packets # pylint: disable=unused-import
+# _interfaces is referenced from specification in this module.
+from grpc.framework.base import _cancellation
+from grpc.framework.base import _context
+from grpc.framework.base import _emission
+from grpc.framework.base import _expiration
+from grpc.framework.base import _ingestion
+from grpc.framework.base import _interfaces # pylint: disable=unused-import
+from grpc.framework.base import _reception
+from grpc.framework.base import _termination
+from grpc.framework.base import _transmission
+from grpc.framework.base import interfaces
from grpc.framework.foundation import callable_util
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
-class _EasyOperation(base_interfaces.Operation):
- """A trivial implementation of base_interfaces.Operation."""
+class _EasyOperation(interfaces.Operation):
+ """A trivial implementation of interfaces.Operation."""
def __init__(self, emission_manager, context, cancellation_manager):
"""Constructor.
@@ -60,7 +58,7 @@
Args:
emission_manager: The _interfaces.EmissionManager for the operation that
will accept values emitted by customer code.
- context: The base_interfaces.OperationContext for use by the customer
+ context: The interfaces.OperationContext for use by the customer
during the operation.
cancellation_manager: The _interfaces.CancellationManager for the
operation.
@@ -88,7 +86,7 @@
# indicates an in-progress fire-and-forget operation for which the customer
# has chosen to ignore results.
self._operations = {}
- self._stats = {outcome: 0 for outcome in base_interfaces.Outcome}
+ self._stats = {outcome: 0 for outcome in interfaces.Outcome}
self._idle_actions = []
def terminal_action(self, operation_id):
@@ -152,9 +150,9 @@
"""Constructs objects necessary for front-side operation management.
Args:
- callback: A callable that accepts packets.FrontToBackPackets and delivers
- them to the other side of the operation. Execution of this callable may
- take any arbitrary length of time.
+ callback: A callable that accepts interfaces.FrontToBackTickets and
+ delivers them to the other side of the operation. Execution of this
+ callable may take any arbitrary length of time.
work_pool: A thread pool in which to execute customer code.
transmission_pool: A thread pool to use for transmitting to the other side
of the operation.
@@ -169,7 +167,7 @@
complete: A boolean indicating whether or not additional payloads will be
supplied by the customer.
timeout: A length of time in seconds to allow for the operation.
- subscription: A base_interfaces.ServicedSubscription describing the
+ subscription: A interfaces.ServicedSubscription describing the
customer's interest in the results of the operation.
trace_id: A uuid.UUID identifying a set of related operations to which this
operation belongs. May be None.
@@ -188,7 +186,7 @@
lock, transmission_pool, callback, operation_id, name,
subscription.kind, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext(
- lock, operation_id, base_interfaces.Outcome.SERVICED_FAILURE,
+ lock, operation_id, interfaces.Outcome.SERVICED_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.front_emission_manager(
lock, termination_manager, transmission_manager)
@@ -216,7 +214,7 @@
transmission_manager.inmit(payload, complete)
- if subscription.kind is base_interfaces.ServicedSubscription.Kind.NONE:
+ if subscription.kind is interfaces.ServicedSubscription.Kind.NONE:
returned_reception_manager = None
else:
returned_reception_manager = reception_manager
@@ -226,8 +224,8 @@
cancellation_manager)
-class Front(interfaces.Front):
- """An implementation of interfaces.Front."""
+class FrontLink(interfaces.FrontLink):
+ """An implementation of interfaces.FrontLink."""
def __init__(self, work_pool, transmission_pool, utility_pool):
"""Constructor.
@@ -252,16 +250,16 @@
self._callback = rear_link.accept_front_to_back_ticket
def operation_stats(self):
- """See base_interfaces.End.operation_stats for specification."""
+ """See interfaces.End.operation_stats for specification."""
return self._endlette.operation_stats()
def add_idle_action(self, action):
- """See base_interfaces.End.add_idle_action for specification."""
+ """See interfaces.End.add_idle_action for specification."""
self._endlette.add_idle_action(action)
def operate(
self, name, payload, complete, timeout, subscription, trace_id):
- """See base_interfaces.Front.operate for specification."""
+ """See interfaces.Front.operate for specification."""
operation_id = uuid.uuid4()
with self._endlette:
management = _front_operate(
@@ -278,7 +276,7 @@
with self._endlette:
reception_manager = self._endlette.get_operation(ticket.operation_id)
if reception_manager:
- reception_manager.receive_packet(ticket)
+ reception_manager.receive_ticket(ticket)
def _back_operate(
@@ -291,16 +289,16 @@
Args:
servicer: An interfaces.Servicer for servicing operations.
- callback: A callable that accepts packets.BackToFrontPackets and delivers
- them to the other side of the operation. Execution of this callable may
- take any arbitrary length of time.
+ callback: A callable that accepts interfaces.BackToFrontTickets and
+ delivers them to the other side of the operation. Execution of this
+ callable may take any arbitrary length of time.
work_pool: A thread pool in which to execute customer code.
transmission_pool: A thread pool to use for transmitting to the other side
of the operation.
utility_pool: A thread pool for utility tasks.
termination_action: A no-arg behavior to be called upon operation
completion.
- ticket: The first packets.FrontToBackPacket received for the operation.
+ ticket: The first interfaces.FrontToBackTicket received for the operation.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
@@ -317,7 +315,7 @@
lock, transmission_pool, callback, ticket.operation_id,
termination_manager, ticket.subscription)
operation_context = _context.OperationContext(
- lock, ticket.operation_id, base_interfaces.Outcome.SERVICER_FAILURE,
+ lock, ticket.operation_id, interfaces.Outcome.SERVICER_FAILURE,
termination_manager, transmission_manager)
emission_manager = _emission.back_emission_manager(
lock, termination_manager, transmission_manager)
@@ -340,13 +338,13 @@
ingestion_manager, expiration_manager)
ingestion_manager.set_expiration_manager(expiration_manager)
- reception_manager.receive_packet(ticket)
+ reception_manager.receive_ticket(ticket)
return reception_manager
-class Back(interfaces.Back):
- """An implementation of interfaces.Back."""
+class BackLink(interfaces.BackLink):
+ """An implementation of interfaces.BackLink."""
def __init__(
self, servicer, work_pool, transmission_pool, utility_pool,
@@ -390,12 +388,12 @@
self._default_timeout, self._maximum_timeout)
self._endlette.add_operation(ticket.operation_id, reception_manager)
else:
- reception_manager.receive_packet(ticket)
+ reception_manager.receive_ticket(ticket)
def operation_stats(self):
- """See base_interfaces.End.operation_stats for specification."""
+ """See interfaces.End.operation_stats for specification."""
return self._endlette.operation_stats()
def add_idle_action(self, action):
- """See base_interfaces.End.add_idle_action for specification."""
+ """See interfaces.End.add_idle_action for specification."""
self._endlette.add_idle_action(action)
diff --git a/src/python/src/grpc/framework/base/packets/_expiration.py b/src/python/src/grpc/framework/base/_expiration.py
similarity index 95%
rename from src/python/src/grpc/framework/base/packets/_expiration.py
rename to src/python/src/grpc/framework/base/_expiration.py
index a9ecaea..17acbef 100644
--- a/src/python/src/grpc/framework/base/packets/_expiration.py
+++ b/src/python/src/grpc/framework/base/_expiration.py
@@ -31,8 +31,8 @@
import time
-from grpc.framework.base import interfaces as base_interfaces
-from grpc.framework.base.packets import _interfaces
+from grpc.framework.base import _interfaces
+from grpc.framework.base import interfaces
from grpc.framework.foundation import later
@@ -73,8 +73,8 @@
with self._lock:
if self._future is not None and index == self._index:
self._future = None
- self._termination_manager.abort(base_interfaces.Outcome.EXPIRED)
- self._transmission_manager.abort(base_interfaces.Outcome.EXPIRED)
+ self._termination_manager.abort(interfaces.Outcome.EXPIRED)
+ self._transmission_manager.abort(interfaces.Outcome.EXPIRED)
self._ingestion_manager.abort()
def start(self):
diff --git a/src/python/src/grpc/framework/base/packets/_ingestion.py b/src/python/src/grpc/framework/base/_ingestion.py
similarity index 98%
rename from src/python/src/grpc/framework/base/packets/_ingestion.py
rename to src/python/src/grpc/framework/base/_ingestion.py
index c5c08fd..06d5b92 100644
--- a/src/python/src/grpc/framework/base/packets/_ingestion.py
+++ b/src/python/src/grpc/framework/base/_ingestion.py
@@ -32,11 +32,10 @@
import abc
import collections
+from grpc.framework.base import _constants
+from grpc.framework.base import _interfaces
from grpc.framework.base import exceptions
from grpc.framework.base import interfaces
-from grpc.framework.base.packets import _constants
-from grpc.framework.base.packets import _interfaces
-from grpc.framework.base.packets import packets
from grpc.framework.foundation import abandonment
from grpc.framework.foundation import callable_util
from grpc.framework.foundation import stream
diff --git a/src/python/src/grpc/framework/base/packets/_interfaces.py b/src/python/src/grpc/framework/base/_interfaces.py
similarity index 91%
rename from src/python/src/grpc/framework/base/packets/_interfaces.py
rename to src/python/src/grpc/framework/base/_interfaces.py
index 64184bd..d88cf76 100644
--- a/src/python/src/grpc/framework/base/packets/_interfaces.py
+++ b/src/python/src/grpc/framework/base/_interfaces.py
@@ -31,9 +31,8 @@
import abc
-# base_interfaces and packets are referenced from specification in this module.
-from grpc.framework.base import interfaces as base_interfaces # pylint: disable=unused-import
-from grpc.framework.base.packets import packets # pylint: disable=unused-import
+# interfaces is referenced from specification in this module.
+from grpc.framework.base import interfaces # pylint: disable=unused-import
from grpc.framework.foundation import stream
@@ -63,7 +62,7 @@
immediately.
Args:
- callback: A callable that will be passed a base_interfaces.Outcome value.
+ callback: A callable that will be passed an interfaces.Outcome value.
"""
raise NotImplementedError()
@@ -87,7 +86,7 @@
"""Indicates that the operation must abort for the indicated reason.
Args:
- outcome: A base_interfaces.Outcome indicating operation abortion.
+ outcome: An interfaces.Outcome indicating operation abortion.
"""
raise NotImplementedError()
@@ -113,7 +112,7 @@
"""Indicates that the operation has aborted for the indicated reason.
Args:
- outcome: A base_interfaces.Outcome indicating operation abortion.
+ outcome: An interfaces.Outcome indicating operation abortion.
"""
raise NotImplementedError()
@@ -248,15 +247,15 @@
class ReceptionManager(object):
- """A manager responsible for receiving packets from the other end."""
+ """A manager responsible for receiving tickets from the other end."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def receive_packet(self, packet):
- """Handle a packet from the other side of the operation.
+ def receive_ticket(self, ticket):
+ """Handle a ticket from the other side of the operation.
Args:
- packet: A packets.BackToFrontPacket or packets.FrontToBackPacket
+ ticket: An interfaces.BackToFrontTicket or interfaces.FrontToBackTicket
appropriate to this end of the operation and this object.
"""
raise NotImplementedError()
diff --git a/src/python/src/grpc/framework/base/_reception.py b/src/python/src/grpc/framework/base/_reception.py
new file mode 100644
index 0000000..dd42896
--- /dev/null
+++ b/src/python/src/grpc/framework/base/_reception.py
@@ -0,0 +1,399 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""State and behavior for ticket reception."""
+
+import abc
+
+from grpc.framework.base import interfaces
+from grpc.framework.base import _interfaces
+
+_INITIAL_FRONT_TO_BACK_TICKET_KINDS = (
+ interfaces.FrontToBackTicket.Kind.COMMENCEMENT,
+ interfaces.FrontToBackTicket.Kind.ENTIRE,
+)
+
+
+class _Receiver(object):
+ """Common specification of different ticket-handling behavior."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def abort_if_abortive(self, ticket):
+ """Aborts the operation if the ticket is abortive.
+
+ Args:
+ ticket: A just-arrived ticket.
+
+ Returns:
+ A boolean indicating whether or not this Receiver aborted the operation
+ based on the ticket.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def receive(self, ticket):
+ """Handles a just-arrived ticket.
+
+ Args:
+ ticket: A just-arrived ticket.
+
+ Returns:
+ A boolean indicating whether or not the ticket was terminal (i.e. whether
+ or not non-abortive tickets are legal after this one).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def reception_failure(self):
+ """Aborts the operation with an indication of reception failure."""
+ raise NotImplementedError()
+
+
+def _abort(
+ outcome, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Indicates abortion with the given outcome to the given managers."""
+ termination_manager.abort(outcome)
+ transmission_manager.abort(outcome)
+ ingestion_manager.abort()
+ expiration_manager.abort()
+
+
+def _abort_if_abortive(
+ ticket, abortive, termination_manager, transmission_manager,
+ ingestion_manager, expiration_manager):
+ """Determines a ticket's being abortive and if so aborts the operation.
+
+ Args:
+ ticket: A just-arrived ticket.
+ abortive: A callable that takes a ticket and returns an interfaces.Outcome
+ indicating that the operation should be aborted or None indicating that
+ the operation should not be aborted.
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+
+ Returns:
+ True if the operation was aborted; False otherwise.
+ """
+ abortion_outcome = abortive(ticket)
+ if abortion_outcome is None:
+ return False
+ else:
+ _abort(
+ abortion_outcome, termination_manager, transmission_manager,
+ ingestion_manager, expiration_manager)
+ return True
+
+
+def _reception_failure(
+ termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Aborts the operation with an indication of reception failure."""
+ _abort(
+ interfaces.Outcome.RECEPTION_FAILURE, termination_manager,
+ transmission_manager, ingestion_manager, expiration_manager)
+
+
+class _BackReceiver(_Receiver):
+ """Ticket-handling specific to the back side of an operation."""
+
+ def __init__(
+ self, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+ """
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ self._first_ticket_seen = False
+ self._last_ticket_seen = False
+
+ def _abortive(self, ticket):
+ """Determines whether or not (and if so, how) a ticket is abortive.
+
+ Args:
+ ticket: A just-arrived ticket.
+
+ Returns:
+ An interfaces.Outcome value describing operation abortion if the
+ ticket is abortive or None if the ticket is not abortive.
+ """
+ if ticket.kind is interfaces.FrontToBackTicket.Kind.CANCELLATION:
+ return interfaces.Outcome.CANCELLED
+ elif ticket.kind is interfaces.FrontToBackTicket.Kind.EXPIRATION:
+ return interfaces.Outcome.EXPIRED
+ elif ticket.kind is interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE:
+ return interfaces.Outcome.SERVICED_FAILURE
+ elif ticket.kind is interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE:
+ return interfaces.Outcome.SERVICED_FAILURE
+ elif (ticket.kind in _INITIAL_FRONT_TO_BACK_TICKET_KINDS and
+ self._first_ticket_seen):
+ return interfaces.Outcome.RECEPTION_FAILURE
+ elif self._last_ticket_seen:
+ return interfaces.Outcome.RECEPTION_FAILURE
+ else:
+ return None
+
+ def abort_if_abortive(self, ticket):
+ """See _Receiver.abort_if_abortive for specification."""
+ return _abort_if_abortive(
+ ticket, self._abortive, self._termination_manager,
+ self._transmission_manager, self._ingestion_manager,
+ self._expiration_manager)
+
+ def receive(self, ticket):
+ """See _Receiver.receive for specification."""
+ if ticket.timeout is not None:
+ self._expiration_manager.change_timeout(ticket.timeout)
+
+ if ticket.kind is interfaces.FrontToBackTicket.Kind.COMMENCEMENT:
+ self._first_ticket_seen = True
+ self._ingestion_manager.start(ticket.name)
+ if ticket.payload is not None:
+ self._ingestion_manager.consume(ticket.payload)
+ elif ticket.kind is interfaces.FrontToBackTicket.Kind.CONTINUATION:
+ self._ingestion_manager.consume(ticket.payload)
+ elif ticket.kind is interfaces.FrontToBackTicket.Kind.COMPLETION:
+ self._last_ticket_seen = True
+ if ticket.payload is None:
+ self._ingestion_manager.terminate()
+ else:
+ self._ingestion_manager.consume_and_terminate(ticket.payload)
+ else:
+ self._first_ticket_seen = True
+ self._last_ticket_seen = True
+ self._ingestion_manager.start(ticket.name)
+ if ticket.payload is None:
+ self._ingestion_manager.terminate()
+ else:
+ self._ingestion_manager.consume_and_terminate(ticket.payload)
+
+ def reception_failure(self):
+ """See _Receiver.reception_failure for specification."""
+ _reception_failure(
+ self._termination_manager, self._transmission_manager,
+ self._ingestion_manager, self._expiration_manager)
+
+
+class _FrontReceiver(_Receiver):
+ """Ticket-handling specific to the front side of an operation."""
+
+ def __init__(
+ self, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Constructor.
+
+ Args:
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+ """
+ self._termination_manager = termination_manager
+ self._transmission_manager = transmission_manager
+ self._ingestion_manager = ingestion_manager
+ self._expiration_manager = expiration_manager
+
+ self._last_ticket_seen = False
+
+ def _abortive(self, ticket):
+ """Determines whether or not (and if so, how) a ticket is abortive.
+
+ Args:
+ ticket: A just-arrived ticket.
+
+ Returns:
+ An interfaces.Outcome value describing operation abortion if the ticket
+ is abortive or None if the ticket is not abortive.
+ """
+ if ticket.kind is interfaces.BackToFrontTicket.Kind.CANCELLATION:
+ return interfaces.Outcome.CANCELLED
+ elif ticket.kind is interfaces.BackToFrontTicket.Kind.EXPIRATION:
+ return interfaces.Outcome.EXPIRED
+ elif ticket.kind is interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE:
+ return interfaces.Outcome.SERVICER_FAILURE
+ elif ticket.kind is interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE:
+ return interfaces.Outcome.SERVICER_FAILURE
+ elif self._last_ticket_seen:
+ return interfaces.Outcome.RECEPTION_FAILURE
+ else:
+ return None
+
+ def abort_if_abortive(self, ticket):
+ """See _Receiver.abort_if_abortive for specification."""
+ return _abort_if_abortive(
+ ticket, self._abortive, self._termination_manager,
+ self._transmission_manager, self._ingestion_manager,
+ self._expiration_manager)
+
+ def receive(self, ticket):
+ """See _Receiver.receive for specification."""
+ if ticket.kind is interfaces.BackToFrontTicket.Kind.CONTINUATION:
+ self._ingestion_manager.consume(ticket.payload)
+ elif ticket.kind is interfaces.BackToFrontTicket.Kind.COMPLETION:
+ self._last_ticket_seen = True
+ if ticket.payload is None:
+ self._ingestion_manager.terminate()
+ else:
+ self._ingestion_manager.consume_and_terminate(ticket.payload)
+
+ def reception_failure(self):
+ """See _Receiver.reception_failure for specification."""
+ _reception_failure(
+ self._termination_manager, self._transmission_manager,
+ self._ingestion_manager, self._expiration_manager)
+
+
+class _ReceptionManager(_interfaces.ReceptionManager):
+ """A ReceptionManager based around a _Receiver passed to it."""
+
+ def __init__(self, lock, receiver):
+ """Constructor.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ receiver: A _Receiver responsible for handling received tickets.
+ """
+ self._lock = lock
+ self._receiver = receiver
+
+ self._lowest_unseen_sequence_number = 0
+ self._out_of_sequence_tickets = {}
+ self._completed_sequence_number = None
+ self._aborted = False
+
+ def _sequence_failure(self, ticket):
+ """Determines a just-arrived ticket's sequential legitimacy.
+
+ Args:
+ ticket: A just-arrived ticket.
+
+ Returns:
+ True if the ticket is sequentially legitimate; False otherwise.
+ """
+ if ticket.sequence_number < self._lowest_unseen_sequence_number:
+ return True
+ elif ticket.sequence_number in self._out_of_sequence_tickets:
+ return True
+ elif (self._completed_sequence_number is not None and
+ self._completed_sequence_number <= ticket.sequence_number):
+ return True
+ else:
+ return False
+
+ def _process(self, ticket):
+ """Process those tickets ready to be processed.
+
+ Args:
+ ticket: A just-arrived ticket the sequence number of which matches this
+ _ReceptionManager's _lowest_unseen_sequence_number field.
+ """
+ while True:
+ completed = self._receiver.receive(ticket)
+ if completed:
+ self._out_of_sequence_tickets.clear()
+ self._completed_sequence_number = ticket.sequence_number
+ self._lowest_unseen_sequence_number = ticket.sequence_number + 1
+ return
+ else:
+ next_ticket = self._out_of_sequence_tickets.pop(
+ ticket.sequence_number + 1, None)
+ if next_ticket is None:
+ self._lowest_unseen_sequence_number = ticket.sequence_number + 1
+ return
+ else:
+ ticket = next_ticket
+
+ def receive_ticket(self, ticket):
+ """See _interfaces.ReceptionManager.receive_ticket for specification."""
+ with self._lock:
+ if self._aborted:
+ return
+ elif self._sequence_failure(ticket):
+ self._receiver.reception_failure()
+ self._aborted = True
+ elif self._receiver.abort_if_abortive(ticket):
+ self._aborted = True
+ elif ticket.sequence_number == self._lowest_unseen_sequence_number:
+ self._process(ticket)
+ else:
+ self._out_of_sequence_tickets[ticket.sequence_number] = ticket
+
+
+def front_reception_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Creates a _interfaces.ReceptionManager for front-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+
+ Returns:
+ A _interfaces.ReceptionManager appropriate for front-side use.
+ """
+ return _ReceptionManager(
+ lock, _FrontReceiver(
+ termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager))
+
+
+def back_reception_manager(
+ lock, termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager):
+ """Creates a _interfaces.ReceptionManager for back-side use.
+
+ Args:
+ lock: The operation-servicing-wide lock object.
+ termination_manager: The operation's _interfaces.TerminationManager.
+ transmission_manager: The operation's _interfaces.TransmissionManager.
+ ingestion_manager: The operation's _interfaces.IngestionManager.
+ expiration_manager: The operation's _interfaces.ExpirationManager.
+
+ Returns:
+ A _interfaces.ReceptionManager appropriate for back-side use.
+ """
+ return _ReceptionManager(
+ lock, _BackReceiver(
+ termination_manager, transmission_manager, ingestion_manager,
+ expiration_manager))
diff --git a/src/python/src/grpc/framework/base/packets/_termination.py b/src/python/src/grpc/framework/base/_termination.py
similarity index 98%
rename from src/python/src/grpc/framework/base/packets/_termination.py
rename to src/python/src/grpc/framework/base/_termination.py
index 6afba88..ddcbc60 100644
--- a/src/python/src/grpc/framework/base/packets/_termination.py
+++ b/src/python/src/grpc/framework/base/_termination.py
@@ -31,9 +31,9 @@
import enum
+from grpc.framework.base import _constants
+from grpc.framework.base import _interfaces
from grpc.framework.base import interfaces
-from grpc.framework.base.packets import _constants
-from grpc.framework.base.packets import _interfaces
from grpc.framework.foundation import callable_util
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
diff --git a/src/python/src/grpc/framework/base/packets/_transmission.py b/src/python/src/grpc/framework/base/_transmission.py
similarity index 68%
rename from src/python/src/grpc/framework/base/packets/_transmission.py
rename to src/python/src/grpc/framework/base/_transmission.py
index 1b18204..6845129 100644
--- a/src/python/src/grpc/framework/base/packets/_transmission.py
+++ b/src/python/src/grpc/framework/base/_transmission.py
@@ -27,14 +27,13 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""State and behavior for packet transmission during an operation."""
+"""State and behavior for ticket transmission during an operation."""
import abc
+from grpc.framework.base import _constants
+from grpc.framework.base import _interfaces
from grpc.framework.base import interfaces
-from grpc.framework.base.packets import _constants
-from grpc.framework.base.packets import _interfaces
-from grpc.framework.base.packets import packets
from grpc.framework.foundation import callable_util
_TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!'
@@ -47,53 +46,53 @@
interfaces.Outcome.SERVICED_FAILURE,
)
-_ABORTION_OUTCOME_TO_FRONT_TO_BACK_PACKET_KIND = {
+_ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND = {
interfaces.Outcome.CANCELLED:
- packets.FrontToBackPacket.Kind.CANCELLATION,
+ interfaces.FrontToBackTicket.Kind.CANCELLATION,
interfaces.Outcome.EXPIRED:
- packets.FrontToBackPacket.Kind.EXPIRATION,
+ interfaces.FrontToBackTicket.Kind.EXPIRATION,
interfaces.Outcome.RECEPTION_FAILURE:
- packets.FrontToBackPacket.Kind.RECEPTION_FAILURE,
+ interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE,
interfaces.Outcome.TRANSMISSION_FAILURE:
- packets.FrontToBackPacket.Kind.TRANSMISSION_FAILURE,
+ interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE,
interfaces.Outcome.SERVICED_FAILURE:
- packets.FrontToBackPacket.Kind.SERVICED_FAILURE,
+ interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE,
interfaces.Outcome.SERVICER_FAILURE:
- packets.FrontToBackPacket.Kind.SERVICER_FAILURE,
+ interfaces.FrontToBackTicket.Kind.SERVICER_FAILURE,
}
-_ABORTION_OUTCOME_TO_BACK_TO_FRONT_PACKET_KIND = {
+_ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND = {
interfaces.Outcome.CANCELLED:
- packets.BackToFrontPacket.Kind.CANCELLATION,
+ interfaces.BackToFrontTicket.Kind.CANCELLATION,
interfaces.Outcome.EXPIRED:
- packets.BackToFrontPacket.Kind.EXPIRATION,
+ interfaces.BackToFrontTicket.Kind.EXPIRATION,
interfaces.Outcome.RECEPTION_FAILURE:
- packets.BackToFrontPacket.Kind.RECEPTION_FAILURE,
+ interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE,
interfaces.Outcome.TRANSMISSION_FAILURE:
- packets.BackToFrontPacket.Kind.TRANSMISSION_FAILURE,
+ interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE,
interfaces.Outcome.SERVICED_FAILURE:
- packets.BackToFrontPacket.Kind.SERVICED_FAILURE,
+ interfaces.BackToFrontTicket.Kind.SERVICED_FAILURE,
interfaces.Outcome.SERVICER_FAILURE:
- packets.BackToFrontPacket.Kind.SERVICER_FAILURE,
+ interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE,
}
-class _Packetizer(object):
- """Common specification of different packet-creating behavior."""
+class _Ticketizer(object):
+ """Common specification of different ticket-creating behavior."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
- def packetize(self, operation_id, sequence_number, payload, complete):
- """Creates a packet indicating ordinary operation progress.
+ def ticketize(self, operation_id, sequence_number, payload, complete):
+ """Creates a ticket indicating ordinary operation progress.
Args:
operation_id: The operation ID for the current operation.
- sequence_number: A sequence number for the packet.
+ sequence_number: A sequence number for the ticket.
payload: A customer payload object. May be None if sequence_number is
zero or complete is true.
- complete: A boolean indicating whether or not the packet should describe
+ complete: A boolean indicating whether or not the ticket should describe
itself as (but for a later indication of operation abortion) the last
- packet to be sent.
+ ticket to be sent.
Returns:
An object of an appropriate type suitable for transmission to the other
@@ -102,12 +101,12 @@
raise NotImplementedError()
@abc.abstractmethod
- def packetize_abortion(self, operation_id, sequence_number, outcome):
- """Creates a packet indicating that the operation is aborted.
+ def ticketize_abortion(self, operation_id, sequence_number, outcome):
+ """Creates a ticket indicating that the operation is aborted.
Args:
operation_id: The operation ID for the current operation.
- sequence_number: A sequence number for the packet.
+ sequence_number: A sequence number for the ticket.
outcome: An interfaces.Outcome value describing the operation abortion.
Returns:
@@ -118,8 +117,8 @@
raise NotImplementedError()
-class _FrontPacketizer(_Packetizer):
- """Front-side packet-creating behavior."""
+class _FrontTicketizer(_Ticketizer):
+ """Front-side ticket-creating behavior."""
def __init__(self, name, subscription_kind, trace_id, timeout):
"""Constructor.
@@ -127,7 +126,7 @@
Args:
name: The name of the operation.
subscription_kind: An interfaces.ServicedSubscription.Kind value
- describing the interest the front has in packets sent from the back.
+ describing the interest the front has in tickets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
@@ -137,54 +136,54 @@
self._trace_id = trace_id
self._timeout = timeout
- def packetize(self, operation_id, sequence_number, payload, complete):
- """See _Packetizer.packetize for specification."""
+ def ticketize(self, operation_id, sequence_number, payload, complete):
+ """See _Ticketizer.ticketize for specification."""
if sequence_number:
if complete:
- kind = packets.FrontToBackPacket.Kind.COMPLETION
+ kind = interfaces.FrontToBackTicket.Kind.COMPLETION
else:
- kind = packets.FrontToBackPacket.Kind.CONTINUATION
- return packets.FrontToBackPacket(
+ kind = interfaces.FrontToBackTicket.Kind.CONTINUATION
+ return interfaces.FrontToBackTicket(
operation_id, sequence_number, kind, self._name,
self._subscription_kind, self._trace_id, payload, self._timeout)
else:
if complete:
- kind = packets.FrontToBackPacket.Kind.ENTIRE
+ kind = interfaces.FrontToBackTicket.Kind.ENTIRE
else:
- kind = packets.FrontToBackPacket.Kind.COMMENCEMENT
- return packets.FrontToBackPacket(
+ kind = interfaces.FrontToBackTicket.Kind.COMMENCEMENT
+ return interfaces.FrontToBackTicket(
operation_id, 0, kind, self._name, self._subscription_kind,
self._trace_id, payload, self._timeout)
- def packetize_abortion(self, operation_id, sequence_number, outcome):
- """See _Packetizer.packetize_abortion for specification."""
+ def ticketize_abortion(self, operation_id, sequence_number, outcome):
+ """See _Ticketizer.ticketize_abortion for specification."""
if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES:
return None
else:
- kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_PACKET_KIND[outcome]
- return packets.FrontToBackPacket(
+ kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND[outcome]
+ return interfaces.FrontToBackTicket(
operation_id, sequence_number, kind, None, None, None, None, None)
-class _BackPacketizer(_Packetizer):
- """Back-side packet-creating behavior."""
+class _BackTicketizer(_Ticketizer):
+ """Back-side ticket-creating behavior."""
- def packetize(self, operation_id, sequence_number, payload, complete):
- """See _Packetizer.packetize for specification."""
+ def ticketize(self, operation_id, sequence_number, payload, complete):
+ """See _Ticketizer.ticketize for specification."""
if complete:
- kind = packets.BackToFrontPacket.Kind.COMPLETION
+ kind = interfaces.BackToFrontTicket.Kind.COMPLETION
else:
- kind = packets.BackToFrontPacket.Kind.CONTINUATION
- return packets.BackToFrontPacket(
+ kind = interfaces.BackToFrontTicket.Kind.CONTINUATION
+ return interfaces.BackToFrontTicket(
operation_id, sequence_number, kind, payload)
- def packetize_abortion(self, operation_id, sequence_number, outcome):
- """See _Packetizer.packetize_abortion for specification."""
+ def ticketize_abortion(self, operation_id, sequence_number, outcome):
+ """See _Ticketizer.ticketize_abortion for specification."""
if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES:
return None
else:
- kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_PACKET_KIND[outcome]
- return packets.BackToFrontPacket(
+ kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND[outcome]
+ return interfaces.BackToFrontTicket(
operation_id, sequence_number, kind, None)
@@ -221,21 +220,21 @@
class _TransmittingTransmissionManager(TransmissionManager):
- """A TransmissionManager implementation that sends packets."""
+ """A TransmissionManager implementation that sends tickets."""
def __init__(
- self, lock, pool, callback, operation_id, packetizer,
+ self, lock, pool, callback, operation_id, ticketizer,
termination_manager):
"""Constructor.
Args:
lock: The operation-servicing-wide lock object.
- pool: A thread pool in which the work of transmitting packets will be
+ pool: A thread pool in which the work of transmitting tickets will be
performed.
- callback: A callable that accepts packets and sends them to the other side
+ callback: A callable that accepts tickets and sends them to the other side
of the operation.
operation_id: The operation's ID.
- packetizer: A _Packetizer for packet creation.
+ ticketizer: A _Ticketizer for ticket creation.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
"""
@@ -243,7 +242,7 @@
self._pool = pool
self._callback = callback
self._operation_id = operation_id
- self._packetizer = packetizer
+ self._ticketizer = ticketizer
self._termination_manager = termination_manager
self._ingestion_manager = None
self._expiration_manager = None
@@ -260,8 +259,8 @@
self._ingestion_manager = ingestion_manager
self._expiration_manager = expiration_manager
- def _lead_packet(self, emission, complete):
- """Creates a packet suitable for leading off the transmission loop.
+ def _lead_ticket(self, emission, complete):
+ """Creates a ticket suitable for leading off the transmission loop.
Args:
emission: A customer payload object to be sent to the other side of the
@@ -270,37 +269,37 @@
the passed object.
Returns:
- A packet with which to lead off the transmission loop.
+ A ticket with which to lead off the transmission loop.
"""
sequence_number = self._lowest_unused_sequence_number
self._lowest_unused_sequence_number += 1
- return self._packetizer.packetize(
+ return self._ticketizer.ticketize(
self._operation_id, sequence_number, emission, complete)
- def _abortive_response_packet(self, outcome):
- """Creates a packet indicating operation abortion.
+ def _abortive_response_ticket(self, outcome):
+ """Creates a ticket indicating operation abortion.
Args:
outcome: An interfaces.Outcome value describing operation abortion.
Returns:
- A packet indicating operation abortion.
+ A ticket indicating operation abortion.
"""
- packet = self._packetizer.packetize_abortion(
+ ticket = self._ticketizer.ticketize_abortion(
self._operation_id, self._lowest_unused_sequence_number, outcome)
- if packet is None:
+ if ticket is None:
return None
else:
self._lowest_unused_sequence_number += 1
- return packet
+ return ticket
- def _next_packet(self):
- """Creates the next packet to be sent to the other side of the operation.
+ def _next_ticket(self):
+ """Creates the next ticket to be sent to the other side of the operation.
Returns:
- A (completed, packet) tuple comprised of a boolean indicating whether or
- not the sequence of packets has completed normally and a packet to send
- to the other side if the sequence of packets hasn't completed. The tuple
+ A (completed, ticket) tuple comprised of a boolean indicating whether or
+ not the sequence of tickets has completed normally and a ticket to send
+ to the other side if the sequence of tickets hasn't completed. The tuple
will never have both a True first element and a non-None second element.
"""
if self._emissions is None:
@@ -311,29 +310,29 @@
complete = self._emission_complete and not self._emissions
sequence_number = self._lowest_unused_sequence_number
self._lowest_unused_sequence_number += 1
- return complete, self._packetizer.packetize(
+ return complete, self._ticketizer.ticketize(
self._operation_id, sequence_number, payload, complete)
else:
return self._emission_complete, None
else:
- packet = self._abortive_response_packet(self._outcome)
+ ticket = self._abortive_response_ticket(self._outcome)
self._emissions = None
- return False, None if packet is None else packet
+ return False, None if ticket is None else ticket
- def _transmit(self, packet):
- """Commences the transmission loop sending packets.
+ def _transmit(self, ticket):
+ """Commences the transmission loop sending tickets.
Args:
- packet: A packet to be sent to the other side of the operation.
+ ticket: A ticket to be sent to the other side of the operation.
"""
- def transmit(packet):
+ def transmit(ticket):
while True:
transmission_outcome = callable_util.call_logging_exceptions(
- self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, packet)
+ self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket)
if transmission_outcome.exception is None:
with self._lock:
- complete, packet = self._next_packet()
- if packet is None:
+ complete, ticket = self._next_ticket()
+ if ticket is None:
if complete:
self._termination_manager.transmission_complete()
self._transmitting = False
@@ -349,7 +348,7 @@
return
self._pool.submit(callable_util.with_exceptions_logged(
- transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), packet)
+ transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket)
self._transmitting = True
def inmit(self, emission, complete):
@@ -359,17 +358,17 @@
if self._transmitting:
self._emissions.append(emission)
else:
- self._transmit(self._lead_packet(emission, complete))
+ self._transmit(self._lead_ticket(emission, complete))
def abort(self, outcome):
"""See _interfaces.TransmissionManager.abort for specification."""
if self._emissions is not None and self._outcome is None:
self._outcome = outcome
if not self._transmitting:
- packet = self._abortive_response_packet(outcome)
+ ticket = self._abortive_response_ticket(outcome)
self._emissions = None
- if packet is not None:
- self._transmit(packet)
+ if ticket is not None:
+ self._transmit(ticket)
def front_transmission_manager(
@@ -379,14 +378,14 @@
Args:
lock: The operation-servicing-wide lock object.
- pool: A thread pool in which the work of transmitting packets will be
+ pool: A thread pool in which the work of transmitting tickets will be
performed.
- callback: A callable that accepts packets and sends them to the other side
+ callback: A callable that accepts tickets and sends them to the other side
of the operation.
operation_id: The operation's ID.
name: The name of the operation.
subscription_kind: An interfaces.ServicedSubscription.Kind value
- describing the interest the front has in packets sent from the back.
+ describing the interest the front has in tickets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
@@ -397,7 +396,7 @@
A TransmissionManager appropriate for front-side use.
"""
return _TransmittingTransmissionManager(
- lock, pool, callback, operation_id, _FrontPacketizer(
+ lock, pool, callback, operation_id, _FrontTicketizer(
name, subscription_kind, trace_id, timeout),
termination_manager)
@@ -409,15 +408,15 @@
Args:
lock: The operation-servicing-wide lock object.
- pool: A thread pool in which the work of transmitting packets will be
+ pool: A thread pool in which the work of transmitting tickets will be
performed.
- callback: A callable that accepts packets and sends them to the other side
+ callback: A callable that accepts tickets and sends them to the other side
of the operation.
operation_id: The operation's ID.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
subscription_kind: An interfaces.ServicedSubscription.Kind value
- describing the interest the front has in packets sent from the back.
+ describing the interest the front has in tickets sent from the back.
Returns:
A TransmissionManager appropriate for back-side use.
@@ -426,5 +425,5 @@
return _EmptyTransmissionManager()
else:
return _TransmittingTransmissionManager(
- lock, pool, callback, operation_id, _BackPacketizer(),
+ lock, pool, callback, operation_id, _BackTicketizer(),
termination_manager)
diff --git a/src/python/src/grpc/framework/base/packets/implementations.py b/src/python/src/grpc/framework/base/implementations.py
similarity index 75%
rename from src/python/src/grpc/framework/base/packets/implementations.py
rename to src/python/src/grpc/framework/base/implementations.py
index 28688bc..5656f9f 100644
--- a/src/python/src/grpc/framework/base/packets/implementations.py
+++ b/src/python/src/grpc/framework/base/implementations.py
@@ -27,51 +27,51 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Entry points into the packet-exchange-based implementation the base layer."""
+"""Entry points into the ticket-exchange-based base layer implementation."""
# interfaces is referenced from specification in this module.
-from grpc.framework.base.packets import _ends
-from grpc.framework.base.packets import interfaces # pylint: disable=unused-import
+from grpc.framework.base import _ends
+from grpc.framework.base import interfaces # pylint: disable=unused-import
-def front(work_pool, transmission_pool, utility_pool):
- """Factory function for creating interfaces.Fronts.
+def front_link(work_pool, transmission_pool, utility_pool):
+ """Factory function for creating interfaces.FrontLinks.
Args:
- work_pool: A thread pool to be used for doing work within the created Front
- object.
- transmission_pool: A thread pool to be used within the created Front object
- for transmitting values to some Back object.
- utility_pool: A thread pool to be used within the created Front object for
- utility tasks.
+ work_pool: A thread pool to be used for doing work within the created
+ FrontLink object.
+ transmission_pool: A thread pool to be used within the created FrontLink
+ object for transmitting values to a joined RearLink object.
+ utility_pool: A thread pool to be used within the created FrontLink object
+ for utility tasks.
Returns:
- An interfaces.Front.
+ An interfaces.FrontLink.
"""
- return _ends.Front(work_pool, transmission_pool, utility_pool)
+ return _ends.FrontLink(work_pool, transmission_pool, utility_pool)
-def back(
+def back_link(
servicer, work_pool, transmission_pool, utility_pool, default_timeout,
maximum_timeout):
- """Factory function for creating interfaces.Backs.
+ """Factory function for creating interfaces.BackLinks.
Args:
servicer: An interfaces.Servicer for servicing operations.
- work_pool: A thread pool to be used for doing work within the created Back
- object.
- transmission_pool: A thread pool to be used within the created Back object
- for transmitting values to some Front object.
- utility_pool: A thread pool to be used within the created Back object for
- utility tasks.
+ work_pool: A thread pool to be used for doing work within the created
+ BackLink object.
+ transmission_pool: A thread pool to be used within the created BackLink
+ object for transmitting values to a joined ForeLink object.
+ utility_pool: A thread pool to be used within the created BackLink object
+ for utility tasks.
default_timeout: A length of time in seconds to be used as the default
time alloted for a single operation.
maximum_timeout: A length of time in seconds to be used as the maximum
time alloted for a single operation.
Returns:
- An interfaces.Back.
+ An interfaces.BackLink.
"""
- return _ends.Back(
+ return _ends.BackLink(
servicer, work_pool, transmission_pool, utility_pool, default_timeout,
maximum_timeout)
diff --git a/src/python/src/grpc/framework/base/packets/implementations_test.py b/src/python/src/grpc/framework/base/implementations_test.py
similarity index 94%
rename from src/python/src/grpc/framework/base/packets/implementations_test.py
rename to src/python/src/grpc/framework/base/implementations_test.py
index e585570..11e49ca 100644
--- a/src/python/src/grpc/framework/base/packets/implementations_test.py
+++ b/src/python/src/grpc/framework/base/implementations_test.py
@@ -27,13 +27,13 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Tests for _framework.base.packets.implementations."""
+"""Tests for grpc.framework.base.implementations."""
import unittest
+from grpc.framework.base import implementations
from grpc.framework.base import interfaces_test_case
from grpc.framework.base import util
-from grpc.framework.base.packets import implementations
from grpc.framework.foundation import logging_pool
POOL_MAX_WORKERS = 100
@@ -54,10 +54,10 @@
self.back_utility_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.test_pool = logging_pool.pool(POOL_MAX_WORKERS)
self.test_servicer = interfaces_test_case.TestServicer(self.test_pool)
- self.front = implementations.front(
+ self.front = implementations.front_link(
self.front_work_pool, self.front_transmission_pool,
self.front_utility_pool)
- self.back = implementations.back(
+ self.back = implementations.back_link(
self.test_servicer, self.back_work_pool, self.back_transmission_pool,
self.back_utility_pool, DEFAULT_TIMEOUT, MAXIMUM_TIMEOUT)
self.front.join_rear_link(self.back)
diff --git a/src/python/src/grpc/framework/base/packets/in_memory.py b/src/python/src/grpc/framework/base/in_memory.py
similarity index 95%
rename from src/python/src/grpc/framework/base/packets/in_memory.py
rename to src/python/src/grpc/framework/base/in_memory.py
index 453fd3b..c92d0bc 100644
--- a/src/python/src/grpc/framework/base/packets/in_memory.py
+++ b/src/python/src/grpc/framework/base/in_memory.py
@@ -27,12 +27,12 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Entry points into the packet-exchange-based implementation the base layer."""
+"""In-memory implementations of base layer interfaces."""
import threading
-from grpc.framework.base.packets import _constants
-from grpc.framework.base.packets import interfaces
+from grpc.framework.base import _constants
+from grpc.framework.base import interfaces
from grpc.framework.foundation import callable_util
diff --git a/src/python/src/grpc/framework/base/interfaces.py b/src/python/src/grpc/framework/base/interfaces.py
index ed43b25..e22c10d 100644
--- a/src/python/src/grpc/framework/base/interfaces.py
+++ b/src/python/src/grpc/framework/base/interfaces.py
@@ -30,6 +30,7 @@
"""Interfaces defined and used by the base layer of RPC Framework."""
import abc
+import collections
import enum
# stream is referenced from specification in this module.
@@ -230,3 +231,133 @@
class Back(End):
"""Serverish objects that perform the work of operations."""
__metaclass__ = abc.ABCMeta
+
+
+class FrontToBackTicket(
+ collections.namedtuple(
+ 'FrontToBackTicket',
+ ['operation_id', 'sequence_number', 'kind', 'name', 'subscription',
+ 'trace_id', 'payload', 'timeout'])):
+ """A sum type for all values sent from a front to a back.
+
+ Attributes:
+ operation_id: A unique-with-respect-to-equality hashable object identifying
+ a particular operation.
+ sequence_number: A zero-indexed integer sequence number identifying the
+ ticket's place among all the tickets sent from front to back for this
+ particular operation. Must be zero if kind is Kind.COMMENCEMENT or
+ Kind.ENTIRE. Must be positive for any other kind.
+ kind: A Kind value describing the overall kind of ticket.
+ name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
+ or Kind.ENTIRE. Must be None for any other kind.
+ subscription: An ServicedSubscription.Kind value describing the interest
+ the front has in tickets sent from the back. Must be present if
+ kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind.
+ trace_id: A uuid.UUID identifying a set of related operations to which this
+ operation belongs. May be None.
+ payload: A customer payload object. Must be present if kind is
+ Kind.CONTINUATION. Must be None if kind is Kind.CANCELLATION. May be None
+ for any other kind.
+ timeout: An optional length of time (measured from the beginning of the
+ operation) to allow for the entire operation. If None, a default value on
+ the back will be used. If present and excessively large, the back may
+ limit the operation to a smaller duration of its choice. May be present
+ for any ticket kind; setting a value on a later ticket allows fronts
+ to request time extensions (or even time reductions!) on in-progress
+ operations.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ """Identifies the overall kind of a FrontToBackTicket."""
+
+ COMMENCEMENT = 'commencement'
+ CONTINUATION = 'continuation'
+ COMPLETION = 'completion'
+ ENTIRE = 'entire'
+ CANCELLATION = 'cancellation'
+ EXPIRATION = 'expiration'
+ SERVICER_FAILURE = 'servicer failure'
+ SERVICED_FAILURE = 'serviced failure'
+ RECEPTION_FAILURE = 'reception failure'
+ TRANSMISSION_FAILURE = 'transmission failure'
+
+
+class BackToFrontTicket(
+ collections.namedtuple(
+ 'BackToFrontTicket',
+ ['operation_id', 'sequence_number', 'kind', 'payload'])):
+ """A sum type for all values sent from a back to a front.
+
+ Attributes:
+ operation_id: A unique-with-respect-to-equality hashable object identifying
+ a particular operation.
+ sequence_number: A zero-indexed integer sequence number identifying the
+ ticket's place among all the tickets sent from back to front for this
+ particular operation.
+ kind: A Kind value describing the overall kind of ticket.
+ payload: A customer payload object. Must be present if kind is
+ Kind.CONTINUATION. May be None if kind is Kind.COMPLETION. Must be None
+ otherwise.
+ """
+
+ @enum.unique
+ class Kind(enum.Enum):
+ """Identifies the overall kind of a BackToFrontTicket."""
+
+ CONTINUATION = 'continuation'
+ COMPLETION = 'completion'
+ CANCELLATION = 'cancellation'
+ EXPIRATION = 'expiration'
+ SERVICER_FAILURE = 'servicer failure'
+ SERVICED_FAILURE = 'serviced failure'
+ RECEPTION_FAILURE = 'reception failure'
+ TRANSMISSION_FAILURE = 'transmission failure'
+
+
+class ForeLink(object):
+ """Accepts back-to-front tickets and emits front-to-back tickets."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def accept_back_to_front_ticket(self, ticket):
+ """Accept a BackToFrontTicket.
+
+ Args:
+ ticket: Any BackToFrontTicket.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def join_rear_link(self, rear_link):
+ """Mates this object with a peer with which it will exchange tickets."""
+ raise NotImplementedError()
+
+
+class RearLink(object):
+ """Accepts front-to-back tickets and emits back-to-front tickets."""
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def accept_front_to_back_ticket(self, ticket):
+ """Accepts a FrontToBackTicket.
+
+ Args:
+ ticket: Any FrontToBackTicket.
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def join_fore_link(self, fore_link):
+ """Mates this object with a peer with which it will exchange tickets."""
+ raise NotImplementedError()
+
+
+class FrontLink(Front, ForeLink):
+ """Clientish objects that operate by sending and receiving tickets."""
+ __metaclass__ = abc.ABCMeta
+
+
+class BackLink(Back, RearLink):
+ """Serverish objects that operate by sending and receiving tickets."""
+ __metaclass__ = abc.ABCMeta
diff --git a/src/python/src/grpc/framework/base/interfaces_test_case.py b/src/python/src/grpc/framework/base/interfaces_test_case.py
index b86011c..dec10c2 100644
--- a/src/python/src/grpc/framework/base/interfaces_test_case.py
+++ b/src/python/src/grpc/framework/base/interfaces_test_case.py
@@ -164,7 +164,7 @@
# pylint: disable=invalid-name
def testSimplestCall(self):
- """Tests the absolute simplest call - a one-packet fire-and-forget."""
+ """Tests the absolute simplest call - a one-ticket fire-and-forget."""
self.front.operate(
SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT,
util.none_serviced_subscription(), 'test trace ID')
@@ -175,25 +175,25 @@
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
- # (1) The packet is still either in the front waiting to be transmitted
+ # (1) The ticket is still either in the front waiting to be transmitted
# or is somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
- # return with the packet bound for it still in the front or on the link.
+ # return with the ticket bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
- # (2) The packet arrived at the back and the back completed the operation.
+ # (2) The ticket arrived at the back and the back completed the operation.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.Outcome.COMPLETED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
- # It's true that if the packet had arrived at the back and the back had
+ # It's true that if the ticket had arrived at the back and the back had
# begun processing that wait_for_idle could hold test execution until the
# back completed the operation, but that doesn't really collapse the
# possibility space down to one solution.
def testEntireEcho(self):
- """Tests a very simple one-packet-each-way round-trip."""
+ """Tests a very simple one-ticket-each-way round-trip."""
test_payload = 'test payload'
test_consumer = stream_testing.TestConsumer()
subscription = util.full_serviced_subscription(
@@ -212,7 +212,7 @@
self.assertListEqual([(test_payload, True)], test_consumer.calls)
def testBidirectionalStreamingEcho(self):
- """Tests sending multiple packets each way."""
+ """Tests sending multiple tickets each way."""
test_payload_template = 'test_payload: %03d'
test_payloads = [test_payload_template % i for i in range(STREAM_LENGTH)]
test_consumer = stream_testing.TestConsumer()
@@ -255,16 +255,16 @@
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
# the back could have experienced execution up to this point:
- # (1) Both packets are still either in the front waiting to be transmitted
+ # (1) Both tickets are still either in the front waiting to be transmitted
# or are somewhere on the link between the front and the back. The back has
# no idea that this test is even happening. Calling wait_for_idle on it
# would do no good because in this case the back is idle and the call would
- # return with the packets bound for it still in the front or on the link.
+ # return with the tickets bound for it still in the front or on the link.
back_operation_stats = self.back.operation_stats()
first_back_possibility = EMPTY_OUTCOME_DICT
- # (2) Both packets arrived within SMALL_TIMEOUT of one another at the back.
- # The back started processing based on the first packet and then stopped
- # upon receiving the cancellation packet.
+ # (2) Both tickets arrived within SMALL_TIMEOUT of one another at the back.
+ # The back started processing based on the first ticket and then stopped
+ # upon receiving the cancellation ticket.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
second_back_possibility[interfaces.Outcome.CANCELLED] = 1
self.assertIn(
diff --git a/src/python/src/grpc/framework/base/packets/null.py b/src/python/src/grpc/framework/base/null.py
similarity index 97%
rename from src/python/src/grpc/framework/base/packets/null.py
rename to src/python/src/grpc/framework/base/null.py
index 5a21212..1e30d45 100644
--- a/src/python/src/grpc/framework/base/packets/null.py
+++ b/src/python/src/grpc/framework/base/null.py
@@ -29,7 +29,7 @@
"""Null links that ignore tickets passed to them."""
-from grpc.framework.base.packets import interfaces
+from grpc.framework.base import interfaces
class _NullForeLink(interfaces.ForeLink):
diff --git a/src/python/src/grpc/framework/base/packets/_reception.py b/src/python/src/grpc/framework/base/packets/_reception.py
deleted file mode 100644
index ef10c7f..0000000
--- a/src/python/src/grpc/framework/base/packets/_reception.py
+++ /dev/null
@@ -1,400 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""State and behavior for packet reception."""
-
-import abc
-
-from grpc.framework.base import interfaces as base_interfaces
-from grpc.framework.base.packets import _interfaces
-from grpc.framework.base.packets import packets
-
-_INITIAL_FRONT_TO_BACK_PACKET_KINDS = (
- packets.FrontToBackPacket.Kind.COMMENCEMENT,
- packets.FrontToBackPacket.Kind.ENTIRE,
-)
-
-
-class _Receiver(object):
- """Common specification of different packet-handling behavior."""
- __metaclass__ = abc.ABCMeta
-
- @abc.abstractmethod
- def abort_if_abortive(self, packet):
- """Aborts the operation if the packet is abortive.
-
- Args:
- packet: A just-arrived packet.
-
- Returns:
- A boolean indicating whether or not this Receiver aborted the operation
- based on the packet.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def receive(self, packet):
- """Handles a just-arrived packet.
-
- Args:
- packet: A just-arrived packet.
-
- Returns:
- A boolean indicating whether or not the packet was terminal (i.e. whether
- or not non-abortive packets are legal after this one).
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def reception_failure(self):
- """Aborts the operation with an indication of reception failure."""
- raise NotImplementedError()
-
-
-def _abort(
- outcome, termination_manager, transmission_manager, ingestion_manager,
- expiration_manager):
- """Indicates abortion with the given outcome to the given managers."""
- termination_manager.abort(outcome)
- transmission_manager.abort(outcome)
- ingestion_manager.abort()
- expiration_manager.abort()
-
-
-def _abort_if_abortive(
- packet, abortive, termination_manager, transmission_manager,
- ingestion_manager, expiration_manager):
- """Determines a packet's being abortive and if so aborts the operation.
-
- Args:
- packet: A just-arrived packet.
- abortive: A callable that takes a packet and returns a
- base_interfaces.Outcome indicating that the operation should be aborted
- or None indicating that the operation should not be aborted.
- termination_manager: The operation's _interfaces.TerminationManager.
- transmission_manager: The operation's _interfaces.TransmissionManager.
- ingestion_manager: The operation's _interfaces.IngestionManager.
- expiration_manager: The operation's _interfaces.ExpirationManager.
-
- Returns:
- True if the operation was aborted; False otherwise.
- """
- abortion_outcome = abortive(packet)
- if abortion_outcome is None:
- return False
- else:
- _abort(
- abortion_outcome, termination_manager, transmission_manager,
- ingestion_manager, expiration_manager)
- return True
-
-
-def _reception_failure(
- termination_manager, transmission_manager, ingestion_manager,
- expiration_manager):
- """Aborts the operation with an indication of reception failure."""
- _abort(
- base_interfaces.Outcome.RECEPTION_FAILURE, termination_manager,
- transmission_manager, ingestion_manager, expiration_manager)
-
-
-class _BackReceiver(_Receiver):
- """Packet-handling specific to the back side of an operation."""
-
- def __init__(
- self, termination_manager, transmission_manager, ingestion_manager,
- expiration_manager):
- """Constructor.
-
- Args:
- termination_manager: The operation's _interfaces.TerminationManager.
- transmission_manager: The operation's _interfaces.TransmissionManager.
- ingestion_manager: The operation's _interfaces.IngestionManager.
- expiration_manager: The operation's _interfaces.ExpirationManager.
- """
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._ingestion_manager = ingestion_manager
- self._expiration_manager = expiration_manager
-
- self._first_packet_seen = False
- self._last_packet_seen = False
-
- def _abortive(self, packet):
- """Determines whether or not (and if so, how) a packet is abortive.
-
- Args:
- packet: A just-arrived packet.
-
- Returns:
- A base_interfaces.Outcome value describing operation abortion if the
- packet is abortive or None if the packet is not abortive.
- """
- if packet.kind is packets.FrontToBackPacket.Kind.CANCELLATION:
- return base_interfaces.Outcome.CANCELLED
- elif packet.kind is packets.FrontToBackPacket.Kind.EXPIRATION:
- return base_interfaces.Outcome.EXPIRED
- elif packet.kind is packets.FrontToBackPacket.Kind.SERVICED_FAILURE:
- return base_interfaces.Outcome.SERVICED_FAILURE
- elif packet.kind is packets.FrontToBackPacket.Kind.RECEPTION_FAILURE:
- return base_interfaces.Outcome.SERVICED_FAILURE
- elif (packet.kind in _INITIAL_FRONT_TO_BACK_PACKET_KINDS and
- self._first_packet_seen):
- return base_interfaces.Outcome.RECEPTION_FAILURE
- elif self._last_packet_seen:
- return base_interfaces.Outcome.RECEPTION_FAILURE
- else:
- return None
-
- def abort_if_abortive(self, packet):
- """See _Receiver.abort_if_abortive for specification."""
- return _abort_if_abortive(
- packet, self._abortive, self._termination_manager,
- self._transmission_manager, self._ingestion_manager,
- self._expiration_manager)
-
- def receive(self, packet):
- """See _Receiver.receive for specification."""
- if packet.timeout is not None:
- self._expiration_manager.change_timeout(packet.timeout)
-
- if packet.kind is packets.FrontToBackPacket.Kind.COMMENCEMENT:
- self._first_packet_seen = True
- self._ingestion_manager.start(packet.name)
- if packet.payload is not None:
- self._ingestion_manager.consume(packet.payload)
- elif packet.kind is packets.FrontToBackPacket.Kind.CONTINUATION:
- self._ingestion_manager.consume(packet.payload)
- elif packet.kind is packets.FrontToBackPacket.Kind.COMPLETION:
- self._last_packet_seen = True
- if packet.payload is None:
- self._ingestion_manager.terminate()
- else:
- self._ingestion_manager.consume_and_terminate(packet.payload)
- else:
- self._first_packet_seen = True
- self._last_packet_seen = True
- self._ingestion_manager.start(packet.name)
- if packet.payload is None:
- self._ingestion_manager.terminate()
- else:
- self._ingestion_manager.consume_and_terminate(packet.payload)
-
- def reception_failure(self):
- """See _Receiver.reception_failure for specification."""
- _reception_failure(
- self._termination_manager, self._transmission_manager,
- self._ingestion_manager, self._expiration_manager)
-
-
-class _FrontReceiver(_Receiver):
- """Packet-handling specific to the front side of an operation."""
-
- def __init__(
- self, termination_manager, transmission_manager, ingestion_manager,
- expiration_manager):
- """Constructor.
-
- Args:
- termination_manager: The operation's _interfaces.TerminationManager.
- transmission_manager: The operation's _interfaces.TransmissionManager.
- ingestion_manager: The operation's _interfaces.IngestionManager.
- expiration_manager: The operation's _interfaces.ExpirationManager.
- """
- self._termination_manager = termination_manager
- self._transmission_manager = transmission_manager
- self._ingestion_manager = ingestion_manager
- self._expiration_manager = expiration_manager
-
- self._last_packet_seen = False
-
- def _abortive(self, packet):
- """Determines whether or not (and if so, how) a packet is abortive.
-
- Args:
- packet: A just-arrived packet.
-
- Returns:
- A base_interfaces.Outcome value describing operation abortion if the
- packet is abortive or None if the packet is not abortive.
- """
- if packet.kind is packets.BackToFrontPacket.Kind.CANCELLATION:
- return base_interfaces.Outcome.CANCELLED
- elif packet.kind is packets.BackToFrontPacket.Kind.EXPIRATION:
- return base_interfaces.Outcome.EXPIRED
- elif packet.kind is packets.BackToFrontPacket.Kind.SERVICER_FAILURE:
- return base_interfaces.Outcome.SERVICER_FAILURE
- elif packet.kind is packets.BackToFrontPacket.Kind.RECEPTION_FAILURE:
- return base_interfaces.Outcome.SERVICER_FAILURE
- elif self._last_packet_seen:
- return base_interfaces.Outcome.RECEPTION_FAILURE
- else:
- return None
-
- def abort_if_abortive(self, packet):
- """See _Receiver.abort_if_abortive for specification."""
- return _abort_if_abortive(
- packet, self._abortive, self._termination_manager,
- self._transmission_manager, self._ingestion_manager,
- self._expiration_manager)
-
- def receive(self, packet):
- """See _Receiver.receive for specification."""
- if packet.kind is packets.BackToFrontPacket.Kind.CONTINUATION:
- self._ingestion_manager.consume(packet.payload)
- elif packet.kind is packets.BackToFrontPacket.Kind.COMPLETION:
- self._last_packet_seen = True
- if packet.payload is None:
- self._ingestion_manager.terminate()
- else:
- self._ingestion_manager.consume_and_terminate(packet.payload)
-
- def reception_failure(self):
- """See _Receiver.reception_failure for specification."""
- _reception_failure(
- self._termination_manager, self._transmission_manager,
- self._ingestion_manager, self._expiration_manager)
-
-
-class _ReceptionManager(_interfaces.ReceptionManager):
- """A ReceptionManager based around a _Receiver passed to it."""
-
- def __init__(self, lock, receiver):
- """Constructor.
-
- Args:
- lock: The operation-servicing-wide lock object.
- receiver: A _Receiver responsible for handling received packets.
- """
- self._lock = lock
- self._receiver = receiver
-
- self._lowest_unseen_sequence_number = 0
- self._out_of_sequence_packets = {}
- self._completed_sequence_number = None
- self._aborted = False
-
- def _sequence_failure(self, packet):
- """Determines a just-arrived packet's sequential legitimacy.
-
- Args:
- packet: A just-arrived packet.
-
- Returns:
- True if the packet is sequentially legitimate; False otherwise.
- """
- if packet.sequence_number < self._lowest_unseen_sequence_number:
- return True
- elif packet.sequence_number in self._out_of_sequence_packets:
- return True
- elif (self._completed_sequence_number is not None and
- self._completed_sequence_number <= packet.sequence_number):
- return True
- else:
- return False
-
- def _process(self, packet):
- """Process those packets ready to be processed.
-
- Args:
- packet: A just-arrived packet the sequence number of which matches this
- _ReceptionManager's _lowest_unseen_sequence_number field.
- """
- while True:
- completed = self._receiver.receive(packet)
- if completed:
- self._out_of_sequence_packets.clear()
- self._completed_sequence_number = packet.sequence_number
- self._lowest_unseen_sequence_number = packet.sequence_number + 1
- return
- else:
- next_packet = self._out_of_sequence_packets.pop(
- packet.sequence_number + 1, None)
- if next_packet is None:
- self._lowest_unseen_sequence_number = packet.sequence_number + 1
- return
- else:
- packet = next_packet
-
- def receive_packet(self, packet):
- """See _interfaces.ReceptionManager.receive_packet for specification."""
- with self._lock:
- if self._aborted:
- return
- elif self._sequence_failure(packet):
- self._receiver.reception_failure()
- self._aborted = True
- elif self._receiver.abort_if_abortive(packet):
- self._aborted = True
- elif packet.sequence_number == self._lowest_unseen_sequence_number:
- self._process(packet)
- else:
- self._out_of_sequence_packets[packet.sequence_number] = packet
-
-
-def front_reception_manager(
- lock, termination_manager, transmission_manager, ingestion_manager,
- expiration_manager):
- """Creates a _interfaces.ReceptionManager for front-side use.
-
- Args:
- lock: The operation-servicing-wide lock object.
- termination_manager: The operation's _interfaces.TerminationManager.
- transmission_manager: The operation's _interfaces.TransmissionManager.
- ingestion_manager: The operation's _interfaces.IngestionManager.
- expiration_manager: The operation's _interfaces.ExpirationManager.
-
- Returns:
- A _interfaces.ReceptionManager appropriate for front-side use.
- """
- return _ReceptionManager(
- lock, _FrontReceiver(
- termination_manager, transmission_manager, ingestion_manager,
- expiration_manager))
-
-
-def back_reception_manager(
- lock, termination_manager, transmission_manager, ingestion_manager,
- expiration_manager):
- """Creates a _interfaces.ReceptionManager for back-side use.
-
- Args:
- lock: The operation-servicing-wide lock object.
- termination_manager: The operation's _interfaces.TerminationManager.
- transmission_manager: The operation's _interfaces.TransmissionManager.
- ingestion_manager: The operation's _interfaces.IngestionManager.
- expiration_manager: The operation's _interfaces.ExpirationManager.
-
- Returns:
- A _interfaces.ReceptionManager appropriate for back-side use.
- """
- return _ReceptionManager(
- lock, _BackReceiver(
- termination_manager, transmission_manager, ingestion_manager,
- expiration_manager))
diff --git a/src/python/src/grpc/framework/base/packets/interfaces.py b/src/python/src/grpc/framework/base/packets/interfaces.py
deleted file mode 100644
index 7c48956..0000000
--- a/src/python/src/grpc/framework/base/packets/interfaces.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Interfaces defined and used by the base layer of RPC Framework."""
-
-import abc
-
-# packets is referenced from specifications in this module.
-from grpc.framework.base import interfaces
-from grpc.framework.base.packets import packets # pylint: disable=unused-import
-
-
-class ForeLink(object):
- """Accepts back-to-front tickets and emits front-to-back tickets."""
- __metaclass__ = abc.ABCMeta
-
- @abc.abstractmethod
- def accept_back_to_front_ticket(self, ticket):
- """Accept a packets.BackToFrontPacket.
-
- Args:
- ticket: Any packets.BackToFrontPacket.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def join_rear_link(self, rear_link):
- """Mates this object with a peer with which it will exchange tickets."""
- raise NotImplementedError()
-
-
-class RearLink(object):
- """Accepts front-to-back tickets and emits back-to-front tickets."""
- __metaclass__ = abc.ABCMeta
-
- @abc.abstractmethod
- def accept_front_to_back_ticket(self, ticket):
- """Accepts a packets.FrontToBackPacket.
-
- Args:
- ticket: Any packets.FrontToBackPacket.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
- def join_fore_link(self, fore_link):
- """Mates this object with a peer with which it will exchange tickets."""
- raise NotImplementedError()
-
-
-class Front(ForeLink, interfaces.Front):
- """Clientish objects that operate by sending and receiving tickets."""
- __metaclass__ = abc.ABCMeta
-
-
-class Back(RearLink, interfaces.Back):
- """Serverish objects that operate by sending and receiving tickets."""
- __metaclass__ = abc.ABCMeta
diff --git a/src/python/src/grpc/framework/base/packets/packets.py b/src/python/src/grpc/framework/base/packets/packets.py
deleted file mode 100644
index 1b14048..0000000
--- a/src/python/src/grpc/framework/base/packets/packets.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Packets used between fronts and backs."""
-
-import collections
-import enum
-
-# interfaces is referenced from specifications in this module.
-from grpc.framework.base import interfaces # pylint: disable=unused-import
-
-
-class FrontToBackPacket(
- collections.namedtuple(
- 'FrontToBackPacket',
- ['operation_id', 'sequence_number', 'kind', 'name', 'subscription',
- 'trace_id', 'payload', 'timeout'])):
- """A sum type for all values sent from a front to a back.
-
- Attributes:
- operation_id: A unique-with-respect-to-equality hashable object identifying
- a particular operation.
- sequence_number: A zero-indexed integer sequence number identifying the
- packet's place among all the packets sent from front to back for this
- particular operation. Must be zero if kind is Kind.COMMENCEMENT or
- Kind.ENTIRE. Must be positive for any other kind.
- kind: A Kind value describing the overall kind of ticket.
- name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
- or Kind.ENTIRE. Must be None for any other kind.
- subscription: An interfaces.ServicedSubscription.Kind value describing the
- interest the front has in packets sent from the back. Must be present if
- kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind.
- trace_id: A uuid.UUID identifying a set of related operations to which this
- operation belongs. May be None.
- payload: A customer payload object. Must be present if kind is
- Kind.CONTINUATION. Must be None if kind is Kind.CANCELLATION. May be None
- for any other kind.
- timeout: An optional length of time (measured from the beginning of the
- operation) to allow for the entire operation. If None, a default value on
- the back will be used. If present and excessively large, the back may
- limit the operation to a smaller duration of its choice. May be present
- for any ticket kind; setting a value on a later ticket allows fronts
- to request time extensions (or even time reductions!) on in-progress
- operations.
- """
-
- @enum.unique
- class Kind(enum.Enum):
- """Identifies the overall kind of a FrontToBackPacket."""
-
- COMMENCEMENT = 'commencement'
- CONTINUATION = 'continuation'
- COMPLETION = 'completion'
- ENTIRE = 'entire'
- CANCELLATION = 'cancellation'
- EXPIRATION = 'expiration'
- SERVICER_FAILURE = 'servicer failure'
- SERVICED_FAILURE = 'serviced failure'
- RECEPTION_FAILURE = 'reception failure'
- TRANSMISSION_FAILURE = 'transmission failure'
-
-
-class BackToFrontPacket(
- collections.namedtuple(
- 'BackToFrontPacket',
- ['operation_id', 'sequence_number', 'kind', 'payload'])):
- """A sum type for all values sent from a back to a front.
-
- Attributes:
- operation_id: A unique-with-respect-to-equality hashable object identifying
- a particular operation.
- sequence_number: A zero-indexed integer sequence number identifying the
- packet's place among all the packets sent from back to front for this
- particular operation.
- kind: A Kind value describing the overall kind of ticket.
- payload: A customer payload object. Must be present if kind is
- Kind.CONTINUATION. May be None if kind is Kind.COMPLETION. Must be None
- otherwise.
- """
-
- @enum.unique
- class Kind(enum.Enum):
- """Identifies the overall kind of a BackToFrontPacket."""
-
- CONTINUATION = 'continuation'
- COMPLETION = 'completion'
- CANCELLATION = 'cancellation'
- EXPIRATION = 'expiration'
- SERVICER_FAILURE = 'servicer failure'
- SERVICED_FAILURE = 'serviced failure'
- RECEPTION_FAILURE = 'reception failure'
- TRANSMISSION_FAILURE = 'transmission failure'
diff --git a/src/python/src/grpc/framework/face/demonstration.py b/src/python/src/grpc/framework/face/demonstration.py
index d922f6e..eabeac4 100644
--- a/src/python/src/grpc/framework/face/demonstration.py
+++ b/src/python/src/grpc/framework/face/demonstration.py
@@ -30,7 +30,7 @@
"""Demonstration-suitable implementation of the face layer of RPC Framework."""
from grpc.framework.base import util as _base_util
-from grpc.framework.base.packets import implementations as _tickets_implementations
+from grpc.framework.base import implementations as _base_implementations
from grpc.framework.face import implementations
from grpc.framework.foundation import logging_pool
@@ -105,9 +105,9 @@
event_stream_in_stream_out_methods=event_stream_in_stream_out_methods,
multi_method=multi_method)
- front = _tickets_implementations.front(
+ front = _base_implementations.front_link(
front_work_pool, front_transmission_pool, front_utility_pool)
- back = _tickets_implementations.back(
+ back = _base_implementations.back_link(
servicer, back_work_pool, back_transmission_pool, back_utility_pool,
default_timeout, _MAXIMUM_TIMEOUT)
front.join_rear_link(back)
diff --git a/src/python/src/grpc/framework/face/testing/base_util.py b/src/python/src/grpc/framework/face/testing/base_util.py
index 7872a6b..151d0ef 100644
--- a/src/python/src/grpc/framework/face/testing/base_util.py
+++ b/src/python/src/grpc/framework/face/testing/base_util.py
@@ -33,9 +33,9 @@
# interfaces is referenced from specification in this module.
from grpc.framework.base import util as _base_util
-from grpc.framework.base.packets import implementations
-from grpc.framework.base.packets import in_memory
-from grpc.framework.base.packets import interfaces # pylint: disable=unused-import
+from grpc.framework.base import implementations
+from grpc.framework.base import in_memory
+from grpc.framework.base import interfaces # pylint: disable=unused-import
from grpc.framework.foundation import logging_pool
_POOL_SIZE_LIMIT = 20
@@ -89,9 +89,9 @@
back_work_pool, back_transmission_pool, back_utility_pool)
link = in_memory.Link(link_pool)
- front = implementations.front(
+ front = implementations.front_link(
front_work_pool, front_transmission_pool, front_utility_pool)
- back = implementations.back(
+ back = implementations.back_link(
servicer, back_work_pool, back_transmission_pool, back_utility_pool,
default_timeout, _MAXIMUM_TIMEOUT)
front.join_rear_link(link)
diff --git a/src/python/src/grpc/framework/foundation/_logging_pool_test.py b/src/python/src/grpc/framework/foundation/_logging_pool_test.py
index 11463a8..c92cf8c 100644
--- a/src/python/src/grpc/framework/foundation/_logging_pool_test.py
+++ b/src/python/src/grpc/framework/foundation/_logging_pool_test.py
@@ -27,7 +27,7 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Tests for _framework.foundation.logging_pool."""
+"""Tests for grpc.framework.foundation.logging_pool."""
import unittest
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index 7d93aa7..bd70634 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -64,8 +64,8 @@
'grpc._junkdrawer',
'grpc.early_adopter',
'grpc.framework',
+ 'grpc.framework.alpha',
'grpc.framework.base',
- 'grpc.framework.base.packets',
'grpc.framework.common',
'grpc.framework.face',
'grpc.framework.face.testing',
diff --git a/src/ruby/bin/interop/interop_client.rb b/src/ruby/bin/interop/interop_client.rb
index b0b24b9..b2a8711 100755
--- a/src/ruby/bin/interop/interop_client.rb
+++ b/src/ruby/bin/interop/interop_client.rb
@@ -57,7 +57,7 @@
require 'signet/ssl_config'
-AUTH_ENV = Google::Auth::ServiceAccountCredentials::ENV_VAR
+AUTH_ENV = Google::Auth::CredentialsLoader::ENV_VAR
# loads the certificates used to access the test server securely.
def load_test_certs
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 7b69f1f..6256330 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -505,12 +505,12 @@
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
- SingleReqView = view_class(:cancelled, :deadline)
+ SingleReqView = view_class(:cancelled, :deadline, :metadata)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled, :deadline, :each_queued_msg,
- :each_remote_read)
+ :each_remote_read, :metadata)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index 5e3d3c9..2cb3d2e 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -81,6 +81,7 @@
active_call.run_server_bidi(mth)
end
send_status(active_call, OK, 'OK')
+ active_call.finished
rescue BadStatus => e
# this is raised by handlers that want GRPC to send an application
# error code and detail message.
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 12cb5c1..8914225 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -68,7 +68,7 @@
describe '#multi_req_view' do
xit 'exposes a fixed subset of the ActiveCall methods' do
- want = %w(cancelled, deadline, each_remote_read, shutdown)
+ want = %w(cancelled, deadline, each_remote_read, metadata, shutdown)
v = @client_call.multi_req_view
want.each do |w|
expect(v.methods.include?(w))
@@ -78,7 +78,7 @@
describe '#single_req_view' do
xit 'exposes a fixed subset of the ActiveCall methods' do
- want = %w(cancelled, deadline, shutdown)
+ want = %w(cancelled, deadline, metadata, shutdown)
v = @client_call.single_req_view
want.each do |w|
expect(v.methods.include?(w))
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index adf354f..73f2d37 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -434,7 +434,7 @@
end
expect(c.remote_read).to eq(expected_input)
replys.each { |r| c.remote_send(r) }
- c.send_status(status, status == @pass ? 'OK' : 'NOK')
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
end
end
@@ -444,7 +444,7 @@
c = expect_server_to_be_invoked(mtx, cnd)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
replys.each { |r| c.remote_send(r) }
- c.send_status(status, status == @pass ? 'OK' : 'NOK')
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
end
end
@@ -460,7 +460,7 @@
expect(c.remote_read).to eq(i)
end
end
- c.send_status(status, status == @pass ? 'OK' : 'NOK')
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
end
end
@@ -473,7 +473,7 @@
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
- c.send_status(status, status == @pass ? 'OK' : 'NOK')
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
end
end
@@ -486,7 +486,7 @@
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
- c.send_status(status, status == @pass ? 'OK' : 'NOK')
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
end
end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 8bff2a9..39d1e83 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -94,6 +94,7 @@
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).once.with(@ok_response)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
+ expect(@call).to receive(:finished).once
@request_response.run_server_method(@call, method(:fake_reqresp))
end
end
@@ -134,6 +135,7 @@
it 'sends a response and closes the stream if there no errors' do
expect(@call).to receive(:remote_send).once.with(@ok_response)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
+ expect(@call).to receive(:finished).once
@client_streamer.run_server_method(@call, method(:fake_clstream))
end
end
@@ -178,6 +180,7 @@
expect(@call).to receive(:remote_read).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
+ expect(@call).to receive(:finished).once
@server_streamer.run_server_method(@call, method(:fake_svstream))
end
end
@@ -207,6 +210,7 @@
it 'closes the stream if there no errors' do
expect(@call).to receive(:run_server_bidi)
expect(@call).to receive(:send_status).once.with(OK, 'OK')
+ expect(@call).to receive(:finished).once
@bidi_streamer.run_server_method(@call, method(:fake_bidistream))
end
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index d5421d4..f3b89b5 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -62,12 +62,15 @@
class EchoService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
+ attr_reader :received_md
def initialize(_default_var = 'ignored')
+ @received_md = []
end
- def an_rpc(req, _call)
+ def an_rpc(req, call)
logger.info('echo service received a request')
+ @received_md << call.metadata unless call.metadata.nil?
req
end
end
@@ -337,6 +340,38 @@
t.join
end
+ it 'should receive metadata sent as rpc keyword args', server: true do
+ service = EchoService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ stub = EchoStub.new(@host, **@client_opts)
+ expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
+ wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
+ expect(service.received_md).to eq(wanted_md)
+ @srv.stop
+ t.join
+ end
+
+ it 'should receive updated metadata', server: true do
+ service = EchoService.new
+ @srv.handle(service)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ @client_opts[:update_metadata] = proc do |md|
+ md[:k1] = 'updated-v1'
+ md
+ end
+ stub = EchoStub.new(@host, **@client_opts)
+ expect(stub.an_rpc(req, k1: 'v1', k2: 'v2')).to be_a(EchoMsg)
+ wanted_md = [{ 'k1' => 'updated-v1', 'k2' => 'v2' }]
+ expect(service.received_md).to eq(wanted_md)
+ @srv.stop
+ t.join
+ end
+
it 'should handle multiple parallel requests', server: true do
@srv.handle(EchoService)
Thread.new { @srv.run }
diff --git a/templates/Makefile.template b/templates/Makefile.template
index 148fa7c..6845b91 100644
--- a/templates/Makefile.template
+++ b/templates/Makefile.template
@@ -653,6 +653,11 @@
% endfor
+test_python: static_c
+ $(E) "[RUN] Testing python code"
+ $(Q) tools/run_tests/run_tests.py -lpython -c$(CONFIG)
+
+
tools: privatelibs\
% for tgt in targets:
% if tgt.build == 'tool':
diff --git a/test/compiler/python_plugin_test.py b/test/compiler/python_plugin_test.py
index 9cf3c62..3d2f117 100644
--- a/test/compiler/python_plugin_test.py
+++ b/test/compiler/python_plugin_test.py
@@ -39,7 +39,7 @@
import time
import unittest
-from grpc.early_adopter import exceptions
+from grpc.framework.alpha import exceptions
from grpc.framework.foundation import future
# Identifiers of entities we expect to find in the generated module.
diff --git a/test/cpp/end2end/async_end2end_test.cc b/test/cpp/end2end/async_end2end_test.cc
index 70df9e1..e011b78 100644
--- a/test/cpp/end2end/async_end2end_test.cc
+++ b/test/cpp/end2end/async_end2end_test.cc
@@ -76,6 +76,20 @@
EXPECT_EQ(tag(i), got_tag);
}
+void verify_timed_ok(CompletionQueue* cq, int i, bool expect_ok,
+ std::chrono::system_clock::time_point deadline =
+ std::chrono::system_clock::time_point::max(),
+ CompletionQueue::NextStatus expected_outcome =
+ CompletionQueue::GOT_EVENT) {
+ bool ok;
+ void* got_tag;
+ EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline), expected_outcome);
+ if (expected_outcome == CompletionQueue::GOT_EVENT) {
+ EXPECT_EQ(expect_ok, ok);
+ EXPECT_EQ(tag(i), got_tag);
+ }
+}
+
class AsyncEnd2endTest : public ::testing::Test {
protected:
AsyncEnd2endTest() : service_(&srv_cq_) {}
@@ -166,6 +180,50 @@
SendRpc(10);
}
+// Test a simple RPC using the async version of Next
+TEST_F(AsyncEnd2endTest, AsyncNextRpc) {
+ ResetStub();
+
+ EchoRequest send_request;
+ EchoRequest recv_request;
+ EchoResponse send_response;
+ EchoResponse recv_response;
+ Status recv_status;
+
+ ClientContext cli_ctx;
+ ServerContext srv_ctx;
+ grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
+
+ send_request.set_message("Hello");
+ std::unique_ptr<ClientAsyncResponseReader<EchoResponse> >
+ response_reader(stub_->AsyncEcho(&cli_ctx, send_request,
+ &cli_cq_, tag(1)));
+
+ std::chrono::system_clock::time_point
+ time_now(std::chrono::system_clock::now()),
+ time_limit(std::chrono::system_clock::now()+std::chrono::seconds(5));
+ verify_timed_ok(&srv_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
+ verify_timed_ok(&cli_cq_, -1, true, time_now, CompletionQueue::TIMEOUT);
+
+ service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, &srv_cq_,
+ tag(2));
+
+ verify_timed_ok(&srv_cq_, 2, true, time_limit);
+ EXPECT_EQ(send_request.message(), recv_request.message());
+ verify_timed_ok(&cli_cq_, 1, true, time_limit);
+
+ send_response.set_message(recv_request.message());
+ response_writer.Finish(send_response, Status::OK, tag(3));
+ verify_timed_ok(&srv_cq_, 3, true);
+
+ response_reader->Finish(&recv_response, &recv_status, tag(4));
+ verify_timed_ok(&cli_cq_, 4, true);
+
+ EXPECT_EQ(send_response.message(), recv_response.message());
+ EXPECT_TRUE(recv_status.IsOk());
+
+}
+
// Two pings and a final pong.
TEST_F(AsyncEnd2endTest, SimpleClientStreaming) {
ResetStub();
diff --git a/tools/dockerfile/grpc_dist_proto/Dockerfile b/tools/dockerfile/grpc_dist_proto/Dockerfile
new file mode 100644
index 0000000..b4ed3b6
--- /dev/null
+++ b/tools/dockerfile/grpc_dist_proto/Dockerfile
@@ -0,0 +1,76 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# Dockerfile to build protoc and plugins for inclusion in a release.
+FROM grpc/base
+
+# Add the file containing the gRPC version
+ADD version.txt version.txt
+
+# Install tools needed for building protoc.
+RUN apt-get update && apt-get -y install libgflags-dev libgtest-dev
+
+# Get the protobuf source from GitHub.
+RUN mkdir -p /var/local/git
+RUN git clone https://github.com/google/protobuf.git /var/local/git/protobuf
+
+# Build the protobuf library statically and install to /tmp/protoc_static.
+WORKDIR /var/local/git/protobuf
+RUN ./autogen.sh && \
+ ./configure --disable-shared --prefix=/tmp/protoc_static \
+ LDFLAGS="-lgcc_eh -static-libgcc -static-libstdc++" && \
+ make -j12 && make check && make install
+
+# Build the protobuf library dynamically and install to /usr/local.
+WORKDIR /var/local/git/protobuf
+RUN ./autogen.sh && \
+ ./configure --prefix=/usr/local && \
+ make -j12 && make check && make install
+
+# Build the grpc plugins.
+RUN git clone https://github.com/google/grpc.git /var/local/git/grpc
+WORKDIR /var/local/git/grpc
+RUN LDFLAGS=-static make plugins
+
+# Create an archive containing all the generated binaries.
+RUN mkdir /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m)
+RUN cp -v bins/opt/* /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m)
+RUN cp -v /tmp/protoc_static/bin/protoc /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m)
+RUN cd /tmp && \
+ tar -czf proto-bins_$(cat /version.txt)_linux-$(uname -m).tar.gz proto-bins_$(cat /version.txt)_linux-$(uname -m)
+
+# List the tar contents: provides a way to visually confirm that the contents
+# are correct.
+RUN echo 'proto-bins_$(cat /version.txt)_linux-tar-$(uname -m) contents:' && \
+ tar -ztf /tmp/proto-bins_$(cat /version.txt)_linux-$(uname -m).tar.gz
+
+
+
+
+
diff --git a/tools/dockerfile/grpc_dist_proto/version.txt b/tools/dockerfile/grpc_dist_proto/version.txt
new file mode 100644
index 0000000..8f0916f
--- /dev/null
+++ b/tools/dockerfile/grpc_dist_proto/version.txt
@@ -0,0 +1 @@
+0.5.0
diff --git a/tools/dockerfile/grpc_python/Dockerfile b/tools/dockerfile/grpc_python/Dockerfile
index fd07e9c..62ef785 100644
--- a/tools/dockerfile/grpc_python/Dockerfile
+++ b/tools/dockerfile/grpc_python/Dockerfile
@@ -54,7 +54,7 @@
&& python2.7 -B -m grpc._adapter._lonely_rear_link_test \
&& python2.7 -B -m grpc._adapter._low_test \
&& python2.7 -B -m grpc.early_adopter.implementations_test \
- && python2.7 -B -m grpc.framework.base.packets.implementations_test \
+ && python2.7 -B -m grpc.framework.base.implementations_test \
&& python2.7 -B -m grpc.framework.face.blocking_invocation_inline_service_test \
&& python2.7 -B -m grpc.framework.face.event_invocation_synchronous_event_service_test \
&& python2.7 -B -m grpc.framework.face.future_invocation_asynchronous_event_service_test \
diff --git a/tools/gce_setup/grpc_docker.sh b/tools/gce_setup/grpc_docker.sh
index 3deef05..2e721a8 100755
--- a/tools/gce_setup/grpc_docker.sh
+++ b/tools/gce_setup/grpc_docker.sh
@@ -560,7 +560,7 @@
_grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment
- # where they this func is used.
+ # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
local grpc_hosts grpc_gce_script_root
@@ -600,7 +600,7 @@
_grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment
- # where they this func is used.
+ # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
local grpc_hosts
@@ -645,7 +645,7 @@
# Shows the grpc servers on the GCE instance <server_name>
grpc_show_servers() {
# declare vars local so that they don't pollute the shell environment
- # where they this func is used.
+ # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# set by _grpc_show_servers
local host
@@ -663,6 +663,58 @@
gcloud compute $project_opt ssh $zone_opt $host --command "$cmd"
}
+_grpc_build_proto_bins_args() {
+ [[ -n $1 ]] && { # host
+ host=$1
+ shift
+ } || {
+ host='grpc-docker-builder'
+ }
+}
+
+# grpc_build_proto_bins
+#
+# - rebuilds the dist_proto docker image
+# * doing this builds the protoc and the ruby, python and cpp bins statically
+#
+# - runs a docker command that copies the built protos to the GCE host
+# - copies the built protos to the local machine
+grpc_build_proto_bins() {
+ _grpc_ensure_gcloud_ssh || return 1;
+
+ # declare vars local so that they don't pollute the shell environment
+ # where this func is used.
+ local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
+ # set by _grpc_build_proto_bins_args
+ local host
+
+ # set the project zone and check that all necessary args are provided
+ _grpc_set_project_and_zone -f _grpc_build_proto_bins_args "$@" || return 1
+ gce_has_instance $grpc_project $host || return 1;
+ local project_opt="--project $grpc_project"
+ local zone_opt="--zone $grpc_zone"
+
+ # rebuild the dist_proto image
+ local label='dist_proto'
+ grpc_update_image -- -h $host $label || return 1
+
+ # run a command to copy the generated archive to the docker host
+ local docker_prefix='sudo docker run -v /tmp:/tmp/proto_bins_out'
+ local tar_name='proto-bins*.tar.gz'
+ local cp_cmd="/bin/bash -c 'cp -v /tmp/$tar_name /tmp/proto_bins_out'"
+ local cmd="$docker_prefix grpc/$label $cp_cmd"
+ local ssh_cmd="bash -l -c \"$cmd\""
+ echo "will run:"
+ echo " $ssh_cmd"
+ echo "on $host"
+ gcloud compute $project_opt ssh $zone_opt $host --command "$cmd" || return 1
+
+ # copy the tar.gz locally
+ local rmt_tar="$host:/tmp/$tar_name"
+ local local_copy="$(pwd)"
+ gcloud compute copy-files $rmt_tar $local_copy $project_opt $zone_opt || return 1
+}
+
_grpc_launch_servers_args() {
[[ -n $1 ]] && { # host
host=$1
@@ -690,7 +742,7 @@
# If no servers are specified, it launches all known servers
grpc_launch_servers() {
# declare vars local so that they don't pollute the shell environment
- # where they this func is used.
+ # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# set by _grpc_launch_servers_args
local host servers
@@ -811,7 +863,7 @@
grpc_interop_test() {
_grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment
- # where they this func is used.
+ # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_interop_test_args
@@ -853,7 +905,7 @@
grpc_cloud_prod_test() {
_grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment
- # where they this func is used.
+ # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_cloud_prod_test_args
@@ -892,7 +944,7 @@
grpc_cloud_prod_auth_test() {
_grpc_ensure_gcloud_ssh || return 1;
# declare vars local so that they don't pollute the shell environment
- # where they this func is used.
+ # where this func is used.
local grpc_zone grpc_project dry_run # set by _grpc_set_project_and_zone
# grpc_cloud_prod_test_args
diff --git a/tools/gce_setup/interop_test_runner.sh b/tools/gce_setup/interop_test_runner.sh
index 430ad09..7f0b5ba 100755
--- a/tools/gce_setup/interop_test_runner.sh
+++ b/tools/gce_setup/interop_test_runner.sh
@@ -37,7 +37,7 @@
source grpc_docker.sh
test_cases=(large_unary empty_unary ping_pong client_streaming server_streaming cancel_after_begin cancel_after_first_response)
clients=(cxx java go ruby node python csharp_mono)
- servers=(cxx java go ruby node python)
+ servers=(cxx java go ruby node python csharp_mono)
for test_case in "${test_cases[@]}"
do
for client in "${clients[@]}"
diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh
index 0eba1c6..b145978 100755
--- a/tools/run_tests/build_python.sh
+++ b/tools/run_tests/build_python.sh
@@ -38,5 +38,5 @@
virtualenv -p /usr/bin/python2.7 python2.7_virtual_environment
source python2.7_virtual_environment/bin/activate
pip install enum34==1.0.4 futures==2.2.0 protobuf==3.0.0-alpha-1
-CFLAGS=-I$root/include LDFLAGS=-L$root/libs/opt pip install src/python/src
+CFLAGS=-I$root/include LDFLAGS=-L$root/libs/$CONFIG pip install src/python/src
pip install src/python/interop
diff --git a/tools/run_tests/jobset.py b/tools/run_tests/jobset.py
index 26caf03..81cdd0e 100755
--- a/tools/run_tests/jobset.py
+++ b/tools/run_tests/jobset.py
@@ -192,7 +192,7 @@
self._tempfile.seek(0)
stdout = self._tempfile.read()
message('FAILED', '%s [ret=%d]' % (
- self._spec.shortname, self._process.returncode), stdout)
+ self._spec.shortname, self._process.returncode), stdout, do_newline=True)
else:
self._state = _SUCCESS
message('PASSED', '%s [time=%.1fsec]' % (self._spec.shortname, elapsed),
@@ -200,7 +200,7 @@
if self._bin_hash:
update_cache.finished(self._spec.identity(), self._bin_hash)
elif self._state == _RUNNING and time.time() - self._start > 300:
- message('TIMEOUT', self._spec.shortname, do_newline=self._travis)
+ message('TIMEOUT', self._spec.shortname, do_newline=True)
self.kill()
return self._state
diff --git a/tools/run_tests/python_tests.json b/tools/run_tests/python_tests.json
index ef483d9..dff0537 100755
--- a/tools/run_tests/python_tests.json
+++ b/tools/run_tests/python_tests.json
@@ -27,7 +27,7 @@
"module": "grpc.early_adopter.implementations_test"
},
{
- "module": "grpc.framework.base.packets.implementations_test"
+ "module": "grpc.framework.base.implementations_test"
},
{
"module": "grpc.framework.face.blocking_invocation_inline_service_test"
diff --git a/tools/run_tests/run_python.sh b/tools/run_tests/run_python.sh
index fa1497a..f0e091f 100755
--- a/tools/run_tests/run_python.sh
+++ b/tools/run_tests/run_python.sh
@@ -34,6 +34,6 @@
cd $(dirname $0)/../..
root=`pwd`
-export LD_LIBRARY_PATH=$root/libs/opt
+export LD_LIBRARY_PATH=$root/libs/$CONFIG
source python2.7_virtual_environment/bin/activate
python2.7 -B $*