blob: d86bef6cb681e8839ce5f3fd9929ac8ed8b8b1f4 [file] [log] [blame]
Ursula Braune6727f32017-01-09 16:55:23 +01001/*
2 * Shared Memory Communications over RDMA (SMC-R) and RoCE
3 *
4 * Manage send buffer.
5 * Producer:
6 * Copy user space data into send buffer, if send buffer space available.
7 * Consumer:
8 * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
9 *
10 * Copyright IBM Corp. 2016
11 *
12 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
13 */
14
15#include <linux/net.h>
16#include <linux/rcupdate.h>
17#include <linux/workqueue.h>
18#include <net/sock.h>
19
20#include "smc.h"
21#include "smc_wr.h"
22#include "smc_cdc.h"
23#include "smc_tx.h"
24
25/***************************** sndbuf producer *******************************/
26
27/* callback implementation for sk.sk_write_space()
28 * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
29 * called under sk_socket lock.
30 */
31static void smc_tx_write_space(struct sock *sk)
32{
33 struct socket *sock = sk->sk_socket;
34 struct smc_sock *smc = smc_sk(sk);
35 struct socket_wq *wq;
36
37 /* similar to sk_stream_write_space */
38 if (atomic_read(&smc->conn.sndbuf_space) && sock) {
39 clear_bit(SOCK_NOSPACE, &sock->flags);
40 rcu_read_lock();
41 wq = rcu_dereference(sk->sk_wq);
42 if (skwq_has_sleeper(wq))
43 wake_up_interruptible_poll(&wq->wait,
44 POLLOUT | POLLWRNORM |
45 POLLWRBAND);
46 if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
47 sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
48 rcu_read_unlock();
49 }
50}
51
52/* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
53 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
54 */
55void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
56{
57 if (smc->sk.sk_socket &&
58 test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
59 smc->sk.sk_write_space(&smc->sk);
60}
61
62/* blocks sndbuf producer until at least one byte of free space available */
63static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
64{
65 DEFINE_WAIT_FUNC(wait, woken_wake_function);
66 struct smc_connection *conn = &smc->conn;
67 struct sock *sk = &smc->sk;
68 bool noblock;
69 long timeo;
70 int rc = 0;
71
72 /* similar to sk_stream_wait_memory */
73 timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
74 noblock = timeo ? false : true;
75 add_wait_queue(sk_sleep(sk), &wait);
76 while (1) {
77 sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
78 if (sk->sk_err ||
79 (sk->sk_shutdown & SEND_SHUTDOWN) ||
80 conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
81 rc = -EPIPE;
82 break;
83 }
84 if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
85 rc = -ECONNRESET;
86 break;
87 }
88 if (!timeo) {
89 if (noblock)
90 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
91 rc = -EAGAIN;
92 break;
93 }
94 if (signal_pending(current)) {
95 rc = sock_intr_errno(timeo);
96 break;
97 }
98 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
99 if (atomic_read(&conn->sndbuf_space))
100 break; /* at least 1 byte of free space available */
101 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
102 sk->sk_write_pending++;
103 sk_wait_event(sk, &timeo,
104 sk->sk_err ||
105 (sk->sk_shutdown & SEND_SHUTDOWN) ||
106 smc_cdc_rxed_any_close_or_senddone(conn) ||
107 atomic_read(&conn->sndbuf_space),
108 &wait);
109 sk->sk_write_pending--;
110 }
111 remove_wait_queue(sk_sleep(sk), &wait);
112 return rc;
113}
114
115/* sndbuf producer: main API called by socket layer.
116 * called under sock lock.
117 */
118int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
119{
120 size_t copylen, send_done = 0, send_remaining = len;
121 size_t chunk_len, chunk_off, chunk_len_sum;
122 struct smc_connection *conn = &smc->conn;
123 union smc_host_cursor prep;
124 struct sock *sk = &smc->sk;
125 char *sndbuf_base;
126 int tx_cnt_prep;
127 int writespace;
128 int rc, chunk;
129
130 /* This should be in poll */
131 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
132
133 if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
134 rc = -EPIPE;
135 goto out_err;
136 }
137
138 while (msg_data_left(msg)) {
139 if (sk->sk_state == SMC_INIT)
140 return -ENOTCONN;
141 if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
142 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
143 return -EPIPE;
144 if (smc_cdc_rxed_any_close(conn))
145 return send_done ?: -ECONNRESET;
146
147 if (!atomic_read(&conn->sndbuf_space)) {
148 rc = smc_tx_wait_memory(smc, msg->msg_flags);
149 if (rc) {
150 if (send_done)
151 return send_done;
152 goto out_err;
153 }
154 continue;
155 }
156
157 /* initialize variables for 1st iteration of subsequent loop */
158 /* could be just 1 byte, even after smc_tx_wait_memory above */
159 writespace = atomic_read(&conn->sndbuf_space);
160 /* not more than what user space asked for */
161 copylen = min_t(size_t, send_remaining, writespace);
162 /* determine start of sndbuf */
163 sndbuf_base = conn->sndbuf_desc->cpu_addr;
164 smc_curs_write(&prep,
165 smc_curs_read(&conn->tx_curs_prep, conn),
166 conn);
167 tx_cnt_prep = prep.count;
168 /* determine chunks where to write into sndbuf */
169 /* either unwrapped case, or 1st chunk of wrapped case */
170 chunk_len = min_t(size_t,
171 copylen, conn->sndbuf_size - tx_cnt_prep);
172 chunk_len_sum = chunk_len;
173 chunk_off = tx_cnt_prep;
174 for (chunk = 0; chunk < 2; chunk++) {
175 rc = memcpy_from_msg(sndbuf_base + chunk_off,
176 msg, chunk_len);
177 if (rc) {
178 if (send_done)
179 return send_done;
180 goto out_err;
181 }
182 send_done += chunk_len;
183 send_remaining -= chunk_len;
184
185 if (chunk_len_sum == copylen)
186 break; /* either on 1st or 2nd iteration */
187 /* prepare next (== 2nd) iteration */
188 chunk_len = copylen - chunk_len; /* remainder */
189 chunk_len_sum += chunk_len;
190 chunk_off = 0; /* modulo offset in send ring buffer */
191 }
192 /* update cursors */
193 smc_curs_add(conn->sndbuf_size, &prep, copylen);
194 smc_curs_write(&conn->tx_curs_prep,
195 smc_curs_read(&prep, conn),
196 conn);
197 /* increased in send tasklet smc_cdc_tx_handler() */
198 smp_mb__before_atomic();
199 atomic_sub(copylen, &conn->sndbuf_space);
200 /* guarantee 0 <= sndbuf_space <= sndbuf_size */
201 smp_mb__after_atomic();
202 /* since we just produced more new data into sndbuf,
203 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
204 */
205 smc_tx_sndbuf_nonempty(conn);
206 } /* while (msg_data_left(msg)) */
207
208 return send_done;
209
210out_err:
211 rc = sk_stream_error(sk, msg->msg_flags, rc);
212 /* make sure we wake any epoll edge trigger waiter */
213 if (unlikely(rc == -EAGAIN))
214 sk->sk_write_space(sk);
215 return rc;
216}
217
218/***************************** sndbuf consumer *******************************/
219
220/* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
221static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
222 int num_sges, struct ib_sge sges[])
223{
224 struct smc_link_group *lgr = conn->lgr;
225 struct ib_send_wr *failed_wr = NULL;
226 struct ib_rdma_wr rdma_wr;
227 struct smc_link *link;
228 int rc;
229
230 memset(&rdma_wr, 0, sizeof(rdma_wr));
231 link = &lgr->lnk[SMC_SINGLE_LINK];
232 rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
233 rdma_wr.wr.sg_list = sges;
234 rdma_wr.wr.num_sge = num_sges;
235 rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
236 rdma_wr.remote_addr =
237 lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
238 /* RMBE within RMB */
239 ((conn->peer_conn_idx - 1) * conn->peer_rmbe_size) +
240 /* offset within RMBE */
241 peer_rmbe_offset;
242 rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
243 rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
244 if (rc)
245 conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
246 return rc;
247}
248
249/* sndbuf consumer */
250static inline void smc_tx_advance_cursors(struct smc_connection *conn,
251 union smc_host_cursor *prod,
252 union smc_host_cursor *sent,
253 size_t len)
254{
255 smc_curs_add(conn->peer_rmbe_size, prod, len);
256 /* increased in recv tasklet smc_cdc_msg_rcv() */
257 smp_mb__before_atomic();
258 /* data in flight reduces usable snd_wnd */
259 atomic_sub(len, &conn->peer_rmbe_space);
260 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
261 smp_mb__after_atomic();
262 smc_curs_add(conn->sndbuf_size, sent, len);
263}
264
265/* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
266 * usable snd_wnd as max transmit
267 */
268static int smc_tx_rdma_writes(struct smc_connection *conn)
269{
270 size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
271 size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
272 union smc_host_cursor sent, prep, prod, cons;
273 struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
274 struct smc_link_group *lgr = conn->lgr;
275 int to_send, rmbespace;
276 struct smc_link *link;
277 int num_sges;
278 int rc;
279
280 /* source: sndbuf */
281 smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
282 smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
283 /* cf. wmem_alloc - (snd_max - snd_una) */
284 to_send = smc_curs_diff(conn->sndbuf_size, &sent, &prep);
285 if (to_send <= 0)
286 return 0;
287
288 /* destination: RMBE */
289 /* cf. snd_wnd */
290 rmbespace = atomic_read(&conn->peer_rmbe_space);
291 if (rmbespace <= 0)
292 return 0;
293 smc_curs_write(&prod,
294 smc_curs_read(&conn->local_tx_ctrl.prod, conn),
295 conn);
296 smc_curs_write(&cons,
297 smc_curs_read(&conn->local_rx_ctrl.cons, conn),
298 conn);
299
300 /* if usable snd_wnd closes ask peer to advertise once it opens again */
301 conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
302 /* cf. usable snd_wnd */
303 len = min(to_send, rmbespace);
304
305 /* initialize variables for first iteration of subsequent nested loop */
306 link = &lgr->lnk[SMC_SINGLE_LINK];
307 dst_off = prod.count;
308 if (prod.wrap == cons.wrap) {
309 /* the filled destination area is unwrapped,
310 * hence the available free destination space is wrapped
311 * and we need 2 destination chunks of sum len; start with 1st
312 * which is limited by what's available in sndbuf
313 */
314 dst_len = min_t(size_t,
315 conn->peer_rmbe_size - prod.count, len);
316 } else {
317 /* the filled destination area is wrapped,
318 * hence the available free destination space is unwrapped
319 * and we need a single destination chunk of entire len
320 */
321 dst_len = len;
322 }
323 dst_len_sum = dst_len;
324 src_off = sent.count;
325 /* dst_len determines the maximum src_len */
326 if (sent.count + dst_len <= conn->sndbuf_size) {
327 /* unwrapped src case: single chunk of entire dst_len */
328 src_len = dst_len;
329 } else {
330 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
331 src_len = conn->sndbuf_size - sent.count;
332 }
333 src_len_sum = src_len;
334 for (dstchunk = 0; dstchunk < 2; dstchunk++) {
335 num_sges = 0;
336 for (srcchunk = 0; srcchunk < 2; srcchunk++) {
337 sges[srcchunk].addr =
338 conn->sndbuf_desc->dma_addr[SMC_SINGLE_LINK] +
339 src_off;
340 sges[srcchunk].length = src_len;
341 sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
342 num_sges++;
343 src_off += src_len;
344 if (src_off >= conn->sndbuf_size)
345 src_off -= conn->sndbuf_size;
346 /* modulo in send ring */
347 if (src_len_sum == dst_len)
348 break; /* either on 1st or 2nd iteration */
349 /* prepare next (== 2nd) iteration */
350 src_len = dst_len - src_len; /* remainder */
351 src_len_sum += src_len;
352 }
353 rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
354 if (rc)
355 return rc;
356 if (dst_len_sum == len)
357 break; /* either on 1st or 2nd iteration */
358 /* prepare next (== 2nd) iteration */
359 dst_off = 0; /* modulo offset in RMBE ring buffer */
360 dst_len = len - dst_len; /* remainder */
361 dst_len_sum += dst_len;
362 src_len = min_t(int,
363 dst_len, conn->sndbuf_size - sent.count);
364 src_len_sum = src_len;
365 }
366
367 smc_tx_advance_cursors(conn, &prod, &sent, len);
368 /* update connection's cursors with advanced local cursors */
369 smc_curs_write(&conn->local_tx_ctrl.prod,
370 smc_curs_read(&prod, conn),
371 conn);
372 /* dst: peer RMBE */
373 smc_curs_write(&conn->tx_curs_sent,
374 smc_curs_read(&sent, conn),
375 conn);
376 /* src: local sndbuf */
377
378 return 0;
379}
380
381/* Wakeup sndbuf consumers from any context (IRQ or process)
382 * since there is more data to transmit; usable snd_wnd as max transmit
383 */
384int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
385{
386 struct smc_cdc_tx_pend *pend;
387 struct smc_wr_buf *wr_buf;
388 int rc;
389
390 spin_lock_bh(&conn->send_lock);
391 rc = smc_cdc_get_free_slot(&conn->lgr->lnk[SMC_SINGLE_LINK], &wr_buf,
392 &pend);
393 if (rc < 0) {
394 if (rc == -EBUSY) {
395 rc = 0;
396 schedule_work(&conn->tx_work);
397 }
398 goto out_unlock;
399 }
400
401 rc = smc_tx_rdma_writes(conn);
402 if (rc) {
403 smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
404 (struct smc_wr_tx_pend_priv *)pend);
405 goto out_unlock;
406 }
407
408 rc = smc_cdc_msg_send(conn, wr_buf, pend);
409
410out_unlock:
411 spin_unlock_bh(&conn->send_lock);
412 return rc;
413}
414
415/* Wakeup sndbuf consumers from process context
416 * since there is more data to transmit
417 */
418static void smc_tx_work(struct work_struct *work)
419{
420 struct smc_connection *conn = container_of(work,
421 struct smc_connection,
422 tx_work);
423 struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
424
425 lock_sock(&smc->sk);
426 smc_tx_sndbuf_nonempty(conn);
427 release_sock(&smc->sk);
428}
429
430/***************************** send initialize *******************************/
431
432/* Initialize send properties on connection establishment. NB: not __init! */
433void smc_tx_init(struct smc_sock *smc)
434{
435 smc->sk.sk_write_space = smc_tx_write_space;
436 INIT_WORK(&smc->conn.tx_work, smc_tx_work);
437 spin_lock_init(&smc->conn.send_lock);
438}