Api_core: Convert 'DatetimeWithNanos' to / from 'google.protobuf.timestamp_pb2.Timestamp' (#6919)

Toward #6547.
diff --git a/google/api_core/datetime_helpers.py b/google/api_core/datetime_helpers.py
index 3f3523b..b0d9105 100644
--- a/google/api_core/datetime_helpers.py
+++ b/google/api_core/datetime_helpers.py
@@ -20,6 +20,8 @@
 
 import pytz
 
+from google.protobuf import timestamp_pb2
+
 
 _UTC_EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=pytz.utc)
 _RFC3339_MICROS = "%Y-%m-%dT%H:%M:%S.%fZ"
@@ -263,3 +265,39 @@
             nanosecond=nanos,
             tzinfo=pytz.UTC,
         )
+
+    def timestamp_pb(self):
+        """Return a timestamp message.
+
+        Returns:
+            (:class:`~google.protobuf.timestamp_pb2.Timestamp`): Timestamp message
+        """
+        inst = self if self.tzinfo is not None else self.replace(tzinfo=pytz.UTC)
+        delta = inst - _UTC_EPOCH
+        seconds = int(delta.total_seconds())
+        nanos = self._nanosecond or self.microsecond * 1000
+        return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
+
+    @classmethod
+    def from_timestamp_pb(cls, stamp):
+        """Parse RFC 3339-compliant timestamp, preserving nanoseconds.
+
+        Args:
+            stamp (:class:`~google.protobuf.timestamp_pb2.Timestamp`): timestamp message
+
+        Returns:
+            :class:`DatetimeWithNanoseconds`:
+                an instance matching the timestamp message
+        """
+        microseconds = int(stamp.seconds * 1e6)
+        bare = from_microseconds(microseconds)
+        return cls(
+            bare.year,
+            bare.month,
+            bare.day,
+            bare.hour,
+            bare.minute,
+            bare.second,
+            nanosecond=stamp.nanos,
+            tzinfo=pytz.UTC,
+        )
diff --git a/tests/unit/test_datetime_helpers.py b/tests/unit/test_datetime_helpers.py
index 2f99235..e5220ae 100644
--- a/tests/unit/test_datetime_helpers.py
+++ b/tests/unit/test_datetime_helpers.py
@@ -12,12 +12,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import calendar
 import datetime
 
 import pytest
 import pytz
 
 from google.api_core import datetime_helpers
+from google.protobuf import timestamp_pb2
 
 
 ONE_MINUTE_IN_MICROSECONDS = 60 * 1e6
@@ -154,93 +156,159 @@
     assert datetime_helpers.to_rfc3339(value, ignore_zone=True) == expected
 
 
-def test_datetimewithnanos_ctor_wo_nanos():
-    stamp = datetime_helpers.DatetimeWithNanoseconds(2016, 12, 20, 21, 13, 47, 123456)
-    assert stamp.year == 2016
-    assert stamp.month == 12
-    assert stamp.day == 20
-    assert stamp.hour == 21
-    assert stamp.minute == 13
-    assert stamp.second == 47
-    assert stamp.microsecond == 123456
-    assert stamp.nanosecond == 0
+class Test_DateTimeWithNanos(object):
 
+    @staticmethod
+    def test_ctor_wo_nanos():
+        stamp = datetime_helpers.DatetimeWithNanoseconds(2016, 12, 20, 21, 13, 47, 123456)
+        assert stamp.year == 2016
+        assert stamp.month == 12
+        assert stamp.day == 20
+        assert stamp.hour == 21
+        assert stamp.minute == 13
+        assert stamp.second == 47
+        assert stamp.microsecond == 123456
+        assert stamp.nanosecond == 0
 
-def test_datetimewithnanos_ctor_w_nanos():
-    stamp = datetime_helpers.DatetimeWithNanoseconds(
-        2016, 12, 20, 21, 13, 47, nanosecond=123456789
-    )
-    assert stamp.year == 2016
-    assert stamp.month == 12
-    assert stamp.day == 20
-    assert stamp.hour == 21
-    assert stamp.minute == 13
-    assert stamp.second == 47
-    assert stamp.microsecond == 123456
-    assert stamp.nanosecond == 123456789
-
-
-def test_datetimewithnanos_ctor_w_micros_positional_and_nanos():
-    with pytest.raises(TypeError):
-        datetime_helpers.DatetimeWithNanoseconds(
-            2016, 12, 20, 21, 13, 47, 123456, nanosecond=123456789
+    @staticmethod
+    def test_ctor_w_nanos():
+        stamp = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, nanosecond=123456789
         )
