blob: e36a96fcc6cee69af6171b8c439aeceb3263e4bb [file] [log] [blame]
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc.
# http://code.google.com/p/protobuf/
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test for google.protobuf.internal.decoder."""
__author__ = 'robinson@google.com (Will Robinson)'
import struct
import unittest
from google.protobuf.internal import wire_format
from google.protobuf.internal import encoder
from google.protobuf.internal import decoder
import logging
import mox
from google.protobuf.internal import input_stream
from google.protobuf import message
class DecoderTest(unittest.TestCase):
def setUp(self):
self.mox = mox.Mox()
self.mock_stream = self.mox.CreateMock(input_stream.InputStream)
self.mock_message = self.mox.CreateMock(message.Message)
def testReadFieldNumberAndWireType(self):
# Test field numbers that will require various varint sizes.
for expected_field_number in (1, 15, 16, 2047, 2048):
for expected_wire_type in range(6): # Highest-numbered wiretype is 5.
e = encoder.Encoder()
e._AppendTag(expected_field_number, expected_wire_type)
s = e.ToString()
d = decoder.Decoder(s)
field_number, wire_type = d.ReadFieldNumberAndWireType()
self.assertEqual(expected_field_number, field_number)
self.assertEqual(expected_wire_type, wire_type)
def ReadScalarTestHelper(self, test_name, decoder_method, expected_result,
expected_stream_method_name,
stream_method_return, *args):
"""Helper for testReadScalars below.
Calls one of the Decoder.Read*() methods and ensures that the results are
as expected.
Args:
test_name: Name of this test, used for logging only.
decoder_method: Unbound decoder.Decoder method to call.
expected_result: Value we expect returned from decoder_method().
expected_stream_method_name: (string) Name of the InputStream
method we expect Decoder to call to actually read the value
on the wire.
stream_method_return: Value our mocked-out stream method should
return to the decoder.
args: Additional arguments that we expect to be passed to the
stream method.
"""
logging.info('Testing %s scalar input.\n'
'Calling %r(), and expecting that to call the '
'stream method %s(%r), which will return %r. Finally, '
'expecting the Decoder method to return %r'% (
test_name, decoder_method,
expected_stream_method_name, args, stream_method_return,
expected_result))
d = decoder.Decoder('')
d._stream = self.mock_stream
if decoder_method in (decoder.Decoder.ReadString,
decoder.Decoder.ReadBytes):
self.mock_stream.ReadVarUInt32().AndReturn(len(stream_method_return))
# We have to use names instead of methods to work around some
# mox weirdness. (ResetAll() is overzealous).
expected_stream_method = getattr(self.mock_stream,
expected_stream_method_name)
expected_stream_method(*args).AndReturn(stream_method_return)
self.mox.ReplayAll()
self.assertEqual(expected_result, decoder_method(d))
self.mox.VerifyAll()
self.mox.ResetAll()
def testReadScalars(self):
test_string = 'I can feel myself getting sutpider.'
scalar_tests = [
['int32', decoder.Decoder.ReadInt32, 0, 'ReadVarint32', 0],
['int64', decoder.Decoder.ReadInt64, 0, 'ReadVarint64', 0],
['uint32', decoder.Decoder.ReadUInt32, 0, 'ReadVarUInt32', 0],
['uint64', decoder.Decoder.ReadUInt64, 0, 'ReadVarUInt64', 0],
['fixed32', decoder.Decoder.ReadFixed32, 0xffffffff,
'ReadLittleEndian32', 0xffffffff],
['fixed64', decoder.Decoder.ReadFixed64, 0xffffffffffffffff,
'ReadLittleEndian64', 0xffffffffffffffff],
['sfixed32', decoder.Decoder.ReadSFixed32, -1,
'ReadLittleEndian32', 0xffffffff],
['sfixed64', decoder.Decoder.ReadSFixed64, -1,
'ReadLittleEndian64', 0xffffffffffffffff],
['float', decoder.Decoder.ReadFloat, 0.0,
'ReadString', struct.pack('f', 0.0), 4],
['double', decoder.Decoder.ReadDouble, 0.0,
'ReadString', struct.pack('d', 0.0), 8],
['bool', decoder.Decoder.ReadBool, True, 'ReadVarUInt32', 1],
['enum', decoder.Decoder.ReadEnum, 23, 'ReadVarUInt32', 23],
['string', decoder.Decoder.ReadString,
test_string, 'ReadString', test_string, len(test_string)],
['bytes', decoder.Decoder.ReadBytes,
test_string, 'ReadString', test_string, len(test_string)],
# We test zigzag decoding routines more extensively below.
['sint32', decoder.Decoder.ReadSInt32, -1, 'ReadVarUInt32', 1],
['sint64', decoder.Decoder.ReadSInt64, -1, 'ReadVarUInt64', 1],
]
# Ensure that we're testing different Decoder methods and using
# different test names in all test cases above.
self.assertEqual(len(scalar_tests), len(set(t[0] for t in scalar_tests)))
self.assertEqual(len(scalar_tests), len(set(t[1] for t in scalar_tests)))
for args in scalar_tests:
self.ReadScalarTestHelper(*args)
def testReadMessageInto(self):
length = 23
def Test(simulate_error):
d = decoder.Decoder('')
d._stream = self.mock_stream
self.mock_stream.ReadVarUInt32().AndReturn(length)
sub_buffer = object()
self.mock_stream.GetSubBuffer(length).AndReturn(sub_buffer)
if simulate_error:
self.mock_message.MergeFromString(sub_buffer).AndReturn(length - 1)
self.mox.ReplayAll()
self.assertRaises(
message.DecodeError, d.ReadMessageInto, self.mock_message)
else:
self.mock_message.MergeFromString(sub_buffer).AndReturn(length)
self.mock_stream.SkipBytes(length)
self.mox.ReplayAll()
d.ReadMessageInto(self.mock_message)
self.mox.VerifyAll()
self.mox.ResetAll()
Test(simulate_error=False)
Test(simulate_error=True)
def testReadGroupInto_Success(self):
# Test both the empty and nonempty cases.
for num_bytes in (5, 0):
field_number = expected_field_number = 10
d = decoder.Decoder('')
d._stream = self.mock_stream
sub_buffer = object()
self.mock_stream.GetSubBuffer().AndReturn(sub_buffer)
self.mock_message.MergeFromString(sub_buffer).AndReturn(num_bytes)
self.mock_stream.SkipBytes(num_bytes)
self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag(
field_number, wire_format.WIRETYPE_END_GROUP))
self.mox.ReplayAll()
d.ReadGroupInto(expected_field_number, self.mock_message)
self.mox.VerifyAll()
self.mox.ResetAll()
def ReadGroupInto_FailureTestHelper(self, bytes_read):
d = decoder.Decoder('')
d._stream = self.mock_stream
sub_buffer = object()
self.mock_stream.GetSubBuffer().AndReturn(sub_buffer)
self.mock_message.MergeFromString(sub_buffer).AndReturn(bytes_read)
return d
def testReadGroupInto_NegativeBytesReported(self):
expected_field_number = 10
d = self.ReadGroupInto_FailureTestHelper(bytes_read=-1)
self.mox.ReplayAll()
self.assertRaises(message.DecodeError,
d.ReadGroupInto, expected_field_number,
self.mock_message)
self.mox.VerifyAll()
def testReadGroupInto_NoEndGroupTag(self):
field_number = expected_field_number = 10
num_bytes = 5
d = self.ReadGroupInto_FailureTestHelper(bytes_read=num_bytes)
self.mock_stream.SkipBytes(num_bytes)
# Right field number, wrong wire type.
self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag(
field_number, wire_format.WIRETYPE_LENGTH_DELIMITED))
self.mox.ReplayAll()
self.assertRaises(message.DecodeError,
d.ReadGroupInto, expected_field_number,
self.mock_message)
self.mox.VerifyAll()
def testReadGroupInto_WrongFieldNumberInEndGroupTag(self):
expected_field_number = 10
field_number = expected_field_number + 1
num_bytes = 5
d = self.ReadGroupInto_FailureTestHelper(bytes_read=num_bytes)
self.mock_stream.SkipBytes(num_bytes)
# Wrong field number, right wire type.
self.mock_stream.ReadVarUInt32().AndReturn(wire_format.PackTag(
field_number, wire_format.WIRETYPE_END_GROUP))
self.mox.ReplayAll()
self.assertRaises(message.DecodeError,
d.ReadGroupInto, expected_field_number,
self.mock_message)
self.mox.VerifyAll()
def testSkipBytes(self):
d = decoder.Decoder('')
num_bytes = 1024
self.mock_stream.SkipBytes(num_bytes)
d._stream = self.mock_stream
self.mox.ReplayAll()
d.SkipBytes(num_bytes)
self.mox.VerifyAll()
if __name__ == '__main__':
unittest.main()