blob: b77e89304472189665930d6819b7caeb4a13c14c [file] [log] [blame]
Jan Tattermuscha7fff862015-02-13 11:08:08 -08001#region Copyright notice and license
2
Jan Tattermuschaf77b3d2015-02-13 11:22:21 -08003// Copyright 2015, Google Inc.
Jan Tattermuscha7fff862015-02-13 11:08:08 -08004// All rights reserved.
Craig Tiller190d3602015-02-18 09:23:38 -08005//
Jan Tattermuscha7fff862015-02-13 11:08:08 -08006// Redistribution and use in source and binary forms, with or without
7// modification, are permitted provided that the following conditions are
8// met:
Craig Tiller190d3602015-02-18 09:23:38 -08009//
Jan Tattermuscha7fff862015-02-13 11:08:08 -080010// * Redistributions of source code must retain the above copyright
11// notice, this list of conditions and the following disclaimer.
12// * Redistributions in binary form must reproduce the above
13// copyright notice, this list of conditions and the following disclaimer
14// in the documentation and/or other materials provided with the
15// distribution.
16// * Neither the name of Google Inc. nor the names of its
17// contributors may be used to endorse or promote products derived from
18// this software without specific prior written permission.
Craig Tiller190d3602015-02-18 09:23:38 -080019//
Jan Tattermuscha7fff862015-02-13 11:08:08 -080020// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31
32#endregion
33
Jan Tattermuscha7608b02015-02-03 17:54:38 -080034using System;
Jan Tattermusch30868622015-02-19 09:22:33 -080035using System.Collections.Generic;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080036using System.Runtime.InteropServices;
37using System.Threading;
38using System.Threading.Tasks;
Jan Tattermusch30868622015-02-19 09:22:33 -080039using Grpc.Core.Internal;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080040
Jan Tattermusch30868622015-02-19 09:22:33 -080041namespace Grpc.Core.Internal
Jan Tattermuscha7608b02015-02-03 17:54:38 -080042{
43 /// <summary>
44 /// Pool of threads polling on the same completion queue.
45 /// </summary>
46 internal class GrpcThreadPool
47 {
Jan Tattermusch04eb89c2015-06-12 13:03:05 -070048 readonly GrpcEnvironment environment;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080049 readonly object myLock = new object();
50 readonly List<Thread> threads = new List<Thread>();
51 readonly int poolSize;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080052
53 CompletionQueueSafeHandle cq;
54
Jan Tattermusch04eb89c2015-06-12 13:03:05 -070055 public GrpcThreadPool(GrpcEnvironment environment, int poolSize)
Jan Tattermusch075dde42015-03-11 18:21:00 -070056 {
Jan Tattermusch04eb89c2015-06-12 13:03:05 -070057 this.environment = environment;
Jan Tattermuscha7608b02015-02-03 17:54:38 -080058 this.poolSize = poolSize;
59 }
60
Jan Tattermusch075dde42015-03-11 18:21:00 -070061 public void Start()
62 {
Jan Tattermuscha7608b02015-02-03 17:54:38 -080063 lock (myLock)
64 {
65 if (cq != null)
66 {
67 throw new InvalidOperationException("Already started.");
68 }
69
70 cq = CompletionQueueSafeHandle.Create();
71
72 for (int i = 0; i < poolSize; i++)
73 {
74 threads.Add(CreateAndStartThread(i));
75 }
76 }
77 }
78
Jan Tattermusch075dde42015-03-11 18:21:00 -070079 public void Stop()
80 {
Jan Tattermuscha7608b02015-02-03 17:54:38 -080081 lock (myLock)
82 {
83 cq.Shutdown();
84
Jan Tattermusch04eb89c2015-06-12 13:03:05 -070085 Console.WriteLine("Waiting for GRPC threads to finish.");
Jan Tattermuscha7608b02015-02-03 17:54:38 -080086 foreach (var thread in threads)
87 {
88 thread.Join();
89 }
90
91 cq.Dispose();
Jan Tattermuscha7608b02015-02-03 17:54:38 -080092 }
93 }
94
95 internal CompletionQueueSafeHandle CompletionQueue
96 {
97 get
98 {
99 return cq;
100 }
101 }
102
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800103 private Thread CreateAndStartThread(int i)
104 {
105 var thread = new Thread(new ThreadStart(RunHandlerLoop));
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800106 thread.IsBackground = false;
107 thread.Start();
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800108 thread.Name = "grpc " + i;
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800109 return thread;
110 }
111
112 /// <summary>
113 /// Body of the polling thread.
114 /// </summary>
Jan Tattermusch337a2dd2015-02-13 15:41:41 -0800115 private void RunHandlerLoop()
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800116 {
Jan Tattermuschd3677482015-06-01 19:27:40 -0700117 CompletionQueueEvent ev;
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800118 do
119 {
Jan Tattermuschd3677482015-06-01 19:27:40 -0700120 ev = cq.Next();
121 if (ev.type == GRPCCompletionType.OpComplete)
122 {
123 bool success = (ev.success != 0);
124 IntPtr tag = ev.tag;
125 try
126 {
Jan Tattermusch04eb89c2015-06-12 13:03:05 -0700127 var callback = environment.CompletionRegistry.Extract(tag);
Jan Tattermuschd3677482015-06-01 19:27:40 -0700128 callback(success);
129 }
130 catch (Exception e)
131 {
132 Console.WriteLine("Exception occured while invoking completion delegate: " + e);
133 }
134 }
Jan Tattermusch075dde42015-03-11 18:21:00 -0700135 }
Jan Tattermuschd3677482015-06-01 19:27:40 -0700136 while (ev.type != GRPCCompletionType.Shutdown);
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800137 Console.WriteLine("Completion queue has shutdown successfully, thread " + Thread.CurrentThread.Name + " exiting.");
138 }
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800139 }
Jan Tattermuscha7608b02015-02-03 17:54:38 -0800140}