+        assert stamp.year == 2016
+        assert stamp.month == 12
+        assert stamp.day == 20
+        assert stamp.hour == 21
+        assert stamp.minute == 13
+        assert stamp.second == 47
+        assert stamp.microsecond == 123456
+        assert stamp.nanosecond == 123456789
 
+    @staticmethod
+    def test_ctor_w_micros_positional_and_nanos():
+        with pytest.raises(TypeError):
+            datetime_helpers.DatetimeWithNanoseconds(
+                2016, 12, 20, 21, 13, 47, 123456, nanosecond=123456789
+            )
 
-def test_datetimewithnanos_ctor_w_micros_keyword_and_nanos():
-    with pytest.raises(TypeError):
-        datetime_helpers.DatetimeWithNanoseconds(
-            2016, 12, 20, 21, 13, 47, microsecond=123456, nanosecond=123456789
+    @staticmethod
+    def test_ctor_w_micros_keyword_and_nanos():
+        with pytest.raises(TypeError):
+            datetime_helpers.DatetimeWithNanoseconds(
+                2016, 12, 20, 21, 13, 47, microsecond=123456, nanosecond=123456789
+            )
+
+    @staticmethod
+    def test_rfc3339_wo_nanos():
+        stamp = datetime_helpers.DatetimeWithNanoseconds(2016, 12, 20, 21, 13, 47, 123456)
+        assert stamp.rfc3339() == "2016-12-20T21:13:47.123456Z"
+
+    @staticmethod
+    def test_rfc3339_w_nanos():
+        stamp = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, nanosecond=123456789
         )
