blob: 2ee02204c43deb83a3241df62947b5b990ee6d8f [file] [log] [blame]
Dan Magenheimer83bc7a72012-02-15 07:54:19 -08001/*
2 * r2net.c
3 *
4 * Copyright (c) 2011, Dan Magenheimer, Oracle Corp.
5 *
6 * Ramster_r2net provides an interface between zcache and r2net.
7 *
8 * FIXME: support more than two nodes
9 */
10
11#include <linux/list.h>
12#include "cluster/tcp.h"
13#include "cluster/nodemanager.h"
14#include "tmem.h"
15#include "zcache.h"
16#include "ramster.h"
17
18#define RAMSTER_TESTING
19
20#define RMSTR_KEY 0x77347734
21
22enum {
23 RMSTR_TMEM_PUT_EPH = 100,
24 RMSTR_TMEM_PUT_PERS,
25 RMSTR_TMEM_ASYNC_GET_REQUEST,
26 RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27 RMSTR_TMEM_ASYNC_GET_REPLY,
28 RMSTR_TMEM_FLUSH,
29 RMSTR_TMEM_FLOBJ,
30 RMSTR_TMEM_DESTROY_POOL,
31};
32
33#define RMSTR_R2NET_MAX_LEN \
34 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
35
36#include "cluster/tcp_internal.h"
37
38static struct r2nm_node *r2net_target_node;
39static int r2net_target_nodenum;
40
41int r2net_remote_target_node_set(int node_num)
42{
43 int ret = -1;
44
45 r2net_target_node = r2nm_get_node_by_num(node_num);
46 if (r2net_target_node != NULL) {
47 r2net_target_nodenum = node_num;
48 r2nm_node_put(r2net_target_node);
49 ret = 0;
50 }
51 return ret;
52}
53
54/* FIXME following buffer should be per-cpu, protected by preempt_disable */
55static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
56
57static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58 u32 len, void *data, void **ret_data)
59{
60 char *pdata;
61 struct tmem_xhandle xh;
62 int found;
63 size_t size = RMSTR_R2NET_MAX_LEN;
64 u16 msgtype = be16_to_cpu(msg->msg_type);
65 bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
66 unsigned long flags;
67
68 xh = *(struct tmem_xhandle *)msg->buf;
69 if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
70 BUG();
71 pdata = ramster_async_get_buf;
72 *(struct tmem_xhandle *)pdata = xh;
73 pdata += sizeof(struct tmem_xhandle);
74 local_irq_save(flags);
75 found = zcache_get(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76 pdata, &size, 1, get_and_free ? 1 : -1);
77 local_irq_restore(flags);
78 if (found < 0) {
79 /* a zero size indicates the get failed */
80 size = 0;
81 }
82 if (size > RMSTR_R2NET_MAX_LEN)
83 BUG();
84 *ret_data = pdata - sizeof(struct tmem_xhandle);
85 /* now make caller (r2net_process_message) handle specially */
86 r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87 return size + sizeof(struct tmem_xhandle);
88}
89
90static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91 u32 len, void *data, void **ret_data)
92{
93 char *in = (char *)msg->buf;
94 int datalen = len - sizeof(struct r2net_msg);
95 int ret = -1;
96 struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
97
98 in += sizeof(struct tmem_xhandle);
99 datalen -= sizeof(struct tmem_xhandle);
100 BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101 ret = zcache_localify(xh->pool_id, &xh->oid, xh->index,
102 in, datalen, xh->extra);
103#ifdef RAMSTER_TESTING
104 if (ret == -EEXIST)
105 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
106#endif
107 return ret;
108}
109
110int ramster_remote_put_handler(struct r2net_msg *msg,
111 u32 len, void *data, void **ret_data)
112{
113 struct tmem_xhandle *xh;
114 char *p = (char *)msg->buf;
115 int datalen = len - sizeof(struct r2net_msg) -
116 sizeof(struct tmem_xhandle);
117 u16 msgtype = be16_to_cpu(msg->msg_type);
118 bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
119 unsigned long flags;
120 int ret;
121
122 xh = (struct tmem_xhandle *)p;
123 p += sizeof(struct tmem_xhandle);
124 zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125 local_irq_save(flags);
126 ret = zcache_put(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127 p, datalen, 1, ephemeral ? 1 : -1);
128 local_irq_restore(flags);
129 return ret;
130}
131
132int ramster_remote_flush_handler(struct r2net_msg *msg,
133 u32 len, void *data, void **ret_data)
134{
135 struct tmem_xhandle *xh;
136 char *p = (char *)msg->buf;
137
138 xh = (struct tmem_xhandle *)p;
139 p += sizeof(struct tmem_xhandle);
140 (void)zcache_flush(xh->client_id, xh->pool_id, &xh->oid, xh->index);
141 return 0;
142}
143
144int ramster_remote_flobj_handler(struct r2net_msg *msg,
145 u32 len, void *data, void **ret_data)
146{
147 struct tmem_xhandle *xh;
148 char *p = (char *)msg->buf;
149
150 xh = (struct tmem_xhandle *)p;
151 p += sizeof(struct tmem_xhandle);
152 (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
153 return 0;
154}
155
156int ramster_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
157 size_t expect_size, uint8_t expect_cksum,
158 void *extra)
159{
160 int ret = -1, status;
161 struct r2nm_node *node = NULL;
162 struct kvec vec[1];
163 size_t veclen = 1;
164 u32 msg_type;
165
166 node = r2nm_get_node_by_num(remotenode);
167 if (node == NULL)
168 goto out;
169 xh->client_id = r2nm_this_node(); /* which node is getting */
170 xh->xh_data_cksum = expect_cksum;
171 xh->xh_data_size = expect_size;
172 xh->extra = extra;
173 vec[0].iov_len = sizeof(*xh);
174 vec[0].iov_base = xh;
175 if (free)
176 msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
177 else
178 msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
179 ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
180 vec, veclen, remotenode, &status);
181 r2nm_node_put(node);
182 if (ret < 0) {
183 /* FIXME handle bad message possibilities here? */
184 pr_err("UNTESTED ret<0 in ramster_remote_async_get\n");
185 }
186 ret = status;
187out:
188 return ret;
189}
190
191#ifdef RAMSTER_TESTING
192/* leave me here to see if it catches a weird crash */
193static void ramster_check_irq_counts(void)
194{
195 static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
196 int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
197
198 cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
199 if (cur_hardirq_cnt > last_hardirq_cnt) {
200 last_hardirq_cnt = cur_hardirq_cnt;
201 if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
202 pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
203 last_hardirq_cnt);
204 }
205 cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
206 if (cur_softirq_cnt > last_softirq_cnt) {
207 last_softirq_cnt = cur_softirq_cnt;
208 if (!(last_softirq_cnt&(last_softirq_cnt-1)))
209 pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
210 last_softirq_cnt);
211 }
212 cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
213 if (cur_preempt_cnt > last_preempt_cnt) {
214 last_preempt_cnt = cur_preempt_cnt;
215 if (!(last_preempt_cnt&(last_preempt_cnt-1)))
216 pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
217 last_preempt_cnt);
218 }
219}
220#endif
221
222int ramster_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
223 bool ephemeral, int *remotenode)
224{
225 int nodenum, ret = -1, status;
226 struct r2nm_node *node = NULL;
227 struct kvec vec[2];
228 size_t veclen = 2;
229 u32 msg_type;
230#ifdef RAMSTER_TESTING
231 struct r2net_node *nn;
232#endif
233
234 BUG_ON(size > RMSTR_R2NET_MAX_LEN);
235 xh->client_id = r2nm_this_node(); /* which node is putting */
236 vec[0].iov_len = sizeof(*xh);
237 vec[0].iov_base = xh;
238 vec[1].iov_len = size;
239 vec[1].iov_base = data;
240 node = r2net_target_node;
241 if (!node)
242 goto out;
243
244 nodenum = r2net_target_nodenum;
245
246 r2nm_node_get(node);
247
248#ifdef RAMSTER_TESTING
249 nn = r2net_nn_from_num(nodenum);
250 WARN_ON_ONCE(nn->nn_persistent_error || !nn->nn_sc_valid);
251#endif
252
253 if (ephemeral)
254 msg_type = RMSTR_TMEM_PUT_EPH;
255 else
256 msg_type = RMSTR_TMEM_PUT_PERS;
257#ifdef RAMSTER_TESTING
258 /* leave me here to see if it catches a weird crash */
259 ramster_check_irq_counts();
260#endif
261
262 ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
263 nodenum, &status);
264#ifdef RAMSTER_TESTING
265 if (ret != 0) {
266 static unsigned long cnt;
267 cnt++;
268 if (!(cnt&(cnt-1)))
269 pr_err("ramster_remote_put: message failed, "
270 "ret=%d, cnt=%lu\n", ret, cnt);
271 ret = -1;
272 }
273#endif
274 if (ret < 0)
275 ret = -1;
276 else {
277 ret = status;
278 *remotenode = nodenum;
279 }
280
281 r2nm_node_put(node);
282out:
283 return ret;
284}
285
286int ramster_remote_flush(struct tmem_xhandle *xh, int remotenode)
287{
288 int ret = -1, status;
289 struct r2nm_node *node = NULL;
290 struct kvec vec[1];
291 size_t veclen = 1;
292
293 node = r2nm_get_node_by_num(remotenode);
294 BUG_ON(node == NULL);
295 xh->client_id = r2nm_this_node(); /* which node is flushing */
296 vec[0].iov_len = sizeof(*xh);
297 vec[0].iov_base = xh;
298 BUG_ON(irqs_disabled());
299 BUG_ON(in_softirq());
300 ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
301 vec, veclen, remotenode, &status);
302 r2nm_node_put(node);
303 return ret;
304}
305
306int ramster_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
307{
308 int ret = -1, status;
309 struct r2nm_node *node = NULL;
310 struct kvec vec[1];
311 size_t veclen = 1;
312
313 node = r2nm_get_node_by_num(remotenode);
314 BUG_ON(node == NULL);
315 xh->client_id = r2nm_this_node(); /* which node is flobjing */
316 vec[0].iov_len = sizeof(*xh);
317 vec[0].iov_base = xh;
318 ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
319 vec, veclen, remotenode, &status);
320 r2nm_node_put(node);
321 return ret;
322}
323
324/*
325 * Handler registration
326 */
327
328static LIST_HEAD(r2net_unreg_list);
329
330static void r2net_unregister_handlers(void)
331{
332 r2net_unregister_handler_list(&r2net_unreg_list);
333}
334
335int r2net_register_handlers(void)
336{
337 int status;
338
339 status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
340 RMSTR_R2NET_MAX_LEN,
341 ramster_remote_put_handler,
342 NULL, NULL, &r2net_unreg_list);
343 if (status)
344 goto bail;
345
346 status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
347 RMSTR_R2NET_MAX_LEN,
348 ramster_remote_put_handler,
349 NULL, NULL, &r2net_unreg_list);
350 if (status)
351 goto bail;
352
353 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
354 RMSTR_R2NET_MAX_LEN,
355 ramster_remote_async_get_request_handler,
356 NULL, NULL,
357 &r2net_unreg_list);
358 if (status)
359 goto bail;
360
361 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
362 RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
363 ramster_remote_async_get_request_handler,
364 NULL, NULL,
365 &r2net_unreg_list);
366 if (status)
367 goto bail;
368
369 status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
370 RMSTR_R2NET_MAX_LEN,
371 ramster_remote_async_get_reply_handler,
372 NULL, NULL,
373 &r2net_unreg_list);
374 if (status)
375 goto bail;
376
377 status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
378 RMSTR_R2NET_MAX_LEN,
379 ramster_remote_flush_handler,
380 NULL, NULL,
381 &r2net_unreg_list);
382 if (status)
383 goto bail;
384
385 status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
386 RMSTR_R2NET_MAX_LEN,
387 ramster_remote_flobj_handler,
388 NULL, NULL,
389 &r2net_unreg_list);
390 if (status)
391 goto bail;
392
393 pr_info("ramster: r2net handlers registered\n");
394
395bail:
396 if (status) {
397 r2net_unregister_handlers();
398 pr_err("ramster: couldn't register r2net handlers\n");
399 }
400 return status;
401}