blob: ca50eddac3860515d30ece69ed12256d8f6ae1cb [file] [log] [blame]
Colin Cross7bb052a2015-02-03 12:59:37 -08001// Copyright 2010 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package textproto
6
7import (
8 "sync"
9)
10
11// A Pipeline manages a pipelined in-order request/response sequence.
12//
13// To use a Pipeline p to manage multiple clients on a connection,
14// each client should run:
15//
16// id := p.Next() // take a number
17//
18// p.StartRequest(id) // wait for turn to send request
19// «send request»
20// p.EndRequest(id) // notify Pipeline that request is sent
21//
22// p.StartResponse(id) // wait for turn to read response
23// «read response»
24// p.EndResponse(id) // notify Pipeline that response is read
25//
26// A pipelined server can use the same calls to ensure that
27// responses computed in parallel are written in the correct order.
28type Pipeline struct {
29 mu sync.Mutex
30 id uint
31 request sequencer
32 response sequencer
33}
34
35// Next returns the next id for a request/response pair.
36func (p *Pipeline) Next() uint {
37 p.mu.Lock()
38 id := p.id
39 p.id++
40 p.mu.Unlock()
41 return id
42}
43
44// StartRequest blocks until it is time to send (or, if this is a server, receive)
45// the request with the given id.
46func (p *Pipeline) StartRequest(id uint) {
47 p.request.Start(id)
48}
49
50// EndRequest notifies p that the request with the given id has been sent
51// (or, if this is a server, received).
52func (p *Pipeline) EndRequest(id uint) {
53 p.request.End(id)
54}
55
56// StartResponse blocks until it is time to receive (or, if this is a server, send)
57// the request with the given id.
58func (p *Pipeline) StartResponse(id uint) {
59 p.response.Start(id)
60}
61
62// EndResponse notifies p that the response with the given id has been received
63// (or, if this is a server, sent).
64func (p *Pipeline) EndResponse(id uint) {
65 p.response.End(id)
66}
67
68// A sequencer schedules a sequence of numbered events that must
69// happen in order, one after the other. The event numbering must start
70// at 0 and increment without skipping. The event number wraps around
71// safely as long as there are not 2^32 simultaneous events pending.
72type sequencer struct {
73 mu sync.Mutex
74 id uint
75 wait map[uint]chan uint
76}
77
78// Start waits until it is time for the event numbered id to begin.
79// That is, except for the first event, it waits until End(id-1) has
80// been called.
81func (s *sequencer) Start(id uint) {
82 s.mu.Lock()
83 if s.id == id {
84 s.mu.Unlock()
85 return
86 }
87 c := make(chan uint)
88 if s.wait == nil {
89 s.wait = make(map[uint]chan uint)
90 }
91 s.wait[id] = c
92 s.mu.Unlock()
93 <-c
94}
95
96// End notifies the sequencer that the event numbered id has completed,
97// allowing it to schedule the event numbered id+1. It is a run-time error
98// to call End with an id that is not the number of the active event.
99func (s *sequencer) End(id uint) {
100 s.mu.Lock()
101 if s.id != id {
102 panic("out of sync")
103 }
104 id++
105 s.id = id
106 if s.wait == nil {
107 s.wait = make(map[uint]chan uint)
108 }
109 c, ok := s.wait[id]
110 if ok {
111 delete(s.wait, id)
112 }
113 s.mu.Unlock()
114 if ok {
115 c <- 1
116 }
117}