feat: add mtls feature (#917)

diff --git a/googleapiclient/discovery.py b/googleapiclient/discovery.py
index 609cead..115609f 100644
--- a/googleapiclient/discovery.py
+++ b/googleapiclient/discovery.py
@@ -47,6 +47,13 @@
 import httplib2
 import uritemplate
 import google.api_core.client_options
+from google.auth.transport import mtls
+from google.auth.exceptions import MutualTLSChannelError
+
+try:
+    import google_auth_httplib2
+except ImportError:  # pragma: NO COVER
+    google_auth_httplib2 = None
 
 # Local imports
 from googleapiclient import _auth
@@ -132,7 +139,7 @@
 
   Returns:
     The name with '_' appended if the name is a reserved word and '$' and '-'
-    replaced with '_'. 
+    replaced with '_'.
   """
     name = name.replace("$", "_").replace("-", "_")
     if keyword.iskeyword(name) or name in RESERVED_WORDS:
@@ -178,6 +185,8 @@
     cache_discovery=True,
     cache=None,
     client_options=None,
+    adc_cert_path=None,
+    adc_key_path=None,
 ):
     """Construct a Resource for interacting with an API.
 
@@ -206,9 +215,21 @@
       cache object for the discovery documents.
     client_options: Dictionary or google.api_core.client_options, Client options to set user
       options on the client. API endpoint should be set through client_options.
+      client_cert_source is not supported, client cert should be provided using
+      client_encrypted_cert_source instead.
+    adc_cert_path: str, client certificate file path to save the application
+      default client certificate for mTLS. This field is required if you want to
+      use the default client certificate.
+    adc_key_path: str, client encrypted private key file path to save the
+      application default client encrypted private key for mTLS. This field is
+      required if you want to use the default client certificate.
 
   Returns:
     A Resource object with methods for interacting with the service.
+
+  Raises:
+    google.auth.exceptions.MutualTLSChannelError: if there are any problems
+      setting up mutual TLS channel.
   """
     params = {"api": serviceName, "apiVersion": version}
 
@@ -232,7 +253,9 @@
                 model=model,
                 requestBuilder=requestBuilder,
                 credentials=credentials,
-                client_options=client_options
+                client_options=client_options,
+                adc_cert_path=adc_cert_path,
+                adc_key_path=adc_key_path,
             )
         except HttpError as e:
             if e.resp.status == http_client.NOT_FOUND:
@@ -309,7 +332,9 @@
     model=None,
     requestBuilder=HttpRequest,
     credentials=None,
-    client_options=None
+    client_options=None,
+    adc_cert_path=None,
+    adc_key_path=None,
 ):
     """Create a Resource for interacting with an API.
 
@@ -336,9 +361,21 @@
       authentication.
     client_options: Dictionary or google.api_core.client_options, Client options to set user
       options on the client. API endpoint should be set through client_options.
+      client_cert_source is not supported, client cert should be provided using
+      client_encrypted_cert_source instead.
+    adc_cert_path: str, client certificate file path to save the application
+      default client certificate for mTLS. This field is required if you want to
+      use the default client certificate.
+    adc_key_path: str, client encrypted private key file path to save the
+      application default client encrypted private key for mTLS. This field is
+      required if you want to use the default client certificate.
 
   Returns:
     A Resource object with methods for interacting with the service.
+
+  Raises:
+    google.auth.exceptions.MutualTLSChannelError: if there are any problems
+      setting up mutual TLS channel.
   """
 
     if http is not None and credentials is not None:
@@ -349,7 +386,7 @@
     elif isinstance(service, six.binary_type):
         service = json.loads(service.decode("utf-8"))
 
-    if "rootUrl" not in service and (isinstance(http, (HttpMock, HttpMockSequence))):
+    if "rootUrl" not in service and isinstance(http, (HttpMock, HttpMockSequence)):
         logger.error(
             "You are using HttpMock or HttpMockSequence without"
             + "having the service discovery doc in cache. Try calling "
@@ -359,12 +396,10 @@
         raise InvalidJsonError()
 
     # If an API Endpoint is provided on client options, use that as the base URL
-    base = urljoin(service['rootUrl'], service["servicePath"])
+    base = urljoin(service["rootUrl"], service["servicePath"])
     if client_options:
         if type(client_options) == dict:
-            client_options = google.api_core.client_options.from_dict(
-                client_options
-            )
+            client_options = google.api_core.client_options.from_dict(client_options)
         if client_options.api_endpoint:
             base = client_options.api_endpoint
 
@@ -400,6 +435,52 @@
         else:
             http = build_http()
 
+        # Obtain client cert and create mTLS http channel if cert exists.
+        client_cert_to_use = None
+        if client_options and client_options.client_cert_source:
+            raise MutualTLSChannelError(
+                "ClientOptions.client_cert_source is not supported, please use ClientOptions.client_encrypted_cert_source."
+            )
+        if client_options and client_options.client_encrypted_cert_source:
+            client_cert_to_use = client_options.client_encrypted_cert_source
+        elif adc_cert_path and adc_key_path and mtls.has_default_client_cert_source():
+            client_cert_to_use = mtls.default_client_encrypted_cert_source(
+                adc_cert_path, adc_key_path
+            )
+        if client_cert_to_use:
+            cert_path, key_path, passphrase = client_cert_to_use()
+
+            # The http object we built could be google_auth_httplib2.AuthorizedHttp
+            # or httplib2.Http. In the first case we need to extract the wrapped
+            # httplib2.Http object from google_auth_httplib2.AuthorizedHttp.
+            http_channel = (
+                http.http
+                if google_auth_httplib2
+                and isinstance(http, google_auth_httplib2.AuthorizedHttp)
+                else http
+            )
+            http_channel.add_certificate(key_path, cert_path, "", passphrase)
+
+        # If user doesn't provide api endpoint via client options, decide which
+        # api endpoint to use.
+        if "mtlsRootUrl" in service and (
+            not client_options or not client_options.api_endpoint
+        ):
+            mtls_endpoint = urljoin(service["mtlsRootUrl"], service["servicePath"])
+            use_mtls_env = os.getenv("GOOGLE_API_USE_MTLS", "Never")
+
+            if not use_mtls_env in ("Never", "Auto", "Always"):
+                raise MutualTLSChannelError(
+                    "Unsupported GOOGLE_API_USE_MTLS value. Accepted values: Never, Auto, Always"
+                )
+
+            # Switch to mTLS endpoint, if environment variable is "Always", or
+            # environment varibable is "Auto" and client cert exists.
+            if use_mtls_env == "Always" or (
+                use_mtls_env == "Auto" and client_cert_to_use
+            ):
+                base = mtls_endpoint
+
     if model is None:
         features = service.get("features", [])
         model = JsonModel("dataWrapper" in features)
diff --git a/noxfile.py b/noxfile.py
index 6523b32..ced9cf5 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -19,6 +19,7 @@
     "google-auth",
     "google-auth-httplib2",
     "mox",
+    "parameterized",
     "pyopenssl",
     "pytest",
     "pytest-cov",
@@ -54,6 +55,10 @@
     ],
 )
 def unit(session, oauth2client):
+    session.install(
+        "-e",
+        "git+https://github.com/googleapis/python-api-core.git@master#egg=google-api-core",
+    )
     session.install(*test_dependencies)
     session.install(oauth2client)
     if session.python < "3.0":
@@ -75,4 +80,4 @@
         "--cov-fail-under=85",
         "tests",
         *session.posargs,
-    )
\ No newline at end of file
+    )
diff --git a/setup.py b/setup.py
index 4c740e6..6554583 100644
--- a/setup.py
+++ b/setup.py
@@ -39,7 +39,7 @@
     # currently upgrade their httplib2 version.
     # Please see https://github.com/googleapis/google-api-python-client/pull/841
     "httplib2>=0.9.2,<1dev",
-    "google-auth>=1.4.1",
+    "google-auth>=1.16.0",
     "google-auth-httplib2>=0.0.3",
     "google-api-core>=1.13.0,<2dev",
     "six>=1.6.1,<2dev",
diff --git a/tests/data/bigquery.json b/tests/data/bigquery.json
index c9f63e3..2bfa173 100644
--- a/tests/data/bigquery.json
+++ b/tests/data/bigquery.json
@@ -19,6 +19,7 @@
  "baseUrl": "https://www.googleapis.com/bigquery/v2/",
  "basePath": "/bigquery/v2/",
  "rootUrl": "https://www.googleapis.com/",
+ "mtlsRootUrl": "https://www.mtls.googleapis.com/",
  "servicePath": "bigquery/v2/",
  "batchPath": "batch",
  "parameters": {
diff --git a/tests/data/drive.json b/tests/data/drive.json
index af7b244..100831f 100644
--- a/tests/data/drive.json
+++ b/tests/data/drive.json
@@ -19,6 +19,7 @@
  "baseUrl": "https://www.googleapis.com/drive/v3/",
  "basePath": "/drive/v3/",
  "rootUrl": "https://www.googleapis.com/",
+ "mtlsRootUrl": "https://www.mtls.googleapis.com/",
  "servicePath": "drive/v3/",
  "batchPath": "batch",
  "parameters": {
diff --git a/tests/data/latitude.json b/tests/data/latitude.json
index 7717f90..84b7aca 100644
--- a/tests/data/latitude.json
+++ b/tests/data/latitude.json
@@ -14,6 +14,7 @@
  "protocol": "rest",
  "basePath": "/latitude/v1/",
  "rootUrl": "https://www.googleapis.com/",
+ "mtlsRootUrl": "https://www.mtls.googleapis.com/",
  "servicePath": "latitude/v1/",
  "auth": {
   "oauth2": {
diff --git a/tests/data/logging.json b/tests/data/logging.json
index b702ea8..6bcb3b7 100644
--- a/tests/data/logging.json
+++ b/tests/data/logging.json
@@ -2086,5 +2086,6 @@
   "ownerName": "Google",
   "version": "v2",
   "rootUrl": "https://logging.googleapis.com/",
+  "mtlsRootUrl": "https://logging.mtls.googleapis.com/",
   "kind": "discovery#restDescription"
 }
diff --git a/tests/data/plus.json b/tests/data/plus.json
index 36d3ae9..8e4815e 100644
--- a/tests/data/plus.json
+++ b/tests/data/plus.json
@@ -16,6 +16,7 @@
  "protocol": "rest",
  "basePath": "/plus/v1/",
  "rootUrl": "https://www.googleapis.com/",
+ "mtlsRootUrl": "https://www.mtls.googleapis.com/",
  "servicePath": "plus/v1/",
  "parameters": {
   "alt": {
diff --git a/tests/data/tasks.json b/tests/data/tasks.json
index ee7be10..ada88cc 100644
--- a/tests/data/tasks.json
+++ b/tests/data/tasks.json
@@ -16,6 +16,7 @@
  "protocol": "rest",
  "basePath": "/tasks/v1/",
  "rootUrl": "https://www.googleapis.com/",
+ "mtlsRootUrl": "https://www.mtls.googleapis.com/",
  "servicePath": "tasks/v1/",
  "parameters": {
   "alt": {
diff --git a/tests/data/zoo.json b/tests/data/zoo.json
index 3a4c775..978da3c 100644
--- a/tests/data/zoo.json
+++ b/tests/data/zoo.json
@@ -6,6 +6,7 @@
  "basePath": "/zoo/",
  "batchPath": "batchZoo",
  "rootUrl": "https://www.googleapis.com/",
+ "mtlsRootUrl": "https://www.mtls.googleapis.com/",
  "servicePath": "zoo/v1/",
  "rpcPath": "/rpc",
  "parameters": {
diff --git a/tests/test_discovery.py b/tests/test_discovery.py
index a07e861..9015066 100644
--- a/tests/test_discovery.py
+++ b/tests/test_discovery.py
@@ -39,9 +39,12 @@
 import sys
 import unittest2 as unittest
 
+from parameterized import parameterized
 import mock
 
 import google.auth.credentials
+from google.auth.transport import mtls
+from google.auth.exceptions import MutualTLSChannelError
 import google_auth_httplib2
 from googleapiclient.discovery import _fix_up_media_upload
 from googleapiclient.discovery import _fix_up_method_description
@@ -224,7 +227,11 @@
         final_max_size,
         final_media_path_url,
     ):
-        fake_root_desc = {"rootUrl": "http://root/", "servicePath": "fake/"}
+        fake_root_desc = {
+            "rootUrl": "http://root/",
+            "servicePath": "fake/",
+            "mtlsRootUrl": "http://root/",
+        }
         fake_path_url = "fake-path/"
 
         accept, max_size, media_path_url = _fix_up_media_upload(
@@ -445,7 +452,7 @@
             base="https://www.googleapis.com/",
             credentials=self.MOCK_CREDENTIALS,
         )
-        self.assertTrue(plus is not None)
+        self.assertIsNotNone(plus)
         self.assertTrue(hasattr(plus, "activities"))
 
     def test_can_build_from_local_deserialized_document(self):
@@ -456,7 +463,7 @@
             base="https://www.googleapis.com/",
             credentials=self.MOCK_CREDENTIALS,
         )
-        self.assertTrue(plus is not None)
+        self.assertIsNotNone(plus)
         self.assertTrue(hasattr(plus, "activities"))
 
     def test_building_with_base_remembers_base(self):
@@ -522,9 +529,7 @@
             api_endpoint=api_endpoint
         )
         plus = build_from_document(
-            discovery,
-            client_options=options,
-            credentials=self.MOCK_CREDENTIALS
+            discovery, client_options=options, credentials=self.MOCK_CREDENTIALS
         )
 
         self.assertEqual(plus._baseUrl, api_endpoint)
@@ -533,14 +538,161 @@
         discovery = open(datafile("plus.json")).read()
         api_endpoint = "https://foo.googleapis.com/"
         plus = build_from_document(
-            discovery, 
+            discovery,
             client_options={"api_endpoint": api_endpoint},
-            credentials=self.MOCK_CREDENTIALS
+            credentials=self.MOCK_CREDENTIALS,
         )
 
         self.assertEqual(plus._baseUrl, api_endpoint)
 
 
+REGULAR_ENDPOINT = "https://www.googleapis.com/plus/v1/"
+MTLS_ENDPOINT = "https://www.mtls.googleapis.com/plus/v1/"
+
+
+class DiscoveryFromDocumentMutualTLS(unittest.TestCase):
+    MOCK_CREDENTIALS = mock.Mock(spec=google.auth.credentials.Credentials)
+    ADC_CERT_PATH = "adc_cert_path"
+    ADC_KEY_PATH = "adc_key_path"
+    ADC_PASSPHRASE = "adc_passphrase"
+
+    def check_http_client_cert(self, resource, has_client_cert=False):
+        if isinstance(resource._http, google_auth_httplib2.AuthorizedHttp):
+            certs = list(resource._http.http.certificates.iter(""))
+        else:
+            certs = list(resource._http.certificates.iter(""))
+        if has_client_cert:
+            self.assertEqual(len(certs), 1)
+            self.assertEqual(
+                certs[0], (self.ADC_KEY_PATH, self.ADC_CERT_PATH, self.ADC_PASSPHRASE)
+            )
+        else:
+            self.assertEqual(len(certs), 0)
+
+    def client_encrypted_cert_source(self):
+        return self.ADC_CERT_PATH, self.ADC_KEY_PATH, self.ADC_PASSPHRASE
+
+    def test_mtls_not_trigger_if_http_provided(self):
+        discovery = open(datafile("plus.json")).read()
+        plus = build_from_document(discovery, http=httplib2.Http())
+        self.assertIsNotNone(plus)
+        self.assertEqual(plus._baseUrl, REGULAR_ENDPOINT)
+        self.check_http_client_cert(plus, has_client_cert=False)
+
+    def test_exception_with_client_cert_source(self):
+        discovery = open(datafile("plus.json")).read()
+        with self.assertRaises(MutualTLSChannelError):
+            build_from_document(
+                discovery,
+                credentials=self.MOCK_CREDENTIALS,
+                client_options={"client_cert_source": mock.Mock()},
+            )
+
+    @parameterized.expand(
+        [
+            ("Never", REGULAR_ENDPOINT),
+            ("Auto", MTLS_ENDPOINT),
+            ("Always", MTLS_ENDPOINT),
+        ]
+    )
+    def test_mtls_with_provided_client_cert(self, use_mtls_env, base_url):
+        discovery = open(datafile("plus.json")).read()
+
+        with mock.patch.dict("os.environ", {"GOOGLE_API_USE_MTLS": use_mtls_env}):
+            plus = build_from_document(
+                discovery,
+                credentials=self.MOCK_CREDENTIALS,
+                client_options={
+                    "client_encrypted_cert_source": self.client_encrypted_cert_source
+                },
+            )
+            self.assertIsNotNone(plus)
+            self.check_http_client_cert(plus, has_client_cert=True)
+            self.assertEqual(plus._baseUrl, base_url)
+
+    @parameterized.expand(["Never", "Auto", "Always"])
+    def test_endpoint_not_switch(self, use_mtls_env):
+        # Test endpoint is not switched if user provided api endpoint
+        discovery = open(datafile("plus.json")).read()
+
+        with mock.patch.dict("os.environ", {"GOOGLE_API_USE_MTLS": use_mtls_env}):
+            plus = build_from_document(
+                discovery,
+                credentials=self.MOCK_CREDENTIALS,
+                client_options={
+                    "api_endpoint": "https://foo.googleapis.com",
+                    "client_encrypted_cert_source": self.client_encrypted_cert_source,
+                },
+            )
+            self.assertIsNotNone(plus)
+            self.check_http_client_cert(plus, has_client_cert=True)
+            self.assertEqual(plus._baseUrl, "https://foo.googleapis.com")
+
+    @parameterized.expand(
+        [
+            ("Never", REGULAR_ENDPOINT),
+            ("Auto", MTLS_ENDPOINT),
+            ("Always", MTLS_ENDPOINT),
+        ]
+    )
+    @mock.patch(
+        "google.auth.transport.mtls.has_default_client_cert_source", autospec=True
+    )
+    @mock.patch(
+        "google.auth.transport.mtls.default_client_encrypted_cert_source", autospec=True
+    )
+    def test_mtls_with_default_client_cert(
+        self,
+        use_mtls_env,
+        base_url,
+        default_client_encrypted_cert_source,
+        has_default_client_cert_source,
+    ):
+        has_default_client_cert_source.return_value = True
+        default_client_encrypted_cert_source.return_value = (
+            self.client_encrypted_cert_source
+        )
+        discovery = open(datafile("plus.json")).read()
+
+        with mock.patch.dict("os.environ", {"GOOGLE_API_USE_MTLS": use_mtls_env}):
+            plus = build_from_document(
+                discovery,
+                credentials=self.MOCK_CREDENTIALS,
+                adc_cert_path=self.ADC_CERT_PATH,
+                adc_key_path=self.ADC_KEY_PATH,
+            )
+            self.assertIsNotNone(plus)
+            self.check_http_client_cert(plus, has_client_cert=True)
+            self.assertEqual(plus._baseUrl, base_url)
+
+    @parameterized.expand(
+        [
+            ("Never", REGULAR_ENDPOINT),
+            ("Auto", REGULAR_ENDPOINT),
+            ("Always", MTLS_ENDPOINT),
+        ]
+    )
+    @mock.patch(
+        "google.auth.transport.mtls.has_default_client_cert_source", autospec=True
+    )
+    def test_mtls_with_no_client_cert(
+        self, use_mtls_env, base_url, has_default_client_cert_source
+    ):
+        has_default_client_cert_source.return_value = False
+        discovery = open(datafile("plus.json")).read()
+
+        with mock.patch.dict("os.environ", {"GOOGLE_API_USE_MTLS": use_mtls_env}):
+            plus = build_from_document(
+                discovery,
+                credentials=self.MOCK_CREDENTIALS,
+                adc_cert_path=self.ADC_CERT_PATH,
+                adc_key_path=self.ADC_KEY_PATH,
+            )
+            self.assertIsNotNone(plus)
+            self.check_http_client_cert(plus, has_client_cert=False)
+            self.assertEqual(plus._baseUrl, base_url)
+
+
 class DiscoveryFromHttp(unittest.TestCase):
     def setUp(self):
         self.old_environ = os.environ.copy()
@@ -648,7 +800,6 @@
 
 
 class DiscoveryFromAppEngineCache(unittest.TestCase):
-
     def setUp(self):
         self.old_environ = os.environ.copy()
         os.environ["APPENGINE_RUNTIME"] = "python27"