Adding C++ auth metadata processor.

- We always do the processing asynchronously but maintain a synchronous
  API for the implementor of the processor.
- there are a lot of string copies right now. Having a StringPiece
  object in grpc++ would really help with that (as we would use it for
  C++ metadata).
- Please review the API carefully and if you're happy with it, I'll
  proceed with tests.
diff --git a/include/grpc++/auth_context.h b/include/grpc++/auth_context.h
index f8ea8ad..0b5f856 100644
--- a/include/grpc++/auth_context.h
+++ b/include/grpc++/auth_context.h
@@ -87,6 +87,11 @@
   // Iteration over all the properties.
   virtual AuthPropertyIterator begin() const = 0;
   virtual AuthPropertyIterator end() const = 0;
+
+  // Mutation functions: should only be used by an AuthMetadataProcessor.
+  virtual void AddProperty(const grpc::string& key,
+                           const grpc::string& value) = 0;
+  virtual bool SetPeerIdentityPropertyName(const grpc::string& name) = 0;
 };
 
 }  // namespace grpc
diff --git a/include/grpc++/auth_metadata_processor.h b/include/grpc++/auth_metadata_processor.h
new file mode 100644
index 0000000..e077ec0
--- /dev/null
+++ b/include/grpc++/auth_metadata_processor.h
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *     * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPCXX_AUTH_METADATA_PROCESSOR_H_
+#define GRPCXX_AUTH_METADATA_PROCESSOR_H_
+
+#include <map>
+#include <string>
+
+#include <grpc++/auth_context.h>
+
+namespace grpc {
+
+class AuthMetadataProcessor {
+ public:
+  virtual ~AuthMetadataProcessor() {}
+
+  // context is read/write: it contains the properties of the channel peer and
+  // it is the job of the Process method to augment it with properties derived
+  // from the passed-in auth_metadata.
+  virtual bool Process(
+      std::multimap<grpc::string, grpc::string>& auth_metadata,
+      AuthContext* context,
+      std::multimap<grpc::string, grpc::string>* consumed_auth_metadata) = 0;
+};
+
+}  // namespace grpc
+
+#endif  // GRPCXX_AUTH_METADATA_PROCESSOR_H_
+
diff --git a/include/grpc++/server_credentials.h b/include/grpc++/server_credentials.h
index 11acd67..d540b95 100644
--- a/include/grpc++/server_credentials.h
+++ b/include/grpc++/server_credentials.h
@@ -38,6 +38,7 @@
 #include <vector>
 
 #include <grpc++/config.h>
+#include <grpc++/auth_metadata_processor.h>
 
 struct grpc_server;
 
@@ -54,6 +55,10 @@
 
   virtual int AddPortToServer(const grpc::string& addr,
                               grpc_server* server) = 0;
+
+  // Has to be called before the server is started.
+  virtual void SetAuthMetadataProcessor(
+      const std::shared_ptr<AuthMetadataProcessor>& processor) = 0;
 };
 
 // Options to create ServerCredentials with SSL
diff --git a/src/cpp/common/secure_auth_context.cc b/src/cpp/common/secure_auth_context.cc
index 87d7bab..dc41083 100644
--- a/src/cpp/common/secure_auth_context.cc
+++ b/src/cpp/common/secure_auth_context.cc
@@ -93,4 +93,16 @@
   return AuthPropertyIterator();
 }
 
+void SecureAuthContext::AddProperty(const grpc::string& key,
+                                    const grpc::string& value) {
+  if (!ctx_) return;
+  grpc_auth_context_add_property(ctx_, key.c_str(), value.data(), value.size());
+}
+
+bool SecureAuthContext::SetPeerIdentityPropertyName(const grpc::string& name) {
+  if (!ctx_) return false;
+  return grpc_auth_context_set_peer_identity_property_name(ctx_,
+                                                           name.c_str()) != 0;
+}
+
 }  // namespace grpc
diff --git a/src/cpp/common/secure_auth_context.h b/src/cpp/common/secure_auth_context.h
index 264ed62..1b27bf5 100644
--- a/src/cpp/common/secure_auth_context.h
+++ b/src/cpp/common/secure_auth_context.h
@@ -57,6 +57,12 @@
 
   AuthPropertyIterator end() const GRPC_OVERRIDE;
 
