blob: 6e73b28915ea6deedd047a6ddd7e73be40e1da13 [file] [log] [blame]
Ursula Braune6727f32017-01-09 16:55:23 +01001/*
2 * Shared Memory Communications over RDMA (SMC-R) and RoCE
3 *
4 * Manage send buffer.
5 * Producer:
6 * Copy user space data into send buffer, if send buffer space available.
7 * Consumer:
8 * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
9 *
10 * Copyright IBM Corp. 2016
11 *
12 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
13 */
14
15#include <linux/net.h>
16#include <linux/rcupdate.h>
17#include <linux/workqueue.h>
18#include <net/sock.h>
19
20#include "smc.h"
21#include "smc_wr.h"
22#include "smc_cdc.h"
23#include "smc_tx.h"
24
25/***************************** sndbuf producer *******************************/
26
27/* callback implementation for sk.sk_write_space()
28 * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
29 * called under sk_socket lock.
30 */
31static void smc_tx_write_space(struct sock *sk)
32{
33 struct socket *sock = sk->sk_socket;
34 struct smc_sock *smc = smc_sk(sk);
35 struct socket_wq *wq;
36
37 /* similar to sk_stream_write_space */
38 if (atomic_read(&smc->conn.sndbuf_space) && sock) {
39 clear_bit(SOCK_NOSPACE, &sock->flags);
40 rcu_read_lock();
41 wq = rcu_dereference(sk->sk_wq);
42 if (skwq_has_sleeper(wq))
43 wake_up_interruptible_poll(&wq->wait,
44 POLLOUT | POLLWRNORM |
45 POLLWRBAND);
46 if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
47 sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
48 rcu_read_unlock();
49 }
50}
51
52/* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
53 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
54 */
55void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
56{
57 if (smc->sk.sk_socket &&
58 test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
59 smc->sk.sk_write_space(&smc->sk);
60}
61
62/* blocks sndbuf producer until at least one byte of free space available */
63static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
64{
65 DEFINE_WAIT_FUNC(wait, woken_wake_function);
66 struct smc_connection *conn = &smc->conn;
67 struct sock *sk = &smc->sk;
68 bool noblock;
69 long timeo;
70 int rc = 0;
71
72 /* similar to sk_stream_wait_memory */
73 timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
74 noblock = timeo ? false : true;
75 add_wait_queue(sk_sleep(sk), &wait);
76 while (1) {
77 sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
78 if (sk->sk_err ||
79 (sk->sk_shutdown & SEND_SHUTDOWN) ||
80 conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
81 rc = -EPIPE;
82 break;
83 }
84 if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
85 rc = -ECONNRESET;
86 break;
87 }
88 if (!timeo) {
89 if (noblock)
90 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
91 rc = -EAGAIN;
92 break;
93 }
94 if (signal_pending(current)) {
95 rc = sock_intr_errno(timeo);
96 break;
97 }
98 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
99 if (atomic_read(&conn->sndbuf_space))
100 break; /* at least 1 byte of free space available */
101 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
102 sk->sk_write_pending++;
103 sk_wait_event(sk, &timeo,
104 sk->sk_err ||
105 (sk->sk_shutdown & SEND_SHUTDOWN) ||
106 smc_cdc_rxed_any_close_or_senddone(conn) ||
107 atomic_read(&conn->sndbuf_space),
108 &wait);
109 sk->sk_write_pending--;
110 }
111 remove_wait_queue(sk_sleep(sk), &wait);
112 return rc;
113}
114
115/* sndbuf producer: main API called by socket layer.
116 * called under sock lock.
117 */
118int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
119{
120 size_t copylen, send_done = 0, send_remaining = len;
121 size_t chunk_len, chunk_off, chunk_len_sum;
122 struct smc_connection *conn = &smc->conn;
123 union smc_host_cursor prep;
124 struct sock *sk = &smc->sk;
125 char *sndbuf_base;
126 int tx_cnt_prep;
127 int writespace;
128 int rc, chunk;
129
130 /* This should be in poll */
131 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
132
133 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
134 rc = -EPIPE;
135 goto out_err;
136 }
137
138 while (msg_data_left(msg)) {
139 if (sk->sk_state == SMC_INIT)
140 return -ENOTCONN;
141 if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
Ursula Braunb38d7322017-01-09 16:55:25 +0100142 (smc->sk.sk_err == ECONNABORTED) ||
Ursula Braune6727f32017-01-09 16:55:23 +0100143 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
144 return -EPIPE;
145 if (smc_cdc_rxed_any_close(conn))
146 return send_done ?: -ECONNRESET;
147
148 if (!atomic_read(&conn->sndbuf_space)) {
149 rc = smc_tx_wait_memory(smc, msg->msg_flags);
150 if (rc) {
151 if (send_done)
152 return send_done;
153 goto out_err;
154 }
155 continue;
156 }
157
158 /* initialize variables for 1st iteration of subsequent loop */
159 /* could be just 1 byte, even after smc_tx_wait_memory above */
160 writespace = atomic_read(&conn->sndbuf_space);
161 /* not more than what user space asked for */
162 copylen = min_t(size_t, send_remaining, writespace);
163 /* determine start of sndbuf */
164 sndbuf_base = conn->sndbuf_desc->cpu_addr;
165 smc_curs_write(&prep,
166 smc_curs_read(&conn->tx_curs_prep, conn),
167 conn);
168 tx_cnt_prep = prep.count;
169 /* determine chunks where to write into sndbuf */
170 /* either unwrapped case, or 1st chunk of wrapped case */
171 chunk_len = min_t(size_t,
172 copylen, conn->sndbuf_size - tx_cnt_prep);
173 chunk_len_sum = chunk_len;
174 chunk_off = tx_cnt_prep;
175 for (chunk = 0; chunk < 2; chunk++) {
176 rc = memcpy_from_msg(sndbuf_base + chunk_off,
177 msg, chunk_len);
178 if (rc) {
179 if (send_done)
180 return send_done;
181 goto out_err;
182 }
183 send_done += chunk_len;
184 send_remaining -= chunk_len;
185
186 if (chunk_len_sum == copylen)
187 break; /* either on 1st or 2nd iteration */
188 /* prepare next (== 2nd) iteration */
189 chunk_len = copylen - chunk_len; /* remainder */
190 chunk_len_sum += chunk_len;
191 chunk_off = 0; /* modulo offset in send ring buffer */
192 }
193 /* update cursors */
194 smc_curs_add(conn->sndbuf_size, &prep, copylen);
195 smc_curs_write(&conn->tx_curs_prep,
196 smc_curs_read(&prep, conn),
197 conn);
198 /* increased in send tasklet smc_cdc_tx_handler() */
199 smp_mb__before_atomic();
200 atomic_sub(copylen, &conn->sndbuf_space);
201 /* guarantee 0 <= sndbuf_space <= sndbuf_size */
202 smp_mb__after_atomic();
203 /* since we just produced more new data into sndbuf,
204 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
205 */
206 smc_tx_sndbuf_nonempty(conn);
207 } /* while (msg_data_left(msg)) */
208
209 return send_done;
210
211out_err:
212 rc = sk_stream_error(sk, msg->msg_flags, rc);
213 /* make sure we wake any epoll edge trigger waiter */
214 if (unlikely(rc == -EAGAIN))
215 sk->sk_write_space(sk);
216 return rc;
217}
218
219/***************************** sndbuf consumer *******************************/
220
221/* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
222static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
223 int num_sges, struct ib_sge sges[])
224{
225 struct smc_link_group *lgr = conn->lgr;
226 struct ib_send_wr *failed_wr = NULL;
227 struct ib_rdma_wr rdma_wr;
228 struct smc_link *link;
229 int rc;
230
231 memset(&rdma_wr, 0, sizeof(rdma_wr));
232 link = &lgr->lnk[SMC_SINGLE_LINK];
233 rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
234 rdma_wr.wr.sg_list = sges;
235 rdma_wr.wr.num_sge = num_sges;
236 rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
237 rdma_wr.remote_addr =
238 lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
239 /* RMBE within RMB */
240 ((conn->peer_conn_idx - 1) * conn->peer_rmbe_size) +
241 /* offset within RMBE */
242 peer_rmbe_offset;
243 rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
244 rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
245 if (rc)
246 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
247 return rc;
248}
249
250/* sndbuf consumer */
251static inline void smc_tx_advance_cursors(struct smc_connection *conn,
252 union smc_host_cursor *prod,
253 union smc_host_cursor *sent,
254 size_t len)
255{
256 smc_curs_add(conn->peer_rmbe_size, prod, len);
257 /* increased in recv tasklet smc_cdc_msg_rcv() */
258 smp_mb__before_atomic();
259 /* data in flight reduces usable snd_wnd */
260 atomic_sub(len, &conn->peer_rmbe_space);
261 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
262 smp_mb__after_atomic();
263 smc_curs_add(conn->sndbuf_size, sent, len);
264}
265
266/* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
267 * usable snd_wnd as max transmit
268 */
269static int smc_tx_rdma_writes(struct smc_connection *conn)
270{
271 size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
272 size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
273 union smc_host_cursor sent, prep, prod, cons;
274 struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
275 struct smc_link_group *lgr = conn->lgr;
276 int to_send, rmbespace;
277 struct smc_link *link;
278 int num_sges;
279 int rc;
280
281 /* source: sndbuf */
282 smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
283 smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
284 /* cf. wmem_alloc - (snd_max - snd_una) */
285 to_send = smc_curs_diff(conn->sndbuf_size, &sent, &prep);
286 if (to_send <= 0)
287 return 0;
288
289 /* destination: RMBE */
290 /* cf. snd_wnd */
291 rmbespace = atomic_read(&conn->peer_rmbe_space);
292 if (rmbespace <= 0)
293 return 0;
294 smc_curs_write(&prod,
295 smc_curs_read(&conn->local_tx_ctrl.prod, conn),
296 conn);
297 smc_curs_write(&cons,
298 smc_curs_read(&conn->local_rx_ctrl.cons, conn),
299 conn);
300
301 /* if usable snd_wnd closes ask peer to advertise once it opens again */
302 conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
303 /* cf. usable snd_wnd */
304 len = min(to_send, rmbespace);
305
306 /* initialize variables for first iteration of subsequent nested loop */
307 link = &lgr->lnk[SMC_SINGLE_LINK];
308 dst_off = prod.count;
309 if (prod.wrap == cons.wrap) {
310 /* the filled destination area is unwrapped,
311 * hence the available free destination space is wrapped
312 * and we need 2 destination chunks of sum len; start with 1st
313 * which is limited by what's available in sndbuf
314 */
315 dst_len = min_t(size_t,
316 conn->peer_rmbe_size - prod.count, len);
317 } else {
318 /* the filled destination area is wrapped,
319 * hence the available free destination space is unwrapped
320 * and we need a single destination chunk of entire len
321 */
322 dst_len = len;
323 }
324 dst_len_sum = dst_len;
325 src_off = sent.count;
326 /* dst_len determines the maximum src_len */
327 if (sent.count + dst_len <= conn->sndbuf_size) {
328 /* unwrapped src case: single chunk of entire dst_len */
329 src_len = dst_len;
330 } else {
331 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
332 src_len = conn->sndbuf_size - sent.count;
333 }
334 src_len_sum = src_len;
335 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
336 num_sges = 0;
337 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
338 sges[srcchunk].addr =
339 conn->sndbuf_desc->dma_addr[SMC_SINGLE_LINK] +
340 src_off;
341 sges[srcchunk].length = src_len;
342 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
343 num_sges++;
344 src_off += src_len;
345 if (src_off >= conn->sndbuf_size)
346 src_off -= conn->sndbuf_size;
347 /* modulo in send ring */
348 if (src_len_sum == dst_len)
349 break; /* either on 1st or 2nd iteration */
350 /* prepare next (== 2nd) iteration */
351 src_len = dst_len - src_len; /* remainder */
352 src_len_sum += src_len;
353 }
354 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
355 if (rc)
356 return rc;
357 if (dst_len_sum == len)
358 break; /* either on 1st or 2nd iteration */
359 /* prepare next (== 2nd) iteration */
360 dst_off = 0; /* modulo offset in RMBE ring buffer */
361 dst_len = len - dst_len; /* remainder */
362 dst_len_sum += dst_len;
363 src_len = min_t(int,
364 dst_len, conn->sndbuf_size - sent.count);
365 src_len_sum = src_len;
366 }
367
368 smc_tx_advance_cursors(conn, &prod, &sent, len);
369 /* update connection's cursors with advanced local cursors */
370 smc_curs_write(&conn->local_tx_ctrl.prod,
371 smc_curs_read(&prod, conn),
372 conn);
373 /* dst: peer RMBE */
374 smc_curs_write(&conn->tx_curs_sent,
375 smc_curs_read(&sent, conn),
376 conn);
377 /* src: local sndbuf */
378
379 return 0;
380}
381
382/* Wakeup sndbuf consumers from any context (IRQ or process)
383 * since there is more data to transmit; usable snd_wnd as max transmit
384 */
385int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
386{
387 struct smc_cdc_tx_pend *pend;
388 struct smc_wr_buf *wr_buf;
389 int rc;
390
391 spin_lock_bh(&conn->send_lock);
392 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], &wr_buf,
393 &pend);
394 if (rc < 0) {
395 if (rc == -EBUSY) {
Ursula Braunb38d7322017-01-09 16:55:25 +0100396 struct smc_sock *smc =
397 container_of(conn, struct smc_sock, conn);
398
399 if (smc->sk.sk_err == ECONNABORTED) {
400 rc = sock_error(&smc->sk);
401 goto out_unlock;
402 }
Ursula Braune6727f32017-01-09 16:55:23 +0100403 rc = 0;
404 schedule_work(&conn->tx_work);
405 }
406 goto out_unlock;
407 }
408
409 rc = smc_tx_rdma_writes(conn);
410 if (rc) {
411 smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
412 (struct smc_wr_tx_pend_priv *)pend);
413 goto out_unlock;
414 }
415
416 rc = smc_cdc_msg_send(conn, wr_buf, pend);
417
418out_unlock:
419 spin_unlock_bh(&conn->send_lock);
420 return rc;
421}
422
423/* Wakeup sndbuf consumers from process context
424 * since there is more data to transmit
425 */
426static void smc_tx_work(struct work_struct *work)
427{
428 struct smc_connection *conn = container_of(work,
429 struct smc_connection,
430 tx_work);
431 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
432
433 lock_sock(&smc->sk);
434 smc_tx_sndbuf_nonempty(conn);
435 release_sock(&smc->sk);
436}
437
Ursula Braun952310c2017-01-09 16:55:24 +0100438void smc_tx_consumer_update(struct smc_connection *conn)
439{
440 union smc_host_cursor cfed, cons;
441 struct smc_cdc_tx_pend *pend;
442 struct smc_wr_buf *wr_buf;
443 int to_confirm, rc;
444
445 smc_curs_write(&cons,
446 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
447 conn);
448 smc_curs_write(&cfed,
449 smc_curs_read(&conn->rx_curs_confirmed, conn),
450 conn);
451 to_confirm = smc_curs_diff(conn->rmbe_size, &cfed, &cons);
452
453 if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
454 ((to_confirm > conn->rmbe_update_limit) &&
455 ((to_confirm > (conn->rmbe_size / 2)) ||
456 conn->local_rx_ctrl.prod_flags.write_blocked))) {
457 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
458 &wr_buf, &pend);
459 if (!rc)
460 rc = smc_cdc_msg_send(conn, wr_buf, pend);
461 if (rc < 0) {
462 schedule_work(&conn->tx_work);
463 return;
464 }
465 smc_curs_write(&conn->rx_curs_confirmed,
466 smc_curs_read(&conn->local_tx_ctrl.cons, conn),
467 conn);
468 conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
469 }
470 if (conn->local_rx_ctrl.prod_flags.write_blocked &&
471 !atomic_read(&conn->bytes_to_rcv))
472 conn->local_rx_ctrl.prod_flags.write_blocked = 0;
473}
474
Ursula Braune6727f32017-01-09 16:55:23 +0100475/***************************** send initialize *******************************/
476
477/* Initialize send properties on connection establishment. NB: not __init! */
478void smc_tx_init(struct smc_sock *smc)
479{
480 smc->sk.sk_write_space = smc_tx_write_space;
481 INIT_WORK(&smc->conn.tx_work, smc_tx_work);
482 spin_lock_init(&smc->conn.send_lock);
483}