blob: a33682c8721297052defe374bbba2fbab66d2acc [file] [log] [blame]
Shuyi Chend7955ce2013-05-22 14:51:55 -07001/**
2 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
3 * you may not use this file except in compliance with the License.
4 * You may obtain a copy of the License at
5 *
6 * http://www.apache.org/licenses/LICENSE-2.0
7 *
8 * Unless required by applicable law or agreed to in writing, software
9 * distributed under the License is distributed on an "AS IS" BASIS,
10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 * See the License for the specific language governing permissions and
12 * limitations under the License.
13 */
14package org.jivesoftware.smackx.bytestreams.ibb;
15
16import java.io.IOException;
17import java.io.InputStream;
18import java.io.OutputStream;
19import java.net.SocketTimeoutException;
20import java.util.concurrent.BlockingQueue;
21import java.util.concurrent.LinkedBlockingQueue;
22import java.util.concurrent.TimeUnit;
23
24import org.jivesoftware.smack.Connection;
25import org.jivesoftware.smack.PacketListener;
26import org.jivesoftware.smack.XMPPException;
27import org.jivesoftware.smack.filter.AndFilter;
28import org.jivesoftware.smack.filter.PacketFilter;
29import org.jivesoftware.smack.filter.PacketTypeFilter;
30import org.jivesoftware.smack.packet.IQ;
31import org.jivesoftware.smack.packet.Message;
32import org.jivesoftware.smack.packet.Packet;
33import org.jivesoftware.smack.packet.PacketExtension;
34import org.jivesoftware.smack.packet.XMPPError;
35import org.jivesoftware.smack.util.StringUtils;
36import org.jivesoftware.smack.util.SyncPacketSend;
37import org.jivesoftware.smackx.bytestreams.BytestreamSession;
38import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
39import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
40import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
41import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
42
43/**
44 * InBandBytestreamSession class represents an In-Band Bytestream session.
45 * <p>
46 * In-band bytestreams are bidirectional and this session encapsulates the streams for both
47 * directions.
48 * <p>
49 * Note that closing the In-Band Bytestream session will close both streams. If both streams are
50 * closed individually the session will be closed automatically once the second stream is closed.
51 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
52 * automatically if one of them is closed.
53 *
54 * @author Henning Staib
55 */
56public class InBandBytestreamSession implements BytestreamSession {
57
58 /* XMPP connection */
59 private final Connection connection;
60
61 /* the In-Band Bytestream open request for this session */
62 private final Open byteStreamRequest;
63
64 /*
65 * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
66 */
67 private IBBInputStream inputStream;
68
69 /*
70 * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
71 */
72 private IBBOutputStream outputStream;
73
74 /* JID of the remote peer */
75 private String remoteJID;
76
77 /* flag to close both streams if one of them is closed */
78 private boolean closeBothStreamsEnabled = false;
79
80 /* flag to indicate if session is closed */
81 private boolean isClosed = false;
82
83 /**
84 * Constructor.
85 *
86 * @param connection the XMPP connection
87 * @param byteStreamRequest the In-Band Bytestream open request for this session
88 * @param remoteJID JID of the remote peer
89 */
90 protected InBandBytestreamSession(Connection connection, Open byteStreamRequest,
91 String remoteJID) {
92 this.connection = connection;
93 this.byteStreamRequest = byteStreamRequest;
94 this.remoteJID = remoteJID;
95
96 // initialize streams dependent to the uses stanza type
97 switch (byteStreamRequest.getStanza()) {
98 case IQ:
99 this.inputStream = new IQIBBInputStream();
100 this.outputStream = new IQIBBOutputStream();
101 break;
102 case MESSAGE:
103 this.inputStream = new MessageIBBInputStream();
104 this.outputStream = new MessageIBBOutputStream();
105 break;
106 }
107
108 }
109
110 public InputStream getInputStream() {
111 return this.inputStream;
112 }
113
114 public OutputStream getOutputStream() {
115 return this.outputStream;
116 }
117
118 public int getReadTimeout() {
119 return this.inputStream.readTimeout;
120 }
121
122 public void setReadTimeout(int timeout) {
123 if (timeout < 0) {
124 throw new IllegalArgumentException("Timeout must be >= 0");
125 }
126 this.inputStream.readTimeout = timeout;
127 }
128
129 /**
130 * Returns whether both streams should be closed automatically if one of the streams is closed.
131 * Default is <code>false</code>.
132 *
133 * @return <code>true</code> if both streams will be closed if one of the streams is closed,
134 * <code>false</code> if both streams can be closed independently.
135 */
136 public boolean isCloseBothStreamsEnabled() {
137 return closeBothStreamsEnabled;
138 }
139
140 /**
141 * Sets whether both streams should be closed automatically if one of the streams is closed.
142 * Default is <code>false</code>.
143 *
144 * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
145 * the streams is closed, <code>false</code> if both streams should be closed
146 * independently
147 */
148 public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
149 this.closeBothStreamsEnabled = closeBothStreamsEnabled;
150 }
151
152 public void close() throws IOException {
153 closeByLocal(true); // close input stream
154 closeByLocal(false); // close output stream
155 }
156
157 /**
158 * This method is invoked if a request to close the In-Band Bytestream has been received.
159 *
160 * @param closeRequest the close request from the remote peer
161 */
162 protected void closeByPeer(Close closeRequest) {
163
164 /*
165 * close streams without flushing them, because stream is already considered closed on the
166 * remote peers side
167 */
168 this.inputStream.closeInternal();
169 this.inputStream.cleanup();
170 this.outputStream.closeInternal(false);
171
172 // acknowledge close request
173 IQ confirmClose = IQ.createResultIQ(closeRequest);
174 this.connection.sendPacket(confirmClose);
175
176 }
177
178 /**
179 * This method is invoked if one of the streams has been closed locally, if an error occurred
180 * locally or if the whole session should be closed.
181 *
182 * @throws IOException if an error occurs while sending the close request
183 */
184 protected synchronized void closeByLocal(boolean in) throws IOException {
185 if (this.isClosed) {
186 return;
187 }
188
189 if (this.closeBothStreamsEnabled) {
190 this.inputStream.closeInternal();
191 this.outputStream.closeInternal(true);
192 }
193 else {
194 if (in) {
195 this.inputStream.closeInternal();
196 }
197 else {
198 // close stream but try to send any data left
199 this.outputStream.closeInternal(true);
200 }
201 }
202
203 if (this.inputStream.isClosed && this.outputStream.isClosed) {
204 this.isClosed = true;
205
206 // send close request
207 Close close = new Close(this.byteStreamRequest.getSessionID());
208 close.setTo(this.remoteJID);
209 try {
210 SyncPacketSend.getReply(this.connection, close);
211 }
212 catch (XMPPException e) {
213 throw new IOException("Error while closing stream: " + e.getMessage());
214 }
215
216 this.inputStream.cleanup();
217
218 // remove session from manager
219 InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
220 }
221
222 }
223
224 /**
225 * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
226 * Subclasses of this input stream must provide a packet listener along with a packet filter to
227 * collect the In-Band Bytestream data packets.
228 */
229 private abstract class IBBInputStream extends InputStream {
230
231 /* the data packet listener to fill the data queue */
232 private final PacketListener dataPacketListener;
233
234 /* queue containing received In-Band Bytestream data packets */
235 protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
236
237 /* buffer containing the data from one data packet */
238 private byte[] buffer;
239
240 /* pointer to the next byte to read from buffer */
241 private int bufferPointer = -1;
242
243 /* data packet sequence (range from 0 to 65535) */
244 private long seq = -1;
245
246 /* flag to indicate if input stream is closed */
247 private boolean isClosed = false;
248
249 /* flag to indicate if close method was invoked */
250 private boolean closeInvoked = false;
251
252 /* timeout for read operations */
253 private int readTimeout = 0;
254
255 /**
256 * Constructor.
257 */
258 public IBBInputStream() {
259 // add data packet listener to connection
260 this.dataPacketListener = getDataPacketListener();
261 connection.addPacketListener(this.dataPacketListener, getDataPacketFilter());
262 }
263
264 /**
265 * Returns the packet listener that processes In-Band Bytestream data packets.
266 *
267 * @return the data packet listener
268 */
269 protected abstract PacketListener getDataPacketListener();
270
271 /**
272 * Returns the packet filter that accepts In-Band Bytestream data packets.
273 *
274 * @return the data packet filter
275 */
276 protected abstract PacketFilter getDataPacketFilter();
277
278 public synchronized int read() throws IOException {
279 checkClosed();
280
281 // if nothing read yet or whole buffer has been read fill buffer
282 if (bufferPointer == -1 || bufferPointer >= buffer.length) {
283 // if no data available and stream was closed return -1
284 if (!loadBuffer()) {
285 return -1;
286 }
287 }
288
289 // return byte and increment buffer pointer
290 return ((int) buffer[bufferPointer++]) & 0xff;
291 }
292
293 public synchronized int read(byte[] b, int off, int len) throws IOException {
294 if (b == null) {
295 throw new NullPointerException();
296 }
297 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
298 || ((off + len) < 0)) {
299 throw new IndexOutOfBoundsException();
300 }
301 else if (len == 0) {
302 return 0;
303 }
304
305 checkClosed();
306
307 // if nothing read yet or whole buffer has been read fill buffer
308 if (bufferPointer == -1 || bufferPointer >= buffer.length) {
309 // if no data available and stream was closed return -1
310 if (!loadBuffer()) {
311 return -1;
312 }
313 }
314
315 // if more bytes wanted than available return all available
316 int bytesAvailable = buffer.length - bufferPointer;
317 if (len > bytesAvailable) {
318 len = bytesAvailable;
319 }
320
321 System.arraycopy(buffer, bufferPointer, b, off, len);
322 bufferPointer += len;
323 return len;
324 }
325
326 public synchronized int read(byte[] b) throws IOException {
327 return read(b, 0, b.length);
328 }
329
330 /**
331 * This method blocks until a data packet is received, the stream is closed or the current
332 * thread is interrupted.
333 *
334 * @return <code>true</code> if data was received, otherwise <code>false</code>
335 * @throws IOException if data packets are out of sequence
336 */
337 private synchronized boolean loadBuffer() throws IOException {
338
339 // wait until data is available or stream is closed
340 DataPacketExtension data = null;
341 try {
342 if (this.readTimeout == 0) {
343 while (data == null) {
344 if (isClosed && this.dataQueue.isEmpty()) {
345 return false;
346 }
347 data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
348 }
349 }
350 else {
351 data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
352 if (data == null) {
353 throw new SocketTimeoutException();
354 }
355 }
356 }
357 catch (InterruptedException e) {
358 // Restore the interrupted status
359 Thread.currentThread().interrupt();
360 return false;
361 }
362
363 // handle sequence overflow
364 if (this.seq == 65535) {
365 this.seq = -1;
366 }
367
368 // check if data packets sequence is successor of last seen sequence
369 long seq = data.getSeq();
370 if (seq - 1 != this.seq) {
371 // packets out of order; close stream/session
372 InBandBytestreamSession.this.close();
373 throw new IOException("Packets out of sequence");
374 }
375 else {
376 this.seq = seq;
377 }
378
379 // set buffer to decoded data
380 buffer = data.getDecodedData();
381 bufferPointer = 0;
382 return true;
383 }
384
385 /**
386 * Checks if this stream is closed and throws an IOException if necessary
387 *
388 * @throws IOException if stream is closed and no data should be read anymore
389 */
390 private void checkClosed() throws IOException {
391 /* throw no exception if there is data available, but not if close method was invoked */
392 if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) {
393 // clear data queue in case additional data was received after stream was closed
394 this.dataQueue.clear();
395 throw new IOException("Stream is closed");
396 }
397 }
398
399 public boolean markSupported() {
400 return false;
401 }
402
403 public void close() throws IOException {
404 if (isClosed) {
405 return;
406 }
407
408 this.closeInvoked = true;
409
410 InBandBytestreamSession.this.closeByLocal(true);
411 }
412
413 /**
414 * This method sets the close flag and removes the data packet listener.
415 */
416 private void closeInternal() {
417 if (isClosed) {
418 return;
419 }
420 isClosed = true;
421 }
422
423 /**
424 * Invoked if the session is closed.
425 */
426 private void cleanup() {
427 connection.removePacketListener(this.dataPacketListener);
428 }
429
430 }
431
432 /**
433 * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
434 * data packets.
435 */
436 private class IQIBBInputStream extends IBBInputStream {
437
438 protected PacketListener getDataPacketListener() {
439 return new PacketListener() {
440
441 private long lastSequence = -1;
442
443 public void processPacket(Packet packet) {
444 // get data packet extension
445 DataPacketExtension data = (DataPacketExtension) packet.getExtension(
446 DataPacketExtension.ELEMENT_NAME,
447 InBandBytestreamManager.NAMESPACE);
448
449 /*
450 * check if sequence was not used already (see XEP-0047 Section 2.2)
451 */
452 if (data.getSeq() <= this.lastSequence) {
453 IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
454 XMPPError.Condition.unexpected_request));
455 connection.sendPacket(unexpectedRequest);
456 return;
457
458 }
459
460 // check if encoded data is valid (see XEP-0047 Section 2.2)
461 if (data.getDecodedData() == null) {
462 // data is invalid; respond with bad-request error
463 IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
464 XMPPError.Condition.bad_request));
465 connection.sendPacket(badRequest);
466 return;
467 }
468
469 // data is valid; add to data queue
470 dataQueue.offer(data);
471
472 // confirm IQ
473 IQ confirmData = IQ.createResultIQ((IQ) packet);
474 connection.sendPacket(confirmData);
475
476 // set last seen sequence
477 this.lastSequence = data.getSeq();
478 if (this.lastSequence == 65535) {
479 this.lastSequence = -1;
480 }
481
482 }
483
484 };
485 }
486
487 protected PacketFilter getDataPacketFilter() {
488 /*
489 * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
490 * data packet extension, matching session ID and recipient
491 */
492 return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter());
493 }
494
495 }
496
497 /**
498 * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
499 * encapsulating the data packets.
500 */
501 private class MessageIBBInputStream extends IBBInputStream {
502
503 protected PacketListener getDataPacketListener() {
504 return new PacketListener() {
505
506 public void processPacket(Packet packet) {
507 // get data packet extension
508 DataPacketExtension data = (DataPacketExtension) packet.getExtension(
509 DataPacketExtension.ELEMENT_NAME,
510 InBandBytestreamManager.NAMESPACE);
511
512 // check if encoded data is valid
513 if (data.getDecodedData() == null) {
514 /*
515 * TODO once a majority of XMPP server implementation support XEP-0079
516 * Advanced Message Processing the invalid message could be answered with an
517 * appropriate error. For now we just ignore the packet. Subsequent packets
518 * with an increased sequence will cause the input stream to close the
519 * stream/session.
520 */
521 return;
522 }
523
524 // data is valid; add to data queue
525 dataQueue.offer(data);
526
527 // TODO confirm packet once XMPP servers support XEP-0079
528 }
529
530 };
531 }
532
533 @Override
534 protected PacketFilter getDataPacketFilter() {
535 /*
536 * filter all message stanzas containing a data packet extension, matching session ID
537 * and recipient
538 */
539 return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter());
540 }
541
542 }
543
544 /**
545 * IBBDataPacketFilter class filters all packets from the remote peer of this session,
546 * containing an In-Band Bytestream data packet extension whose session ID matches this sessions
547 * ID.
548 */
549 private class IBBDataPacketFilter implements PacketFilter {
550
551 public boolean accept(Packet packet) {
552 // sender equals remote peer
553 if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
554 return false;
555 }
556
557 // stanza contains data packet extension
558 PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME,
559 InBandBytestreamManager.NAMESPACE);
560 if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) {
561 return false;
562 }
563
564 // session ID equals this session ID
565 DataPacketExtension data = (DataPacketExtension) packetExtension;
566 if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
567 return false;
568 }
569
570 return true;
571 }
572
573 }
574
575 /**
576 * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
577 * Subclasses of this output stream must provide a method to send data over XMPP stream.
578 */
579 private abstract class IBBOutputStream extends OutputStream {
580
581 /* buffer with the size of this sessions block size */
582 protected final byte[] buffer;
583
584 /* pointer to next byte to write to buffer */
585 protected int bufferPointer = 0;
586
587 /* data packet sequence (range from 0 to 65535) */
588 protected long seq = 0;
589
590 /* flag to indicate if output stream is closed */
591 protected boolean isClosed = false;
592
593 /**
594 * Constructor.
595 */
596 public IBBOutputStream() {
597 this.buffer = new byte[(byteStreamRequest.getBlockSize()/4)*3];
598 }
599
600 /**
601 * Writes the given data packet to the XMPP stream.
602 *
603 * @param data the data packet
604 * @throws IOException if an I/O error occurred while sending or if the stream is closed
605 */
606 protected abstract void writeToXML(DataPacketExtension data) throws IOException;
607
608 public synchronized void write(int b) throws IOException {
609 if (this.isClosed) {
610 throw new IOException("Stream is closed");
611 }
612
613 // if buffer is full flush buffer
614 if (bufferPointer >= buffer.length) {
615 flushBuffer();
616 }
617
618 buffer[bufferPointer++] = (byte) b;
619 }
620
621 public synchronized void write(byte b[], int off, int len) throws IOException {
622 if (b == null) {
623 throw new NullPointerException();
624 }
625 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
626 || ((off + len) < 0)) {
627 throw new IndexOutOfBoundsException();
628 }
629 else if (len == 0) {
630 return;
631 }
632
633 if (this.isClosed) {
634 throw new IOException("Stream is closed");
635 }
636
637 // is data to send greater than buffer size
638 if (len >= buffer.length) {
639
640 // "byte" off the first chunk to write out
641 writeOut(b, off, buffer.length);
642
643 // recursively call this method with the lesser amount
644 write(b, off + buffer.length, len - buffer.length);
645 }
646 else {
647 writeOut(b, off, len);
648 }
649 }
650
651 public synchronized void write(byte[] b) throws IOException {
652 write(b, 0, b.length);
653 }
654
655 /**
656 * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
657 * capacity has been reached. This method is only called from this class so it is assured
658 * that the amount of data to send is <= buffer capacity
659 *
660 * @param b the data
661 * @param off the data
662 * @param len the number of bytes to write
663 * @throws IOException if an I/O error occurred while sending or if the stream is closed
664 */
665 private synchronized void writeOut(byte b[], int off, int len) throws IOException {
666 if (this.isClosed) {
667 throw new IOException("Stream is closed");
668 }
669
670 // set to 0 in case the next 'if' block is not executed
671 int available = 0;
672
673 // is data to send greater that buffer space left
674 if (len > buffer.length - bufferPointer) {
675 // fill buffer to capacity and send it
676 available = buffer.length - bufferPointer;
677 System.arraycopy(b, off, buffer, bufferPointer, available);
678 bufferPointer += available;
679 flushBuffer();
680 }
681
682 // copy the data left to buffer
683 System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
684 bufferPointer += len - available;
685 }
686
687 public synchronized void flush() throws IOException {
688 if (this.isClosed) {
689 throw new IOException("Stream is closed");
690 }
691 flushBuffer();
692 }
693
694 private synchronized void flushBuffer() throws IOException {
695
696 // do nothing if no data to send available
697 if (bufferPointer == 0) {
698 return;
699 }
700
701 // create data packet
702 String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false);
703 DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
704 this.seq, enc);
705
706 // write to XMPP stream
707 writeToXML(data);
708
709 // reset buffer pointer
710 bufferPointer = 0;
711
712 // increment sequence, considering sequence overflow
713 this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
714
715 }
716
717 public void close() throws IOException {
718 if (isClosed) {
719 return;
720 }
721 InBandBytestreamSession.this.closeByLocal(false);
722 }
723
724 /**
725 * Sets the close flag and optionally flushes the stream.
726 *
727 * @param flush if <code>true</code> flushes the stream
728 */
729 protected void closeInternal(boolean flush) {
730 if (this.isClosed) {
731 return;
732 }
733 this.isClosed = true;
734
735 try {
736 if (flush) {
737 flushBuffer();
738 }
739 }
740 catch (IOException e) {
741 /*
742 * ignore, because writeToXML() will not throw an exception if stream is already
743 * closed
744 */
745 }
746 }
747
748 }
749
750 /**
751 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
752 * the data packets.
753 */
754 private class IQIBBOutputStream extends IBBOutputStream {
755
756 @Override
757 protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
758 // create IQ stanza containing data packet
759 IQ iq = new Data(data);
760 iq.setTo(remoteJID);
761
762 try {
763 SyncPacketSend.getReply(connection, iq);
764 }
765 catch (XMPPException e) {
766 // close session unless it is already closed
767 if (!this.isClosed) {
768 InBandBytestreamSession.this.close();
769 throw new IOException("Error while sending Data: " + e.getMessage());
770 }
771 }
772
773 }
774
775 }
776
777 /**
778 * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
779 * encapsulating the data packets.
780 */
781 private class MessageIBBOutputStream extends IBBOutputStream {
782
783 @Override
784 protected synchronized void writeToXML(DataPacketExtension data) {
785 // create message stanza containing data packet
786 Message message = new Message(remoteJID);
787 message.addExtension(data);
788
789 connection.sendPacket(message);
790
791 }
792
793 }
794
795}