+  void AddProperty(const grpc::string& key,
+                   const grpc::string& value) GRPC_OVERRIDE;
+
+  virtual bool SetPeerIdentityPropertyName(const grpc::string& name)
+      GRPC_OVERRIDE;
+
  private:
   grpc_auth_context* ctx_;
 };
diff --git a/src/cpp/server/insecure_server_credentials.cc b/src/cpp/server/insecure_server_credentials.cc
index 800cd36..9645847 100644
--- a/src/cpp/server/insecure_server_credentials.cc
+++ b/src/cpp/server/insecure_server_credentials.cc
@@ -43,6 +43,8 @@
                       grpc_server* server) GRPC_OVERRIDE {
     return grpc_server_add_insecure_http2_port(server, addr.c_str());
   }
+  void SetAuthMetadataProcessor(
+      const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE {}
 };
 }  // namespace
 
diff --git a/src/cpp/server/secure_server_credentials.cc b/src/cpp/server/secure_server_credentials.cc
index 32c45e2..bdb7ba6 100644
--- a/src/cpp/server/secure_server_credentials.cc
+++ b/src/cpp/server/secure_server_credentials.cc
@@ -31,15 +31,65 @@
  *
  */
 
+#include <functional>
+#include <map>
+#include <memory>
+
+
+#include "src/cpp/common/secure_auth_context.h"
 #include "src/cpp/server/secure_server_credentials.h"
 
