blob: c4aeb7d40266f84d84c03023f482102d70d6b3df [file] [log] [blame]
Linus Torvalds1da177e2005-04-16 15:20:36 -07001/* call.c: Rx call routines
2 *
3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/sched.h>
13#include <linux/slab.h>
14#include <linux/module.h>
15#include <rxrpc/rxrpc.h>
16#include <rxrpc/transport.h>
17#include <rxrpc/peer.h>
18#include <rxrpc/connection.h>
19#include <rxrpc/call.h>
20#include <rxrpc/message.h>
21#include "internal.h"
22
23__RXACCT_DECL(atomic_t rxrpc_call_count);
24__RXACCT_DECL(atomic_t rxrpc_message_count);
25
26LIST_HEAD(rxrpc_calls);
27DECLARE_RWSEM(rxrpc_calls_sem);
28
29unsigned rxrpc_call_rcv_timeout = HZ/3;
30static unsigned rxrpc_call_acks_timeout = HZ/3;
31static unsigned rxrpc_call_dfr_ack_timeout = HZ/20;
32static unsigned short rxrpc_call_max_resend = HZ/10;
33
34const char *rxrpc_call_states[] = {
35 "COMPLETE",
36 "ERROR",
37 "SRVR_RCV_OPID",
38 "SRVR_RCV_ARGS",
39 "SRVR_GOT_ARGS",
40 "SRVR_SND_REPLY",
41 "SRVR_RCV_FINAL_ACK",
42 "CLNT_SND_ARGS",
43 "CLNT_RCV_REPLY",
44 "CLNT_GOT_REPLY"
45};
46
47const char *rxrpc_call_error_states[] = {
48 "NO_ERROR",
49 "LOCAL_ABORT",
50 "PEER_ABORT",
51 "LOCAL_ERROR",
52 "REMOTE_ERROR"
53};
54
55const char *rxrpc_pkts[] = {
56 "?00",
57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",
58 "?09", "?10", "?11", "?12", "?13", "?14", "?15"
59};
60
61static const char *rxrpc_acks[] = {
62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
63 "-?-"
64};
65
66static const char _acktype[] = "NA-";
67
68static void rxrpc_call_receive_packet(struct rxrpc_call *call);
69static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
70 struct rxrpc_message *msg);
71static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
72 struct rxrpc_message *msg);
73static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
74 rxrpc_seq_t higest);
75static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest);
76static int __rxrpc_call_read_data(struct rxrpc_call *call);
77
78static int rxrpc_call_record_ACK(struct rxrpc_call *call,
79 struct rxrpc_message *msg,
80 rxrpc_seq_t seq,
81 size_t count);
82
83static int rxrpc_call_flush(struct rxrpc_call *call);
84
85#define _state(call) \
86 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);
87
88static void rxrpc_call_default_attn_func(struct rxrpc_call *call)
89{
90 wake_up(&call->waitq);
91}
92
93static void rxrpc_call_default_error_func(struct rxrpc_call *call)
94{
95 wake_up(&call->waitq);
96}
97
98static void rxrpc_call_default_aemap_func(struct rxrpc_call *call)
99{
100 switch (call->app_err_state) {
101 case RXRPC_ESTATE_LOCAL_ABORT:
102 call->app_abort_code = -call->app_errno;
103 case RXRPC_ESTATE_PEER_ABORT:
104 call->app_errno = -ECONNABORTED;
105 default:
106 break;
107 }
108}
109
110static void __rxrpc_call_acks_timeout(unsigned long _call)
111{
112 struct rxrpc_call *call = (struct rxrpc_call *) _call;
113
114 _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif);
115
116 call->flags |= RXRPC_CALL_ACKS_TIMO;
117 rxrpc_krxiod_queue_call(call);
118}
119
120static void __rxrpc_call_rcv_timeout(unsigned long _call)
121{
122 struct rxrpc_call *call = (struct rxrpc_call *) _call;
123
124 _debug("RCV TIMEOUT %05lu", jiffies - call->cjif);
125
126 call->flags |= RXRPC_CALL_RCV_TIMO;
127 rxrpc_krxiod_queue_call(call);
128}
129
130static void __rxrpc_call_ackr_timeout(unsigned long _call)
131{
132 struct rxrpc_call *call = (struct rxrpc_call *) _call;
133
134 _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif);
135
136 call->flags |= RXRPC_CALL_ACKR_TIMO;
137 rxrpc_krxiod_queue_call(call);
138}
139
140/*****************************************************************************/
141/*
142 * calculate a timeout based on an RTT value
143 */
144static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call,
145 unsigned long val)
146{
147 unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ);
148
149 expiry += 10;
150 if (expiry < HZ / 25)
151 expiry = HZ / 25;
152 if (expiry > HZ)
153 expiry = HZ;
154
155 _leave(" = %lu jiffies", expiry);
156 return jiffies + expiry;
157} /* end __rxrpc_rtt_based_timeout() */
158
159/*****************************************************************************/
160/*
161 * create a new call record
162 */
163static inline int __rxrpc_create_call(struct rxrpc_connection *conn,
164 struct rxrpc_call **_call)
165{
166 struct rxrpc_call *call;
167
168 _enter("%p", conn);
169
170 /* allocate and initialise a call record */
171 call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL);
172 if (!call) {
173 _leave(" ENOMEM");
174 return -ENOMEM;
175 }
176
177 atomic_set(&call->usage, 1);
178
179 init_waitqueue_head(&call->waitq);
180 spin_lock_init(&call->lock);
181 INIT_LIST_HEAD(&call->link);
182 INIT_LIST_HEAD(&call->acks_pendq);
183 INIT_LIST_HEAD(&call->rcv_receiveq);
184 INIT_LIST_HEAD(&call->rcv_krxiodq_lk);
185 INIT_LIST_HEAD(&call->app_readyq);
186 INIT_LIST_HEAD(&call->app_unreadyq);
187 INIT_LIST_HEAD(&call->app_link);
188 INIT_LIST_HEAD(&call->app_attn_link);
189
190 init_timer(&call->acks_timeout);
191 call->acks_timeout.data = (unsigned long) call;
192 call->acks_timeout.function = __rxrpc_call_acks_timeout;
193
194 init_timer(&call->rcv_timeout);
195 call->rcv_timeout.data = (unsigned long) call;
196 call->rcv_timeout.function = __rxrpc_call_rcv_timeout;
197
198 init_timer(&call->ackr_dfr_timo);
199 call->ackr_dfr_timo.data = (unsigned long) call;
200 call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout;
201
202 call->conn = conn;
203 call->ackr_win_bot = 1;
204 call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1;
205 call->ackr_prev_seq = 0;
206 call->app_mark = RXRPC_APP_MARK_EOF;
207 call->app_attn_func = rxrpc_call_default_attn_func;
208 call->app_error_func = rxrpc_call_default_error_func;
209 call->app_aemap_func = rxrpc_call_default_aemap_func;
210 call->app_scr_alloc = call->app_scratch;
211
212 call->cjif = jiffies;
213
214 _leave(" = 0 (%p)", call);
215
216 *_call = call;
217
218 return 0;
219} /* end __rxrpc_create_call() */
220
221/*****************************************************************************/
222/*
223 * create a new call record for outgoing calls
224 */
225int rxrpc_create_call(struct rxrpc_connection *conn,
226 rxrpc_call_attn_func_t attn,
227 rxrpc_call_error_func_t error,
228 rxrpc_call_aemap_func_t aemap,
229 struct rxrpc_call **_call)
230{
231 DECLARE_WAITQUEUE(myself, current);
232
233 struct rxrpc_call *call;
234 int ret, cix, loop;
235
236 _enter("%p", conn);
237
238 /* allocate and initialise a call record */
239 ret = __rxrpc_create_call(conn, &call);
240 if (ret < 0) {
241 _leave(" = %d", ret);
242 return ret;
243 }
244
245 call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS;
246 if (attn)
247 call->app_attn_func = attn;
248 if (error)
249 call->app_error_func = error;
250 if (aemap)
251 call->app_aemap_func = aemap;
252
253 _state(call);
254
255 spin_lock(&conn->lock);
256 set_current_state(TASK_INTERRUPTIBLE);
257 add_wait_queue(&conn->chanwait, &myself);
258
259 try_again:
260 /* try to find an unused channel */
261 for (cix = 0; cix < 4; cix++)
262 if (!conn->channels[cix])
263 goto obtained_chan;
264
265 /* no free channels - wait for one to become available */
266 ret = -EINTR;
267 if (signal_pending(current))
268 goto error_unwait;
269
270 spin_unlock(&conn->lock);
271
272 schedule();
273 set_current_state(TASK_INTERRUPTIBLE);
274
275 spin_lock(&conn->lock);
276 goto try_again;
277
278 /* got a channel - now attach to the connection */
279 obtained_chan:
280 remove_wait_queue(&conn->chanwait, &myself);
281 set_current_state(TASK_RUNNING);
282
283 /* concoct a unique call number */
284 next_callid:
285 call->call_id = htonl(++conn->call_counter);
286 for (loop = 0; loop < 4; loop++)
287 if (conn->channels[loop] &&
288 conn->channels[loop]->call_id == call->call_id)
289 goto next_callid;
290
291 rxrpc_get_connection(conn);
292 conn->channels[cix] = call; /* assign _after_ done callid check loop */
293 do_gettimeofday(&conn->atime);
294 call->chan_ix = htonl(cix);
295
296 spin_unlock(&conn->lock);
297
298 down_write(&rxrpc_calls_sem);
299 list_add_tail(&call->call_link, &rxrpc_calls);
300 up_write(&rxrpc_calls_sem);
301
302 __RXACCT(atomic_inc(&rxrpc_call_count));
303 *_call = call;
304
305 _leave(" = 0 (call=%p cix=%u)", call, cix);
306 return 0;
307
308 error_unwait:
309 remove_wait_queue(&conn->chanwait, &myself);
310 set_current_state(TASK_RUNNING);
311 spin_unlock(&conn->lock);
312
313 free_page((unsigned long) call);
314 _leave(" = %d", ret);
315 return ret;
316} /* end rxrpc_create_call() */
317
318/*****************************************************************************/
319/*
320 * create a new call record for incoming calls
321 */
322int rxrpc_incoming_call(struct rxrpc_connection *conn,
323 struct rxrpc_message *msg,
324 struct rxrpc_call **_call)
325{
326 struct rxrpc_call *call;
327 unsigned cix;
328 int ret;
329
330 cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK;
331
332 _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix);
333
334 /* allocate and initialise a call record */
335 ret = __rxrpc_create_call(conn, &call);
336 if (ret < 0) {
337 _leave(" = %d", ret);
338 return ret;
339 }
340
341 call->pkt_rcv_count = 1;
342 call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID;
343 call->app_mark = sizeof(uint32_t);
344
345 _state(call);
346
347 /* attach to the connection */
348 ret = -EBUSY;
349 call->chan_ix = htonl(cix);
350 call->call_id = msg->hdr.callNumber;
351
352 spin_lock(&conn->lock);
353
354 if (!conn->channels[cix] ||
355 conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE ||
356 conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR
357 ) {
358 conn->channels[cix] = call;
359 rxrpc_get_connection(conn);
360 ret = 0;
361 }
362
363 spin_unlock(&conn->lock);
364
365 if (ret < 0) {
366 free_page((unsigned long) call);
367 call = NULL;
368 }
369
370 if (ret == 0) {
371 down_write(&rxrpc_calls_sem);
372 list_add_tail(&call->call_link, &rxrpc_calls);
373 up_write(&rxrpc_calls_sem);
374 __RXACCT(atomic_inc(&rxrpc_call_count));
375 *_call = call;
376 }
377
378 _leave(" = %d [%p]", ret, call);
379 return ret;
380} /* end rxrpc_incoming_call() */
381
382/*****************************************************************************/
383/*
384 * free a call record
385 */
386void rxrpc_put_call(struct rxrpc_call *call)
387{
388 struct rxrpc_connection *conn = call->conn;
389 struct rxrpc_message *msg;
390
391 _enter("%p{u=%d}",call,atomic_read(&call->usage));
392
393 /* sanity check */
394 if (atomic_read(&call->usage) <= 0)
395 BUG();
396
397 /* to prevent a race, the decrement and the de-list must be effectively
398 * atomic */
399 spin_lock(&conn->lock);
400 if (likely(!atomic_dec_and_test(&call->usage))) {
401 spin_unlock(&conn->lock);
402 _leave("");
403 return;
404 }
405
406 if (conn->channels[ntohl(call->chan_ix)] == call)
407 conn->channels[ntohl(call->chan_ix)] = NULL;
408
409 spin_unlock(&conn->lock);
410
411 wake_up(&conn->chanwait);
412
413 rxrpc_put_connection(conn);
414
415 /* clear the timers and dequeue from krxiod */
416 del_timer_sync(&call->acks_timeout);
417 del_timer_sync(&call->rcv_timeout);
418 del_timer_sync(&call->ackr_dfr_timo);
419
420 rxrpc_krxiod_dequeue_call(call);
421
422 /* clean up the contents of the struct */
423 if (call->snd_nextmsg)
424 rxrpc_put_message(call->snd_nextmsg);
425
426 if (call->snd_ping)
427 rxrpc_put_message(call->snd_ping);
428
429 while (!list_empty(&call->acks_pendq)) {
430 msg = list_entry(call->acks_pendq.next,
431 struct rxrpc_message, link);
432 list_del(&msg->link);
433 rxrpc_put_message(msg);
434 }
435
436 while (!list_empty(&call->rcv_receiveq)) {
437 msg = list_entry(call->rcv_receiveq.next,
438 struct rxrpc_message, link);
439 list_del(&msg->link);
440 rxrpc_put_message(msg);
441 }
442
443 while (!list_empty(&call->app_readyq)) {
444 msg = list_entry(call->app_readyq.next,
445 struct rxrpc_message, link);
446 list_del(&msg->link);
447 rxrpc_put_message(msg);
448 }
449
450 while (!list_empty(&call->app_unreadyq)) {
451 msg = list_entry(call->app_unreadyq.next,
452 struct rxrpc_message, link);
453 list_del(&msg->link);
454 rxrpc_put_message(msg);
455 }
456
457 module_put(call->owner);
458
459 down_write(&rxrpc_calls_sem);
460 list_del(&call->call_link);
461 up_write(&rxrpc_calls_sem);
462
463 __RXACCT(atomic_dec(&rxrpc_call_count));
464 free_page((unsigned long) call);
465
466 _leave(" [destroyed]");
467} /* end rxrpc_put_call() */
468
469/*****************************************************************************/
470/*
471 * actually generate a normal ACK
472 */
473static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call,
474 rxrpc_seq_t seq)
475{
476 struct rxrpc_message *msg;
477 struct kvec diov[3];
478 __be32 aux[4];
479 int delta, ret;
480
481 /* ACKs default to DELAY */
482 if (!call->ackr.reason)
483 call->ackr.reason = RXRPC_ACK_DELAY;
484
485 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
486 jiffies - call->cjif,
487 ntohs(call->ackr.maxSkew),
488 ntohl(call->ackr.firstPacket),
489 ntohl(call->ackr.previousPacket),
490 ntohl(call->ackr.serial),
491 rxrpc_acks[call->ackr.reason],
492 call->ackr.nAcks);
493
494 aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */
495 aux[1] = htonl(1444); /* max MTU */
496 aux[2] = htonl(16); /* rwind */
497 aux[3] = htonl(4); /* max packets */
498
499 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
500 diov[0].iov_base = &call->ackr;
501 diov[1].iov_len = call->ackr_pend_cnt + 3;
502 diov[1].iov_base = call->ackr_array;
503 diov[2].iov_len = sizeof(aux);
504 diov[2].iov_base = &aux;
505
506 /* build and send the message */
507 ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
508 3, diov, GFP_KERNEL, &msg);
509 if (ret < 0)
510 goto out;
511
512 msg->seq = seq;
513 msg->hdr.seq = htonl(seq);
514 msg->hdr.flags |= RXRPC_SLOW_START_OK;
515
516 ret = rxrpc_conn_sendmsg(call->conn, msg);
517 rxrpc_put_message(msg);
518 if (ret < 0)
519 goto out;
520 call->pkt_snd_count++;
521
522 /* count how many actual ACKs there were at the front */
523 for (delta = 0; delta < call->ackr_pend_cnt; delta++)
524 if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK)
525 break;
526
527 call->ackr_pend_cnt -= delta; /* all ACK'd to this point */
528
529 /* crank the ACK window around */
530 if (delta == 0) {
531 /* un-ACK'd window */
532 }
533 else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) {
534 /* partially ACK'd window
535 * - shuffle down to avoid losing out-of-sequence packets
536 */
537 call->ackr_win_bot += delta;
538 call->ackr_win_top += delta;
539
540 memmove(&call->ackr_array[0],
541 &call->ackr_array[delta],
542 call->ackr_pend_cnt);
543
544 memset(&call->ackr_array[call->ackr_pend_cnt],
545 RXRPC_ACK_TYPE_NACK,
546 sizeof(call->ackr_array) - call->ackr_pend_cnt);
547 }
548 else {
549 /* fully ACK'd window
550 * - just clear the whole thing
551 */
552 memset(&call->ackr_array,
553 RXRPC_ACK_TYPE_NACK,
554 sizeof(call->ackr_array));
555 }
556
557 /* clear this ACK */
558 memset(&call->ackr, 0, sizeof(call->ackr));
559
560 out:
561 if (!call->app_call_state)
562 printk("___ STATE 0 ___\n");
563 return ret;
564} /* end __rxrpc_call_gen_normal_ACK() */
565
566/*****************************************************************************/
567/*
568 * note the reception of a packet in the call's ACK records and generate an
569 * appropriate ACK packet if necessary
570 * - returns 0 if packet should be processed, 1 if packet should be ignored
571 * and -ve on an error
572 */
573static int rxrpc_call_generate_ACK(struct rxrpc_call *call,
574 struct rxrpc_header *hdr,
575 struct rxrpc_ackpacket *ack)
576{
577 struct rxrpc_message *msg;
578 rxrpc_seq_t seq;
579 unsigned offset;
580 int ret = 0, err;
581 u8 special_ACK, do_ACK, force;
582
583 _enter("%p,%p { seq=%d tp=%d fl=%02x }",
584 call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags);
585
586 seq = ntohl(hdr->seq);
587 offset = seq - call->ackr_win_bot;
588 do_ACK = RXRPC_ACK_DELAY;
589 special_ACK = 0;
590 force = (seq == 1);
591
592 if (call->ackr_high_seq < seq)
593 call->ackr_high_seq = seq;
594
595 /* deal with generation of obvious special ACKs first */
596 if (ack && ack->reason == RXRPC_ACK_PING) {
597 special_ACK = RXRPC_ACK_PING_RESPONSE;
598 ret = 1;
599 goto gen_ACK;
600 }
601
602 if (seq < call->ackr_win_bot) {
603 special_ACK = RXRPC_ACK_DUPLICATE;
604 ret = 1;
605 goto gen_ACK;
606 }
607
608 if (seq >= call->ackr_win_top) {
609 special_ACK = RXRPC_ACK_EXCEEDS_WINDOW;
610 ret = 1;
611 goto gen_ACK;
612 }
613
614 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) {
615 special_ACK = RXRPC_ACK_DUPLICATE;
616 ret = 1;
617 goto gen_ACK;
618 }
619
620 /* okay... it's a normal data packet inside the ACK window */
621 call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK;
622
623 if (offset < call->ackr_pend_cnt) {
624 }
625 else if (offset > call->ackr_pend_cnt) {
626 do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE;
627 call->ackr_pend_cnt = offset;
628 goto gen_ACK;
629 }
630
631 if (hdr->flags & RXRPC_REQUEST_ACK) {
632 do_ACK = RXRPC_ACK_REQUESTED;
633 }
634
635 /* generate an ACK on the final packet of a reply just received */
636 if (hdr->flags & RXRPC_LAST_PACKET) {
637 if (call->conn->out_clientflag)
638 force = 1;
639 }
640 else if (!(hdr->flags & RXRPC_MORE_PACKETS)) {
641 do_ACK = RXRPC_ACK_REQUESTED;
642 }
643
644 /* re-ACK packets previously received out-of-order */
645 for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++)
646 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK)
647 break;
648
649 call->ackr_pend_cnt = offset;
650
651 /* generate an ACK if we fill up the window */
652 if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE)
653 force = 1;
654
655 gen_ACK:
656 _debug("%05lu ACKs pend=%u norm=%s special=%s%s",
657 jiffies - call->cjif,
658 call->ackr_pend_cnt,
659 rxrpc_acks[do_ACK],
660 rxrpc_acks[special_ACK],
661 force ? " immediate" :
662 do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" :
663 hdr->flags & RXRPC_LAST_PACKET ? " finalise" :
664 " defer"
665 );
666
667 /* send any pending normal ACKs if need be */
668 if (call->ackr_pend_cnt > 0) {
669 /* fill out the appropriate form */
670 call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
671 call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq,
672 65535U));
673 call->ackr.firstPacket = htonl(call->ackr_win_bot);
674 call->ackr.previousPacket = call->ackr_prev_seq;
675 call->ackr.serial = hdr->serial;
676 call->ackr.nAcks = call->ackr_pend_cnt;
677
678 if (do_ACK == RXRPC_ACK_REQUESTED)
679 call->ackr.reason = do_ACK;
680
681 /* generate the ACK immediately if necessary */
682 if (special_ACK || force) {
683 err = __rxrpc_call_gen_normal_ACK(
684 call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq);
685 if (err < 0) {
686 ret = err;
687 goto out;
688 }
689 }
690 }
691
692 if (call->ackr.reason == RXRPC_ACK_REQUESTED)
693 call->ackr_dfr_seq = seq;
694
695 /* start the ACK timer if not running if there are any pending deferred
696 * ACKs */
697 if (call->ackr_pend_cnt > 0 &&
698 call->ackr.reason != RXRPC_ACK_REQUESTED &&
699 !timer_pending(&call->ackr_dfr_timo)
700 ) {
701 unsigned long timo;
702
703 timo = rxrpc_call_dfr_ack_timeout + jiffies;
704
705 _debug("START ACKR TIMER for cj=%lu", timo - call->cjif);
706
707 spin_lock(&call->lock);
708 mod_timer(&call->ackr_dfr_timo, timo);
709 spin_unlock(&call->lock);
710 }
711 else if ((call->ackr_pend_cnt == 0 ||
712 call->ackr.reason == RXRPC_ACK_REQUESTED) &&
713 timer_pending(&call->ackr_dfr_timo)
714 ) {
715 /* stop timer if no pending ACKs */
716 _debug("CLEAR ACKR TIMER");
717 del_timer_sync(&call->ackr_dfr_timo);
718 }
719
720 /* send a special ACK if one is required */
721 if (special_ACK) {
722 struct rxrpc_ackpacket ack;
723 struct kvec diov[2];
724 uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK };
725
726 /* fill out the appropriate form */
727 ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
728 ack.maxSkew = htons(min(call->ackr_high_seq - seq,
729 65535U));
730 ack.firstPacket = htonl(call->ackr_win_bot);
731 ack.previousPacket = call->ackr_prev_seq;
732 ack.serial = hdr->serial;
733 ack.reason = special_ACK;
734 ack.nAcks = 0;
735
736 _proto("Rx Sending s-ACK"
737 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
738 ntohs(ack.maxSkew),
739 ntohl(ack.firstPacket),
740 ntohl(ack.previousPacket),
741 ntohl(ack.serial),
742 rxrpc_acks[ack.reason],
743 ack.nAcks);
744
745 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
746 diov[0].iov_base = &ack;
747 diov[1].iov_len = sizeof(acks);
748 diov[1].iov_base = acks;
749
750 /* build and send the message */
751 err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
752 hdr->seq ? 2 : 1, diov,
753 GFP_KERNEL,
754 &msg);
755 if (err < 0) {
756 ret = err;
757 goto out;
758 }
759
760 msg->seq = seq;
761 msg->hdr.seq = htonl(seq);
762 msg->hdr.flags |= RXRPC_SLOW_START_OK;
763
764 err = rxrpc_conn_sendmsg(call->conn, msg);
765 rxrpc_put_message(msg);
766 if (err < 0) {
767 ret = err;
768 goto out;
769 }
770 call->pkt_snd_count++;
771 }
772
773 out:
774 if (hdr->seq)
775 call->ackr_prev_seq = hdr->seq;
776
777 _leave(" = %d", ret);
778 return ret;
779} /* end rxrpc_call_generate_ACK() */
780
781/*****************************************************************************/
782/*
783 * handle work to be done on a call
784 * - includes packet reception and timeout processing
785 */
786void rxrpc_call_do_stuff(struct rxrpc_call *call)
787{
788 _enter("%p{flags=%lx}", call, call->flags);
789
790 /* handle packet reception */
791 if (call->flags & RXRPC_CALL_RCV_PKT) {
792 _debug("- receive packet");
793 call->flags &= ~RXRPC_CALL_RCV_PKT;
794 rxrpc_call_receive_packet(call);
795 }
796
797 /* handle overdue ACKs */
798 if (call->flags & RXRPC_CALL_ACKS_TIMO) {
799 _debug("- overdue ACK timeout");
800 call->flags &= ~RXRPC_CALL_ACKS_TIMO;
801 rxrpc_call_resend(call, call->snd_seq_count);
802 }
803
804 /* handle lack of reception */
805 if (call->flags & RXRPC_CALL_RCV_TIMO) {
806 _debug("- reception timeout");
807 call->flags &= ~RXRPC_CALL_RCV_TIMO;
808 rxrpc_call_abort(call, -EIO);
809 }
810
811 /* handle deferred ACKs */
812 if (call->flags & RXRPC_CALL_ACKR_TIMO ||
813 (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED)
814 ) {
815 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",
816 jiffies - call->cjif,
817 rxrpc_acks[call->ackr.reason],
818 call->ackr.nAcks);
819
820 call->flags &= ~RXRPC_CALL_ACKR_TIMO;
821
822 if (call->ackr.nAcks > 0 &&
823 call->app_call_state != RXRPC_CSTATE_ERROR) {
824 /* generate ACK */
825 __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq);
826 call->ackr_dfr_seq = 0;
827 }
828 }
829
830 _leave("");
831
832} /* end rxrpc_call_do_stuff() */
833
834/*****************************************************************************/
835/*
836 * send an abort message at call or connection level
837 * - must be called with call->lock held
838 * - the supplied error code is sent as the packet data
839 */
840static int __rxrpc_call_abort(struct rxrpc_call *call, int errno)
841{
842 struct rxrpc_connection *conn = call->conn;
843 struct rxrpc_message *msg;
844 struct kvec diov[1];
845 int ret;
846 __be32 _error;
847
848 _enter("%p{%08x},%p{%d},%d",
849 conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno);
850
851 /* if this call is already aborted, then just wake up any waiters */
852 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
853 spin_unlock(&call->lock);
854 call->app_error_func(call);
855 _leave(" = 0");
856 return 0;
857 }
858
859 rxrpc_get_call(call);
860
861 /* change the state _with_ the lock still held */
862 call->app_call_state = RXRPC_CSTATE_ERROR;
863 call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT;
864 call->app_errno = errno;
865 call->app_mark = RXRPC_APP_MARK_EOF;
866 call->app_read_buf = NULL;
867 call->app_async_read = 0;
868
869 _state(call);
870
871 /* ask the app to translate the error code */
872 call->app_aemap_func(call);
873
874 spin_unlock(&call->lock);
875
876 /* flush any outstanding ACKs */
877 del_timer_sync(&call->acks_timeout);
878 del_timer_sync(&call->rcv_timeout);
879 del_timer_sync(&call->ackr_dfr_timo);
880
881 if (rxrpc_call_is_ack_pending(call))
882 __rxrpc_call_gen_normal_ACK(call, 0);
883
884 /* send the abort packet only if we actually traded some other
885 * packets */
886 ret = 0;
887 if (call->pkt_snd_count || call->pkt_rcv_count) {
888 /* actually send the abort */
889 _proto("Rx Sending Call ABORT { data=%d }",
890 call->app_abort_code);
891
892 _error = htonl(call->app_abort_code);
893
894 diov[0].iov_len = sizeof(_error);
895 diov[0].iov_base = &_error;
896
897 ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT,
898 1, diov, GFP_KERNEL, &msg);
899 if (ret == 0) {
900 ret = rxrpc_conn_sendmsg(conn, msg);
901 rxrpc_put_message(msg);
902 }
903 }
904
905 /* tell the app layer to let go */
906 call->app_error_func(call);
907
908 rxrpc_put_call(call);
909
910 _leave(" = %d", ret);
911 return ret;
912} /* end __rxrpc_call_abort() */
913
914/*****************************************************************************/
915/*
916 * send an abort message at call or connection level
917 * - the supplied error code is sent as the packet data
918 */
919int rxrpc_call_abort(struct rxrpc_call *call, int error)
920{
921 spin_lock(&call->lock);
922
923 return __rxrpc_call_abort(call, error);
924
925} /* end rxrpc_call_abort() */
926
927/*****************************************************************************/
928/*
929 * process packets waiting for this call
930 */
931static void rxrpc_call_receive_packet(struct rxrpc_call *call)
932{
933 struct rxrpc_message *msg;
934 struct list_head *_p;
935
936 _enter("%p", call);
937
938 rxrpc_get_call(call); /* must not go away too soon if aborted by
939 * app-layer */
940
941 while (!list_empty(&call->rcv_receiveq)) {
942 /* try to get next packet */
943 _p = NULL;
944 spin_lock(&call->lock);
945 if (!list_empty(&call->rcv_receiveq)) {
946 _p = call->rcv_receiveq.next;
947 list_del_init(_p);
948 }
949 spin_unlock(&call->lock);
950
951 if (!_p)
952 break;
953
954 msg = list_entry(_p, struct rxrpc_message, link);
955
956 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",
957 jiffies - call->cjif,
958 rxrpc_pkts[msg->hdr.type],
959 ntohl(msg->hdr.serial),
960 msg->seq,
961 msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-',
962 msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-',
963 msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-',
964 msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-',
965 msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S'
966 );
967
968 switch (msg->hdr.type) {
969 /* deal with data packets */
970 case RXRPC_PACKET_TYPE_DATA:
971 /* ACK the packet if necessary */
972 switch (rxrpc_call_generate_ACK(call, &msg->hdr,
973 NULL)) {
974 case 0: /* useful packet */
975 rxrpc_call_receive_data_packet(call, msg);
976 break;
977 case 1: /* duplicate or out-of-window packet */
978 break;
979 default:
980 rxrpc_put_message(msg);
981 goto out;
982 }
983 break;
984
985 /* deal with ACK packets */
986 case RXRPC_PACKET_TYPE_ACK:
987 rxrpc_call_receive_ack_packet(call, msg);
988 break;
989
990 /* deal with abort packets */
991 case RXRPC_PACKET_TYPE_ABORT: {
992 __be32 _dbuf, *dp;
993
994 dp = skb_header_pointer(msg->pkt, msg->offset,
995 sizeof(_dbuf), &_dbuf);
996 if (dp == NULL)
997 printk("Rx Received short ABORT packet\n");
998
999 _proto("Rx Received Call ABORT { data=%d }",
1000 (dp ? ntohl(*dp) : 0));
1001
1002 spin_lock(&call->lock);
1003 call->app_call_state = RXRPC_CSTATE_ERROR;
1004 call->app_err_state = RXRPC_ESTATE_PEER_ABORT;
1005 call->app_abort_code = (dp ? ntohl(*dp) : 0);
1006 call->app_errno = -ECONNABORTED;
1007 call->app_mark = RXRPC_APP_MARK_EOF;
1008 call->app_read_buf = NULL;
1009 call->app_async_read = 0;
1010
1011 /* ask the app to translate the error code */
1012 call->app_aemap_func(call);
1013 _state(call);
1014 spin_unlock(&call->lock);
1015 call->app_error_func(call);
1016 break;
1017 }
1018 default:
1019 /* deal with other packet types */
1020 _proto("Rx Unsupported packet type %u (#%u)",
1021 msg->hdr.type, msg->seq);
1022 break;
1023 }
1024
1025 rxrpc_put_message(msg);
1026 }
1027
1028 out:
1029 rxrpc_put_call(call);
1030 _leave("");
1031} /* end rxrpc_call_receive_packet() */
1032
1033/*****************************************************************************/
1034/*
1035 * process next data packet
1036 * - as the next data packet arrives:
1037 * - it is queued on app_readyq _if_ it is the next one expected
1038 * (app_ready_seq+1)
1039 * - it is queued on app_unreadyq _if_ it is not the next one expected
1040 * - if a packet placed on app_readyq completely fills a hole leading up to
1041 * the first packet on app_unreadyq, then packets now in sequence are
1042 * tranferred to app_readyq
1043 * - the application layer can only see packets on app_readyq
1044 * (app_ready_qty bytes)
1045 * - the application layer is prodded every time a new packet arrives
1046 */
1047static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
1048 struct rxrpc_message *msg)
1049{
1050 const struct rxrpc_operation *optbl, *op;
1051 struct rxrpc_message *pmsg;
1052 struct list_head *_p;
1053 int ret, lo, hi, rmtimo;
1054 __be32 opid;
1055
1056 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1057
1058 rxrpc_get_message(msg);
1059
1060 /* add to the unready queue if we'd have to create a hole in the ready
1061 * queue otherwise */
1062 if (msg->seq != call->app_ready_seq + 1) {
1063 _debug("Call add packet %d to unreadyq", msg->seq);
1064
1065 /* insert in seq order */
1066 list_for_each(_p, &call->app_unreadyq) {
1067 pmsg = list_entry(_p, struct rxrpc_message, link);
1068 if (pmsg->seq > msg->seq)
1069 break;
1070 }
1071
1072 list_add_tail(&msg->link, _p);
1073
1074 _leave(" [unreadyq]");
1075 return;
1076 }
1077
1078 /* next in sequence - simply append into the call's ready queue */
1079 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",
1080 msg->seq, msg->dsize, call->app_ready_qty);
1081
1082 spin_lock(&call->lock);
1083 call->app_ready_seq = msg->seq;
1084 call->app_ready_qty += msg->dsize;
1085 list_add_tail(&msg->link, &call->app_readyq);
1086
1087 /* move unready packets to the readyq if we got rid of a hole */
1088 while (!list_empty(&call->app_unreadyq)) {
1089 pmsg = list_entry(call->app_unreadyq.next,
1090 struct rxrpc_message, link);
1091
1092 if (pmsg->seq != call->app_ready_seq + 1)
1093 break;
1094
1095 /* next in sequence - just move list-to-list */
1096 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",
1097 pmsg->seq, pmsg->dsize, call->app_ready_qty);
1098
1099 call->app_ready_seq = pmsg->seq;
1100 call->app_ready_qty += pmsg->dsize;
1101 list_del_init(&pmsg->link);
1102 list_add_tail(&pmsg->link, &call->app_readyq);
1103 }
1104
1105 /* see if we've got the last packet yet */
1106 if (!list_empty(&call->app_readyq)) {
1107 pmsg = list_entry(call->app_readyq.prev,
1108 struct rxrpc_message, link);
1109 if (pmsg->hdr.flags & RXRPC_LAST_PACKET) {
1110 call->app_last_rcv = 1;
1111 _debug("Last packet on readyq");
1112 }
1113 }
1114
1115 switch (call->app_call_state) {
1116 /* do nothing if call already aborted */
1117 case RXRPC_CSTATE_ERROR:
1118 spin_unlock(&call->lock);
1119 _leave(" [error]");
1120 return;
1121
1122 /* extract the operation ID from an incoming call if that's not
1123 * yet been done */
1124 case RXRPC_CSTATE_SRVR_RCV_OPID:
1125 spin_unlock(&call->lock);
1126
1127 /* handle as yet insufficient data for the operation ID */
1128 if (call->app_ready_qty < 4) {
1129 if (call->app_last_rcv)
1130 /* trouble - last packet seen */
1131 rxrpc_call_abort(call, -EINVAL);
1132
1133 _leave("");
1134 return;
1135 }
1136
1137 /* pull the operation ID out of the buffer */
1138 ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0);
1139 if (ret < 0) {
1140 printk("Unexpected error from read-data: %d\n", ret);
1141 if (call->app_call_state != RXRPC_CSTATE_ERROR)
1142 rxrpc_call_abort(call, ret);
1143 _leave("");
1144 return;
1145 }
1146 call->app_opcode = ntohl(opid);
1147
1148 /* locate the operation in the available ops table */
1149 optbl = call->conn->service->ops_begin;
1150 lo = 0;
1151 hi = call->conn->service->ops_end - optbl;
1152
1153 while (lo < hi) {
1154 int mid = (hi + lo) / 2;
1155 op = &optbl[mid];
1156 if (call->app_opcode == op->id)
1157 goto found_op;
1158 if (call->app_opcode > op->id)
1159 lo = mid + 1;
1160 else
1161 hi = mid;
1162 }
1163
1164 /* search failed */
1165 kproto("Rx Client requested operation %d from %s service",
1166 call->app_opcode, call->conn->service->name);
1167 rxrpc_call_abort(call, -EINVAL);
1168 _leave(" [inval]");
1169 return;
1170
1171 found_op:
1172 _proto("Rx Client requested operation %s from %s service",
1173 op->name, call->conn->service->name);
1174
1175 /* we're now waiting for the argument block (unless the call
1176 * was aborted) */
1177 spin_lock(&call->lock);
1178 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID ||
1179 call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) {
1180 if (!call->app_last_rcv)
1181 call->app_call_state =
1182 RXRPC_CSTATE_SRVR_RCV_ARGS;
1183 else if (call->app_ready_qty > 0)
1184 call->app_call_state =
1185 RXRPC_CSTATE_SRVR_GOT_ARGS;
1186 else
1187 call->app_call_state =
1188 RXRPC_CSTATE_SRVR_SND_REPLY;
1189 call->app_mark = op->asize;
1190 call->app_user = op->user;
1191 }
1192 spin_unlock(&call->lock);
1193
1194 _state(call);
1195 break;
1196
1197 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1198 /* change state if just received last packet of arg block */
1199 if (call->app_last_rcv)
1200 call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS;
1201 spin_unlock(&call->lock);
1202
1203 _state(call);
1204 break;
1205
1206 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1207 /* change state if just received last packet of reply block */
1208 rmtimo = 0;
1209 if (call->app_last_rcv) {
1210 call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY;
1211 rmtimo = 1;
1212 }
1213 spin_unlock(&call->lock);
1214
1215 if (rmtimo) {
1216 del_timer_sync(&call->acks_timeout);
1217 del_timer_sync(&call->rcv_timeout);
1218 del_timer_sync(&call->ackr_dfr_timo);
1219 }
1220
1221 _state(call);
1222 break;
1223
1224 default:
1225 /* deal with data reception in an unexpected state */
1226 printk("Unexpected state [[[ %u ]]]\n", call->app_call_state);
1227 __rxrpc_call_abort(call, -EBADMSG);
1228 _leave("");
1229 return;
1230 }
1231
1232 if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY &&
1233 call->app_last_rcv)
1234 BUG();
1235
1236 /* otherwise just invoke the data function whenever we can satisfy its desire for more
1237 * data
1238 */
1239 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s",
1240 call->app_call_state, call->app_ready_qty, call->app_mark,
1241 call->app_last_rcv ? " last-rcvd" : "");
1242
1243 spin_lock(&call->lock);
1244
1245 ret = __rxrpc_call_read_data(call);
1246 switch (ret) {
1247 case 0:
1248 spin_unlock(&call->lock);
1249 call->app_attn_func(call);
1250 break;
1251 case -EAGAIN:
1252 spin_unlock(&call->lock);
1253 break;
1254 case -ECONNABORTED:
1255 spin_unlock(&call->lock);
1256 break;
1257 default:
1258 __rxrpc_call_abort(call, ret);
1259 break;
1260 }
1261
1262 _state(call);
1263
1264 _leave("");
1265
1266} /* end rxrpc_call_receive_data_packet() */
1267
1268/*****************************************************************************/
1269/*
1270 * received an ACK packet
1271 */
1272static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
1273 struct rxrpc_message *msg)
1274{
1275 struct rxrpc_ackpacket _ack, *ap;
1276 rxrpc_serial_net_t serial;
1277 rxrpc_seq_t seq;
1278 int ret;
1279
1280 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1281
1282 /* extract the basic ACK record */
1283 ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack);
1284 if (ap == NULL) {
1285 printk("Rx Received short ACK packet\n");
1286 return;
1287 }
1288 msg->offset += sizeof(_ack);
1289
1290 serial = ap->serial;
1291 seq = ntohl(ap->firstPacket);
1292
1293 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }",
1294 ntohl(msg->hdr.serial),
1295 ntohs(ap->bufferSpace),
1296 ntohs(ap->maxSkew),
1297 seq,
1298 ntohl(ap->previousPacket),
1299 ntohl(serial),
1300 rxrpc_acks[ap->reason],
1301 call->ackr.nAcks
1302 );
1303
1304 /* check the other side isn't ACK'ing a sequence number I haven't sent
1305 * yet */
1306 if (ap->nAcks > 0 &&
1307 (seq > call->snd_seq_count ||
1308 seq + ap->nAcks - 1 > call->snd_seq_count)) {
1309 printk("Received ACK (#%u-#%u) for unsent packet\n",
1310 seq, seq + ap->nAcks - 1);
1311 rxrpc_call_abort(call, -EINVAL);
1312 _leave("");
1313 return;
1314 }
1315
1316 /* deal with RTT calculation */
1317 if (serial) {
1318 struct rxrpc_message *rttmsg;
1319
1320 /* find the prompting packet */
1321 spin_lock(&call->lock);
1322 if (call->snd_ping && call->snd_ping->hdr.serial == serial) {
1323 /* it was a ping packet */
1324 rttmsg = call->snd_ping;
1325 call->snd_ping = NULL;
1326 spin_unlock(&call->lock);
1327
1328 if (rttmsg) {
1329 rttmsg->rttdone = 1;
1330 rxrpc_peer_calculate_rtt(call->conn->peer,
1331 rttmsg, msg);
1332 rxrpc_put_message(rttmsg);
1333 }
1334 }
1335 else {
1336 struct list_head *_p;
1337
1338 /* it ought to be a data packet - look in the pending
1339 * ACK list */
1340 list_for_each(_p, &call->acks_pendq) {
1341 rttmsg = list_entry(_p, struct rxrpc_message,
1342 link);
1343 if (rttmsg->hdr.serial == serial) {
1344 if (rttmsg->rttdone)
1345 /* never do RTT twice without
1346 * resending */
1347 break;
1348
1349 rttmsg->rttdone = 1;
1350 rxrpc_peer_calculate_rtt(
1351 call->conn->peer, rttmsg, msg);
1352 break;
1353 }
1354 }
1355 spin_unlock(&call->lock);
1356 }
1357 }
1358
1359 switch (ap->reason) {
1360 /* deal with negative/positive acknowledgement of data
1361 * packets */
1362 case RXRPC_ACK_REQUESTED:
1363 case RXRPC_ACK_DELAY:
1364 case RXRPC_ACK_IDLE:
1365 rxrpc_call_definitively_ACK(call, seq - 1);
1366
1367 case RXRPC_ACK_DUPLICATE:
1368 case RXRPC_ACK_OUT_OF_SEQUENCE:
1369 case RXRPC_ACK_EXCEEDS_WINDOW:
1370 call->snd_resend_cnt = 0;
1371 ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks);
1372 if (ret < 0)
1373 rxrpc_call_abort(call, ret);
1374 break;
1375
1376 /* respond to ping packets immediately */
1377 case RXRPC_ACK_PING:
1378 rxrpc_call_generate_ACK(call, &msg->hdr, ap);
1379 break;
1380
1381 /* only record RTT on ping response packets */
1382 case RXRPC_ACK_PING_RESPONSE:
1383 if (call->snd_ping) {
1384 struct rxrpc_message *rttmsg;
1385
1386 /* only do RTT stuff if the response matches the
1387 * retained ping */
1388 rttmsg = NULL;
1389 spin_lock(&call->lock);
1390 if (call->snd_ping &&
1391 call->snd_ping->hdr.serial == ap->serial) {
1392 rttmsg = call->snd_ping;
1393 call->snd_ping = NULL;
1394 }
1395 spin_unlock(&call->lock);
1396
1397 if (rttmsg) {
1398 rttmsg->rttdone = 1;
1399 rxrpc_peer_calculate_rtt(call->conn->peer,
1400 rttmsg, msg);
1401 rxrpc_put_message(rttmsg);
1402 }
1403 }
1404 break;
1405
1406 default:
1407 printk("Unsupported ACK reason %u\n", ap->reason);
1408 break;
1409 }
1410
1411 _leave("");
1412} /* end rxrpc_call_receive_ack_packet() */
1413
1414/*****************************************************************************/
1415/*
1416 * record definitive ACKs for all messages up to and including the one with the
1417 * 'highest' seq
1418 */
1419static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
1420 rxrpc_seq_t highest)
1421{
1422 struct rxrpc_message *msg;
1423 int now_complete;
1424
1425 _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest);
1426
1427 while (call->acks_dftv_seq < highest) {
1428 call->acks_dftv_seq++;
1429
1430 _proto("Definitive ACK on packet #%u", call->acks_dftv_seq);
1431
1432 /* discard those at front of queue until message with highest
1433 * ACK is found */
1434 spin_lock(&call->lock);
1435 msg = NULL;
1436 if (!list_empty(&call->acks_pendq)) {
1437 msg = list_entry(call->acks_pendq.next,
1438 struct rxrpc_message, link);
1439 list_del_init(&msg->link); /* dequeue */
1440 if (msg->state == RXRPC_MSG_SENT)
1441 call->acks_pend_cnt--;
1442 }
1443 spin_unlock(&call->lock);
1444
1445 /* insanity check */
1446 if (!msg)
1447 panic("%s(): acks_pendq unexpectedly empty\n",
1448 __FUNCTION__);
1449
1450 if (msg->seq != call->acks_dftv_seq)
1451 panic("%s(): Packet #%u expected at front of acks_pendq"
1452 " (#%u found)\n",
1453 __FUNCTION__, call->acks_dftv_seq, msg->seq);
1454
1455 /* discard the message */
1456 msg->state = RXRPC_MSG_DONE;
1457 rxrpc_put_message(msg);
1458 }
1459
1460 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */
1461 now_complete = 0;
1462 spin_lock(&call->lock);
1463 if (call->acks_dftv_seq == call->snd_seq_count) {
1464 if (call->app_call_state != RXRPC_CSTATE_COMPLETE) {
1465 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1466 _state(call);
1467 now_complete = 1;
1468 }
1469 }
1470 spin_unlock(&call->lock);
1471
1472 if (now_complete) {
1473 del_timer_sync(&call->acks_timeout);
1474 del_timer_sync(&call->rcv_timeout);
1475 del_timer_sync(&call->ackr_dfr_timo);
1476 call->app_attn_func(call);
1477 }
1478
1479 _leave("");
1480} /* end rxrpc_call_definitively_ACK() */
1481
1482/*****************************************************************************/
1483/*
1484 * record the specified amount of ACKs/NAKs
1485 */
1486static int rxrpc_call_record_ACK(struct rxrpc_call *call,
1487 struct rxrpc_message *msg,
1488 rxrpc_seq_t seq,
1489 size_t count)
1490{
1491 struct rxrpc_message *dmsg;
1492 struct list_head *_p;
1493 rxrpc_seq_t highest;
1494 unsigned ix;
1495 size_t chunk;
1496 char resend, now_complete;
1497 u8 acks[16];
1498
1499 _enter("%p{apc=%u ads=%u},%p,%u,%Zu",
1500 call, call->acks_pend_cnt, call->acks_dftv_seq,
1501 msg, seq, count);
1502
1503 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order
1504 * ACKs) */
1505 if (seq <= call->acks_dftv_seq) {
1506 unsigned delta = call->acks_dftv_seq - seq;
1507
1508 if (count <= delta) {
1509 _leave(" = 0 [all definitively ACK'd]");
1510 return 0;
1511 }
1512
1513 seq += delta;
1514 count -= delta;
1515 msg->offset += delta;
1516 }
1517
1518 highest = seq + count - 1;
1519 resend = 0;
1520 while (count > 0) {
1521 /* extract up to 16 ACK slots at a time */
1522 chunk = min(count, sizeof(acks));
1523 count -= chunk;
1524
1525 memset(acks, 2, sizeof(acks));
1526
1527 if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) {
1528 printk("Rx Received short ACK packet\n");
1529 _leave(" = -EINVAL");
1530 return -EINVAL;
1531 }
1532 msg->offset += chunk;
1533
1534 /* check that the ACK set is valid */
1535 for (ix = 0; ix < chunk; ix++) {
1536 switch (acks[ix]) {
1537 case RXRPC_ACK_TYPE_ACK:
1538 break;
1539 case RXRPC_ACK_TYPE_NACK:
1540 resend = 1;
1541 break;
1542 default:
1543 printk("Rx Received unsupported ACK state"
1544 " %u\n", acks[ix]);
1545 _leave(" = -EINVAL");
1546 return -EINVAL;
1547 }
1548 }
1549
1550 _proto("Rx ACK of packets #%u-#%u "
1551 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)",
1552 seq, (unsigned) (seq + chunk - 1),
1553 _acktype[acks[0x0]],
1554 _acktype[acks[0x1]],
1555 _acktype[acks[0x2]],
1556 _acktype[acks[0x3]],
1557 _acktype[acks[0x4]],
1558 _acktype[acks[0x5]],
1559 _acktype[acks[0x6]],
1560 _acktype[acks[0x7]],
1561 _acktype[acks[0x8]],
1562 _acktype[acks[0x9]],
1563 _acktype[acks[0xA]],
1564 _acktype[acks[0xB]],
1565 _acktype[acks[0xC]],
1566 _acktype[acks[0xD]],
1567 _acktype[acks[0xE]],
1568 _acktype[acks[0xF]],
1569 call->acks_pend_cnt
1570 );
1571
1572 /* mark the packets in the ACK queue as being provisionally
1573 * ACK'd */
1574 ix = 0;
1575 spin_lock(&call->lock);
1576
1577 /* find the first packet ACK'd/NAK'd here */
1578 list_for_each(_p, &call->acks_pendq) {
1579 dmsg = list_entry(_p, struct rxrpc_message, link);
1580 if (dmsg->seq == seq)
1581 goto found_first;
1582 _debug("- %u: skipping #%u", ix, dmsg->seq);
1583 }
1584 goto bad_queue;
1585
1586 found_first:
1587 do {
1588 _debug("- %u: processing #%u (%c) apc=%u",
1589 ix, dmsg->seq, _acktype[acks[ix]],
1590 call->acks_pend_cnt);
1591
1592 if (acks[ix] == RXRPC_ACK_TYPE_ACK) {
1593 if (dmsg->state == RXRPC_MSG_SENT)
1594 call->acks_pend_cnt--;
1595 dmsg->state = RXRPC_MSG_ACKED;
1596 }
1597 else {
1598 if (dmsg->state == RXRPC_MSG_ACKED)
1599 call->acks_pend_cnt++;
1600 dmsg->state = RXRPC_MSG_SENT;
1601 }
1602 ix++;
1603 seq++;
1604
1605 _p = dmsg->link.next;
1606 dmsg = list_entry(_p, struct rxrpc_message, link);
1607 } while(ix < chunk &&
1608 _p != &call->acks_pendq &&
1609 dmsg->seq == seq);
1610
1611 if (ix < chunk)
1612 goto bad_queue;
1613
1614 spin_unlock(&call->lock);
1615 }
1616
1617 if (resend)
1618 rxrpc_call_resend(call, highest);
1619
1620 /* if all packets are provisionally ACK'd, then wake up anyone who's
1621 * waiting for that */
1622 now_complete = 0;
1623 spin_lock(&call->lock);
1624 if (call->acks_pend_cnt == 0) {
1625 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) {
1626 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1627 _state(call);
1628 }
1629 now_complete = 1;
1630 }
1631 spin_unlock(&call->lock);
1632
1633 if (now_complete) {
1634 _debug("- wake up waiters");
1635 del_timer_sync(&call->acks_timeout);
1636 del_timer_sync(&call->rcv_timeout);
1637 del_timer_sync(&call->ackr_dfr_timo);
1638 call->app_attn_func(call);
1639 }
1640
1641 _leave(" = 0 (apc=%u)", call->acks_pend_cnt);
1642 return 0;
1643
1644 bad_queue:
1645 panic("%s(): acks_pendq in bad state (packet #%u absent)\n",
1646 __FUNCTION__, seq);
1647
1648} /* end rxrpc_call_record_ACK() */
1649
1650/*****************************************************************************/
1651/*
1652 * transfer data from the ready packet queue to the asynchronous read buffer
1653 * - since this func is the only one going to look at packets queued on
1654 * app_readyq, we don't need a lock to modify or access them, only to modify
1655 * the queue pointers
1656 * - called with call->lock held
1657 * - the buffer must be in kernel space
1658 * - returns:
1659 * 0 if buffer filled
1660 * -EAGAIN if buffer not filled and more data to come
1661 * -EBADMSG if last packet received and insufficient data left
1662 * -ECONNABORTED if the call has in an error state
1663 */
1664static int __rxrpc_call_read_data(struct rxrpc_call *call)
1665{
1666 struct rxrpc_message *msg;
1667 size_t qty;
1668 int ret;
1669
1670 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}",
1671 call,
1672 call->app_async_read, call->app_read_buf,
1673 call->app_ready_qty, call->app_mark);
1674
1675 /* check the state */
1676 switch (call->app_call_state) {
1677 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1678 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1679 if (call->app_last_rcv) {
1680 printk("%s(%p,%p,%Zd):"
1681 " Inconsistent call state (%s, last pkt)",
1682 __FUNCTION__,
1683 call, call->app_read_buf, call->app_mark,
1684 rxrpc_call_states[call->app_call_state]);
1685 BUG();
1686 }
1687 break;
1688
1689 case RXRPC_CSTATE_SRVR_RCV_OPID:
1690 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1691 case RXRPC_CSTATE_CLNT_GOT_REPLY:
1692 break;
1693
1694 case RXRPC_CSTATE_SRVR_SND_REPLY:
1695 if (!call->app_last_rcv) {
1696 printk("%s(%p,%p,%Zd):"
1697 " Inconsistent call state (%s, not last pkt)",
1698 __FUNCTION__,
1699 call, call->app_read_buf, call->app_mark,
1700 rxrpc_call_states[call->app_call_state]);
1701 BUG();
1702 }
1703 _debug("Trying to read data from call in SND_REPLY state");
1704 break;
1705
1706 case RXRPC_CSTATE_ERROR:
1707 _leave(" = -ECONNABORTED");
1708 return -ECONNABORTED;
1709
1710 default:
1711 printk("reading in unexpected state [[[ %u ]]]\n",
1712 call->app_call_state);
1713 BUG();
1714 }
1715
1716 /* handle the case of not having an async buffer */
1717 if (!call->app_async_read) {
1718 if (call->app_mark == RXRPC_APP_MARK_EOF) {
1719 ret = call->app_last_rcv ? 0 : -EAGAIN;
1720 }
1721 else {
1722 if (call->app_mark >= call->app_ready_qty) {
1723 call->app_mark = RXRPC_APP_MARK_EOF;
1724 ret = 0;
1725 }
1726 else {
1727 ret = call->app_last_rcv ? -EBADMSG : -EAGAIN;
1728 }
1729 }
1730
1731 _leave(" = %d [no buf]", ret);
1732 return 0;
1733 }
1734
1735 while (!list_empty(&call->app_readyq) && call->app_mark > 0) {
1736 msg = list_entry(call->app_readyq.next,
1737 struct rxrpc_message, link);
1738
1739 /* drag as much data as we need out of this packet */
1740 qty = min(call->app_mark, msg->dsize);
1741
1742 _debug("reading %Zu from skb=%p off=%lu",
1743 qty, msg->pkt, msg->offset);
1744
1745 if (call->app_read_buf)
1746 if (skb_copy_bits(msg->pkt, msg->offset,
1747 call->app_read_buf, qty) < 0)
1748 panic("%s: Failed to copy data from packet:"
1749 " (%p,%p,%Zd)",
1750 __FUNCTION__,
1751 call, call->app_read_buf, qty);
1752
1753 /* if that packet is now empty, discard it */
1754 call->app_ready_qty -= qty;
1755 msg->dsize -= qty;
1756
1757 if (msg->dsize == 0) {
1758 list_del_init(&msg->link);
1759 rxrpc_put_message(msg);
1760 }
1761 else {
1762 msg->offset += qty;
1763 }
1764
1765 call->app_mark -= qty;
1766 if (call->app_read_buf)
1767 call->app_read_buf += qty;
1768 }
1769
1770 if (call->app_mark == 0) {
1771 call->app_async_read = 0;
1772 call->app_mark = RXRPC_APP_MARK_EOF;
1773 call->app_read_buf = NULL;
1774
1775 /* adjust the state if used up all packets */
1776 if (list_empty(&call->app_readyq) && call->app_last_rcv) {
1777 switch (call->app_call_state) {
1778 case RXRPC_CSTATE_SRVR_RCV_OPID:
1779 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1780 call->app_mark = RXRPC_APP_MARK_EOF;
1781 _state(call);
1782 del_timer_sync(&call->rcv_timeout);
1783 break;
1784 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1785 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1786 _state(call);
1787 del_timer_sync(&call->rcv_timeout);
1788 break;
1789 default:
1790 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1791 _state(call);
1792 del_timer_sync(&call->acks_timeout);
1793 del_timer_sync(&call->ackr_dfr_timo);
1794 del_timer_sync(&call->rcv_timeout);
1795 break;
1796 }
1797 }
1798
1799 _leave(" = 0");
1800 return 0;
1801 }
1802
1803 if (call->app_last_rcv) {
1804 _debug("Insufficient data (%Zu/%Zu)",
1805 call->app_ready_qty, call->app_mark);
1806 call->app_async_read = 0;
1807 call->app_mark = RXRPC_APP_MARK_EOF;
1808 call->app_read_buf = NULL;
1809
1810 _leave(" = -EBADMSG");
1811 return -EBADMSG;
1812 }
1813
1814 _leave(" = -EAGAIN");
1815 return -EAGAIN;
1816} /* end __rxrpc_call_read_data() */
1817
1818/*****************************************************************************/
1819/*
1820 * attempt to read the specified amount of data from the call's ready queue
1821 * into the buffer provided
1822 * - since this func is the only one going to look at packets queued on
1823 * app_readyq, we don't need a lock to modify or access them, only to modify
1824 * the queue pointers
1825 * - if the buffer pointer is NULL, then data is merely drained, not copied
1826 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is
1827 * enough data or an error will be generated
1828 * - note that the caller must have added the calling task to the call's wait
1829 * queue beforehand
1830 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this
1831 * function doesn't read all available data
1832 */
1833int rxrpc_call_read_data(struct rxrpc_call *call,
1834 void *buffer, size_t size, int flags)
1835{
1836 int ret;
1837
1838 _enter("%p{arq=%Zu},%p,%Zd,%x",
1839 call, call->app_ready_qty, buffer, size, flags);
1840
1841 spin_lock(&call->lock);
1842
1843 if (unlikely(!!call->app_read_buf)) {
1844 spin_unlock(&call->lock);
1845 _leave(" = -EBUSY");
1846 return -EBUSY;
1847 }
1848
1849 call->app_mark = size;
1850 call->app_read_buf = buffer;
1851 call->app_async_read = 1;
1852 call->app_read_count++;
1853
1854 /* read as much data as possible */
1855 ret = __rxrpc_call_read_data(call);
1856 switch (ret) {
1857 case 0:
1858 if (flags & RXRPC_CALL_READ_ALL &&
1859 (!call->app_last_rcv || call->app_ready_qty > 0)) {
1860 _leave(" = -EBADMSG");
1861 __rxrpc_call_abort(call, -EBADMSG);
1862 return -EBADMSG;
1863 }
1864
1865 spin_unlock(&call->lock);
1866 call->app_attn_func(call);
1867 _leave(" = 0");
1868 return ret;
1869
1870 case -ECONNABORTED:
1871 spin_unlock(&call->lock);
1872 _leave(" = %d [aborted]", ret);
1873 return ret;
1874
1875 default:
1876 __rxrpc_call_abort(call, ret);
1877 _leave(" = %d", ret);
1878 return ret;
1879
1880 case -EAGAIN:
1881 spin_unlock(&call->lock);
1882
1883 if (!(flags & RXRPC_CALL_READ_BLOCK)) {
1884 _leave(" = -EAGAIN");
1885 return -EAGAIN;
1886 }
1887
1888 /* wait for the data to arrive */
1889 _debug("blocking for data arrival");
1890
1891 for (;;) {
1892 set_current_state(TASK_INTERRUPTIBLE);
1893 if (!call->app_async_read || signal_pending(current))
1894 break;
1895 schedule();
1896 }
1897 set_current_state(TASK_RUNNING);
1898
1899 if (signal_pending(current)) {
1900 _leave(" = -EINTR");
1901 return -EINTR;
1902 }
1903
1904 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
1905 _leave(" = -ECONNABORTED");
1906 return -ECONNABORTED;
1907 }
1908
1909 _leave(" = 0");
1910 return 0;
1911 }
1912
1913} /* end rxrpc_call_read_data() */
1914
1915/*****************************************************************************/
1916/*
1917 * write data to a call
1918 * - the data may not be sent immediately if it doesn't fill a buffer
1919 * - if we can't queue all the data for buffering now, siov[] will have been
1920 * adjusted to take account of what has been sent
1921 */
1922int rxrpc_call_write_data(struct rxrpc_call *call,
1923 size_t sioc,
1924 struct kvec *siov,
1925 u8 rxhdr_flags,
Al Virodd0fc662005-10-07 07:46:04 +01001926 gfp_t alloc_flags,
Linus Torvalds1da177e2005-04-16 15:20:36 -07001927 int dup_data,
1928 size_t *size_sent)
1929{
1930 struct rxrpc_message *msg;
1931 struct kvec *sptr;
1932 size_t space, size, chunk, tmp;
1933 char *buf;
1934 int ret;
1935
1936 _enter("%p,%Zu,%p,%02x,%x,%d,%p",
1937 call, sioc, siov, rxhdr_flags, alloc_flags, dup_data,
1938 size_sent);
1939
1940 *size_sent = 0;
1941 size = 0;
1942 ret = -EINVAL;
1943
1944 /* can't send more if we've sent last packet from this end */
1945 switch (call->app_call_state) {
1946 case RXRPC_CSTATE_SRVR_SND_REPLY:
1947 case RXRPC_CSTATE_CLNT_SND_ARGS:
1948 break;
1949 case RXRPC_CSTATE_ERROR:
1950 ret = call->app_errno;
1951 default:
1952 goto out;
1953 }
1954
1955 /* calculate how much data we've been given */
1956 sptr = siov;
1957 for (; sioc > 0; sptr++, sioc--) {
1958 if (!sptr->iov_len)
1959 continue;
1960
1961 if (!sptr->iov_base)
1962 goto out;
1963
1964 size += sptr->iov_len;
1965 }
1966
1967 _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size);
1968
1969 do {
1970 /* make sure there's a message under construction */
1971 if (!call->snd_nextmsg) {
1972 /* no - allocate a message with no data yet attached */
1973 ret = rxrpc_conn_newmsg(call->conn, call,
1974 RXRPC_PACKET_TYPE_DATA,
1975 0, NULL, alloc_flags,
1976 &call->snd_nextmsg);
1977 if (ret < 0)
1978 goto out;
1979 _debug("- allocated new message [ds=%Zu]",
1980 call->snd_nextmsg->dsize);
1981 }
1982
1983 msg = call->snd_nextmsg;
1984 msg->hdr.flags |= rxhdr_flags;
1985
1986 /* deal with zero-length terminal packet */
1987 if (size == 0) {
1988 if (rxhdr_flags & RXRPC_LAST_PACKET) {
1989 ret = rxrpc_call_flush(call);
1990 if (ret < 0)
1991 goto out;
1992 }
1993 break;
1994 }
1995
1996 /* work out how much space current packet has available */
1997 space = call->conn->mtu_size - msg->dsize;
1998 chunk = min(space, size);
1999
2000 _debug("- [before] space=%Zu chunk=%Zu", space, chunk);
2001
2002 while (!siov->iov_len)
2003 siov++;
2004
2005 /* if we are going to have to duplicate the data then coalesce
2006 * it too */
2007 if (dup_data) {
2008 /* don't allocate more that 1 page at a time */
2009 if (chunk > PAGE_SIZE)
2010 chunk = PAGE_SIZE;
2011
2012 /* allocate a data buffer and attach to the message */
2013 buf = kmalloc(chunk, alloc_flags);
2014 if (unlikely(!buf)) {
2015 if (msg->dsize ==
2016 sizeof(struct rxrpc_header)) {
2017 /* discard an empty msg and wind back
2018 * the seq counter */
2019 rxrpc_put_message(msg);
2020 call->snd_nextmsg = NULL;
2021 call->snd_seq_count--;
2022 }
2023
2024 ret = -ENOMEM;
2025 goto out;
2026 }
2027
2028 tmp = msg->dcount++;
2029 set_bit(tmp, &msg->dfree);
2030 msg->data[tmp].iov_base = buf;
2031 msg->data[tmp].iov_len = chunk;
2032 msg->dsize += chunk;
2033 *size_sent += chunk;
2034 size -= chunk;
2035
2036 /* load the buffer with data */
2037 while (chunk > 0) {
2038 tmp = min(chunk, siov->iov_len);
2039 memcpy(buf, siov->iov_base, tmp);
2040 buf += tmp;
2041 siov->iov_base += tmp;
2042 siov->iov_len -= tmp;
2043 if (!siov->iov_len)
2044 siov++;
2045 chunk -= tmp;
2046 }
2047 }
2048 else {
2049 /* we want to attach the supplied buffers directly */
2050 while (chunk > 0 &&
2051 msg->dcount < RXRPC_MSG_MAX_IOCS) {
2052 tmp = msg->dcount++;
2053 msg->data[tmp].iov_base = siov->iov_base;
2054 msg->data[tmp].iov_len = siov->iov_len;
2055 msg->dsize += siov->iov_len;
2056 *size_sent += siov->iov_len;
2057 size -= siov->iov_len;
2058 chunk -= siov->iov_len;
2059 siov++;
2060 }
2061 }
2062
2063 _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size);
2064
2065 /* dispatch the message when full, final or requesting ACK */
2066 if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) {
2067 ret = rxrpc_call_flush(call);
2068 if (ret < 0)
2069 goto out;
2070 }
2071
2072 } while(size > 0);
2073
2074 ret = 0;
2075 out:
2076 _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size);
2077 return ret;
2078
2079} /* end rxrpc_call_write_data() */
2080
2081/*****************************************************************************/
2082/*
2083 * flush outstanding packets to the network
2084 */
2085static int rxrpc_call_flush(struct rxrpc_call *call)
2086{
2087 struct rxrpc_message *msg;
2088 int ret = 0;
2089
2090 _enter("%p", call);
2091
2092 rxrpc_get_call(call);
2093
2094 /* if there's a packet under construction, then dispatch it now */
2095 if (call->snd_nextmsg) {
2096 msg = call->snd_nextmsg;
2097 call->snd_nextmsg = NULL;
2098
2099 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2100 msg->hdr.flags &= ~RXRPC_MORE_PACKETS;
2101 if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS)
2102 msg->hdr.flags |= RXRPC_REQUEST_ACK;
2103 }
2104 else {
2105 msg->hdr.flags |= RXRPC_MORE_PACKETS;
2106 }
2107
2108 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",
2109 msg->dsize, msg->dcount, msg->dfree);
2110
2111 /* queue and adjust call state */
2112 spin_lock(&call->lock);
2113 list_add_tail(&msg->link, &call->acks_pendq);
2114
2115 /* decide what to do depending on current state and if this is
2116 * the last packet */
2117 ret = -EINVAL;
2118 switch (call->app_call_state) {
2119 case RXRPC_CSTATE_SRVR_SND_REPLY:
2120 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2121 call->app_call_state =
2122 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK;
2123 _state(call);
2124 }
2125 break;
2126
2127 case RXRPC_CSTATE_CLNT_SND_ARGS:
2128 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2129 call->app_call_state =
2130 RXRPC_CSTATE_CLNT_RCV_REPLY;
2131 _state(call);
2132 }
2133 break;
2134
2135 case RXRPC_CSTATE_ERROR:
2136 ret = call->app_errno;
2137 default:
2138 spin_unlock(&call->lock);
2139 goto out;
2140 }
2141
2142 call->acks_pend_cnt++;
2143
2144 mod_timer(&call->acks_timeout,
2145 __rxrpc_rtt_based_timeout(call,
2146 rxrpc_call_acks_timeout));
2147
2148 spin_unlock(&call->lock);
2149
2150 ret = rxrpc_conn_sendmsg(call->conn, msg);
2151 if (ret == 0)
2152 call->pkt_snd_count++;
2153 }
2154
2155 out:
2156 rxrpc_put_call(call);
2157
2158 _leave(" = %d", ret);
2159 return ret;
2160
2161} /* end rxrpc_call_flush() */
2162
2163/*****************************************************************************/
2164/*
2165 * resend NAK'd or unacknowledged packets up to the highest one specified
2166 */
2167static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest)
2168{
2169 struct rxrpc_message *msg;
2170 struct list_head *_p;
2171 rxrpc_seq_t seq = 0;
2172
2173 _enter("%p,%u", call, highest);
2174
2175 _proto("Rx Resend required");
2176
2177 /* handle too many resends */
2178 if (call->snd_resend_cnt >= rxrpc_call_max_resend) {
2179 _debug("Aborting due to too many resends (rcv=%d)",
2180 call->pkt_rcv_count);
2181 rxrpc_call_abort(call,
2182 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT);
2183 _leave("");
2184 return;
2185 }
2186
2187 spin_lock(&call->lock);
2188 call->snd_resend_cnt++;
2189 for (;;) {
2190 /* determine which the next packet we might need to ACK is */
2191 if (seq <= call->acks_dftv_seq)
2192 seq = call->acks_dftv_seq;
2193 seq++;
2194
2195 if (seq > highest)
2196 break;
2197
2198 /* look for the packet in the pending-ACK queue */
2199 list_for_each(_p, &call->acks_pendq) {
2200 msg = list_entry(_p, struct rxrpc_message, link);
2201 if (msg->seq == seq)
2202 goto found_msg;
2203 }
2204
2205 panic("%s(%p,%d):"
2206 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",
2207 __FUNCTION__, call, highest,
2208 call->acks_dftv_seq, call->snd_seq_count, seq);
2209
2210 found_msg:
2211 if (msg->state != RXRPC_MSG_SENT)
2212 continue; /* only un-ACK'd packets */
2213
2214 rxrpc_get_message(msg);
2215 spin_unlock(&call->lock);
2216
2217 /* send each message again (and ignore any errors we might
2218 * incur) */
2219 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",
2220 msg->dsize, msg->dcount, msg->dfree);
2221
2222 if (rxrpc_conn_sendmsg(call->conn, msg) == 0)
2223 call->pkt_snd_count++;
2224
2225 rxrpc_put_message(msg);
2226
2227 spin_lock(&call->lock);
2228 }
2229
2230 /* reset the timeout */
2231 mod_timer(&call->acks_timeout,
2232 __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout));
2233
2234 spin_unlock(&call->lock);
2235
2236 _leave("");
2237} /* end rxrpc_call_resend() */
2238
2239/*****************************************************************************/
2240/*
2241 * handle an ICMP error being applied to a call
2242 */
2243void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno)
2244{
2245 _enter("%p{%u},%d", call, ntohl(call->call_id), errno);
2246
2247 /* if this call is already aborted, then just wake up any waiters */
2248 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
2249 call->app_error_func(call);
2250 }
2251 else {
2252 /* tell the app layer what happened */
2253 spin_lock(&call->lock);
2254 call->app_call_state = RXRPC_CSTATE_ERROR;
2255 _state(call);
2256 if (local)
2257 call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR;
2258 else
2259 call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR;
2260 call->app_errno = errno;
2261 call->app_mark = RXRPC_APP_MARK_EOF;
2262 call->app_read_buf = NULL;
2263 call->app_async_read = 0;
2264
2265 /* map the error */
2266 call->app_aemap_func(call);
2267
2268 del_timer_sync(&call->acks_timeout);
2269 del_timer_sync(&call->rcv_timeout);
2270 del_timer_sync(&call->ackr_dfr_timo);
2271
2272 spin_unlock(&call->lock);
2273
2274 call->app_error_func(call);
2275 }
2276
2277 _leave("");
2278} /* end rxrpc_call_handle_error() */