+        assert stamp.rfc3339() == "2016-12-20T21:13:47.123456789Z"
+
+    @staticmethod
+    def test_rfc3339_w_nanos_no_trailing_zeroes():
+        stamp = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, nanosecond=100000000
+        )
+        assert stamp.rfc3339() == "2016-12-20T21:13:47.1Z"
+
+    @staticmethod
+    def test_from_rfc3339_w_invalid():
+        stamp = "2016-12-20T21:13:47"
+        with pytest.raises(ValueError):
+            datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(stamp)
+
+    @staticmethod
+    def test_from_rfc3339_wo_fraction():
+        timestamp = "2016-12-20T21:13:47Z"
+        expected = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, tzinfo=pytz.UTC
+        )
+        stamp = datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(timestamp)
+        assert stamp == expected
+
+    @staticmethod
+    def test_from_rfc3339_w_partial_precision():
+        timestamp = "2016-12-20T21:13:47.1Z"
+        expected = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, microsecond=100000, tzinfo=pytz.UTC
+        )
+        stamp = datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(timestamp)
+        assert stamp == expected
+
+    @staticmethod
+    def test_from_rfc3339_w_full_precision():
+        timestamp = "2016-12-20T21:13:47.123456789Z"
+        expected = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, nanosecond=123456789, tzinfo=pytz.UTC
+        )
+        stamp = datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(timestamp)
+        assert stamp == expected
+
+    @staticmethod
+    def test_timestamp_pb_wo_nanos_naive():
+        stamp = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, 123456)
+        delta = stamp.replace(tzinfo=pytz.UTC) - datetime_helpers._UTC_EPOCH
+        seconds = int(delta.total_seconds())
+        nanos = 123456000
+        timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
+        assert stamp.timestamp_pb() == timestamp
+
+    @staticmethod
+    def test_timestamp_pb_w_nanos():
+        stamp = datetime_helpers.DatetimeWithNanoseconds(
+            2016, 12, 20, 21, 13, 47, nanosecond=123456789, tzinfo=pytz.UTC
+        )
+        delta = stamp - datetime_helpers._UTC_EPOCH
+        timestamp = timestamp_pb2.Timestamp(
+            seconds=int(delta.total_seconds()), nanos=123456789)
+        assert stamp.timestamp_pb() == timestamp
+
+    @staticmethod
+    def test_from_timestamp_pb_wo_nanos():
+        when = datetime.datetime(2016, 12, 20, 21, 13, 47, 123456, tzinfo=pytz.UTC)
+        delta = when - datetime_helpers._UTC_EPOCH
+        seconds = int(delta.total_seconds())
+        timestamp = timestamp_pb2.Timestamp(seconds=seconds)
+
+        stamp = datetime_helpers.DatetimeWithNanoseconds.from_timestamp_pb(
+            timestamp)
+
+        assert _to_seconds(when) == _to_seconds(stamp)
+        assert stamp.microsecond == 0
+        assert stamp.nanosecond == 0
+        assert stamp.tzinfo == pytz.UTC
+
+    @staticmethod
+    def test_from_timestamp_pb_w_nanos():
+        when = datetime.datetime(2016, 12, 20, 21, 13, 47, 123456, tzinfo=pytz.UTC)
+        delta = when - datetime_helpers._UTC_EPOCH
+        seconds = int(delta.total_seconds())
+        timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=123456789)
+
+        stamp = datetime_helpers.DatetimeWithNanoseconds.from_timestamp_pb(
+            timestamp)
+
+        assert _to_seconds(when) == _to_seconds(stamp)
+        assert stamp.microsecond == 123456
+        assert stamp.nanosecond == 123456789
+        assert stamp.tzinfo == pytz.UTC
 
 
-def test_datetimewithnanos_rfc339_wo_nanos():
-    stamp = datetime_helpers.DatetimeWithNanoseconds(2016, 12, 20, 21, 13, 47, 123456)
-    assert stamp.rfc3339() == "2016-12-20T21:13:47.123456Z"
+def _to_seconds(value):
+    """Convert a datetime to seconds since the unix epoch.
 
+    Args:
+        value (datetime.datetime): The datetime to covert.
 
-def test_datetimewithnanos_rfc339_w_nanos():
-    stamp = datetime_helpers.DatetimeWithNanoseconds(
-        2016, 12, 20, 21, 13, 47, nanosecond=123456789
-    )
-    assert stamp.rfc3339() == "2016-12-20T21:13:47.123456789Z"
-
-
-def test_datetimewithnanos_rfc339_w_nanos_no_trailing_zeroes():
-    stamp = datetime_helpers.DatetimeWithNanoseconds(
-        2016, 12, 20, 21, 13, 47, nanosecond=100000000
-    )
-    assert stamp.rfc3339() == "2016-12-20T21:13:47.1Z"
-
-
-def test_datetimewithnanos_from_rfc3339_w_invalid():
-    stamp = "2016-12-20T21:13:47"
-    with pytest.raises(ValueError):
-        datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(stamp)
-
-
-def test_datetimewithnanos_from_rfc3339_wo_fraction():
-    timestamp = "2016-12-20T21:13:47Z"
-    expected = datetime_helpers.DatetimeWithNanoseconds(
-        2016, 12, 20, 21, 13, 47, tzinfo=pytz.UTC
-    )
-    stamp = datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(timestamp)
-    assert stamp == expected
-
-
-def test_datetimewithnanos_from_rfc3339_w_partial_precision():
-    timestamp = "2016-12-20T21:13:47.1Z"
-    expected = datetime_helpers.DatetimeWithNanoseconds(
-        2016, 12, 20, 21, 13, 47, microsecond=100000, tzinfo=pytz.UTC
-    )
-    stamp = datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(timestamp)
-    assert stamp == expected
-
-
-def test_datetimewithnanos_from_rfc3339_w_full_precision():
-    timestamp = "2016-12-20T21:13:47.123456789Z"
-    expected = datetime_helpers.DatetimeWithNanoseconds(
-        2016, 12, 20, 21, 13, 47, nanosecond=123456789, tzinfo=pytz.UTC
-    )
-    stamp = datetime_helpers.DatetimeWithNanoseconds.from_rfc3339(timestamp)
-    assert stamp == expected
+    Returns:
+        int: Microseconds since the unix epoch.
+    """
+    assert value.tzinfo is pytz.UTC
+    return calendar.timegm(value.timetuple())