Add custom_metadata test case
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index cb9b396..8e81e43 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -74,6 +74,7 @@
               "oauth2_auth_token: raw oauth2 access token auth; "
               "per_rpc_creds: raw oauth2 access token on a single rpc; "
               "status_code_and_message: verify status code & message; "
+              "custom_metadata: server will echo custom metadata;"
               "all : all of above.");
 DEFINE_string(default_service_account, "",
               "Email of GCE default service account");
@@ -129,6 +130,8 @@
     client.DoPerRpcCreds(json_key);
   } else if (FLAGS_test_case == "status_code_and_message") {
     client.DoStatusWithMessage();
+  } else if (FLAGS_test_case == "custom_metadata") {
+    client.DoCustomMetadata();
   } else if (FLAGS_test_case == "all") {
     client.DoEmpty();
     client.DoLargeUnary();
@@ -142,6 +145,7 @@
     client.DoTimeoutOnSleepingServer();
     client.DoEmptyStream();
     client.DoStatusWithMessage();
+    client.DoCustomMetadata();
     // service_account_creds and jwt_token_creds can only run with ssl.
     if (FLAGS_use_tls) {
       grpc::string json_key = GetServiceAccountJsonKey();
@@ -159,7 +163,7 @@
         "server_compressed_streaming|half_duplex|ping_pong|cancel_after_begin|"
         "cancel_after_first_response|timeout_on_sleeping_server|empty_stream|"
         "compute_engine_creds|jwt_token_creds|oauth2_auth_token|per_rpc_creds",
-        FLAGS_test_case.c_str());
+        "status_code_and_message|custom_metadata", FLAGS_test_case.c_str());
     ret = 1;
   }
 
diff --git a/test/cpp/interop/interop_client.cc b/test/cpp/interop/interop_client.cc
index b063107..46f6fda 100644
--- a/test/cpp/interop/interop_client.cc
+++ b/test/cpp/interop/interop_client.cc
@@ -625,5 +625,77 @@
   gpr_log(GPR_DEBUG, "Done testing Status and Message");
 }
 
+void InteropClient::DoCustomMetadata() {
+  const grpc::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
+  const grpc::string kInitialMetadataValue("test_initial_metadata_value");
+  const grpc::string kEchoTrailingBinMetadataKey(
+      "x-grpc-test-echo-trailing-bin");
+  const grpc::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
+  ;
+
+  {
+    gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
+    ClientContext context;
+    context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
+    context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
+    SimpleRequest request;
+    SimpleResponse response;
+    request.set_response_size(kLargeResponseSize);
+    grpc::string payload(kLargeRequestSize, '\0');
+    request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
+
+    Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
+    AssertOkOrPrintErrorStatus(s);
+    const auto& server_initial_metadata = context.GetServerInitialMetadata();
+    auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
+    GPR_ASSERT(iter != server_initial_metadata.end());
+    GPR_ASSERT(iter->second.data() == kInitialMetadataValue);
+    const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
+    iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
+    GPR_ASSERT(iter != server_trailing_metadata.end());
+    GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) ==
+               kTrailingBinValue);
+
+    gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
+  }
+
+  {
+    gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
+    ClientContext context;
+    context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
+    context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
+    std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
+                                       StreamingOutputCallResponse>>
+        stream(serviceStub_.Get()->FullDuplexCall(&context));
+
+    StreamingOutputCallRequest request;
+    request.set_response_type(PayloadType::COMPRESSABLE);
+    ResponseParameters* response_parameter = request.add_response_parameters();
+    response_parameter->set_size(kLargeResponseSize);
+    grpc::string payload(kLargeRequestSize, '\0');
+    request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
+    StreamingOutputCallResponse response;
+    GPR_ASSERT(stream->Write(request));
+    stream->WritesDone();
+    GPR_ASSERT(stream->Read(&response));
+    GPR_ASSERT(response.payload().body() ==
+               grpc::string(kLargeResponseSize, '\0'));
+    GPR_ASSERT(!stream->Read(&response));
+    Status s = stream->Finish();
+    AssertOkOrPrintErrorStatus(s);
+    const auto& server_initial_metadata = context.GetServerInitialMetadata();
+    auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
+    GPR_ASSERT(iter != server_initial_metadata.end());
+    GPR_ASSERT(iter->second.data() == kInitialMetadataValue);
+    const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
+    iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
+    GPR_ASSERT(iter != server_trailing_metadata.end());
+    GPR_ASSERT(grpc::string(iter->second.begin(), iter->second.end()) ==
+               kTrailingBinValue);
+
+    gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
+  }
+}
+
 }  // namespace testing
 }  // namespace grpc
diff --git a/test/cpp/interop/interop_client.h b/test/cpp/interop/interop_client.h
index 3ecd380..92f57f7 100644
--- a/test/cpp/interop/interop_client.h
+++ b/test/cpp/interop/interop_client.h
@@ -75,6 +75,7 @@
   void DoTimeoutOnSleepingServer();
   void DoEmptyStream();
   void DoStatusWithMessage();
+  void DoCustomMetadata();
   // Auth tests.
   // username is a string containing the user email
   void DoJwtTokenCreds(const grpc::string& username);
diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc
index cdca060..bf57632 100644
--- a/test/cpp/interop/server.cc
+++ b/test/cpp/interop/server.cc
@@ -80,6 +80,26 @@
 static bool got_sigint = false;
 static const char* kRandomFile = "test/cpp/interop/rnd.dat";
 
+const char kEchoInitialMetadataKey[] = "x-grpc-test-echo-initial";
+const char kEchoTrailingBinMetadataKey[] = "x-grpc-test-echo-trailing-bin";
+
+void MaybeEchoMetadata(ServerContext* context) {
+  const auto& client_metadata = context->client_metadata();
+  GPR_ASSERT(client_metadata.count(kEchoInitialMetadataKey) <= 1);
+  GPR_ASSERT(client_metadata.count(kEchoTrailingBinMetadataKey) <= 1);
+
+  auto iter = client_metadata.find(kEchoInitialMetadataKey);
+  if (iter != client_metadata.end()) {
+    context->AddInitialMetadata(kEchoInitialMetadataKey, iter->second.data());
+  }
+  iter = client_metadata.find(kEchoTrailingBinMetadataKey);
+  if (iter != client_metadata.end()) {
+    context->AddTrailingMetadata(
+        kEchoTrailingBinMetadataKey,
+        grpc::string(iter->second.begin(), iter->second.end()));
+  }
+}
+
 bool SetPayload(PayloadType type, int size, Payload* payload) {
   PayloadType response_type;
   if (type == PayloadType::RANDOM) {
@@ -135,6 +155,7 @@
 
   Status UnaryCall(ServerContext* context, const SimpleRequest* request,
                    SimpleResponse* response) {
+    MaybeEchoMetadata(context);
     SetResponseCompression(context, *request);
     if (request->response_size() > 0) {
       if (!SetPayload(request->response_type(), request->response_size(),
@@ -192,6 +213,7 @@
       ServerContext* context,
       ServerReaderWriter<StreamingOutputCallResponse,
                          StreamingOutputCallRequest>* stream) {
+    MaybeEchoMetadata(context);
     StreamingOutputCallRequest request;
     StreamingOutputCallResponse response;
     bool write_success = true;