| # Copyright 2015, Google Inc. |
| # All rights reserved. |
| # |
| # Redistribution and use in source and binary forms, with or without |
| # modification, are permitted provided that the following conditions are |
| # met: |
| # |
| # * Redistributions of source code must retain the above copyright |
| # notice, this list of conditions and the following disclaimer. |
| # * Redistributions in binary form must reproduce the above |
| # copyright notice, this list of conditions and the following disclaimer |
| # in the documentation and/or other materials provided with the |
| # distribution. |
| # * Neither the name of Google Inc. nor the names of its |
| # contributors may be used to endorse or promote products derived from |
| # this software without specific prior written permission. |
| # |
| # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| |
| """State and behavior for ticket transmission during an operation.""" |
| |
| import abc |
| |
| from grpc.framework.base import _constants |
| from grpc.framework.base import _interfaces |
| from grpc.framework.base import interfaces |
| from grpc.framework.foundation import callable_util |
| |
| _TRANSMISSION_EXCEPTION_LOG_MESSAGE = 'Exception during transmission!' |
| |
| _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES = ( |
| interfaces.Outcome.SERVICER_FAILURE, |
| ) |
| _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES = ( |
| interfaces.Outcome.CANCELLED, |
| interfaces.Outcome.SERVICED_FAILURE, |
| ) |
| |
| _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND = { |
| interfaces.Outcome.CANCELLED: |
| interfaces.FrontToBackTicket.Kind.CANCELLATION, |
| interfaces.Outcome.EXPIRED: |
| interfaces.FrontToBackTicket.Kind.EXPIRATION, |
| interfaces.Outcome.RECEPTION_FAILURE: |
| interfaces.FrontToBackTicket.Kind.RECEPTION_FAILURE, |
| interfaces.Outcome.TRANSMISSION_FAILURE: |
| interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, |
| interfaces.Outcome.SERVICED_FAILURE: |
| interfaces.FrontToBackTicket.Kind.SERVICED_FAILURE, |
| interfaces.Outcome.SERVICER_FAILURE: |
| interfaces.FrontToBackTicket.Kind.SERVICER_FAILURE, |
| } |
| |
| _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND = { |
| interfaces.Outcome.CANCELLED: |
| interfaces.BackToFrontTicket.Kind.CANCELLATION, |
| interfaces.Outcome.EXPIRED: |
| interfaces.BackToFrontTicket.Kind.EXPIRATION, |
| interfaces.Outcome.RECEPTION_FAILURE: |
| interfaces.BackToFrontTicket.Kind.RECEPTION_FAILURE, |
| interfaces.Outcome.TRANSMISSION_FAILURE: |
| interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE, |
| interfaces.Outcome.SERVICED_FAILURE: |
| interfaces.BackToFrontTicket.Kind.SERVICED_FAILURE, |
| interfaces.Outcome.SERVICER_FAILURE: |
| interfaces.BackToFrontTicket.Kind.SERVICER_FAILURE, |
| } |
| |
| |
| class _Ticketizer(object): |
| """Common specification of different ticket-creating behavior.""" |
| __metaclass__ = abc.ABCMeta |
| |
| @abc.abstractmethod |
| def ticketize(self, operation_id, sequence_number, payload, complete): |
| """Creates a ticket indicating ordinary operation progress. |
| |
| Args: |
| operation_id: The operation ID for the current operation. |
| sequence_number: A sequence number for the ticket. |
| payload: A customer payload object. May be None if sequence_number is |
| zero or complete is true. |
| complete: A boolean indicating whether or not the ticket should describe |
| itself as (but for a later indication of operation abortion) the last |
| ticket to be sent. |
| |
| Returns: |
| An object of an appropriate type suitable for transmission to the other |
| side of the operation. |
| """ |
| raise NotImplementedError() |
| |
| @abc.abstractmethod |
| def ticketize_abortion(self, operation_id, sequence_number, outcome): |
| """Creates a ticket indicating that the operation is aborted. |
| |
| Args: |
| operation_id: The operation ID for the current operation. |
| sequence_number: A sequence number for the ticket. |
| outcome: An interfaces.Outcome value describing the operation abortion. |
| |
| Returns: |
| An object of an appropriate type suitable for transmission to the other |
| side of the operation, or None if transmission is not appropriate for |
| the given outcome. |
| """ |
| raise NotImplementedError() |
| |
| |
| class _FrontTicketizer(_Ticketizer): |
| """Front-side ticket-creating behavior.""" |
| |
| def __init__(self, name, subscription_kind, trace_id, timeout): |
| """Constructor. |
| |
| Args: |
| name: The name of the operation. |
| subscription_kind: An interfaces.ServicedSubscription.Kind value |
| describing the interest the front has in tickets sent from the back. |
| trace_id: A uuid.UUID identifying a set of related operations to which |
| this operation belongs. |
| timeout: A length of time in seconds to allow for the entire operation. |
| """ |
| self._name = name |
| self._subscription_kind = subscription_kind |
| self._trace_id = trace_id |
| self._timeout = timeout |
| |
| def ticketize(self, operation_id, sequence_number, payload, complete): |
| """See _Ticketizer.ticketize for specification.""" |
| if sequence_number: |
| if complete: |
| kind = interfaces.FrontToBackTicket.Kind.COMPLETION |
| else: |
| kind = interfaces.FrontToBackTicket.Kind.CONTINUATION |
| return interfaces.FrontToBackTicket( |
| operation_id, sequence_number, kind, self._name, |
| self._subscription_kind, self._trace_id, payload, self._timeout) |
| else: |
| if complete: |
| kind = interfaces.FrontToBackTicket.Kind.ENTIRE |
| else: |
| kind = interfaces.FrontToBackTicket.Kind.COMMENCEMENT |
| return interfaces.FrontToBackTicket( |
| operation_id, 0, kind, self._name, self._subscription_kind, |
| self._trace_id, payload, self._timeout) |
| |
| def ticketize_abortion(self, operation_id, sequence_number, outcome): |
| """See _Ticketizer.ticketize_abortion for specification.""" |
| if outcome in _FRONT_TO_BACK_NO_TRANSMISSION_OUTCOMES: |
| return None |
| else: |
| kind = _ABORTION_OUTCOME_TO_FRONT_TO_BACK_TICKET_KIND[outcome] |
| return interfaces.FrontToBackTicket( |
| operation_id, sequence_number, kind, None, None, None, None, None) |
| |
| |
| class _BackTicketizer(_Ticketizer): |
| """Back-side ticket-creating behavior.""" |
| |
| def ticketize(self, operation_id, sequence_number, payload, complete): |
| """See _Ticketizer.ticketize for specification.""" |
| if complete: |
| kind = interfaces.BackToFrontTicket.Kind.COMPLETION |
| else: |
| kind = interfaces.BackToFrontTicket.Kind.CONTINUATION |
| return interfaces.BackToFrontTicket( |
| operation_id, sequence_number, kind, payload) |
| |
| def ticketize_abortion(self, operation_id, sequence_number, outcome): |
| """See _Ticketizer.ticketize_abortion for specification.""" |
| if outcome in _BACK_TO_FRONT_NO_TRANSMISSION_OUTCOMES: |
| return None |
| else: |
| kind = _ABORTION_OUTCOME_TO_BACK_TO_FRONT_TICKET_KIND[outcome] |
| return interfaces.BackToFrontTicket( |
| operation_id, sequence_number, kind, None) |
| |
| |
| class TransmissionManager(_interfaces.TransmissionManager): |
| """A _interfaces.TransmissionManager on which other managers may be set.""" |
| __metaclass__ = abc.ABCMeta |
| |
| @abc.abstractmethod |
| def set_ingestion_and_expiration_managers( |
| self, ingestion_manager, expiration_manager): |
| """Sets two of the other managers with which this manager may interact. |
| |
| Args: |
| ingestion_manager: The _interfaces.IngestionManager associated with the |
| current operation. |
| expiration_manager: The _interfaces.ExpirationManager associated with the |
| current operation. |
| """ |
| raise NotImplementedError() |
| |
| |
| class _EmptyTransmissionManager(TransmissionManager): |
| """A completely no-operative _interfaces.TransmissionManager.""" |
| |
| def set_ingestion_and_expiration_managers( |
| self, ingestion_manager, expiration_manager): |
| """See overriden method for specification.""" |
| |
| def inmit(self, emission, complete): |
| """See _interfaces.TransmissionManager.inmit for specification.""" |
| |
| def abort(self, outcome): |
| """See _interfaces.TransmissionManager.abort for specification.""" |
| |
| |
| class _TransmittingTransmissionManager(TransmissionManager): |
| """A TransmissionManager implementation that sends tickets.""" |
| |
| def __init__( |
| self, lock, pool, callback, operation_id, ticketizer, |
| termination_manager): |
| """Constructor. |
| |
| Args: |
| lock: The operation-servicing-wide lock object. |
| pool: A thread pool in which the work of transmitting tickets will be |
| performed. |
| callback: A callable that accepts tickets and sends them to the other side |
| of the operation. |
| operation_id: The operation's ID. |
| ticketizer: A _Ticketizer for ticket creation. |
| termination_manager: The _interfaces.TerminationManager associated with |
| this operation. |
| """ |
| self._lock = lock |
| self._pool = pool |
| self._callback = callback |
| self._operation_id = operation_id |
| self._ticketizer = ticketizer |
| self._termination_manager = termination_manager |
| self._ingestion_manager = None |
| self._expiration_manager = None |
| |
| self._emissions = [] |
| self._emission_complete = False |
| self._outcome = None |
| self._lowest_unused_sequence_number = 0 |
| self._transmitting = False |
| |
| def set_ingestion_and_expiration_managers( |
| self, ingestion_manager, expiration_manager): |
| """See overridden method for specification.""" |
| self._ingestion_manager = ingestion_manager |
| self._expiration_manager = expiration_manager |
| |
| def _lead_ticket(self, emission, complete): |
| """Creates a ticket suitable for leading off the transmission loop. |
| |
| Args: |
| emission: A customer payload object to be sent to the other side of the |
| operation. |
| complete: Whether or not the sequence of customer payloads ends with |
| the passed object. |
| |
| Returns: |
| A ticket with which to lead off the transmission loop. |
| """ |
| sequence_number = self._lowest_unused_sequence_number |
| self._lowest_unused_sequence_number += 1 |
| return self._ticketizer.ticketize( |
| self._operation_id, sequence_number, emission, complete) |
| |
| def _abortive_response_ticket(self, outcome): |
| """Creates a ticket indicating operation abortion. |
| |
| Args: |
| outcome: An interfaces.Outcome value describing operation abortion. |
| |
| Returns: |
| A ticket indicating operation abortion. |
| """ |
| ticket = self._ticketizer.ticketize_abortion( |
| self._operation_id, self._lowest_unused_sequence_number, outcome) |
| if ticket is None: |
| return None |
| else: |
| self._lowest_unused_sequence_number += 1 |
| return ticket |
| |
| def _next_ticket(self): |
| """Creates the next ticket to be sent to the other side of the operation. |
| |
| Returns: |
| A (completed, ticket) tuple comprised of a boolean indicating whether or |
| not the sequence of tickets has completed normally and a ticket to send |
| to the other side if the sequence of tickets hasn't completed. The tuple |
| will never have both a True first element and a non-None second element. |
| """ |
| if self._emissions is None: |
| return False, None |
| elif self._outcome is None: |
| if self._emissions: |
| payload = self._emissions.pop(0) |
| complete = self._emission_complete and not self._emissions |
| sequence_number = self._lowest_unused_sequence_number |
| self._lowest_unused_sequence_number += 1 |
| return complete, self._ticketizer.ticketize( |
| self._operation_id, sequence_number, payload, complete) |
| else: |
| return self._emission_complete, None |
| else: |
| ticket = self._abortive_response_ticket(self._outcome) |
| self._emissions = None |
| return False, None if ticket is None else ticket |
| |
| def _transmit(self, ticket): |
| """Commences the transmission loop sending tickets. |
| |
| Args: |
| ticket: A ticket to be sent to the other side of the operation. |
| """ |
| def transmit(ticket): |
| while True: |
| transmission_outcome = callable_util.call_logging_exceptions( |
| self._callback, _TRANSMISSION_EXCEPTION_LOG_MESSAGE, ticket) |
| if transmission_outcome.exception is None: |
| with self._lock: |
| complete, ticket = self._next_ticket() |
| if ticket is None: |
| if complete: |
| self._termination_manager.transmission_complete() |
| self._transmitting = False |
| return |
| else: |
| with self._lock: |
| self._emissions = None |
| self._termination_manager.abort( |
| interfaces.Outcome.TRANSMISSION_FAILURE) |
| self._ingestion_manager.abort() |
| self._expiration_manager.abort() |
| self._transmitting = False |
| return |
| |
| self._pool.submit(callable_util.with_exceptions_logged( |
| transmit, _constants.INTERNAL_ERROR_LOG_MESSAGE), ticket) |
| self._transmitting = True |
| |
| def inmit(self, emission, complete): |
| """See _interfaces.TransmissionManager.inmit for specification.""" |
| if self._emissions is not None and self._outcome is None: |
| self._emission_complete = complete |
| if self._transmitting: |
| self._emissions.append(emission) |
| else: |
| self._transmit(self._lead_ticket(emission, complete)) |
| |
| def abort(self, outcome): |
| """See _interfaces.TransmissionManager.abort for specification.""" |
| if self._emissions is not None and self._outcome is None: |
| self._outcome = outcome |
| if not self._transmitting: |
| ticket = self._abortive_response_ticket(outcome) |
| self._emissions = None |
| if ticket is not None: |
| self._transmit(ticket) |
| |
| |
| def front_transmission_manager( |
| lock, pool, callback, operation_id, name, subscription_kind, trace_id, |
| timeout, termination_manager): |
| """Creates a TransmissionManager appropriate for front-side use. |
| |
| Args: |
| lock: The operation-servicing-wide lock object. |
| pool: A thread pool in which the work of transmitting tickets will be |
| performed. |
| callback: A callable that accepts tickets and sends them to the other side |
| of the operation. |
| operation_id: The operation's ID. |
| name: The name of the operation. |
| subscription_kind: An interfaces.ServicedSubscription.Kind value |
| describing the interest the front has in tickets sent from the back. |
| trace_id: A uuid.UUID identifying a set of related operations to which |
| this operation belongs. |
| timeout: A length of time in seconds to allow for the entire operation. |
| termination_manager: The _interfaces.TerminationManager associated with |
| this operation. |
| |
| Returns: |
| A TransmissionManager appropriate for front-side use. |
| """ |
| return _TransmittingTransmissionManager( |
| lock, pool, callback, operation_id, _FrontTicketizer( |
| name, subscription_kind, trace_id, timeout), |
| termination_manager) |
| |
| |
| def back_transmission_manager( |
| lock, pool, callback, operation_id, termination_manager, |
| subscription_kind): |
| """Creates a TransmissionManager appropriate for back-side use. |
| |
| Args: |
| lock: The operation-servicing-wide lock object. |
| pool: A thread pool in which the work of transmitting tickets will be |
| performed. |
| callback: A callable that accepts tickets and sends them to the other side |
| of the operation. |
| operation_id: The operation's ID. |
| termination_manager: The _interfaces.TerminationManager associated with |
| this operation. |
| subscription_kind: An interfaces.ServicedSubscription.Kind value |
| describing the interest the front has in tickets sent from the back. |
| |
| Returns: |
| A TransmissionManager appropriate for back-side use. |
| """ |
| if subscription_kind is interfaces.ServicedSubscription.Kind.NONE: |
| return _EmptyTransmissionManager() |
| else: |
| return _TransmittingTransmissionManager( |
| lock, pool, callback, operation_id, _BackTicketizer(), |
| termination_manager) |