blob: 675af251de2ddd144575e0147b45c6c31b01537e [file] [log] [blame]
Shuyi Chend7955ce2013-05-22 14:51:55 -07001/**
2 * $RCSfile$
3 * $Revision$
4 * $Date$
5 *
6 * Copyright 2003-2007 Jive Software.
7 *
8 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21package org.jivesoftware.smack;
22
23import org.jivesoftware.smack.packet.Packet;
24
25import java.io.IOException;
26import java.io.Writer;
27import java.util.concurrent.ArrayBlockingQueue;
28import java.util.concurrent.BlockingQueue;
29
30/**
31 * Writes packets to a XMPP server. Packets are sent using a dedicated thread. Packet
32 * interceptors can be registered to dynamically modify packets before they're actually
33 * sent. Packet listeners can be registered to listen for all outgoing packets.
34 *
35 * @see Connection#addPacketInterceptor
36 * @see Connection#addPacketSendingListener
37 *
38 * @author Matt Tucker
39 */
40class PacketWriter {
41
42 private Thread writerThread;
43 private Thread keepAliveThread;
44 private Writer writer;
45 private XMPPConnection connection;
46 private final BlockingQueue<Packet> queue;
47 volatile boolean done;
48
49 /**
50 * Creates a new packet writer with the specified connection.
51 *
52 * @param connection the connection.
53 */
54 protected PacketWriter(XMPPConnection connection) {
55 this.queue = new ArrayBlockingQueue<Packet>(500, true);
56 this.connection = connection;
57 init();
58 }
59
60 /**
61 * Initializes the writer in order to be used. It is called at the first connection and also
62 * is invoked if the connection is disconnected by an error.
63 */
64 protected void init() {
65 this.writer = connection.writer;
66 done = false;
67
68 writerThread = new Thread() {
69 public void run() {
70 writePackets(this);
71 }
72 };
73 writerThread.setName("Smack Packet Writer (" + connection.connectionCounterValue + ")");
74 writerThread.setDaemon(true);
75 }
76
77 /**
78 * Sends the specified packet to the server.
79 *
80 * @param packet the packet to send.
81 */
82 public void sendPacket(Packet packet) {
83 if (!done) {
84 // Invoke interceptors for the new packet that is about to be sent. Interceptors
85 // may modify the content of the packet.
86 connection.firePacketInterceptors(packet);
87
88 try {
89 queue.put(packet);
90 }
91 catch (InterruptedException ie) {
92 ie.printStackTrace();
93 return;
94 }
95 synchronized (queue) {
96 queue.notifyAll();
97 }
98
99 // Process packet writer listeners. Note that we're using the sending
100 // thread so it's expected that listeners are fast.
101 connection.firePacketSendingListeners(packet);
102 }
103 }
104
105 /**
106 * Starts the packet writer thread and opens a connection to the server. The
107 * packet writer will continue writing packets until {@link #shutdown} or an
108 * error occurs.
109 */
110 public void startup() {
111 writerThread.start();
112 }
113
114 void setWriter(Writer writer) {
115 this.writer = writer;
116 }
117
118 /**
119 * Shuts down the packet writer. Once this method has been called, no further
120 * packets will be written to the server.
121 */
122 public void shutdown() {
123 done = true;
124 synchronized (queue) {
125 queue.notifyAll();
126 }
127 // Interrupt the keep alive thread if one was created
128 if (keepAliveThread != null)
129 keepAliveThread.interrupt();
130 }
131
132 /**
133 * Cleans up all resources used by the packet writer.
134 */
135 void cleanup() {
136 connection.interceptors.clear();
137 connection.sendListeners.clear();
138 }
139
140 /**
141 * Returns the next available packet from the queue for writing.
142 *
143 * @return the next packet for writing.
144 */
145 private Packet nextPacket() {
146 Packet packet = null;
147 // Wait until there's a packet or we're done.
148 while (!done && (packet = queue.poll()) == null) {
149 try {
150 synchronized (queue) {
151 queue.wait();
152 }
153 }
154 catch (InterruptedException ie) {
155 // Do nothing
156 }
157 }
158 return packet;
159 }
160
161 private void writePackets(Thread thisThread) {
162 try {
163 // Open the stream.
164 openStream();
165 // Write out packets from the queue.
166 while (!done && (writerThread == thisThread)) {
167 Packet packet = nextPacket();
168 if (packet != null) {
169 writer.write(packet.toXML());
170 if (queue.isEmpty()) {
171 writer.flush();
172 }
173 }
174 }
175 // Flush out the rest of the queue. If the queue is extremely large, it's possible
176 // we won't have time to entirely flush it before the socket is forced closed
177 // by the shutdown process.
178 try {
179 while (!queue.isEmpty()) {
180 Packet packet = queue.remove();
181 writer.write(packet.toXML());
182 }
183 writer.flush();
184 }
185 catch (Exception e) {
186 e.printStackTrace();
187 }
188
189 // Delete the queue contents (hopefully nothing is left).
190 queue.clear();
191
192 // Close the stream.
193 try {
194 writer.write("</stream:stream>");
195 writer.flush();
196 }
197 catch (Exception e) {
198 // Do nothing
199 }
200 finally {
201 try {
202 writer.close();
203 }
204 catch (Exception e) {
205 // Do nothing
206 }
207 }
208 }
209 catch (IOException ioe) {
210 // The exception can be ignored if the the connection is 'done'
211 // or if the it was caused because the socket got closed
212 if (!(done || connection.isSocketClosed())) {
213 done = true;
214 // packetReader could be set to null by an concurrent disconnect() call.
215 // Therefore Prevent NPE exceptions by checking packetReader.
216 if (connection.packetReader != null) {
217 connection.notifyConnectionError(ioe);
218 }
219 }
220 }
221 }
222
223 /**
224 * Sends to the server a new stream element. This operation may be requested several times
225 * so we need to encapsulate the logic in one place. This message will be sent while doing
226 * TLS, SASL and resource binding.
227 *
228 * @throws IOException If an error occurs while sending the stanza to the server.
229 */
230 void openStream() throws IOException {
231 StringBuilder stream = new StringBuilder();
232 stream.append("<stream:stream");
233 stream.append(" to=\"").append(connection.getServiceName()).append("\"");
234 stream.append(" xmlns=\"jabber:client\"");
235 stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
236 stream.append(" version=\"1.0\">");
237 writer.write(stream.toString());
238 writer.flush();
239 }
240}