blob: 020c0c9ed9addae3c27fbe311954a2b02d2d1c56 [file] [log] [blame]
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +00001# Copyright 2015, Google Inc.
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright
9# notice, this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above
11# copyright notice, this list of conditions and the following disclaimer
12# in the documentation and/or other materials provided with the
13# distribution.
14# * Neither the name of Google Inc. nor the names of its
15# contributors may be used to endorse or promote products derived from
16# this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30"""Implementation of operations."""
31
32import threading
33
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +000034from grpc.framework.core import _context
35from grpc.framework.core import _emission
36from grpc.framework.core import _expiration
37from grpc.framework.core import _ingestion
38from grpc.framework.core import _interfaces
Nathaniel Manista13db8e52015-09-04 19:03:32 +000039from grpc.framework.core import _protocol
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +000040from grpc.framework.core import _reception
41from grpc.framework.core import _termination
42from grpc.framework.core import _transmission
Nathaniel Manista69210e52015-09-02 19:24:41 +000043from grpc.framework.core import _utilities
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +000044
45
46class _EasyOperation(_interfaces.Operation):
47 """A trivial implementation of interfaces.Operation."""
48
49 def __init__(
50 self, lock, termination_manager, transmission_manager, expiration_manager,
51 context, operator, reception_manager):
52 """Constructor.
53
54 Args:
55 lock: The operation-wide lock.
56 termination_manager: The _interfaces.TerminationManager for the operation.
57 transmission_manager: The _interfaces.TransmissionManager for the
58 operation.
59 expiration_manager: The _interfaces.ExpirationManager for the operation.
60 context: A base.OperationContext for use by the customer during the
61 operation.
62 operator: A base.Operator for use by the customer during the operation.
63 reception_manager: The _interfaces.ReceptionManager for the operation.
64 """
65 self._lock = lock
66 self._termination_manager = termination_manager
67 self._transmission_manager = transmission_manager
68 self._expiration_manager = expiration_manager
69 self._reception_manager = reception_manager
70
71 self.context = context
72 self.operator = operator
73
74 def handle_ticket(self, ticket):
75 with self._lock:
76 self._reception_manager.receive_ticket(ticket)
77
Nathaniel Manista69210e52015-09-02 19:24:41 +000078 def abort(self, outcome_kind):
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +000079 with self._lock:
80 if self._termination_manager.outcome is None:
Nathaniel Manista69210e52015-09-02 19:24:41 +000081 outcome = _utilities.Outcome(outcome_kind, None, None)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +000082 self._termination_manager.abort(outcome)
Nathaniel Manista69210e52015-09-02 19:24:41 +000083 self._transmission_manager.abort(outcome)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +000084 self._expiration_manager.terminate()
85
86
87def invocation_operate(
Nathaniel Manistad2aa1cf2015-09-03 20:31:16 +000088 operation_id, group, method, subscription, timeout, protocol_options,
89 initial_metadata, payload, completion, ticket_sink, termination_action,
90 pool):
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +000091 """Constructs objects necessary for front-side operation management.
92
93 Args:
94 operation_id: An object identifying the operation.
95 group: The group identifier of the operation.
96 method: The method identifier of the operation.
97 subscription: A base.Subscription describing the customer's interest in the
98 results of the operation.
99 timeout: A length of time in seconds to allow for the operation.
Nathaniel Manistad2aa1cf2015-09-03 20:31:16 +0000100 protocol_options: A transport-specific, application-specific, and/or
101 protocol-specific value relating to the invocation. May be None.
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000102 initial_metadata: An initial metadata value to be sent to the other side of
103 the operation. May be None if the initial metadata will be passed later or
104 if there will be no initial metadata passed at all.
105 payload: The first payload value to be transmitted to the other side. May be
106 None if there is no such value or if the customer chose not to pass it at
107 operation invocation.
108 completion: A base.Completion value indicating the end of values passed to
109 the other side of the operation.
110 ticket_sink: A callable that accepts links.Tickets and delivers them to the
111 other side of the operation.
112 termination_action: A callable that accepts the outcome of the operation as
113 a base.Outcome value to be called on operation completion.
114 pool: A thread pool with which to do the work of the operation.
115
116 Returns:
117 An _interfaces.Operation for the operation.
118 """
119 lock = threading.Lock()
120 with lock:
121 termination_manager = _termination.invocation_termination_manager(
122 termination_action, pool)
123 transmission_manager = _transmission.TransmissionManager(
124 operation_id, ticket_sink, lock, pool, termination_manager)
125 expiration_manager = _expiration.invocation_expiration_manager(
126 timeout, lock, termination_manager, transmission_manager)
Nathaniel Manista13db8e52015-09-04 19:03:32 +0000127 protocol_manager = _protocol.invocation_protocol_manager(
128 subscription, lock, pool, termination_manager, transmission_manager,
129 expiration_manager)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000130 operation_context = _context.OperationContext(
131 lock, termination_manager, transmission_manager, expiration_manager)
132 emission_manager = _emission.EmissionManager(
133 lock, termination_manager, transmission_manager, expiration_manager)
134 ingestion_manager = _ingestion.invocation_ingestion_manager(
135 subscription, lock, pool, termination_manager, transmission_manager,
Nathaniel Manista13db8e52015-09-04 19:03:32 +0000136 expiration_manager, protocol_manager)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000137 reception_manager = _reception.ReceptionManager(
138 termination_manager, transmission_manager, expiration_manager,
Nathaniel Manista13db8e52015-09-04 19:03:32 +0000139 protocol_manager, ingestion_manager)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000140
141 termination_manager.set_expiration_manager(expiration_manager)
142 transmission_manager.set_expiration_manager(expiration_manager)
143 emission_manager.set_ingestion_manager(ingestion_manager)
144
145 transmission_manager.kick_off(
Nathaniel Manistad2aa1cf2015-09-03 20:31:16 +0000146 group, method, timeout, protocol_options, initial_metadata, payload,
147 completion, None)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000148
149 return _EasyOperation(
150 lock, termination_manager, transmission_manager, expiration_manager,
151 operation_context, emission_manager, reception_manager)
152
153
154def service_operate(
155 servicer_package, ticket, ticket_sink, termination_action, pool):
156 """Constructs an Operation for service of an operation.
157
158 Args:
159 servicer_package: A _utilities.ServicerPackage to be used servicing the
160 operation.
161 ticket: The first links.Ticket received for the operation.
162 ticket_sink: A callable that accepts links.Tickets and delivers them to the
163 other side of the operation.
164 termination_action: A callable that accepts the outcome of the operation as
165 a base.Outcome value to be called on operation completion.
166 pool: A thread pool with which to do the work of the operation.
167
168 Returns:
169 An _interfaces.Operation for the operation.
170 """
171 lock = threading.Lock()
172 with lock:
173 termination_manager = _termination.service_termination_manager(
174 termination_action, pool)
175 transmission_manager = _transmission.TransmissionManager(
176 ticket.operation_id, ticket_sink, lock, pool, termination_manager)
177 expiration_manager = _expiration.service_expiration_manager(
178 ticket.timeout, servicer_package.default_timeout,
179 servicer_package.maximum_timeout, lock, termination_manager,
180 transmission_manager)
Nathaniel Manista13db8e52015-09-04 19:03:32 +0000181 protocol_manager = _protocol.service_protocol_manager(
182 lock, pool, termination_manager, transmission_manager,
183 expiration_manager)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000184 operation_context = _context.OperationContext(
185 lock, termination_manager, transmission_manager, expiration_manager)
186 emission_manager = _emission.EmissionManager(
187 lock, termination_manager, transmission_manager, expiration_manager)
188 ingestion_manager = _ingestion.service_ingestion_manager(
189 servicer_package.servicer, operation_context, emission_manager, lock,
Nathaniel Manista13db8e52015-09-04 19:03:32 +0000190 pool, termination_manager, transmission_manager, expiration_manager,
191 protocol_manager)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000192 reception_manager = _reception.ReceptionManager(
193 termination_manager, transmission_manager, expiration_manager,
Nathaniel Manista13db8e52015-09-04 19:03:32 +0000194 protocol_manager, ingestion_manager)
Nathaniel Manistaf36e1b72015-08-18 01:30:29 +0000195
196 termination_manager.set_expiration_manager(expiration_manager)
197 transmission_manager.set_expiration_manager(expiration_manager)
198 emission_manager.set_ingestion_manager(ingestion_manager)
199
200 reception_manager.receive_ticket(ticket)
201
202 return _EasyOperation(
203 lock, termination_manager, transmission_manager, expiration_manager,
204 operation_context, emission_manager, reception_manager)