+#include <grpc++/auth_metadata_processor.h>
+
 namespace grpc {
 
+void AuthMetadataProcessorAyncWrapper::Process(
+    void* self, grpc_auth_context* context, const grpc_metadata* md,
+    size_t md_count, grpc_process_auth_metadata_done_cb cb, void* user_data) {
+  AuthMetadataProcessorAyncWrapper* instance =
+      reinterpret_cast<AuthMetadataProcessorAyncWrapper*>(self);
+  instance->thread_pool_->Add(
+      std::bind(&AuthMetadataProcessorAyncWrapper::ProcessAsync, instance,
+                context, md, md_count, cb, user_data));
+}
+
+void AuthMetadataProcessorAyncWrapper::ProcessAsync(
+    grpc_auth_context* ctx, const grpc_metadata* md, size_t md_count,
+    grpc_process_auth_metadata_done_cb cb, void* user_data) {
+  SecureAuthContext context(ctx);
+  std::multimap<grpc::string, grpc::string> metadata;
+  for (size_t i = 0; i < md_count; i++) {
+    metadata.insert(std::make_pair(
+        md[i].key, grpc::string(md[i].value, md[i].value_length)));
+  }
+  std::multimap<grpc::string, grpc::string> consumed_metadata;
+  bool ok = processor_->Process(metadata, &context, &consumed_metadata);
+  if (ok) {
+    std::vector<grpc_metadata> consumed_md(consumed_metadata.size());
+    for (const auto& entry : consumed_metadata) {
+      consumed_md.push_back({entry.first.c_str(),
+                             entry.second.data(),
+                             entry.second.size(),
+                             {{nullptr, nullptr, nullptr}}});
+    }
+    cb(user_data, &consumed_md[0], consumed_md.size(), 1);
+  } else {
+    cb(user_data, nullptr, 0, 0);
+  }
+}
+
 int SecureServerCredentials::AddPortToServer(
     const grpc::string& addr, grpc_server* server) {
   return grpc_server_add_secure_http2_port(server, addr.c_str(), creds_);
 }
 
+void SecureServerCredentials::SetAuthMetadataProcessor(
+    const std::shared_ptr<AuthMetadataProcessor>& processor) {
+  processor_.reset(new AuthMetadataProcessorAyncWrapper(processor));
+  grpc_server_credentials_set_auth_metadata_processor(
+      creds_, {AuthMetadataProcessorAyncWrapper::Process, processor_.get()});
+}
+
 std::shared_ptr<ServerCredentials> SslServerCredentials(
     const SslServerCredentialsOptions& options) {
   std::vector<grpc_ssl_pem_key_cert_pair> pem_key_cert_pairs;
diff --git a/src/cpp/server/secure_server_credentials.h b/src/cpp/server/secure_server_credentials.h
index b9803f1..2707336 100644
--- a/src/cpp/server/secure_server_credentials.h
+++ b/src/cpp/server/secure_server_credentials.h
@@ -34,12 +34,33 @@
 #ifndef GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H
 #define GRPC_INTERNAL_CPP_SERVER_SECURE_SERVER_CREDENTIALS_H
 
+#include <memory>
+
 #include <grpc/grpc_security.h>
 
 #include <grpc++/server_credentials.h>
+#include <grpc++/thread_pool_interface.h>
 
 namespace grpc {
 
+class AuthMetadataProcessorAyncWrapper GRPC_FINAL {
+ public:
+  static void Process(void* self, grpc_auth_context* context,
+                      const grpc_metadata* md, size_t md_count,
+                      grpc_process_auth_metadata_done_cb cb, void* user_data);
+
+  AuthMetadataProcessorAyncWrapper(
+      const std::shared_ptr<AuthMetadataProcessor>& processor)
+      : thread_pool_(CreateDefaultThreadPool()), processor_(processor) {}
+
+ private:
+  void ProcessAsync(grpc_auth_context* context, const grpc_metadata* md,
+                    size_t md_count, grpc_process_auth_metadata_done_cb cb,
+                    void* user_data);
+  std::unique_ptr<ThreadPoolInterface> thread_pool_;
+  std::shared_ptr<AuthMetadataProcessor> processor_;
+};
+
 class SecureServerCredentials GRPC_FINAL : public ServerCredentials {
  public:
   explicit SecureServerCredentials(grpc_server_credentials* creds)
@@ -51,8 +72,12 @@
   int AddPortToServer(const grpc::string& addr,
                       grpc_server* server) GRPC_OVERRIDE;
 
+  void SetAuthMetadataProcessor(
+      const std::shared_ptr<AuthMetadataProcessor>& processor) GRPC_OVERRIDE;
+
  private:
-  grpc_server_credentials* const creds_;
+  grpc_server_credentials* creds_;
+  std::unique_ptr<AuthMetadataProcessorAyncWrapper> processor_;
 };
 
 }  // namespace grpc
diff --git a/test/cpp/common/secure_auth_context_test.cc b/test/cpp/common/secure_auth_context_test.cc
index 075d4ce..e208189 100644
--- a/test/cpp/common/secure_auth_context_test.cc
+++ b/test/cpp/common/secure_auth_context_test.cc
@@ -57,12 +57,12 @@
 
 TEST_F(SecureAuthContextTest, Properties) {
   grpc_auth_context* ctx = grpc_auth_context_create(NULL);
-  grpc_auth_context_add_cstring_property(ctx, "name", "chapi");
-  grpc_auth_context_add_cstring_property(ctx, "name", "chapo");
-  grpc_auth_context_add_cstring_property(ctx, "foo", "bar");
-  EXPECT_EQ(1, grpc_auth_context_set_peer_identity_property_name(ctx, "name"));
-
   SecureAuthContext context(ctx);
+  context.AddProperty("name", "chapi");
+  context.AddProperty("name", "chapo");
+  context.AddProperty("foo", "bar");
+  EXPECT_TRUE(context.SetPeerIdentityPropertyName("name"));
+
   std::vector<grpc::string> peer_identity = context.GetPeerIdentity();
   EXPECT_EQ(2u, peer_identity.size());
   EXPECT_EQ("chapi", peer_identity[0]);
@@ -75,12 +75,12 @@
 
 TEST_F(SecureAuthContextTest, Iterators) {
   grpc_auth_context* ctx = grpc_auth_context_create(NULL);
-  grpc_auth_context_add_cstring_property(ctx, "name", "chapi");
-  grpc_auth_context_add_cstring_property(ctx, "name", "chapo");
-  grpc_auth_context_add_cstring_property(ctx, "foo", "bar");
-  EXPECT_EQ(1, grpc_auth_context_set_peer_identity_property_name(ctx, "name"));
-
   SecureAuthContext context(ctx);
+  context.AddProperty("name", "chapi");
+  context.AddProperty("name", "chapo");
+  context.AddProperty("foo", "bar");
+  EXPECT_TRUE(context.SetPeerIdentityPropertyName("name"));
+
   AuthPropertyIterator iter = context.begin();
   EXPECT_TRUE(context.end() != iter);
   AuthProperty p0 = *iter;