Shuyi Chen | d7955ce | 2013-05-22 14:51:55 -0700 | [diff] [blame] | 1 | /**
|
| 2 | * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
|
| 3 | * you may not use this file except in compliance with the License.
|
| 4 | * You may obtain a copy of the License at
|
| 5 | *
|
| 6 | * http://www.apache.org/licenses/LICENSE-2.0
|
| 7 | *
|
| 8 | * Unless required by applicable law or agreed to in writing, software
|
| 9 | * distributed under the License is distributed on an "AS IS" BASIS,
|
| 10 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| 11 | * See the License for the specific language governing permissions and
|
| 12 | * limitations under the License.
|
| 13 | */
|
| 14 | package org.jivesoftware.smackx.bytestreams.ibb;
|
| 15 |
|
| 16 | import java.io.IOException;
|
| 17 | import java.io.InputStream;
|
| 18 | import java.io.OutputStream;
|
| 19 | import java.net.SocketTimeoutException;
|
| 20 | import java.util.concurrent.BlockingQueue;
|
| 21 | import java.util.concurrent.LinkedBlockingQueue;
|
| 22 | import java.util.concurrent.TimeUnit;
|
| 23 |
|
| 24 | import org.jivesoftware.smack.Connection;
|
| 25 | import org.jivesoftware.smack.PacketListener;
|
| 26 | import org.jivesoftware.smack.XMPPException;
|
| 27 | import org.jivesoftware.smack.filter.AndFilter;
|
| 28 | import org.jivesoftware.smack.filter.PacketFilter;
|
| 29 | import org.jivesoftware.smack.filter.PacketTypeFilter;
|
| 30 | import org.jivesoftware.smack.packet.IQ;
|
| 31 | import org.jivesoftware.smack.packet.Message;
|
| 32 | import org.jivesoftware.smack.packet.Packet;
|
| 33 | import org.jivesoftware.smack.packet.PacketExtension;
|
| 34 | import org.jivesoftware.smack.packet.XMPPError;
|
| 35 | import org.jivesoftware.smack.util.StringUtils;
|
| 36 | import org.jivesoftware.smack.util.SyncPacketSend;
|
| 37 | import org.jivesoftware.smackx.bytestreams.BytestreamSession;
|
| 38 | import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
|
| 39 | import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
|
| 40 | import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
|
| 41 | import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
|
| 42 |
|
| 43 | /**
|
| 44 | * InBandBytestreamSession class represents an In-Band Bytestream session.
|
| 45 | * <p>
|
| 46 | * In-band bytestreams are bidirectional and this session encapsulates the streams for both
|
| 47 | * directions.
|
| 48 | * <p>
|
| 49 | * Note that closing the In-Band Bytestream session will close both streams. If both streams are
|
| 50 | * closed individually the session will be closed automatically once the second stream is closed.
|
| 51 | * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
|
| 52 | * automatically if one of them is closed.
|
| 53 | *
|
| 54 | * @author Henning Staib
|
| 55 | */
|
| 56 | public class InBandBytestreamSession implements BytestreamSession {
|
| 57 |
|
| 58 | /* XMPP connection */
|
| 59 | private final Connection connection;
|
| 60 |
|
| 61 | /* the In-Band Bytestream open request for this session */
|
| 62 | private final Open byteStreamRequest;
|
| 63 |
|
| 64 | /*
|
| 65 | * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
|
| 66 | */
|
| 67 | private IBBInputStream inputStream;
|
| 68 |
|
| 69 | /*
|
| 70 | * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
|
| 71 | */
|
| 72 | private IBBOutputStream outputStream;
|
| 73 |
|
| 74 | /* JID of the remote peer */
|
| 75 | private String remoteJID;
|
| 76 |
|
| 77 | /* flag to close both streams if one of them is closed */
|
| 78 | private boolean closeBothStreamsEnabled = false;
|
| 79 |
|
| 80 | /* flag to indicate if session is closed */
|
| 81 | private boolean isClosed = false;
|
| 82 |
|
| 83 | /**
|
| 84 | * Constructor.
|
| 85 | *
|
| 86 | * @param connection the XMPP connection
|
| 87 | * @param byteStreamRequest the In-Band Bytestream open request for this session
|
| 88 | * @param remoteJID JID of the remote peer
|
| 89 | */
|
| 90 | protected InBandBytestreamSession(Connection connection, Open byteStreamRequest,
|
| 91 | String remoteJID) {
|
| 92 | this.connection = connection;
|
| 93 | this.byteStreamRequest = byteStreamRequest;
|
| 94 | this.remoteJID = remoteJID;
|
| 95 |
|
| 96 | // initialize streams dependent to the uses stanza type
|
| 97 | switch (byteStreamRequest.getStanza()) {
|
| 98 | case IQ:
|
| 99 | this.inputStream = new IQIBBInputStream();
|
| 100 | this.outputStream = new IQIBBOutputStream();
|
| 101 | break;
|
| 102 | case MESSAGE:
|
| 103 | this.inputStream = new MessageIBBInputStream();
|
| 104 | this.outputStream = new MessageIBBOutputStream();
|
| 105 | break;
|
| 106 | }
|
| 107 |
|
| 108 | }
|
| 109 |
|
| 110 | public InputStream getInputStream() {
|
| 111 | return this.inputStream;
|
| 112 | }
|
| 113 |
|
| 114 | public OutputStream getOutputStream() {
|
| 115 | return this.outputStream;
|
| 116 | }
|
| 117 |
|
| 118 | public int getReadTimeout() {
|
| 119 | return this.inputStream.readTimeout;
|
| 120 | }
|
| 121 |
|
| 122 | public void setReadTimeout(int timeout) {
|
| 123 | if (timeout < 0) {
|
| 124 | throw new IllegalArgumentException("Timeout must be >= 0");
|
| 125 | }
|
| 126 | this.inputStream.readTimeout = timeout;
|
| 127 | }
|
| 128 |
|
| 129 | /**
|
| 130 | * Returns whether both streams should be closed automatically if one of the streams is closed.
|
| 131 | * Default is <code>false</code>.
|
| 132 | *
|
| 133 | * @return <code>true</code> if both streams will be closed if one of the streams is closed,
|
| 134 | * <code>false</code> if both streams can be closed independently.
|
| 135 | */
|
| 136 | public boolean isCloseBothStreamsEnabled() {
|
| 137 | return closeBothStreamsEnabled;
|
| 138 | }
|
| 139 |
|
| 140 | /**
|
| 141 | * Sets whether both streams should be closed automatically if one of the streams is closed.
|
| 142 | * Default is <code>false</code>.
|
| 143 | *
|
| 144 | * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
|
| 145 | * the streams is closed, <code>false</code> if both streams should be closed
|
| 146 | * independently
|
| 147 | */
|
| 148 | public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
|
| 149 | this.closeBothStreamsEnabled = closeBothStreamsEnabled;
|
| 150 | }
|
| 151 |
|
| 152 | public void close() throws IOException {
|
| 153 | closeByLocal(true); // close input stream
|
| 154 | closeByLocal(false); // close output stream
|
| 155 | }
|
| 156 |
|
| 157 | /**
|
| 158 | * This method is invoked if a request to close the In-Band Bytestream has been received.
|
| 159 | *
|
| 160 | * @param closeRequest the close request from the remote peer
|
| 161 | */
|
| 162 | protected void closeByPeer(Close closeRequest) {
|
| 163 |
|
| 164 | /*
|
| 165 | * close streams without flushing them, because stream is already considered closed on the
|
| 166 | * remote peers side
|
| 167 | */
|
| 168 | this.inputStream.closeInternal();
|
| 169 | this.inputStream.cleanup();
|
| 170 | this.outputStream.closeInternal(false);
|
| 171 |
|
| 172 | // acknowledge close request
|
| 173 | IQ confirmClose = IQ.createResultIQ(closeRequest);
|
| 174 | this.connection.sendPacket(confirmClose);
|
| 175 |
|
| 176 | }
|
| 177 |
|
| 178 | /**
|
| 179 | * This method is invoked if one of the streams has been closed locally, if an error occurred
|
| 180 | * locally or if the whole session should be closed.
|
| 181 | *
|
| 182 | * @throws IOException if an error occurs while sending the close request
|
| 183 | */
|
| 184 | protected synchronized void closeByLocal(boolean in) throws IOException {
|
| 185 | if (this.isClosed) {
|
| 186 | return;
|
| 187 | }
|
| 188 |
|
| 189 | if (this.closeBothStreamsEnabled) {
|
| 190 | this.inputStream.closeInternal();
|
| 191 | this.outputStream.closeInternal(true);
|
| 192 | }
|
| 193 | else {
|
| 194 | if (in) {
|
| 195 | this.inputStream.closeInternal();
|
| 196 | }
|
| 197 | else {
|
| 198 | // close stream but try to send any data left
|
| 199 | this.outputStream.closeInternal(true);
|
| 200 | }
|
| 201 | }
|
| 202 |
|
| 203 | if (this.inputStream.isClosed && this.outputStream.isClosed) {
|
| 204 | this.isClosed = true;
|
| 205 |
|
| 206 | // send close request
|
| 207 | Close close = new Close(this.byteStreamRequest.getSessionID());
|
| 208 | close.setTo(this.remoteJID);
|
| 209 | try {
|
| 210 | SyncPacketSend.getReply(this.connection, close);
|
| 211 | }
|
| 212 | catch (XMPPException e) {
|
| 213 | throw new IOException("Error while closing stream: " + e.getMessage());
|
| 214 | }
|
| 215 |
|
| 216 | this.inputStream.cleanup();
|
| 217 |
|
| 218 | // remove session from manager
|
| 219 | InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
|
| 220 | }
|
| 221 |
|
| 222 | }
|
| 223 |
|
| 224 | /**
|
| 225 | * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
|
| 226 | * Subclasses of this input stream must provide a packet listener along with a packet filter to
|
| 227 | * collect the In-Band Bytestream data packets.
|
| 228 | */
|
| 229 | private abstract class IBBInputStream extends InputStream {
|
| 230 |
|
| 231 | /* the data packet listener to fill the data queue */
|
| 232 | private final PacketListener dataPacketListener;
|
| 233 |
|
| 234 | /* queue containing received In-Band Bytestream data packets */
|
| 235 | protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
|
| 236 |
|
| 237 | /* buffer containing the data from one data packet */
|
| 238 | private byte[] buffer;
|
| 239 |
|
| 240 | /* pointer to the next byte to read from buffer */
|
| 241 | private int bufferPointer = -1;
|
| 242 |
|
| 243 | /* data packet sequence (range from 0 to 65535) */
|
| 244 | private long seq = -1;
|
| 245 |
|
| 246 | /* flag to indicate if input stream is closed */
|
| 247 | private boolean isClosed = false;
|
| 248 |
|
| 249 | /* flag to indicate if close method was invoked */
|
| 250 | private boolean closeInvoked = false;
|
| 251 |
|
| 252 | /* timeout for read operations */
|
| 253 | private int readTimeout = 0;
|
| 254 |
|
| 255 | /**
|
| 256 | * Constructor.
|
| 257 | */
|
| 258 | public IBBInputStream() {
|
| 259 | // add data packet listener to connection
|
| 260 | this.dataPacketListener = getDataPacketListener();
|
| 261 | connection.addPacketListener(this.dataPacketListener, getDataPacketFilter());
|
| 262 | }
|
| 263 |
|
| 264 | /**
|
| 265 | * Returns the packet listener that processes In-Band Bytestream data packets.
|
| 266 | *
|
| 267 | * @return the data packet listener
|
| 268 | */
|
| 269 | protected abstract PacketListener getDataPacketListener();
|
| 270 |
|
| 271 | /**
|
| 272 | * Returns the packet filter that accepts In-Band Bytestream data packets.
|
| 273 | *
|
| 274 | * @return the data packet filter
|
| 275 | */
|
| 276 | protected abstract PacketFilter getDataPacketFilter();
|
| 277 |
|
| 278 | public synchronized int read() throws IOException {
|
| 279 | checkClosed();
|
| 280 |
|
| 281 | // if nothing read yet or whole buffer has been read fill buffer
|
| 282 | if (bufferPointer == -1 || bufferPointer >= buffer.length) {
|
| 283 | // if no data available and stream was closed return -1
|
| 284 | if (!loadBuffer()) {
|
| 285 | return -1;
|
| 286 | }
|
| 287 | }
|
| 288 |
|
| 289 | // return byte and increment buffer pointer
|
| 290 | return ((int) buffer[bufferPointer++]) & 0xff;
|
| 291 | }
|
| 292 |
|
| 293 | public synchronized int read(byte[] b, int off, int len) throws IOException {
|
| 294 | if (b == null) {
|
| 295 | throw new NullPointerException();
|
| 296 | }
|
| 297 | else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|
| 298 | || ((off + len) < 0)) {
|
| 299 | throw new IndexOutOfBoundsException();
|
| 300 | }
|
| 301 | else if (len == 0) {
|
| 302 | return 0;
|
| 303 | }
|
| 304 |
|
| 305 | checkClosed();
|
| 306 |
|
| 307 | // if nothing read yet or whole buffer has been read fill buffer
|
| 308 | if (bufferPointer == -1 || bufferPointer >= buffer.length) {
|
| 309 | // if no data available and stream was closed return -1
|
| 310 | if (!loadBuffer()) {
|
| 311 | return -1;
|
| 312 | }
|
| 313 | }
|
| 314 |
|
| 315 | // if more bytes wanted than available return all available
|
| 316 | int bytesAvailable = buffer.length - bufferPointer;
|
| 317 | if (len > bytesAvailable) {
|
| 318 | len = bytesAvailable;
|
| 319 | }
|
| 320 |
|
| 321 | System.arraycopy(buffer, bufferPointer, b, off, len);
|
| 322 | bufferPointer += len;
|
| 323 | return len;
|
| 324 | }
|
| 325 |
|
| 326 | public synchronized int read(byte[] b) throws IOException {
|
| 327 | return read(b, 0, b.length);
|
| 328 | }
|
| 329 |
|
| 330 | /**
|
| 331 | * This method blocks until a data packet is received, the stream is closed or the current
|
| 332 | * thread is interrupted.
|
| 333 | *
|
| 334 | * @return <code>true</code> if data was received, otherwise <code>false</code>
|
| 335 | * @throws IOException if data packets are out of sequence
|
| 336 | */
|
| 337 | private synchronized boolean loadBuffer() throws IOException {
|
| 338 |
|
| 339 | // wait until data is available or stream is closed
|
| 340 | DataPacketExtension data = null;
|
| 341 | try {
|
| 342 | if (this.readTimeout == 0) {
|
| 343 | while (data == null) {
|
| 344 | if (isClosed && this.dataQueue.isEmpty()) {
|
| 345 | return false;
|
| 346 | }
|
| 347 | data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
|
| 348 | }
|
| 349 | }
|
| 350 | else {
|
| 351 | data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
|
| 352 | if (data == null) {
|
| 353 | throw new SocketTimeoutException();
|
| 354 | }
|
| 355 | }
|
| 356 | }
|
| 357 | catch (InterruptedException e) {
|
| 358 | // Restore the interrupted status
|
| 359 | Thread.currentThread().interrupt();
|
| 360 | return false;
|
| 361 | }
|
| 362 |
|
| 363 | // handle sequence overflow
|
| 364 | if (this.seq == 65535) {
|
| 365 | this.seq = -1;
|
| 366 | }
|
| 367 |
|
| 368 | // check if data packets sequence is successor of last seen sequence
|
| 369 | long seq = data.getSeq();
|
| 370 | if (seq - 1 != this.seq) {
|
| 371 | // packets out of order; close stream/session
|
| 372 | InBandBytestreamSession.this.close();
|
| 373 | throw new IOException("Packets out of sequence");
|
| 374 | }
|
| 375 | else {
|
| 376 | this.seq = seq;
|
| 377 | }
|
| 378 |
|
| 379 | // set buffer to decoded data
|
| 380 | buffer = data.getDecodedData();
|
| 381 | bufferPointer = 0;
|
| 382 | return true;
|
| 383 | }
|
| 384 |
|
| 385 | /**
|
| 386 | * Checks if this stream is closed and throws an IOException if necessary
|
| 387 | *
|
| 388 | * @throws IOException if stream is closed and no data should be read anymore
|
| 389 | */
|
| 390 | private void checkClosed() throws IOException {
|
| 391 | /* throw no exception if there is data available, but not if close method was invoked */
|
| 392 | if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) {
|
| 393 | // clear data queue in case additional data was received after stream was closed
|
| 394 | this.dataQueue.clear();
|
| 395 | throw new IOException("Stream is closed");
|
| 396 | }
|
| 397 | }
|
| 398 |
|
| 399 | public boolean markSupported() {
|
| 400 | return false;
|
| 401 | }
|
| 402 |
|
| 403 | public void close() throws IOException {
|
| 404 | if (isClosed) {
|
| 405 | return;
|
| 406 | }
|
| 407 |
|
| 408 | this.closeInvoked = true;
|
| 409 |
|
| 410 | InBandBytestreamSession.this.closeByLocal(true);
|
| 411 | }
|
| 412 |
|
| 413 | /**
|
| 414 | * This method sets the close flag and removes the data packet listener.
|
| 415 | */
|
| 416 | private void closeInternal() {
|
| 417 | if (isClosed) {
|
| 418 | return;
|
| 419 | }
|
| 420 | isClosed = true;
|
| 421 | }
|
| 422 |
|
| 423 | /**
|
| 424 | * Invoked if the session is closed.
|
| 425 | */
|
| 426 | private void cleanup() {
|
| 427 | connection.removePacketListener(this.dataPacketListener);
|
| 428 | }
|
| 429 |
|
| 430 | }
|
| 431 |
|
| 432 | /**
|
| 433 | * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
|
| 434 | * data packets.
|
| 435 | */
|
| 436 | private class IQIBBInputStream extends IBBInputStream {
|
| 437 |
|
| 438 | protected PacketListener getDataPacketListener() {
|
| 439 | return new PacketListener() {
|
| 440 |
|
| 441 | private long lastSequence = -1;
|
| 442 |
|
| 443 | public void processPacket(Packet packet) {
|
| 444 | // get data packet extension
|
| 445 | DataPacketExtension data = (DataPacketExtension) packet.getExtension(
|
| 446 | DataPacketExtension.ELEMENT_NAME,
|
| 447 | InBandBytestreamManager.NAMESPACE);
|
| 448 |
|
| 449 | /*
|
| 450 | * check if sequence was not used already (see XEP-0047 Section 2.2)
|
| 451 | */
|
| 452 | if (data.getSeq() <= this.lastSequence) {
|
| 453 | IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
|
| 454 | XMPPError.Condition.unexpected_request));
|
| 455 | connection.sendPacket(unexpectedRequest);
|
| 456 | return;
|
| 457 |
|
| 458 | }
|
| 459 |
|
| 460 | // check if encoded data is valid (see XEP-0047 Section 2.2)
|
| 461 | if (data.getDecodedData() == null) {
|
| 462 | // data is invalid; respond with bad-request error
|
| 463 | IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
|
| 464 | XMPPError.Condition.bad_request));
|
| 465 | connection.sendPacket(badRequest);
|
| 466 | return;
|
| 467 | }
|
| 468 |
|
| 469 | // data is valid; add to data queue
|
| 470 | dataQueue.offer(data);
|
| 471 |
|
| 472 | // confirm IQ
|
| 473 | IQ confirmData = IQ.createResultIQ((IQ) packet);
|
| 474 | connection.sendPacket(confirmData);
|
| 475 |
|
| 476 | // set last seen sequence
|
| 477 | this.lastSequence = data.getSeq();
|
| 478 | if (this.lastSequence == 65535) {
|
| 479 | this.lastSequence = -1;
|
| 480 | }
|
| 481 |
|
| 482 | }
|
| 483 |
|
| 484 | };
|
| 485 | }
|
| 486 |
|
| 487 | protected PacketFilter getDataPacketFilter() {
|
| 488 | /*
|
| 489 | * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
|
| 490 | * data packet extension, matching session ID and recipient
|
| 491 | */
|
| 492 | return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter());
|
| 493 | }
|
| 494 |
|
| 495 | }
|
| 496 |
|
| 497 | /**
|
| 498 | * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
|
| 499 | * encapsulating the data packets.
|
| 500 | */
|
| 501 | private class MessageIBBInputStream extends IBBInputStream {
|
| 502 |
|
| 503 | protected PacketListener getDataPacketListener() {
|
| 504 | return new PacketListener() {
|
| 505 |
|
| 506 | public void processPacket(Packet packet) {
|
| 507 | // get data packet extension
|
| 508 | DataPacketExtension data = (DataPacketExtension) packet.getExtension(
|
| 509 | DataPacketExtension.ELEMENT_NAME,
|
| 510 | InBandBytestreamManager.NAMESPACE);
|
| 511 |
|
| 512 | // check if encoded data is valid
|
| 513 | if (data.getDecodedData() == null) {
|
| 514 | /*
|
| 515 | * TODO once a majority of XMPP server implementation support XEP-0079
|
| 516 | * Advanced Message Processing the invalid message could be answered with an
|
| 517 | * appropriate error. For now we just ignore the packet. Subsequent packets
|
| 518 | * with an increased sequence will cause the input stream to close the
|
| 519 | * stream/session.
|
| 520 | */
|
| 521 | return;
|
| 522 | }
|
| 523 |
|
| 524 | // data is valid; add to data queue
|
| 525 | dataQueue.offer(data);
|
| 526 |
|
| 527 | // TODO confirm packet once XMPP servers support XEP-0079
|
| 528 | }
|
| 529 |
|
| 530 | };
|
| 531 | }
|
| 532 |
|
| 533 | @Override
|
| 534 | protected PacketFilter getDataPacketFilter() {
|
| 535 | /*
|
| 536 | * filter all message stanzas containing a data packet extension, matching session ID
|
| 537 | * and recipient
|
| 538 | */
|
| 539 | return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter());
|
| 540 | }
|
| 541 |
|
| 542 | }
|
| 543 |
|
| 544 | /**
|
| 545 | * IBBDataPacketFilter class filters all packets from the remote peer of this session,
|
| 546 | * containing an In-Band Bytestream data packet extension whose session ID matches this sessions
|
| 547 | * ID.
|
| 548 | */
|
| 549 | private class IBBDataPacketFilter implements PacketFilter {
|
| 550 |
|
| 551 | public boolean accept(Packet packet) {
|
| 552 | // sender equals remote peer
|
| 553 | if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
|
| 554 | return false;
|
| 555 | }
|
| 556 |
|
| 557 | // stanza contains data packet extension
|
| 558 | PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME,
|
| 559 | InBandBytestreamManager.NAMESPACE);
|
| 560 | if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) {
|
| 561 | return false;
|
| 562 | }
|
| 563 |
|
| 564 | // session ID equals this session ID
|
| 565 | DataPacketExtension data = (DataPacketExtension) packetExtension;
|
| 566 | if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
|
| 567 | return false;
|
| 568 | }
|
| 569 |
|
| 570 | return true;
|
| 571 | }
|
| 572 |
|
| 573 | }
|
| 574 |
|
| 575 | /**
|
| 576 | * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
|
| 577 | * Subclasses of this output stream must provide a method to send data over XMPP stream.
|
| 578 | */
|
| 579 | private abstract class IBBOutputStream extends OutputStream {
|
| 580 |
|
| 581 | /* buffer with the size of this sessions block size */
|
| 582 | protected final byte[] buffer;
|
| 583 |
|
| 584 | /* pointer to next byte to write to buffer */
|
| 585 | protected int bufferPointer = 0;
|
| 586 |
|
| 587 | /* data packet sequence (range from 0 to 65535) */
|
| 588 | protected long seq = 0;
|
| 589 |
|
| 590 | /* flag to indicate if output stream is closed */
|
| 591 | protected boolean isClosed = false;
|
| 592 |
|
| 593 | /**
|
| 594 | * Constructor.
|
| 595 | */
|
| 596 | public IBBOutputStream() {
|
| 597 | this.buffer = new byte[(byteStreamRequest.getBlockSize()/4)*3];
|
| 598 | }
|
| 599 |
|
| 600 | /**
|
| 601 | * Writes the given data packet to the XMPP stream.
|
| 602 | *
|
| 603 | * @param data the data packet
|
| 604 | * @throws IOException if an I/O error occurred while sending or if the stream is closed
|
| 605 | */
|
| 606 | protected abstract void writeToXML(DataPacketExtension data) throws IOException;
|
| 607 |
|
| 608 | public synchronized void write(int b) throws IOException {
|
| 609 | if (this.isClosed) {
|
| 610 | throw new IOException("Stream is closed");
|
| 611 | }
|
| 612 |
|
| 613 | // if buffer is full flush buffer
|
| 614 | if (bufferPointer >= buffer.length) {
|
| 615 | flushBuffer();
|
| 616 | }
|
| 617 |
|
| 618 | buffer[bufferPointer++] = (byte) b;
|
| 619 | }
|
| 620 |
|
| 621 | public synchronized void write(byte b[], int off, int len) throws IOException {
|
| 622 | if (b == null) {
|
| 623 | throw new NullPointerException();
|
| 624 | }
|
| 625 | else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|
| 626 | || ((off + len) < 0)) {
|
| 627 | throw new IndexOutOfBoundsException();
|
| 628 | }
|
| 629 | else if (len == 0) {
|
| 630 | return;
|
| 631 | }
|
| 632 |
|
| 633 | if (this.isClosed) {
|
| 634 | throw new IOException("Stream is closed");
|
| 635 | }
|
| 636 |
|
| 637 | // is data to send greater than buffer size
|
| 638 | if (len >= buffer.length) {
|
| 639 |
|
| 640 | // "byte" off the first chunk to write out
|
| 641 | writeOut(b, off, buffer.length);
|
| 642 |
|
| 643 | // recursively call this method with the lesser amount
|
| 644 | write(b, off + buffer.length, len - buffer.length);
|
| 645 | }
|
| 646 | else {
|
| 647 | writeOut(b, off, len);
|
| 648 | }
|
| 649 | }
|
| 650 |
|
| 651 | public synchronized void write(byte[] b) throws IOException {
|
| 652 | write(b, 0, b.length);
|
| 653 | }
|
| 654 |
|
| 655 | /**
|
| 656 | * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
|
| 657 | * capacity has been reached. This method is only called from this class so it is assured
|
| 658 | * that the amount of data to send is <= buffer capacity
|
| 659 | *
|
| 660 | * @param b the data
|
| 661 | * @param off the data
|
| 662 | * @param len the number of bytes to write
|
| 663 | * @throws IOException if an I/O error occurred while sending or if the stream is closed
|
| 664 | */
|
| 665 | private synchronized void writeOut(byte b[], int off, int len) throws IOException {
|
| 666 | if (this.isClosed) {
|
| 667 | throw new IOException("Stream is closed");
|
| 668 | }
|
| 669 |
|
| 670 | // set to 0 in case the next 'if' block is not executed
|
| 671 | int available = 0;
|
| 672 |
|
| 673 | // is data to send greater that buffer space left
|
| 674 | if (len > buffer.length - bufferPointer) {
|
| 675 | // fill buffer to capacity and send it
|
| 676 | available = buffer.length - bufferPointer;
|
| 677 | System.arraycopy(b, off, buffer, bufferPointer, available);
|
| 678 | bufferPointer += available;
|
| 679 | flushBuffer();
|
| 680 | }
|
| 681 |
|
| 682 | // copy the data left to buffer
|
| 683 | System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
|
| 684 | bufferPointer += len - available;
|
| 685 | }
|
| 686 |
|
| 687 | public synchronized void flush() throws IOException {
|
| 688 | if (this.isClosed) {
|
| 689 | throw new IOException("Stream is closed");
|
| 690 | }
|
| 691 | flushBuffer();
|
| 692 | }
|
| 693 |
|
| 694 | private synchronized void flushBuffer() throws IOException {
|
| 695 |
|
| 696 | // do nothing if no data to send available
|
| 697 | if (bufferPointer == 0) {
|
| 698 | return;
|
| 699 | }
|
| 700 |
|
| 701 | // create data packet
|
| 702 | String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false);
|
| 703 | DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
|
| 704 | this.seq, enc);
|
| 705 |
|
| 706 | // write to XMPP stream
|
| 707 | writeToXML(data);
|
| 708 |
|
| 709 | // reset buffer pointer
|
| 710 | bufferPointer = 0;
|
| 711 |
|
| 712 | // increment sequence, considering sequence overflow
|
| 713 | this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
|
| 714 |
|
| 715 | }
|
| 716 |
|
| 717 | public void close() throws IOException {
|
| 718 | if (isClosed) {
|
| 719 | return;
|
| 720 | }
|
| 721 | InBandBytestreamSession.this.closeByLocal(false);
|
| 722 | }
|
| 723 |
|
| 724 | /**
|
| 725 | * Sets the close flag and optionally flushes the stream.
|
| 726 | *
|
| 727 | * @param flush if <code>true</code> flushes the stream
|
| 728 | */
|
| 729 | protected void closeInternal(boolean flush) {
|
| 730 | if (this.isClosed) {
|
| 731 | return;
|
| 732 | }
|
| 733 | this.isClosed = true;
|
| 734 |
|
| 735 | try {
|
| 736 | if (flush) {
|
| 737 | flushBuffer();
|
| 738 | }
|
| 739 | }
|
| 740 | catch (IOException e) {
|
| 741 | /*
|
| 742 | * ignore, because writeToXML() will not throw an exception if stream is already
|
| 743 | * closed
|
| 744 | */
|
| 745 | }
|
| 746 | }
|
| 747 |
|
| 748 | }
|
| 749 |
|
| 750 | /**
|
| 751 | * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
|
| 752 | * the data packets.
|
| 753 | */
|
| 754 | private class IQIBBOutputStream extends IBBOutputStream {
|
| 755 |
|
| 756 | @Override
|
| 757 | protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
|
| 758 | // create IQ stanza containing data packet
|
| 759 | IQ iq = new Data(data);
|
| 760 | iq.setTo(remoteJID);
|
| 761 |
|
| 762 | try {
|
| 763 | SyncPacketSend.getReply(connection, iq);
|
| 764 | }
|
| 765 | catch (XMPPException e) {
|
| 766 | // close session unless it is already closed
|
| 767 | if (!this.isClosed) {
|
| 768 | InBandBytestreamSession.this.close();
|
| 769 | throw new IOException("Error while sending Data: " + e.getMessage());
|
| 770 | }
|
| 771 | }
|
| 772 |
|
| 773 | }
|
| 774 |
|
| 775 | }
|
| 776 |
|
| 777 | /**
|
| 778 | * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
|
| 779 | * encapsulating the data packets.
|
| 780 | */
|
| 781 | private class MessageIBBOutputStream extends IBBOutputStream {
|
| 782 |
|
| 783 | @Override
|
| 784 | protected synchronized void writeToXML(DataPacketExtension data) {
|
| 785 | // create message stanza containing data packet
|
| 786 | Message message = new Message(remoteJID);
|
| 787 | message.addExtension(data);
|
| 788 |
|
| 789 | connection.sendPacket(message);
|
| 790 |
|
| 791 | }
|
| 792 |
|
| 793 | }
|
| 794 |
|
| 795 | }
|