blob: 6f6018b2dabae77f8a11f05a8b85389850ddb496 [file] [log] [blame]
Theodore Ts'o106ad962007-04-04 21:26:37 -04001/*
2URL: svn://svnanon.samba.org/samba/branches/SAMBA_4_0/source/lib/tdb
3Rev: 22080
4Last Changed: 2007-04-03 05:08:18 -0400
5*/
6 /*
7 trivial database library - standalone version
8
9 Copyright (C) Andrew Tridgell 1999-2005
10 Copyright (C) Jeremy Allison 2000-2006
11 Copyright (C) Paul `Rusty' Russell 2000
12
13 ** NOTE! The following LGPL license applies to the tdb
14 ** library. This does NOT imply that all of Samba is released
15 ** under the LGPL
16
17 This library is free software; you can redistribute it and/or
18 modify it under the terms of the GNU Lesser General Public
19 License as published by the Free Software Foundation; either
20 version 2 of the License, or (at your option) any later version.
21
22 This library is distributed in the hope that it will be useful,
23 but WITHOUT ANY WARRANTY; without even the implied warranty of
24 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
25 Lesser General Public License for more details.
26
27 You should have received a copy of the GNU Lesser General Public
28 License along with this library; if not, write to the Free Software
29 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
30*/
31
32#ifdef CONFIG_STAND_ALONE
33#define HAVE_MMAP
34#define HAVE_STRDUP
35#define HAVE_SYS_MMAN_H
36#define HAVE_UTIME_H
37#define HAVE_UTIME
38#endif
39#define _XOPEN_SOURCE 500
40
41#include <unistd.h>
42#include <stdio.h>
43#include <stdlib.h>
44#include <stdarg.h>
45#include <stddef.h>
46#include <errno.h>
47#include <string.h>
48#include <sys/select.h>
49#include <sys/time.h>
50#include <sys/types.h>
51#include <time.h>
52#ifdef HAVE_UTIME_H
53#include <utime.h>
54#endif
55#include <sys/stat.h>
56#include <sys/file.h>
57#include <fcntl.h>
58
59#ifdef HAVE_SYS_MMAN_H
60#include <sys/mman.h>
61#endif
62
63#ifndef MAP_FILE
64#define MAP_FILE 0
65#endif
66
67#ifndef MAP_FAILED
68#define MAP_FAILED ((void *)-1)
69#endif
70
71#ifndef HAVE_STRDUP
72#define strdup rep_strdup
73static char *rep_strdup(const char *s)
74{
75 char *ret;
76 int length;
77 if (!s)
78 return NULL;
79
80 if (!length)
81 length = strlen(s);
82
83 ret = malloc(length + 1);
84 if (ret) {
85 strncpy(ret, s, length);
86 ret[length] = '\0';
87 }
88 return ret;
89}
90#endif
91
92#ifndef PRINTF_ATTRIBUTE
93#if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
94/** Use gcc attribute to check printf fns. a1 is the 1-based index of
95 * the parameter containing the format, and a2 the index of the first
96 * argument. Note that some gcc 2.x versions don't handle this
97 * properly **/
98#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
99#else
100#define PRINTF_ATTRIBUTE(a1, a2)
101#endif
102#endif
103
104#include "tdb.h"
105
106#ifndef u32
107#define u32 unsigned
108#endif
109
110#ifndef HAVE_GETPAGESIZE
111#define getpagesize() 0x2000
112#endif
113
114typedef u32 tdb_len_t;
115typedef u32 tdb_off_t;
116
117#ifndef offsetof
118#define offsetof(t,f) ((unsigned int)&((t *)0)->f)
119#endif
120
121#define TDB_MAGIC_FOOD "TDB file\n"
122#define TDB_VERSION (0x26011967 + 6)
123#define TDB_MAGIC (0x26011999U)
124#define TDB_FREE_MAGIC (~TDB_MAGIC)
125#define TDB_DEAD_MAGIC (0xFEE1DEAD)
126#define TDB_RECOVERY_MAGIC (0xf53bc0e7U)
127#define TDB_ALIGNMENT 4
128#define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
129#define DEFAULT_HASH_SIZE 131
130#define FREELIST_TOP (sizeof(struct tdb_header))
131#define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
132#define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
133#define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
134#define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
135#define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off_t))
136#define TDB_HASHTABLE_SIZE(tdb) ((tdb->header.hash_size+1)*sizeof(tdb_off_t))
137#define TDB_DATA_START(hash_size) TDB_HASH_TOP(hash_size-1)
138#define TDB_RECOVERY_HEAD offsetof(struct tdb_header, recovery_start)
139#define TDB_SEQNUM_OFS offsetof(struct tdb_header, sequence_number)
140#define TDB_PAD_BYTE 0x42
141#define TDB_PAD_U32 0x42424242
142
143/* NB assumes there is a local variable called "tdb" that is the
144 * current context, also takes doubly-parenthesized print-style
145 * argument. */
146#define TDB_LOG(x) tdb->log.log_fn x
147
148/* lock offsets */
149#define GLOBAL_LOCK 0
150#define ACTIVE_LOCK 4
151#define TRANSACTION_LOCK 8
152
153/* free memory if the pointer is valid and zero the pointer */
154#ifndef SAFE_FREE
155#define SAFE_FREE(x) do { if ((x) != NULL) {free(x); (x)=NULL;} } while(0)
156#endif
157
158#define BUCKET(hash) ((hash) % tdb->header.hash_size)
159
160#define DOCONV() (tdb->flags & TDB_CONVERT)
161#define CONVERT(x) (DOCONV() ? tdb_convert(&x, sizeof(x)) : &x)
162
163
164/* the body of the database is made of one list_struct for the free space
165 plus a separate data list for each hash value */
166struct list_struct {
167 tdb_off_t next; /* offset of the next record in the list */
168 tdb_len_t rec_len; /* total byte length of record */
169 tdb_len_t key_len; /* byte length of key */
170 tdb_len_t data_len; /* byte length of data */
171 u32 full_hash; /* the full 32 bit hash of the key */
172 u32 magic; /* try to catch errors */
173 /* the following union is implied:
174 union {
175 char record[rec_len];
176 struct {
177 char key[key_len];
178 char data[data_len];
179 }
180 u32 totalsize; (tailer)
181 }
182 */
183};
184
185
186/* this is stored at the front of every database */
187struct tdb_header {
188 char magic_food[32]; /* for /etc/magic */
189 u32 version; /* version of the code */
190 u32 hash_size; /* number of hash entries */
191 tdb_off_t rwlocks; /* obsolete - kept to detect old formats */
192 tdb_off_t recovery_start; /* offset of transaction recovery region */
193 tdb_off_t sequence_number; /* used when TDB_SEQNUM is set */
194 tdb_off_t reserved[29];
195};
196
197struct tdb_lock_type {
198 int list;
199 u32 count;
200 u32 ltype;
201};
202
203struct tdb_traverse_lock {
204 struct tdb_traverse_lock *next;
205 u32 off;
206 u32 hash;
207 int lock_rw;
208};
209
210
211struct tdb_methods {
212 int (*tdb_read)(struct tdb_context *, tdb_off_t , void *, tdb_len_t , int );
213 int (*tdb_write)(struct tdb_context *, tdb_off_t, const void *, tdb_len_t);
214 void (*next_hash_chain)(struct tdb_context *, u32 *);
215 int (*tdb_oob)(struct tdb_context *, tdb_off_t , int );
216 int (*tdb_expand_file)(struct tdb_context *, tdb_off_t , tdb_off_t );
217 int (*tdb_brlock)(struct tdb_context *, tdb_off_t , int, int, int, size_t);
218};
219
220struct tdb_context {
221 char *name; /* the name of the database */
222 void *map_ptr; /* where it is currently mapped */
223 int fd; /* open file descriptor for the database */
224 tdb_len_t map_size; /* how much space has been mapped */
225 int read_only; /* opened read-only */
226 int traverse_read; /* read-only traversal */
227 struct tdb_lock_type global_lock;
228 int num_lockrecs;
229 struct tdb_lock_type *lockrecs; /* only real locks, all with count>0 */
230 enum TDB_ERROR ecode; /* error code for last tdb error */
231 struct tdb_header header; /* a cached copy of the header */
232 u32 flags; /* the flags passed to tdb_open */
233 struct tdb_traverse_lock travlocks; /* current traversal locks */
234 struct tdb_context *next; /* all tdbs to avoid multiple opens */
235 dev_t device; /* uniquely identifies this tdb */
236 ino_t inode; /* uniquely identifies this tdb */
237 struct tdb_logging_context log;
238 unsigned int (*hash_fn)(TDB_DATA *key);
239 int open_flags; /* flags used in the open - needed by reopen */
240 unsigned int num_locks; /* number of chain locks held */
241 const struct tdb_methods *methods;
242 struct tdb_transaction *transaction;
243 int page_size;
244 int max_dead_records;
245};
246
247
248/*
249 internal prototypes
250*/
251static int tdb_munmap(struct tdb_context *tdb);
252static void tdb_mmap(struct tdb_context *tdb);
253static int tdb_lock(struct tdb_context *tdb, int list, int ltype);
254static int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
255static int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset, int rw_type, int lck_type, int probe, size_t len);
256static int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len);
257static int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off);
258static int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off);
259static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
260static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
261static void *tdb_convert(void *buf, u32 size);
262static int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
263static tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec);
264static int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
265static int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d);
266static int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off);
267static int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off);
268static int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
269static int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec);
270static int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec);
271static unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len);
272static int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
273 tdb_off_t offset, tdb_len_t len,
274 int (*parser)(TDB_DATA key, TDB_DATA data,
275 void *private_data),
276 void *private_data);
277static tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
278 struct list_struct *rec);
279static void tdb_io_init(struct tdb_context *tdb);
280static int tdb_expand(struct tdb_context *tdb, tdb_off_t size);
281
282
283/* file: error.c */
284
285enum TDB_ERROR tdb_error(struct tdb_context *tdb)
286{
287 return tdb->ecode;
288}
289
290static struct tdb_errname {
291 enum TDB_ERROR ecode; const char *estring;
292} emap[] = { {TDB_SUCCESS, "Success"},
293 {TDB_ERR_CORRUPT, "Corrupt database"},
294 {TDB_ERR_IO, "IO Error"},
295 {TDB_ERR_LOCK, "Locking error"},
296 {TDB_ERR_OOM, "Out of memory"},
297 {TDB_ERR_EXISTS, "Record exists"},
298 {TDB_ERR_NOLOCK, "Lock exists on other keys"},
299 {TDB_ERR_EINVAL, "Invalid parameter"},
300 {TDB_ERR_NOEXIST, "Record does not exist"},
301 {TDB_ERR_RDONLY, "write not permitted"} };
302
303/* Error string for the last tdb error */
304const char *tdb_errorstr(struct tdb_context *tdb)
305{
306 u32 i;
307 for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
308 if (tdb->ecode == emap[i].ecode)
309 return emap[i].estring;
310 return "Invalid error code";
311}
312
313/* file: lock.c */
314
315/* a byte range locking function - return 0 on success
316 this functions locks/unlocks 1 byte at the specified offset.
317
318 On error, errno is also set so that errors are passed back properly
319 through tdb_open().
320
321 note that a len of zero means lock to end of file
322*/
323int tdb_brlock(struct tdb_context *tdb, tdb_off_t offset,
324 int rw_type, int lck_type, int probe, size_t len)
325{
326 struct flock fl;
327 int ret;
328
329 if (tdb->flags & TDB_NOLOCK) {
330 return 0;
331 }
332
333 if ((rw_type == F_WRLCK) && (tdb->read_only || tdb->traverse_read)) {
334 tdb->ecode = TDB_ERR_RDONLY;
335 return -1;
336 }
337
338 fl.l_type = rw_type;
339 fl.l_whence = SEEK_SET;
340 fl.l_start = offset;
341 fl.l_len = len;
342 fl.l_pid = 0;
343
344 do {
345 ret = fcntl(tdb->fd,lck_type,&fl);
346 } while (ret == -1 && errno == EINTR);
347
348 if (ret == -1) {
349 /* Generic lock error. errno set by fcntl.
350 * EAGAIN is an expected return from non-blocking
351 * locks. */
352 if (!probe && lck_type != F_SETLK) {
353 /* Ensure error code is set for log fun to examine. */
354 tdb->ecode = TDB_ERR_LOCK;
355 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d len=%d\n",
356 tdb->fd, offset, rw_type, lck_type, (int)len));
357 }
358 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
359 }
360 return 0;
361}
362
363
364/*
365 upgrade a read lock to a write lock. This needs to be handled in a
366 special way as some OSes (such as solaris) have too conservative
367 deadlock detection and claim a deadlock when progress can be
368 made. For those OSes we may loop for a while.
369*/
370int tdb_brlock_upgrade(struct tdb_context *tdb, tdb_off_t offset, size_t len)
371{
372 int count = 1000;
373 while (count--) {
374 struct timeval tv;
375 if (tdb_brlock(tdb, offset, F_WRLCK, F_SETLKW, 1, len) == 0) {
376 return 0;
377 }
378 if (errno != EDEADLK) {
379 break;
380 }
381 /* sleep for as short a time as we can - more portable than usleep() */
382 tv.tv_sec = 0;
383 tv.tv_usec = 1;
384 select(0, NULL, NULL, NULL, &tv);
385 }
386 TDB_LOG((tdb, TDB_DEBUG_TRACE,"tdb_brlock_upgrade failed at offset %d\n", offset));
387 return -1;
388}
389
390
391/* lock a list in the database. list -1 is the alloc list */
392int tdb_lock(struct tdb_context *tdb, int list, int ltype)
393{
394 struct tdb_lock_type *new_lck;
395 int i;
396
397 /* a global lock allows us to avoid per chain locks */
398 if (tdb->global_lock.count &&
399 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
400 return 0;
401 }
402
403 if (tdb->global_lock.count) {
404 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
405 }
406
407 if (list < -1 || list >= (int)tdb->header.hash_size) {
408 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_lock: invalid list %d for ltype=%d\n",
409 list, ltype));
410 return -1;
411 }
412 if (tdb->flags & TDB_NOLOCK)
413 return 0;
414
415 for (i=0; i<tdb->num_lockrecs; i++) {
416 if (tdb->lockrecs[i].list == list) {
417 if (tdb->lockrecs[i].count == 0) {
418 /*
419 * Can't happen, see tdb_unlock(). It should
420 * be an assert.
421 */
422 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock: "
423 "lck->count == 0 for list %d", list));
424 }
425 /*
426 * Just increment the in-memory struct, posix locks
427 * don't stack.
428 */
429 tdb->lockrecs[i].count++;
430 return 0;
431 }
432 }
433
434 new_lck = (struct tdb_lock_type *)realloc(
435 tdb->lockrecs,
436 sizeof(*tdb->lockrecs) * (tdb->num_lockrecs+1));
437 if (new_lck == NULL) {
438 errno = ENOMEM;
439 return -1;
440 }
441 tdb->lockrecs = new_lck;
442
443 /* Since fcntl locks don't nest, we do a lock for the first one,
444 and simply bump the count for future ones */
445 if (tdb->methods->tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW,
446 0, 1)) {
447 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lock failed on list %d "
448 "ltype=%d (%s)\n", list, ltype, strerror(errno)));
449 return -1;
450 }
451
452 tdb->num_locks++;
453
454 tdb->lockrecs[tdb->num_lockrecs].list = list;
455 tdb->lockrecs[tdb->num_lockrecs].count = 1;
456 tdb->lockrecs[tdb->num_lockrecs].ltype = ltype;
457 tdb->num_lockrecs += 1;
458
459 return 0;
460}
461
462/* unlock the database: returns void because it's too late for errors. */
463 /* changed to return int it may be interesting to know there
464 has been an error --simo */
465int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
466{
467 int ret = -1;
468 int i;
469 struct tdb_lock_type *lck = NULL;
470
471 /* a global lock allows us to avoid per chain locks */
472 if (tdb->global_lock.count &&
473 (ltype == tdb->global_lock.ltype || ltype == F_RDLCK)) {
474 return 0;
475 }
476
477 if (tdb->global_lock.count) {
478 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
479 }
480
481 if (tdb->flags & TDB_NOLOCK)
482 return 0;
483
484 /* Sanity checks */
485 if (list < -1 || list >= (int)tdb->header.hash_size) {
486 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
487 return ret;
488 }
489
490 for (i=0; i<tdb->num_lockrecs; i++) {
491 if (tdb->lockrecs[i].list == list) {
492 lck = &tdb->lockrecs[i];
493 break;
494 }
495 }
496
497 if ((lck == NULL) || (lck->count == 0)) {
498 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: count is 0\n"));
499 return -1;
500 }
501
502 if (lck->count > 1) {
503 lck->count--;
504 return 0;
505 }
506
507 /*
508 * This lock has count==1 left, so we need to unlock it in the
509 * kernel. We don't bother with decrementing the in-memory array
510 * element, we're about to overwrite it with the last array element
511 * anyway.
512 */
513
514 ret = tdb->methods->tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK,
515 F_SETLKW, 0, 1);
516 tdb->num_locks--;
517
518 /*
519 * Shrink the array by overwriting the element just unlocked with the
520 * last array element.
521 */
522
523 if (tdb->num_lockrecs > 1) {
524 *lck = tdb->lockrecs[tdb->num_lockrecs-1];
525 }
526 tdb->num_lockrecs -= 1;
527
528 /*
529 * We don't bother with realloc when the array shrinks, but if we have
530 * a completely idle tdb we should get rid of the locked array.
531 */
532
533 if (tdb->num_lockrecs == 0) {
534 SAFE_FREE(tdb->lockrecs);
535 }
536
537 if (ret)
538 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlock: An error occurred unlocking!\n"));
539 return ret;
540}
541
542
543
544/* lock/unlock entire database */
545static int _tdb_lockall(struct tdb_context *tdb, int ltype)
546{
547 /* There are no locks on read-only dbs */
548 if (tdb->read_only || tdb->traverse_read)
549 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
550
551 if (tdb->global_lock.count && tdb->global_lock.ltype == ltype) {
552 tdb->global_lock.count++;
553 return 0;
554 }
555
556 if (tdb->global_lock.count) {
557 /* a global lock of a different type exists */
558 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
559 }
560
561 if (tdb->num_locks != 0) {
562 /* can't combine global and chain locks */
563 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
564 }
565
566 if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, F_SETLKW,
567 0, 4*tdb->header.hash_size)) {
568 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
569 return -1;
570 }
571
572 tdb->global_lock.count = 1;
573 tdb->global_lock.ltype = ltype;
574
575 return 0;
576}
577
578/* unlock entire db */
579static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
580{
581 /* There are no locks on read-only dbs */
582 if (tdb->read_only || tdb->traverse_read) {
583 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
584 }
585
586 if (tdb->global_lock.ltype != ltype || tdb->global_lock.count == 0) {
587 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
588 }
589
590 if (tdb->global_lock.count > 1) {
591 tdb->global_lock.count--;
592 return 0;
593 }
594
595 if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW,
596 0, 4*tdb->header.hash_size)) {
597 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_unlockall failed (%s)\n", strerror(errno)));
598 return -1;
599 }
600
601 tdb->global_lock.count = 0;
602 tdb->global_lock.ltype = 0;
603
604 return 0;
605}
606
607/* lock entire database with write lock */
608int tdb_lockall(struct tdb_context *tdb)
609{
610 return _tdb_lockall(tdb, F_WRLCK);
611}
612
613/* unlock entire database with write lock */
614int tdb_unlockall(struct tdb_context *tdb)
615{
616 return _tdb_unlockall(tdb, F_WRLCK);
617}
618
619/* lock entire database with read lock */
620int tdb_lockall_read(struct tdb_context *tdb)
621{
622 return _tdb_lockall(tdb, F_RDLCK);
623}
624
625/* unlock entire database with read lock */
626int tdb_unlockall_read(struct tdb_context *tdb)
627{
628 return _tdb_unlockall(tdb, F_RDLCK);
629}
630
631/* lock/unlock one hash chain. This is meant to be used to reduce
632 contention - it cannot guarantee how many records will be locked */
633int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key)
634{
635 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
636}
637
638int tdb_chainunlock(struct tdb_context *tdb, TDB_DATA key)
639{
640 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
641}
642
643int tdb_chainlock_read(struct tdb_context *tdb, TDB_DATA key)
644{
645 return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
646}
647
648int tdb_chainunlock_read(struct tdb_context *tdb, TDB_DATA key)
649{
650 return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
651}
652
653
654
655/* record lock stops delete underneath */
656int tdb_lock_record(struct tdb_context *tdb, tdb_off_t off)
657{
658 return off ? tdb->methods->tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0, 1) : 0;
659}
660
661/*
662 Write locks override our own fcntl readlocks, so check it here.
663 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
664 an error to fail to get the lock here.
665*/
666int tdb_write_lock_record(struct tdb_context *tdb, tdb_off_t off)
667{
668 struct tdb_traverse_lock *i;
669 for (i = &tdb->travlocks; i; i = i->next)
670 if (i->off == off)
671 return -1;
672 return tdb->methods->tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1, 1);
673}
674
675/*
676 Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
677 an error to fail to get the lock here.
678*/
679int tdb_write_unlock_record(struct tdb_context *tdb, tdb_off_t off)
680{
681 return tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0, 1);
682}
683
684/* fcntl locks don't stack: avoid unlocking someone else's */
685int tdb_unlock_record(struct tdb_context *tdb, tdb_off_t off)
686{
687 struct tdb_traverse_lock *i;
688 u32 count = 0;
689
690 if (off == 0)
691 return 0;
692 for (i = &tdb->travlocks; i; i = i->next)
693 if (i->off == off)
694 count++;
695 return (count == 1 ? tdb->methods->tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0, 1) : 0);
696}
697
698/* file: io.c */
699
700/* check for an out of bounds access - if it is out of bounds then
701 see if the database has been expanded by someone else and expand
702 if necessary
703 note that "len" is the minimum length needed for the db
704*/
705static int tdb_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
706{
707 struct stat st;
708 if (len <= tdb->map_size)
709 return 0;
710 if (tdb->flags & TDB_INTERNAL) {
711 if (!probe) {
712 /* Ensure ecode is set for log fn. */
713 tdb->ecode = TDB_ERR_IO;
714 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond internal malloc size %d\n",
715 (int)len, (int)tdb->map_size));
716 }
717 return TDB_ERRCODE(TDB_ERR_IO, -1);
718 }
719
720 if (fstat(tdb->fd, &st) == -1) {
721 return TDB_ERRCODE(TDB_ERR_IO, -1);
722 }
723
724 if (st.st_size < (size_t)len) {
725 if (!probe) {
726 /* Ensure ecode is set for log fn. */
727 tdb->ecode = TDB_ERR_IO;
728 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_oob len %d beyond eof at %d\n",
729 (int)len, (int)st.st_size));
730 }
731 return TDB_ERRCODE(TDB_ERR_IO, -1);
732 }
733
734 /* Unmap, update size, remap */
735 if (tdb_munmap(tdb) == -1)
736 return TDB_ERRCODE(TDB_ERR_IO, -1);
737 tdb->map_size = st.st_size;
738 tdb_mmap(tdb);
739 return 0;
740}
741
742/* write a lump of data at a specified offset */
743static int tdb_write(struct tdb_context *tdb, tdb_off_t off,
744 const void *buf, tdb_len_t len)
745{
746 if (len == 0) {
747 return 0;
748 }
749
750 if (tdb->read_only || tdb->traverse_read) {
751 tdb->ecode = TDB_ERR_RDONLY;
752 return -1;
753 }
754
755 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0)
756 return -1;
757
758 if (tdb->map_ptr) {
759 memcpy(off + (char *)tdb->map_ptr, buf, len);
760 } else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
761 /* Ensure ecode is set for log fn. */
762 tdb->ecode = TDB_ERR_IO;
763 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_write failed at %d len=%d (%s)\n",
764 off, len, strerror(errno)));
765 return TDB_ERRCODE(TDB_ERR_IO, -1);
766 }
767 return 0;
768}
769
770/* Endian conversion: we only ever deal with 4 byte quantities */
771void *tdb_convert(void *buf, u32 size)
772{
773 u32 i, *p = (u32 *)buf;
774 for (i = 0; i < size / 4; i++)
775 p[i] = TDB_BYTEREV(p[i]);
776 return buf;
777}
778
779
780/* read a lump of data at a specified offset, maybe convert */
781static int tdb_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
782 tdb_len_t len, int cv)
783{
784 if (tdb->methods->tdb_oob(tdb, off + len, 0) != 0) {
785 return -1;
786 }
787
788 if (tdb->map_ptr) {
789 memcpy(buf, off + (char *)tdb->map_ptr, len);
790 } else {
791 ssize_t ret = pread(tdb->fd, buf, len, off);
792 if (ret != (ssize_t)len) {
793 /* Ensure ecode is set for log fn. */
794 tdb->ecode = TDB_ERR_IO;
795 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_read failed at %d "
796 "len=%d ret=%d (%s) map_size=%d\n",
797 (int)off, (int)len, (int)ret, strerror(errno),
798 (int)tdb->map_size));
799 return TDB_ERRCODE(TDB_ERR_IO, -1);
800 }
801 }
802 if (cv) {
803 tdb_convert(buf, len);
804 }
805 return 0;
806}
807
808
809
810/*
811 do an unlocked scan of the hash table heads to find the next non-zero head. The value
812 will then be confirmed with the lock held
813*/
814static void tdb_next_hash_chain(struct tdb_context *tdb, u32 *chain)
815{
816 u32 h = *chain;
817 if (tdb->map_ptr) {
818 for (;h < tdb->header.hash_size;h++) {
819 if (0 != *(u32 *)(TDB_HASH_TOP(h) + (unsigned char *)tdb->map_ptr)) {
820 break;
821 }
822 }
823 } else {
824 u32 off=0;
825 for (;h < tdb->header.hash_size;h++) {
826 if (tdb_ofs_read(tdb, TDB_HASH_TOP(h), &off) != 0 || off != 0) {
827 break;
828 }
829 }
830 }
831 (*chain) = h;
832}
833
834
835int tdb_munmap(struct tdb_context *tdb)
836{
837 if (tdb->flags & TDB_INTERNAL)
838 return 0;
839
840#ifdef HAVE_MMAP
841 if (tdb->map_ptr) {
842 int ret = munmap(tdb->map_ptr, tdb->map_size);
843 if (ret != 0)
844 return ret;
845 }
846#endif
847 tdb->map_ptr = NULL;
848 return 0;
849}
850
851void tdb_mmap(struct tdb_context *tdb)
852{
853 if (tdb->flags & TDB_INTERNAL)
854 return;
855
856#ifdef HAVE_MMAP
857 if (!(tdb->flags & TDB_NOMMAP)) {
858 tdb->map_ptr = mmap(NULL, tdb->map_size,
859 PROT_READ|(tdb->read_only? 0:PROT_WRITE),
860 MAP_SHARED|MAP_FILE, tdb->fd, 0);
861
862 /*
863 * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
864 */
865
866 if (tdb->map_ptr == MAP_FAILED) {
867 tdb->map_ptr = NULL;
868 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_mmap failed for size %d (%s)\n",
869 tdb->map_size, strerror(errno)));
870 }
871 } else {
872 tdb->map_ptr = NULL;
873 }
874#else
875 tdb->map_ptr = NULL;
876#endif
877}
878
879/* expand a file. we prefer to use ftruncate, as that is what posix
880 says to use for mmap expansion */
881static int tdb_expand_file(struct tdb_context *tdb, tdb_off_t size, tdb_off_t addition)
882{
883 char buf[1024];
884
885 if (tdb->read_only || tdb->traverse_read) {
886 tdb->ecode = TDB_ERR_RDONLY;
887 return -1;
888 }
889
890 if (ftruncate(tdb->fd, size+addition) == -1) {
891 char b = 0;
892 if (pwrite(tdb->fd, &b, 1, (size+addition) - 1) != 1) {
893 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file to %d failed (%s)\n",
894 size+addition, strerror(errno)));
895 return -1;
896 }
897 }
898
899 /* now fill the file with something. This ensures that the
900 file isn't sparse, which would be very bad if we ran out of
901 disk. This must be done with write, not via mmap */
902 memset(buf, TDB_PAD_BYTE, sizeof(buf));
903 while (addition) {
904 int n = addition>sizeof(buf)?sizeof(buf):addition;
905 int ret = pwrite(tdb->fd, buf, n, size);
906 if (ret != n) {
907 TDB_LOG((tdb, TDB_DEBUG_FATAL, "expand_file write of %d failed (%s)\n",
908 n, strerror(errno)));
909 return -1;
910 }
911 addition -= n;
912 size += n;
913 }
914 return 0;
915}
916
917
918/* expand the database at least size bytes by expanding the underlying
919 file and doing the mmap again if necessary */
920int tdb_expand(struct tdb_context *tdb, tdb_off_t size)
921{
922 struct list_struct rec;
923 tdb_off_t offset;
924
925 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
926 TDB_LOG((tdb, TDB_DEBUG_ERROR, "lock failed in tdb_expand\n"));
927 return -1;
928 }
929
930 /* must know about any previous expansions by another process */
931 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
932
933 /* always make room for at least 10 more records, and round
934 the database up to a multiple of the page size */
935 size = TDB_ALIGN(tdb->map_size + size*10, tdb->page_size) - tdb->map_size;
936
937 if (!(tdb->flags & TDB_INTERNAL))
938 tdb_munmap(tdb);
939
940 /*
941 * We must ensure the file is unmapped before doing this
942 * to ensure consistency with systems like OpenBSD where
943 * writes and mmaps are not consistent.
944 */
945
946 /* expand the file itself */
947 if (!(tdb->flags & TDB_INTERNAL)) {
948 if (tdb->methods->tdb_expand_file(tdb, tdb->map_size, size) != 0)
949 goto fail;
950 }
951
952 tdb->map_size += size;
953
954 if (tdb->flags & TDB_INTERNAL) {
955 char *new_map_ptr = (char *)realloc(tdb->map_ptr,
956 tdb->map_size);
957 if (!new_map_ptr) {
958 tdb->map_size -= size;
959 goto fail;
960 }
961 tdb->map_ptr = new_map_ptr;
962 } else {
963 /*
964 * We must ensure the file is remapped before adding the space
965 * to ensure consistency with systems like OpenBSD where
966 * writes and mmaps are not consistent.
967 */
968
969 /* We're ok if the mmap fails as we'll fallback to read/write */
970 tdb_mmap(tdb);
971 }
972
973 /* form a new freelist record */
974 memset(&rec,'\0',sizeof(rec));
975 rec.rec_len = size - sizeof(rec);
976
977 /* link it into the free list */
978 offset = tdb->map_size - size;
979 if (tdb_free(tdb, offset, &rec) == -1)
980 goto fail;
981
982 tdb_unlock(tdb, -1, F_WRLCK);
983 return 0;
984 fail:
985 tdb_unlock(tdb, -1, F_WRLCK);
986 return -1;
987}
988
989/* read/write a tdb_off_t */
990int tdb_ofs_read(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
991{
992 return tdb->methods->tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
993}
994
995int tdb_ofs_write(struct tdb_context *tdb, tdb_off_t offset, tdb_off_t *d)
996{
997 tdb_off_t off = *d;
998 return tdb->methods->tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
999}
1000
1001
1002/* read a lump of data, allocating the space for it */
1003unsigned char *tdb_alloc_read(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t len)
1004{
1005 unsigned char *buf;
1006
1007 /* some systems don't like zero length malloc */
1008 if (len == 0) {
1009 len = 1;
1010 }
1011
1012 if (!(buf = (unsigned char *)malloc(len))) {
1013 /* Ensure ecode is set for log fn. */
1014 tdb->ecode = TDB_ERR_OOM;
1015 TDB_LOG((tdb, TDB_DEBUG_ERROR,"tdb_alloc_read malloc failed len=%d (%s)\n",
1016 len, strerror(errno)));
1017 return TDB_ERRCODE(TDB_ERR_OOM, buf);
1018 }
1019 if (tdb->methods->tdb_read(tdb, offset, buf, len, 0) == -1) {
1020 SAFE_FREE(buf);
1021 return NULL;
1022 }
1023 return buf;
1024}
1025
1026/* Give a piece of tdb data to a parser */
1027
1028int tdb_parse_data(struct tdb_context *tdb, TDB_DATA key,
1029 tdb_off_t offset, tdb_len_t len,
1030 int (*parser)(TDB_DATA key, TDB_DATA data,
1031 void *private_data),
1032 void *private_data)
1033{
1034 TDB_DATA data;
1035 int result;
1036
1037 data.dsize = len;
1038
1039 if ((tdb->transaction == NULL) && (tdb->map_ptr != NULL)) {
1040 /*
1041 * Optimize by avoiding the malloc/memcpy/free, point the
1042 * parser directly at the mmap area.
1043 */
1044 if (tdb->methods->tdb_oob(tdb, offset+len, 0) != 0) {
1045 return -1;
1046 }
1047 data.dptr = offset + (unsigned char *)tdb->map_ptr;
1048 return parser(key, data, private_data);
1049 }
1050
1051 if (!(data.dptr = tdb_alloc_read(tdb, offset, len))) {
1052 return -1;
1053 }
1054
1055 result = parser(key, data, private_data);
1056 free(data.dptr);
1057 return result;
1058}
1059
1060/* read/write a record */
1061int tdb_rec_read(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1062{
1063 if (tdb->methods->tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
1064 return -1;
1065 if (TDB_BAD_MAGIC(rec)) {
1066 /* Ensure ecode is set for log fn. */
1067 tdb->ecode = TDB_ERR_CORRUPT;
1068 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
1069 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
1070 }
1071 return tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0);
1072}
1073
1074int tdb_rec_write(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
1075{
1076 struct list_struct r = *rec;
1077 return tdb->methods->tdb_write(tdb, offset, CONVERT(r), sizeof(r));
1078}
1079
1080static const struct tdb_methods io_methods = {
1081 tdb_read,
1082 tdb_write,
1083 tdb_next_hash_chain,
1084 tdb_oob,
1085 tdb_expand_file,
1086 tdb_brlock
1087};
1088
1089/*
1090 initialise the default methods table
1091*/
1092void tdb_io_init(struct tdb_context *tdb)
1093{
1094 tdb->methods = &io_methods;
1095}
1096
1097/* file: transaction.c */
1098
1099/*
1100 transaction design:
1101
1102 - only allow a single transaction at a time per database. This makes
1103 using the transaction API simpler, as otherwise the caller would
1104 have to cope with temporary failures in transactions that conflict
1105 with other current transactions
1106
1107 - keep the transaction recovery information in the same file as the
1108 database, using a special 'transaction recovery' record pointed at
1109 by the header. This removes the need for extra journal files as
1110 used by some other databases
1111
1112 - dynamically allocated the transaction recover record, re-using it
1113 for subsequent transactions. If a larger record is needed then
1114 tdb_free() the old record to place it on the normal tdb freelist
1115 before allocating the new record
1116
1117 - during transactions, keep a linked list of writes all that have
1118 been performed by intercepting all tdb_write() calls. The hooked
1119 transaction versions of tdb_read() and tdb_write() check this
1120 linked list and try to use the elements of the list in preference
1121 to the real database.
1122
1123 - don't allow any locks to be held when a transaction starts,
1124 otherwise we can end up with deadlock (plus lack of lock nesting
1125 in posix locks would mean the lock is lost)
1126
1127 - if the caller gains a lock during the transaction but doesn't
1128 release it then fail the commit
1129
1130 - allow for nested calls to tdb_transaction_start(), re-using the
1131 existing transaction record. If the inner transaction is cancelled
1132 then a subsequent commit will fail
1133
1134 - keep a mirrored copy of the tdb hash chain heads to allow for the
1135 fast hash heads scan on traverse, updating the mirrored copy in
1136 the transaction version of tdb_write
1137
1138 - allow callers to mix transaction and non-transaction use of tdb,
1139 although once a transaction is started then an exclusive lock is
1140 gained until the transaction is committed or cancelled
1141
1142 - the commit stategy involves first saving away all modified data
1143 into a linearised buffer in the transaction recovery area, then
1144 marking the transaction recovery area with a magic value to
1145 indicate a valid recovery record. In total 4 fsync/msync calls are
1146 needed per commit to prevent race conditions. It might be possible
1147 to reduce this to 3 or even 2 with some more work.
1148
1149 - check for a valid recovery record on open of the tdb, while the
1150 global lock is held. Automatically recover from the transaction
1151 recovery area if needed, then continue with the open as
1152 usual. This allows for smooth crash recovery with no administrator
1153 intervention.
1154
1155 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
1156 still available, but no transaction recovery area is used and no
1157 fsync/msync calls are made.
1158
1159*/
1160
1161struct tdb_transaction_el {
1162 struct tdb_transaction_el *next, *prev;
1163 tdb_off_t offset;
1164 tdb_len_t length;
1165 unsigned char *data;
1166};
1167
1168/*
1169 hold the context of any current transaction
1170*/
1171struct tdb_transaction {
1172 /* we keep a mirrored copy of the tdb hash heads here so
1173 tdb_next_hash_chain() can operate efficiently */
1174 u32 *hash_heads;
1175
1176 /* the original io methods - used to do IOs to the real db */
1177 const struct tdb_methods *io_methods;
1178
1179 /* the list of transaction elements. We use a doubly linked
1180 list with a last pointer to allow us to keep the list
1181 ordered, with first element at the front of the list. It
1182 needs to be doubly linked as the read/write traversals need
1183 to be backwards, while the commit needs to be forwards */
1184 struct tdb_transaction_el *elements, *elements_last;
1185
1186 /* non-zero when an internal transaction error has
1187 occurred. All write operations will then fail until the
1188 transaction is ended */
1189 int transaction_error;
1190
1191 /* when inside a transaction we need to keep track of any
1192 nested tdb_transaction_start() calls, as these are allowed,
1193 but don't create a new transaction */
1194 int nesting;
1195
1196 /* old file size before transaction */
1197 tdb_len_t old_map_size;
1198};
1199
1200
1201/*
1202 read while in a transaction. We need to check first if the data is in our list
1203 of transaction elements, then if not do a real read
1204*/
1205static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
1206 tdb_len_t len, int cv)
1207{
1208 struct tdb_transaction_el *el;
1209
1210 /* we need to walk the list backwards to get the most recent data */
1211 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1212 tdb_len_t partial;
1213
1214 if (off+len <= el->offset) {
1215 continue;
1216 }
1217 if (off >= el->offset + el->length) {
1218 continue;
1219 }
1220
1221 /* an overlapping read - needs to be split into up to
1222 2 reads and a memcpy */
1223 if (off < el->offset) {
1224 partial = el->offset - off;
1225 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
1226 goto fail;
1227 }
1228 len -= partial;
1229 off += partial;
1230 buf = (void *)(partial + (char *)buf);
1231 }
1232 if (off + len <= el->offset + el->length) {
1233 partial = len;
1234 } else {
1235 partial = el->offset + el->length - off;
1236 }
1237 memcpy(buf, el->data + (off - el->offset), partial);
1238 if (cv) {
1239 tdb_convert(buf, len);
1240 }
1241 len -= partial;
1242 off += partial;
1243 buf = (void *)(partial + (char *)buf);
1244
1245 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
1246 goto fail;
1247 }
1248
1249 return 0;
1250 }
1251
1252 /* its not in the transaction elements - do a real read */
1253 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
1254
1255fail:
1256 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
1257 tdb->ecode = TDB_ERR_IO;
1258 tdb->transaction->transaction_error = 1;
1259 return -1;
1260}
1261
1262
1263/*
1264 write while in a transaction
1265*/
1266static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
1267 const void *buf, tdb_len_t len)
1268{
1269 struct tdb_transaction_el *el, *best_el=NULL;
1270
1271 if (len == 0) {
1272 return 0;
1273 }
1274
1275 /* if the write is to a hash head, then update the transaction
1276 hash heads */
1277 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
1278 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
1279 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
1280 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
1281 }
1282
1283 /* first see if we can replace an existing entry */
1284 for (el=tdb->transaction->elements_last;el;el=el->prev) {
1285 tdb_len_t partial;
1286
1287 if (best_el == NULL && off == el->offset+el->length) {
1288 best_el = el;
1289 }
1290
1291 if (off+len <= el->offset) {
1292 continue;
1293 }
1294 if (off >= el->offset + el->length) {
1295 continue;
1296 }
1297
1298 /* an overlapping write - needs to be split into up to
1299 2 writes and a memcpy */
1300 if (off < el->offset) {
1301 partial = el->offset - off;
1302 if (transaction_write(tdb, off, buf, partial) != 0) {
1303 goto fail;
1304 }
1305 len -= partial;
1306 off += partial;
1307 buf = (const void *)(partial + (const char *)buf);
1308 }
1309 if (off + len <= el->offset + el->length) {
1310 partial = len;
1311 } else {
1312 partial = el->offset + el->length - off;
1313 }
1314 memcpy(el->data + (off - el->offset), buf, partial);
1315 len -= partial;
1316 off += partial;
1317 buf = (const void *)(partial + (const char *)buf);
1318
1319 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
1320 goto fail;
1321 }
1322
1323 return 0;
1324 }
1325
1326 /* see if we can append the new entry to an existing entry */
1327 if (best_el && best_el->offset + best_el->length == off &&
1328 (off+len < tdb->transaction->old_map_size ||
1329 off > tdb->transaction->old_map_size)) {
1330 unsigned char *data = best_el->data;
1331 el = best_el;
1332 el->data = (unsigned char *)realloc(el->data,
1333 el->length + len);
1334 if (el->data == NULL) {
1335 tdb->ecode = TDB_ERR_OOM;
1336 tdb->transaction->transaction_error = 1;
1337 el->data = data;
1338 return -1;
1339 }
1340 if (buf) {
1341 memcpy(el->data + el->length, buf, len);
1342 } else {
1343 memset(el->data + el->length, TDB_PAD_BYTE, len);
1344 }
1345 el->length += len;
1346 return 0;
1347 }
1348
1349 /* add a new entry at the end of the list */
1350 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
1351 if (el == NULL) {
1352 tdb->ecode = TDB_ERR_OOM;
1353 tdb->transaction->transaction_error = 1;
1354 return -1;
1355 }
1356 el->next = NULL;
1357 el->prev = tdb->transaction->elements_last;
1358 el->offset = off;
1359 el->length = len;
1360 el->data = (unsigned char *)malloc(len);
1361 if (el->data == NULL) {
1362 free(el);
1363 tdb->ecode = TDB_ERR_OOM;
1364 tdb->transaction->transaction_error = 1;
1365 return -1;
1366 }
1367 if (buf) {
1368 memcpy(el->data, buf, len);
1369 } else {
1370 memset(el->data, TDB_PAD_BYTE, len);
1371 }
1372 if (el->prev) {
1373 el->prev->next = el;
1374 } else {
1375 tdb->transaction->elements = el;
1376 }
1377 tdb->transaction->elements_last = el;
1378 return 0;
1379
1380fail:
1381 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
1382 tdb->ecode = TDB_ERR_IO;
1383 tdb->transaction->transaction_error = 1;
1384 return -1;
1385}
1386
1387/*
1388 accelerated hash chain head search, using the cached hash heads
1389*/
1390static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
1391{
1392 u32 h = *chain;
1393 for (;h < tdb->header.hash_size;h++) {
1394 /* the +1 takes account of the freelist */
1395 if (0 != tdb->transaction->hash_heads[h+1]) {
1396 break;
1397 }
1398 }
1399 (*chain) = h;
1400}
1401
1402/*
1403 out of bounds check during a transaction
1404*/
1405static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
1406{
1407 if (len <= tdb->map_size) {
1408 return 0;
1409 }
1410 return TDB_ERRCODE(TDB_ERR_IO, -1);
1411}
1412
1413/*
1414 transaction version of tdb_expand().
1415*/
1416static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
1417 tdb_off_t addition)
1418{
1419 /* add a write to the transaction elements, so subsequent
1420 reads see the zero data */
1421 if (transaction_write(tdb, size, NULL, addition) != 0) {
1422 return -1;
1423 }
1424
1425 return 0;
1426}
1427
1428/*
1429 brlock during a transaction - ignore them
1430*/
1431static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
1432 int rw_type, int lck_type, int probe, size_t len)
1433{
1434 return 0;
1435}
1436
1437static const struct tdb_methods transaction_methods = {
1438 transaction_read,
1439 transaction_write,
1440 transaction_next_hash_chain,
1441 transaction_oob,
1442 transaction_expand_file,
1443 transaction_brlock
1444};
1445
1446
1447/*
1448 start a tdb transaction. No token is returned, as only a single
1449 transaction is allowed to be pending per tdb_context
1450*/
1451int tdb_transaction_start(struct tdb_context *tdb)
1452{
1453 /* some sanity checks */
1454 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
1455 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
1456 tdb->ecode = TDB_ERR_EINVAL;
1457 return -1;
1458 }
1459
1460 /* cope with nested tdb_transaction_start() calls */
1461 if (tdb->transaction != NULL) {
1462 tdb->transaction->nesting++;
1463 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
1464 tdb->transaction->nesting));
1465 return 0;
1466 }
1467
1468 if (tdb->num_locks != 0 || tdb->global_lock.count) {
1469 /* the caller must not have any locks when starting a
1470 transaction as otherwise we'll be screwed by lack
1471 of nested locks in posix */
1472 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
1473 tdb->ecode = TDB_ERR_LOCK;
1474 return -1;
1475 }
1476
1477 if (tdb->travlocks.next != NULL) {
1478 /* you cannot use transactions inside a traverse (although you can use
1479 traverse inside a transaction) as otherwise you can end up with
1480 deadlock */
1481 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
1482 tdb->ecode = TDB_ERR_LOCK;
1483 return -1;
1484 }
1485
1486 tdb->transaction = (struct tdb_transaction *)
1487 calloc(sizeof(struct tdb_transaction), 1);
1488 if (tdb->transaction == NULL) {
1489 tdb->ecode = TDB_ERR_OOM;
1490 return -1;
1491 }
1492
1493 /* get the transaction write lock. This is a blocking lock. As
1494 discussed with Volker, there are a number of ways we could
1495 make this async, which we will probably do in the future */
1496 if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1497 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
1498 tdb->ecode = TDB_ERR_LOCK;
1499 SAFE_FREE(tdb->transaction);
1500 return -1;
1501 }
1502
1503 /* get a read lock from the freelist to the end of file. This
1504 is upgraded to a write lock during the commit */
1505 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
1506 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
1507 tdb->ecode = TDB_ERR_LOCK;
1508 goto fail;
1509 }
1510
1511 /* setup a copy of the hash table heads so the hash scan in
1512 traverse can be fast */
1513 tdb->transaction->hash_heads = (u32 *)
1514 calloc(tdb->header.hash_size+1, sizeof(u32));
1515 if (tdb->transaction->hash_heads == NULL) {
1516 tdb->ecode = TDB_ERR_OOM;
1517 goto fail;
1518 }
1519 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1520 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
1521 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
1522 tdb->ecode = TDB_ERR_IO;
1523 goto fail;
1524 }
1525
1526 /* make sure we know about any file expansions already done by
1527 anyone else */
1528 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1529 tdb->transaction->old_map_size = tdb->map_size;
1530
1531 /* finally hook the io methods, replacing them with
1532 transaction specific methods */
1533 tdb->transaction->io_methods = tdb->methods;
1534 tdb->methods = &transaction_methods;
1535
1536 /* by calling this transaction write here, we ensure that we don't grow the
1537 transaction linked list due to hash table updates */
1538 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
1539 TDB_HASHTABLE_SIZE(tdb)) != 0) {
1540 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
1541 tdb->ecode = TDB_ERR_IO;
1542 goto fail;
1543 }
1544
1545 return 0;
1546
1547fail:
1548 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1549 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1550 SAFE_FREE(tdb->transaction->hash_heads);
1551 SAFE_FREE(tdb->transaction);
1552 return -1;
1553}
1554
1555
1556/*
1557 cancel the current transaction
1558*/
1559int tdb_transaction_cancel(struct tdb_context *tdb)
1560{
1561 if (tdb->transaction == NULL) {
1562 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
1563 return -1;
1564 }
1565
1566 if (tdb->transaction->nesting != 0) {
1567 tdb->transaction->transaction_error = 1;
1568 tdb->transaction->nesting--;
1569 return 0;
1570 }
1571
1572 tdb->map_size = tdb->transaction->old_map_size;
1573
1574 /* free all the transaction elements */
1575 while (tdb->transaction->elements) {
1576 struct tdb_transaction_el *el = tdb->transaction->elements;
1577 tdb->transaction->elements = el->next;
1578 free(el->data);
1579 free(el);
1580 }
1581
1582 /* remove any global lock created during the transaction */
1583 if (tdb->global_lock.count != 0) {
1584 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
1585 tdb->global_lock.count = 0;
1586 }
1587
1588 /* remove any locks created during the transaction */
1589 if (tdb->num_locks != 0) {
1590 int i;
1591 for (i=0;i<tdb->num_lockrecs;i++) {
1592 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
1593 F_UNLCK,F_SETLKW, 0, 1);
1594 }
1595 tdb->num_locks = 0;
1596 }
1597
1598 /* restore the normal io methods */
1599 tdb->methods = tdb->transaction->io_methods;
1600
1601 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
1602 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1603 SAFE_FREE(tdb->transaction->hash_heads);
1604 SAFE_FREE(tdb->transaction);
1605
1606 return 0;
1607}
1608
1609/*
1610 sync to disk
1611*/
1612static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
1613{
1614 if (fsync(tdb->fd) != 0) {
1615 tdb->ecode = TDB_ERR_IO;
1616 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
1617 return -1;
1618 }
1619#ifdef MS_SYNC
1620 if (tdb->map_ptr) {
1621 tdb_off_t moffset = offset & ~(tdb->page_size-1);
1622 if (msync(moffset + (char *)tdb->map_ptr,
1623 length + (offset - moffset), MS_SYNC) != 0) {
1624 tdb->ecode = TDB_ERR_IO;
1625 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
1626 strerror(errno)));
1627 return -1;
1628 }
1629 }
1630#endif
1631 return 0;
1632}
1633
1634
1635/*
1636 work out how much space the linearised recovery data will consume
1637*/
1638static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
1639{
1640 struct tdb_transaction_el *el;
1641 tdb_len_t recovery_size = 0;
1642
1643 recovery_size = sizeof(u32);
1644 for (el=tdb->transaction->elements;el;el=el->next) {
1645 if (el->offset >= tdb->transaction->old_map_size) {
1646 continue;
1647 }
1648 recovery_size += 2*sizeof(tdb_off_t) + el->length;
1649 }
1650
1651 return recovery_size;
1652}
1653
1654/*
1655 allocate the recovery area, or use an existing recovery area if it is
1656 large enough
1657*/
1658static int tdb_recovery_allocate(struct tdb_context *tdb,
1659 tdb_len_t *recovery_size,
1660 tdb_off_t *recovery_offset,
1661 tdb_len_t *recovery_max_size)
1662{
1663 struct list_struct rec;
1664 const struct tdb_methods *methods = tdb->transaction->io_methods;
1665 tdb_off_t recovery_head;
1666
1667 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1668 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
1669 return -1;
1670 }
1671
1672 rec.rec_len = 0;
1673
1674 if (recovery_head != 0 &&
1675 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
1676 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
1677 return -1;
1678 }
1679
1680 *recovery_size = tdb_recovery_size(tdb);
1681
1682 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
1683 /* it fits in the existing area */
1684 *recovery_max_size = rec.rec_len;
1685 *recovery_offset = recovery_head;
1686 return 0;
1687 }
1688
1689 /* we need to free up the old recovery area, then allocate a
1690 new one at the end of the file. Note that we cannot use
1691 tdb_allocate() to allocate the new one as that might return
1692 us an area that is being currently used (as of the start of
1693 the transaction) */
1694 if (recovery_head != 0) {
1695 if (tdb_free(tdb, recovery_head, &rec) == -1) {
1696 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
1697 return -1;
1698 }
1699 }
1700
1701 /* the tdb_free() call might have increased the recovery size */
1702 *recovery_size = tdb_recovery_size(tdb);
1703
1704 /* round up to a multiple of page size */
1705 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
1706 *recovery_offset = tdb->map_size;
1707 recovery_head = *recovery_offset;
1708
1709 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1710 (tdb->map_size - tdb->transaction->old_map_size) +
1711 sizeof(rec) + *recovery_max_size) == -1) {
1712 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
1713 return -1;
1714 }
1715
1716 /* remap the file (if using mmap) */
1717 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1718
1719 /* we have to reset the old map size so that we don't try to expand the file
1720 again in the transaction commit, which would destroy the recovery area */
1721 tdb->transaction->old_map_size = tdb->map_size;
1722
1723 /* write the recovery header offset and sync - we can sync without a race here
1724 as the magic ptr in the recovery record has not been set */
1725 CONVERT(recovery_head);
1726 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
1727 &recovery_head, sizeof(tdb_off_t)) == -1) {
1728 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
1729 return -1;
1730 }
1731
1732 return 0;
1733}
1734
1735
1736/*
1737 setup the recovery data that will be used on a crash during commit
1738*/
1739static int transaction_setup_recovery(struct tdb_context *tdb,
1740 tdb_off_t *magic_offset)
1741{
1742 struct tdb_transaction_el *el;
1743 tdb_len_t recovery_size;
1744 unsigned char *data, *p;
1745 const struct tdb_methods *methods = tdb->transaction->io_methods;
1746 struct list_struct *rec;
1747 tdb_off_t recovery_offset, recovery_max_size;
1748 tdb_off_t old_map_size = tdb->transaction->old_map_size;
1749 u32 magic, tailer;
1750
1751 /*
1752 check that the recovery area has enough space
1753 */
1754 if (tdb_recovery_allocate(tdb, &recovery_size,
1755 &recovery_offset, &recovery_max_size) == -1) {
1756 return -1;
1757 }
1758
1759 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
1760 if (data == NULL) {
1761 tdb->ecode = TDB_ERR_OOM;
1762 return -1;
1763 }
1764
1765 rec = (struct list_struct *)data;
1766 memset(rec, 0, sizeof(*rec));
1767
1768 rec->magic = 0;
1769 rec->data_len = recovery_size;
1770 rec->rec_len = recovery_max_size;
1771 rec->key_len = old_map_size;
1772 CONVERT(rec);
1773
1774 /* build the recovery data into a single blob to allow us to do a single
1775 large write, which should be more efficient */
1776 p = data + sizeof(*rec);
1777 for (el=tdb->transaction->elements;el;el=el->next) {
1778 if (el->offset >= old_map_size) {
1779 continue;
1780 }
1781 if (el->offset + el->length > tdb->transaction->old_map_size) {
1782 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
1783 free(data);
1784 tdb->ecode = TDB_ERR_CORRUPT;
1785 return -1;
1786 }
1787 memcpy(p, &el->offset, 4);
1788 memcpy(p+4, &el->length, 4);
1789 if (DOCONV()) {
1790 tdb_convert(p, 8);
1791 }
1792 /* the recovery area contains the old data, not the
1793 new data, so we have to call the original tdb_read
1794 method to get it */
1795 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
1796 free(data);
1797 tdb->ecode = TDB_ERR_IO;
1798 return -1;
1799 }
1800 p += 8 + el->length;
1801 }
1802
1803 /* and the tailer */
1804 tailer = sizeof(*rec) + recovery_max_size;
1805 memcpy(p, &tailer, 4);
1806 CONVERT(p);
1807
1808 /* write the recovery data to the recovery area */
1809 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
1810 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
1811 free(data);
1812 tdb->ecode = TDB_ERR_IO;
1813 return -1;
1814 }
1815
1816 /* as we don't have ordered writes, we have to sync the recovery
1817 data before we update the magic to indicate that the recovery
1818 data is present */
1819 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
1820 free(data);
1821 return -1;
1822 }
1823
1824 free(data);
1825
1826 magic = TDB_RECOVERY_MAGIC;
1827 CONVERT(magic);
1828
1829 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
1830
1831 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
1832 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
1833 tdb->ecode = TDB_ERR_IO;
1834 return -1;
1835 }
1836
1837 /* ensure the recovery magic marker is on disk */
1838 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
1839 return -1;
1840 }
1841
1842 return 0;
1843}
1844
1845/*
1846 commit the current transaction
1847*/
1848int tdb_transaction_commit(struct tdb_context *tdb)
1849{
1850 const struct tdb_methods *methods;
1851 tdb_off_t magic_offset = 0;
1852 u32 zero = 0;
1853
1854 if (tdb->transaction == NULL) {
1855 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1856 return -1;
1857 }
1858
1859 if (tdb->transaction->transaction_error) {
1860 tdb->ecode = TDB_ERR_IO;
1861 tdb_transaction_cancel(tdb);
1862 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1863 return -1;
1864 }
1865
1866 if (tdb->transaction->nesting != 0) {
1867 tdb->transaction->nesting--;
1868 return 0;
1869 }
1870
1871 /* check for a null transaction */
1872 if (tdb->transaction->elements == NULL) {
1873 tdb_transaction_cancel(tdb);
1874 return 0;
1875 }
1876
1877 methods = tdb->transaction->io_methods;
1878
1879 /* if there are any locks pending then the caller has not
1880 nested their locks properly, so fail the transaction */
1881 if (tdb->num_locks || tdb->global_lock.count) {
1882 tdb->ecode = TDB_ERR_LOCK;
1883 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
1884 tdb_transaction_cancel(tdb);
1885 return -1;
1886 }
1887
1888 /* upgrade the main transaction lock region to a write lock */
1889 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
1890 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
1891 tdb->ecode = TDB_ERR_LOCK;
1892 tdb_transaction_cancel(tdb);
1893 return -1;
1894 }
1895
1896 /* get the global lock - this prevents new users attaching to the database
1897 during the commit */
1898 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
1899 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
1900 tdb->ecode = TDB_ERR_LOCK;
1901 tdb_transaction_cancel(tdb);
1902 return -1;
1903 }
1904
1905 if (!(tdb->flags & TDB_NOSYNC)) {
1906 /* write the recovery data to the end of the file */
1907 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
1908 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
1909 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1910 tdb_transaction_cancel(tdb);
1911 return -1;
1912 }
1913 }
1914
1915 /* expand the file to the new size if needed */
1916 if (tdb->map_size != tdb->transaction->old_map_size) {
1917 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1918 tdb->map_size -
1919 tdb->transaction->old_map_size) == -1) {
1920 tdb->ecode = TDB_ERR_IO;
1921 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
1922 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1923 tdb_transaction_cancel(tdb);
1924 return -1;
1925 }
1926 tdb->map_size = tdb->transaction->old_map_size;
1927 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
1928 }
1929
1930 /* perform all the writes */
1931 while (tdb->transaction->elements) {
1932 struct tdb_transaction_el *el = tdb->transaction->elements;
1933
1934 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
1935 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1936
1937 /* we've overwritten part of the data and
1938 possibly expanded the file, so we need to
1939 run the crash recovery code */
1940 tdb->methods = methods;
1941 tdb_transaction_recover(tdb);
1942
1943 tdb_transaction_cancel(tdb);
1944 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1945
1946 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1947 return -1;
1948 }
1949 tdb->transaction->elements = el->next;
1950 free(el->data);
1951 free(el);
1952 }
1953
1954 if (!(tdb->flags & TDB_NOSYNC)) {
1955 /* ensure the new data is on disk */
1956 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1957 return -1;
1958 }
1959
1960 /* remove the recovery marker */
1961 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
1962 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
1963 return -1;
1964 }
1965
1966 /* ensure the recovery marker has been removed on disk */
1967 if (transaction_sync(tdb, magic_offset, 4) == -1) {
1968 return -1;
1969 }
1970 }
1971
1972 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1973
1974 /*
1975 TODO: maybe write to some dummy hdr field, or write to magic
1976 offset without mmap, before the last sync, instead of the
1977 utime() call
1978 */
1979
1980 /* on some systems (like Linux 2.6.x) changes via mmap/msync
1981 don't change the mtime of the file, this means the file may
1982 not be backed up (as tdb rounding to block sizes means that
1983 file size changes are quite rare too). The following forces
1984 mtime changes when a transaction completes */
1985#ifdef HAVE_UTIME
1986 utime(tdb->name, NULL);
1987#endif
1988
1989 /* use a transaction cancel to free memory and remove the
1990 transaction locks */
1991 tdb_transaction_cancel(tdb);
1992 return 0;
1993}
1994
1995
1996/*
1997 recover from an aborted transaction. Must be called with exclusive
1998 database write access already established (including the global
1999 lock to prevent new processes attaching)
2000*/
2001int tdb_transaction_recover(struct tdb_context *tdb)
2002{
2003 tdb_off_t recovery_head, recovery_eof;
2004 unsigned char *data, *p;
2005 u32 zero = 0;
2006 struct list_struct rec;
2007
2008 /* find the recovery area */
2009 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
2010 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
2011 tdb->ecode = TDB_ERR_IO;
2012 return -1;
2013 }
2014
2015 if (recovery_head == 0) {
2016 /* we have never allocated a recovery record */
2017 return 0;
2018 }
2019
2020 /* read the recovery record */
2021 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
2022 sizeof(rec), DOCONV()) == -1) {
2023 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
2024 tdb->ecode = TDB_ERR_IO;
2025 return -1;
2026 }
2027
2028 if (rec.magic != TDB_RECOVERY_MAGIC) {
2029 /* there is no valid recovery data */
2030 return 0;
2031 }
2032
2033 if (tdb->read_only) {
2034 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
2035 tdb->ecode = TDB_ERR_CORRUPT;
2036 return -1;
2037 }
2038
2039 recovery_eof = rec.key_len;
2040
2041 data = (unsigned char *)malloc(rec.data_len);
2042 if (data == NULL) {
2043 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
2044 tdb->ecode = TDB_ERR_OOM;
2045 return -1;
2046 }
2047
2048 /* read the full recovery data */
2049 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
2050 rec.data_len, 0) == -1) {
2051 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
2052 tdb->ecode = TDB_ERR_IO;
2053 return -1;
2054 }
2055
2056 /* recover the file data */
2057 p = data;
2058 while (p+8 < data + rec.data_len) {
2059 u32 ofs, len;
2060 if (DOCONV()) {
2061 tdb_convert(p, 8);
2062 }
2063 memcpy(&ofs, p, 4);
2064 memcpy(&len, p+4, 4);
2065
2066 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
2067 free(data);
2068 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
2069 tdb->ecode = TDB_ERR_IO;
2070 return -1;
2071 }
2072 p += 8 + len;
2073 }
2074
2075 free(data);
2076
2077 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
2078 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
2079 tdb->ecode = TDB_ERR_IO;
2080 return -1;
2081 }
2082
2083 /* if the recovery area is after the recovered eof then remove it */
2084 if (recovery_eof <= recovery_head) {
2085 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
2086 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
2087 tdb->ecode = TDB_ERR_IO;
2088 return -1;
2089 }
2090 }
2091
2092 /* remove the recovery magic */
2093 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
2094 &zero) == -1) {
2095 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
2096 tdb->ecode = TDB_ERR_IO;
2097 return -1;
2098 }
2099
2100 /* reduce the file size to the old size */
2101 tdb_munmap(tdb);
2102 if (ftruncate(tdb->fd, recovery_eof) != 0) {
2103 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
2104 tdb->ecode = TDB_ERR_IO;
2105 return -1;
2106 }
2107 tdb->map_size = recovery_eof;
2108 tdb_mmap(tdb);
2109
2110 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
2111 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
2112 tdb->ecode = TDB_ERR_IO;
2113 return -1;
2114 }
2115
2116 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
2117 recovery_eof));
2118
2119 /* all done */
2120 return 0;
2121}
2122
2123/* file: freelist.c */
2124
2125/* read a freelist record and check for simple errors */
2126static int rec_free_read(struct tdb_context *tdb, tdb_off_t off, struct list_struct *rec)
2127{
2128 if (tdb->methods->tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
2129 return -1;
2130
2131 if (rec->magic == TDB_MAGIC) {
2132 /* this happens when a app is showdown while deleting a record - we should
2133 not completely fail when this happens */
2134 TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read non-free magic 0x%x at offset=%d - fixing\n",
2135 rec->magic, off));
2136 rec->magic = TDB_FREE_MAGIC;
2137 if (tdb->methods->tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
2138 return -1;
2139 }
2140
2141 if (rec->magic != TDB_FREE_MAGIC) {
2142 /* Ensure ecode is set for log fn. */
2143 tdb->ecode = TDB_ERR_CORRUPT;
2144 TDB_LOG((tdb, TDB_DEBUG_WARNING, "rec_free_read bad magic 0x%x at offset=%d\n",
2145 rec->magic, off));
2146 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2147 }
2148 if (tdb->methods->tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
2149 return -1;
2150 return 0;
2151}
2152
2153
2154
2155/* Remove an element from the freelist. Must have alloc lock. */
2156static int remove_from_freelist(struct tdb_context *tdb, tdb_off_t off, tdb_off_t next)
2157{
2158 tdb_off_t last_ptr, i;
2159
2160 /* read in the freelist top */
2161 last_ptr = FREELIST_TOP;
2162 while (tdb_ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
2163 if (i == off) {
2164 /* We've found it! */
2165 return tdb_ofs_write(tdb, last_ptr, &next);
2166 }
2167 /* Follow chain (next offset is at start of record) */
2168 last_ptr = i;
2169 }
2170 TDB_LOG((tdb, TDB_DEBUG_FATAL,"remove_from_freelist: not on list at off=%d\n", off));
2171 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2172}
2173
2174
2175/* update a record tailer (must hold allocation lock) */
2176static int update_tailer(struct tdb_context *tdb, tdb_off_t offset,
2177 const struct list_struct *rec)
2178{
2179 tdb_off_t totalsize;
2180
2181 /* Offset of tailer from record header */
2182 totalsize = sizeof(*rec) + rec->rec_len;
2183 return tdb_ofs_write(tdb, offset + totalsize - sizeof(tdb_off_t),
2184 &totalsize);
2185}
2186
2187/* Add an element into the freelist. Merge adjacent records if
2188 neccessary. */
2189int tdb_free(struct tdb_context *tdb, tdb_off_t offset, struct list_struct *rec)
2190{
2191 tdb_off_t right, left;
2192
2193 /* Allocation and tailer lock */
2194 if (tdb_lock(tdb, -1, F_WRLCK) != 0)
2195 return -1;
2196
2197 /* set an initial tailer, so if we fail we don't leave a bogus record */
2198 if (update_tailer(tdb, offset, rec) != 0) {
2199 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed!\n"));
2200 goto fail;
2201 }
2202
2203 /* Look right first (I'm an Australian, dammit) */
2204 right = offset + sizeof(*rec) + rec->rec_len;
2205 if (right + sizeof(*rec) <= tdb->map_size) {
2206 struct list_struct r;
2207
2208 if (tdb->methods->tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
2209 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right read failed at %u\n", right));
2210 goto left;
2211 }
2212
2213 /* If it's free, expand to include it. */
2214 if (r.magic == TDB_FREE_MAGIC) {
2215 if (remove_from_freelist(tdb, right, r.next) == -1) {
2216 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: right free failed at %u\n", right));
2217 goto left;
2218 }
2219 rec->rec_len += sizeof(r) + r.rec_len;
2220 }
2221 }
2222
2223left:
2224 /* Look left */
2225 left = offset - sizeof(tdb_off_t);
2226 if (left > TDB_DATA_START(tdb->header.hash_size)) {
2227 struct list_struct l;
2228 tdb_off_t leftsize;
2229
2230 /* Read in tailer and jump back to header */
2231 if (tdb_ofs_read(tdb, left, &leftsize) == -1) {
2232 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left offset read failed at %u\n", left));
2233 goto update;
2234 }
2235
2236 /* it could be uninitialised data */
2237 if (leftsize == 0 || leftsize == TDB_PAD_U32) {
2238 goto update;
2239 }
2240
2241 left = offset - leftsize;
2242
2243 /* Now read in record */
2244 if (tdb->methods->tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
2245 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
2246 goto update;
2247 }
2248
2249 /* If it's free, expand to include it. */
2250 if (l.magic == TDB_FREE_MAGIC) {
2251 if (remove_from_freelist(tdb, left, l.next) == -1) {
2252 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: left free failed at %u\n", left));
2253 goto update;
2254 } else {
2255 offset = left;
2256 rec->rec_len += leftsize;
2257 }
2258 }
2259 }
2260
2261update:
2262 if (update_tailer(tdb, offset, rec) == -1) {
2263 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free: update_tailer failed at %u\n", offset));
2264 goto fail;
2265 }
2266
2267 /* Now, prepend to free list */
2268 rec->magic = TDB_FREE_MAGIC;
2269
2270 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
2271 tdb_rec_write(tdb, offset, rec) == -1 ||
2272 tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
2273 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_free record write failed at offset=%d\n", offset));
2274 goto fail;
2275 }
2276
2277 /* And we're done. */
2278 tdb_unlock(tdb, -1, F_WRLCK);
2279 return 0;
2280
2281 fail:
2282 tdb_unlock(tdb, -1, F_WRLCK);
2283 return -1;
2284}
2285
2286
2287/*
2288 the core of tdb_allocate - called when we have decided which
2289 free list entry to use
2290 */
2291static tdb_off_t tdb_allocate_ofs(struct tdb_context *tdb, tdb_len_t length, tdb_off_t rec_ptr,
2292 struct list_struct *rec, tdb_off_t last_ptr)
2293{
2294 struct list_struct newrec;
2295 tdb_off_t newrec_ptr;
2296
2297 memset(&newrec, '\0', sizeof(newrec));
2298
2299 /* found it - now possibly split it up */
2300 if (rec->rec_len > length + MIN_REC_SIZE) {
2301 /* Length of left piece */
2302 length = TDB_ALIGN(length, TDB_ALIGNMENT);
2303
2304 /* Right piece to go on free list */
2305 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
2306 newrec_ptr = rec_ptr + sizeof(*rec) + length;
2307
2308 /* And left record is shortened */
2309 rec->rec_len = length;
2310 } else {
2311 newrec_ptr = 0;
2312 }
2313
2314 /* Remove allocated record from the free list */
2315 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1) {
2316 return 0;
2317 }
2318
2319 /* Update header: do this before we drop alloc
2320 lock, otherwise tdb_free() might try to
2321 merge with us, thinking we're free.
2322 (Thanks Jeremy Allison). */
2323 rec->magic = TDB_MAGIC;
2324 if (tdb_rec_write(tdb, rec_ptr, rec) == -1) {
2325 return 0;
2326 }
2327
2328 /* Did we create new block? */
2329 if (newrec_ptr) {
2330 /* Update allocated record tailer (we
2331 shortened it). */
2332 if (update_tailer(tdb, rec_ptr, rec) == -1) {
2333 return 0;
2334 }
2335
2336 /* Free new record */
2337 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
2338 return 0;
2339 }
2340 }
2341
2342 /* all done - return the new record offset */
2343 return rec_ptr;
2344}
2345
2346/* allocate some space from the free list. The offset returned points
2347 to a unconnected list_struct within the database with room for at
2348 least length bytes of total data
2349
2350 0 is returned if the space could not be allocated
2351 */
2352tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_struct *rec)
2353{
2354 tdb_off_t rec_ptr, last_ptr, newrec_ptr;
2355 struct {
2356 tdb_off_t rec_ptr, last_ptr;
2357 tdb_len_t rec_len;
2358 } bestfit;
2359
2360 if (tdb_lock(tdb, -1, F_WRLCK) == -1)
2361 return 0;
2362
2363 /* Extra bytes required for tailer */
2364 length += sizeof(tdb_off_t);
2365
2366 again:
2367 last_ptr = FREELIST_TOP;
2368
2369 /* read in the freelist top */
2370 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
2371 goto fail;
2372
2373 bestfit.rec_ptr = 0;
2374 bestfit.last_ptr = 0;
2375 bestfit.rec_len = 0;
2376
2377 /*
2378 this is a best fit allocation strategy. Originally we used
2379 a first fit strategy, but it suffered from massive fragmentation
2380 issues when faced with a slowly increasing record size.
2381 */
2382 while (rec_ptr) {
2383 if (rec_free_read(tdb, rec_ptr, rec) == -1) {
2384 goto fail;
2385 }
2386
2387 if (rec->rec_len >= length) {
2388 if (bestfit.rec_ptr == 0 ||
2389 rec->rec_len < bestfit.rec_len) {
2390 bestfit.rec_len = rec->rec_len;
2391 bestfit.rec_ptr = rec_ptr;
2392 bestfit.last_ptr = last_ptr;
2393 /* consider a fit to be good enough if
2394 we aren't wasting more than half
2395 the space */
2396 if (bestfit.rec_len < 2*length) {
2397 break;
2398 }
2399 }
2400 }
2401
2402 /* move to the next record */
2403 last_ptr = rec_ptr;
2404 rec_ptr = rec->next;
2405 }
2406
2407 if (bestfit.rec_ptr != 0) {
2408 if (rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
2409 goto fail;
2410 }
2411
2412 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
2413 tdb_unlock(tdb, -1, F_WRLCK);
2414 return newrec_ptr;
2415 }
2416
2417 /* we didn't find enough space. See if we can expand the
2418 database and if we can then try again */
2419 if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
2420 goto again;
2421 fail:
2422 tdb_unlock(tdb, -1, F_WRLCK);
2423 return 0;
2424}
2425
2426/* file: freelistcheck.c */
2427
2428/* Check the freelist is good and contains no loops.
2429 Very memory intensive - only do this as a consistency
2430 checker. Heh heh - uses an in memory tdb as the storage
2431 for the "seen" record list. For some reason this strikes
2432 me as extremely clever as I don't have to write another tree
2433 data structure implementation :-).
2434 */
2435
2436static int seen_insert(struct tdb_context *mem_tdb, tdb_off_t rec_ptr)
2437{
2438 TDB_DATA key, data;
2439
2440 memset(&data, '\0', sizeof(data));
2441 key.dptr = (unsigned char *)&rec_ptr;
2442 key.dsize = sizeof(rec_ptr);
2443 return tdb_store(mem_tdb, key, data, TDB_INSERT);
2444}
2445
2446int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries)
2447{
2448 struct tdb_context *mem_tdb = NULL;
2449 struct list_struct rec;
2450 tdb_off_t rec_ptr, last_ptr;
2451 int ret = -1;
2452
2453 *pnum_entries = 0;
2454
2455 mem_tdb = tdb_open("flval", tdb->header.hash_size,
2456 TDB_INTERNAL, O_RDWR, 0600);
2457 if (!mem_tdb) {
2458 return -1;
2459 }
2460
2461 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
2462 tdb_close(mem_tdb);
2463 return 0;
2464 }
2465
2466 last_ptr = FREELIST_TOP;
2467
2468 /* Store the FREELIST_TOP record. */
2469 if (seen_insert(mem_tdb, last_ptr) == -1) {
2470 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2471 goto fail;
2472 }
2473
2474 /* read in the freelist top */
2475 if (tdb_ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) {
2476 goto fail;
2477 }
2478
2479 while (rec_ptr) {
2480
2481 /* If we can't store this record (we've seen it
2482 before) then the free list has a loop and must
2483 be corrupt. */
2484
2485 if (seen_insert(mem_tdb, rec_ptr)) {
2486 ret = TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
2487 goto fail;
2488 }
2489
2490 if (rec_free_read(tdb, rec_ptr, &rec) == -1) {
2491 goto fail;
2492 }
2493
2494 /* move to the next record */
2495 last_ptr = rec_ptr;
2496 rec_ptr = rec.next;
2497 *pnum_entries += 1;
2498 }
2499
2500 ret = 0;
2501
2502 fail:
2503
2504 tdb_close(mem_tdb);
2505 tdb_unlock(tdb, -1, F_WRLCK);
2506 return ret;
2507}
2508
2509/* file: traverse.c */
2510
2511/* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
2512static int tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
2513 struct list_struct *rec)
2514{
2515 int want_next = (tlock->off != 0);
2516
2517 /* Lock each chain from the start one. */
2518 for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
2519 if (!tlock->off && tlock->hash != 0) {
2520 /* this is an optimisation for the common case where
2521 the hash chain is empty, which is particularly
2522 common for the use of tdb with ldb, where large
2523 hashes are used. In that case we spend most of our
2524 time in tdb_brlock(), locking empty hash chains.
2525
2526 To avoid this, we do an unlocked pre-check to see
2527 if the hash chain is empty before starting to look
2528 inside it. If it is empty then we can avoid that
2529 hash chain. If it isn't empty then we can't believe
2530 the value we get back, as we read it without a
2531 lock, so instead we get the lock and re-fetch the
2532 value below.
2533
2534 Notice that not doing this optimisation on the
2535 first hash chain is critical. We must guarantee
2536 that we have done at least one fcntl lock at the
2537 start of a search to guarantee that memory is
2538 coherent on SMP systems. If records are added by
2539 others during the search then thats OK, and we
2540 could possibly miss those with this trick, but we
2541 could miss them anyway without this trick, so the
2542 semantics don't change.
2543
2544 With a non-indexed ldb search this trick gains us a
2545 factor of around 80 in speed on a linux 2.6.x
2546 system (testing using ldbtest).
2547 */
2548 tdb->methods->next_hash_chain(tdb, &tlock->hash);
2549 if (tlock->hash == tdb->header.hash_size) {
2550 continue;
2551 }
2552 }
2553
2554 if (tdb_lock(tdb, tlock->hash, tlock->lock_rw) == -1)
2555 return -1;
2556
2557 /* No previous record? Start at top of chain. */
2558 if (!tlock->off) {
2559 if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
2560 &tlock->off) == -1)
2561 goto fail;
2562 } else {
2563 /* Otherwise unlock the previous record. */
2564 if (tdb_unlock_record(tdb, tlock->off) != 0)
2565 goto fail;
2566 }
2567
2568 if (want_next) {
2569 /* We have offset of old record: grab next */
2570 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2571 goto fail;
2572 tlock->off = rec->next;
2573 }
2574
2575 /* Iterate through chain */
2576 while( tlock->off) {
2577 tdb_off_t current;
2578 if (tdb_rec_read(tdb, tlock->off, rec) == -1)
2579 goto fail;
2580
2581 /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
2582 if (tlock->off == rec->next) {
2583 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
2584 goto fail;
2585 }
2586
2587 if (!TDB_DEAD(rec)) {
2588 /* Woohoo: we found one! */
2589 if (tdb_lock_record(tdb, tlock->off) != 0)
2590 goto fail;
2591 return tlock->off;
2592 }
2593
2594 /* Try to clean dead ones from old traverses */
2595 current = tlock->off;
2596 tlock->off = rec->next;
2597 if (!(tdb->read_only || tdb->traverse_read) &&
2598 tdb_do_delete(tdb, current, rec) != 0)
2599 goto fail;
2600 }
2601 tdb_unlock(tdb, tlock->hash, tlock->lock_rw);
2602 want_next = 0;
2603 }
2604 /* We finished iteration without finding anything */
2605 return TDB_ERRCODE(TDB_SUCCESS, 0);
2606
2607 fail:
2608 tlock->off = 0;
2609 if (tdb_unlock(tdb, tlock->hash, tlock->lock_rw) != 0)
2610 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
2611 return -1;
2612}
2613
2614/* traverse the entire database - calling fn(tdb, key, data) on each element.
2615 return -1 on error or the record count traversed
2616 if fn is NULL then it is not called
2617 a non-zero return value from fn() indicates that the traversal should stop
2618 */
2619static int tdb_traverse_internal(struct tdb_context *tdb,
2620 tdb_traverse_func fn, void *private_data,
2621 struct tdb_traverse_lock *tl)
2622{
2623 TDB_DATA key, dbuf;
2624 struct list_struct rec;
2625 int ret, count = 0;
2626
2627 /* This was in the initializaton, above, but the IRIX compiler
2628 * did not like it. crh
2629 */
2630 tl->next = tdb->travlocks.next;
2631
2632 /* fcntl locks don't stack: beware traverse inside traverse */
2633 tdb->travlocks.next = tl;
2634
2635 /* tdb_next_lock places locks on the record returned, and its chain */
2636 while ((ret = tdb_next_lock(tdb, tl, &rec)) > 0) {
2637 count++;
2638 /* now read the full record */
2639 key.dptr = tdb_alloc_read(tdb, tl->off + sizeof(rec),
2640 rec.key_len + rec.data_len);
2641 if (!key.dptr) {
2642 ret = -1;
2643 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0)
2644 goto out;
2645 if (tdb_unlock_record(tdb, tl->off) != 0)
2646 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
2647 goto out;
2648 }
2649 key.dsize = rec.key_len;
2650 dbuf.dptr = key.dptr + rec.key_len;
2651 dbuf.dsize = rec.data_len;
2652
2653 /* Drop chain lock, call out */
2654 if (tdb_unlock(tdb, tl->hash, tl->lock_rw) != 0) {
2655 ret = -1;
2656 SAFE_FREE(key.dptr);
2657 goto out;
2658 }
2659 if (fn && fn(tdb, key, dbuf, private_data)) {
2660 /* They want us to terminate traversal */
2661 ret = count;
2662 if (tdb_unlock_record(tdb, tl->off) != 0) {
2663 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
2664 ret = -1;
2665 }
2666 SAFE_FREE(key.dptr);
2667 goto out;
2668 }
2669 SAFE_FREE(key.dptr);
2670 }
2671out:
2672 tdb->travlocks.next = tl->next;
2673 if (ret < 0)
2674 return -1;
2675 else
2676 return count;
2677}
2678
2679
2680/*
2681 a write style traverse - temporarily marks the db read only
2682*/
2683int tdb_traverse_read(struct tdb_context *tdb,
2684 tdb_traverse_func fn, void *private_data)
2685{
2686 struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
2687 int ret;
2688
2689 /* we need to get a read lock on the transaction lock here to
2690 cope with the lock ordering semantics of solaris10 */
2691 if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1) {
2692 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse_read: failed to get transaction lock\n"));
2693 tdb->ecode = TDB_ERR_LOCK;
2694 return -1;
2695 }
2696
2697 tdb->traverse_read++;
2698 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2699 tdb->traverse_read--;
2700
2701 tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2702
2703 return ret;
2704}
2705
2706/*
2707 a write style traverse - needs to get the transaction lock to
2708 prevent deadlocks
2709*/
2710int tdb_traverse(struct tdb_context *tdb,
2711 tdb_traverse_func fn, void *private_data)
2712{
2713 struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
2714 int ret;
2715
2716 if (tdb->read_only || tdb->traverse_read) {
2717 return tdb_traverse_read(tdb, fn, private_data);
2718 }
2719
2720 if (tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
2721 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_traverse: failed to get transaction lock\n"));
2722 tdb->ecode = TDB_ERR_LOCK;
2723 return -1;
2724 }
2725
2726 ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
2727
2728 tdb->methods->tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
2729
2730 return ret;
2731}
2732
2733
2734/* find the first entry in the database and return its key */
2735TDB_DATA tdb_firstkey(struct tdb_context *tdb)
2736{
2737 TDB_DATA key;
2738 struct list_struct rec;
2739
2740 /* release any old lock */
2741 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
2742 return tdb_null;
2743 tdb->travlocks.off = tdb->travlocks.hash = 0;
2744 tdb->travlocks.lock_rw = F_RDLCK;
2745
2746 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
2747 return tdb_null;
2748 /* now read the key */
2749 key.dsize = rec.key_len;
2750 key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
2751 if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
2752 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
2753 return key;
2754}
2755
2756/* find the next entry in the database, returning its key */
2757TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
2758{
2759 u32 oldhash;
2760 TDB_DATA key = tdb_null;
2761 struct list_struct rec;
2762 unsigned char *k = NULL;
2763
2764 /* Is locked key the old key? If so, traverse will be reliable. */
2765 if (tdb->travlocks.off) {
2766 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
2767 return tdb_null;
2768 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
2769 || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
2770 rec.key_len))
2771 || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
2772 /* No, it wasn't: unlock it and start from scratch */
2773 if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
2774 SAFE_FREE(k);
2775 return tdb_null;
2776 }
2777 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0) {
2778 SAFE_FREE(k);
2779 return tdb_null;
2780 }
2781 tdb->travlocks.off = 0;
2782 }
2783
2784 SAFE_FREE(k);
2785 }
2786
2787 if (!tdb->travlocks.off) {
2788 /* No previous element: do normal find, and lock record */
2789 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
2790 if (!tdb->travlocks.off)
2791 return tdb_null;
2792 tdb->travlocks.hash = BUCKET(rec.full_hash);
2793 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
2794 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
2795 return tdb_null;
2796 }
2797 }
2798 oldhash = tdb->travlocks.hash;
2799
2800 /* Grab next record: locks chain and returned record,
2801 unlocks old record */
2802 if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
2803 key.dsize = rec.key_len;
2804 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
2805 key.dsize);
2806 /* Unlock the chain of this new record */
2807 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
2808 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2809 }
2810 /* Unlock the chain of old record */
2811 if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
2812 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
2813 return key;
2814}
2815
2816/* file: dump.c */
2817
2818static tdb_off_t tdb_dump_record(struct tdb_context *tdb, tdb_off_t offset)
2819{
2820 struct list_struct rec;
2821 tdb_off_t tailer_ofs, tailer;
2822
2823 if (tdb->methods->tdb_read(tdb, offset, (char *)&rec,
2824 sizeof(rec), DOCONV()) == -1) {
2825 printf("ERROR: failed to read record at %u\n", offset);
2826 return 0;
2827 }
2828
2829 printf(" rec: offset=0x%08x next=0x%08x rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
2830 offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
2831
2832 tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off_t);
2833
2834 if (tdb_ofs_read(tdb, tailer_ofs, &tailer) == -1) {
2835 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
2836 return rec.next;
2837 }
2838
2839 if (tailer != rec.rec_len + sizeof(rec)) {
2840 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
2841 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
2842 }
2843 return rec.next;
2844}
2845
2846static int tdb_dump_chain(struct tdb_context *tdb, int i)
2847{
2848 tdb_off_t rec_ptr, top;
2849
2850 top = TDB_HASH_TOP(i);
2851
2852 if (tdb_lock(tdb, i, F_WRLCK) != 0)
2853 return -1;
2854
2855 if (tdb_ofs_read(tdb, top, &rec_ptr) == -1)
2856 return tdb_unlock(tdb, i, F_WRLCK);
2857
2858 if (rec_ptr)
2859 printf("hash=%d\n", i);
2860
2861 while (rec_ptr) {
2862 rec_ptr = tdb_dump_record(tdb, rec_ptr);
2863 }
2864
2865 return tdb_unlock(tdb, i, F_WRLCK);
2866}
2867
2868void tdb_dump_all(struct tdb_context *tdb)
2869{
2870 int i;
2871 for (i=0;i<tdb->header.hash_size;i++) {
2872 tdb_dump_chain(tdb, i);
2873 }
2874 printf("freelist:\n");
2875 tdb_dump_chain(tdb, -1);
2876}
2877
2878int tdb_printfreelist(struct tdb_context *tdb)
2879{
2880 int ret;
2881 long total_free = 0;
2882 tdb_off_t offset, rec_ptr;
2883 struct list_struct rec;
2884
2885 if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
2886 return ret;
2887
2888 offset = FREELIST_TOP;
2889
2890 /* read in the freelist top */
2891 if (tdb_ofs_read(tdb, offset, &rec_ptr) == -1) {
2892 tdb_unlock(tdb, -1, F_WRLCK);
2893 return 0;
2894 }
2895
2896 printf("freelist top=[0x%08x]\n", rec_ptr );
2897 while (rec_ptr) {
2898 if (tdb->methods->tdb_read(tdb, rec_ptr, (char *)&rec,
2899 sizeof(rec), DOCONV()) == -1) {
2900 tdb_unlock(tdb, -1, F_WRLCK);
2901 return -1;
2902 }
2903
2904 if (rec.magic != TDB_FREE_MAGIC) {
2905 printf("bad magic 0x%08x in free list\n", rec.magic);
2906 tdb_unlock(tdb, -1, F_WRLCK);
2907 return -1;
2908 }
2909
2910 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n",
2911 rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
2912 total_free += rec.rec_len;
2913
2914 /* move to the next record */
2915 rec_ptr = rec.next;
2916 }
2917 printf("total rec_len = [0x%08x (%d)]\n", (int)total_free,
2918 (int)total_free);
2919
2920 return tdb_unlock(tdb, -1, F_WRLCK);
2921}
2922
2923/* file: tdb.c */
2924
2925TDB_DATA tdb_null;
2926
2927/*
2928 increment the tdb sequence number if the tdb has been opened using
2929 the TDB_SEQNUM flag
2930*/
2931static void tdb_increment_seqnum(struct tdb_context *tdb)
2932{
2933 tdb_off_t seqnum=0;
2934
2935 if (!(tdb->flags & TDB_SEQNUM)) {
2936 return;
2937 }
2938
2939 if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
2940 return;
2941 }
2942
2943 /* we ignore errors from this, as we have no sane way of
2944 dealing with them.
2945 */
2946 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
2947 seqnum++;
2948 tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
2949
2950 tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
2951}
2952
2953static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
2954{
2955 return memcmp(data.dptr, key.dptr, data.dsize);
2956}
2957
2958/* Returns 0 on fail. On success, return offset of record, and fills
2959 in rec */
2960static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
2961 struct list_struct *r)
2962{
2963 tdb_off_t rec_ptr;
2964
2965 /* read in the hash top */
2966 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
2967 return 0;
2968
2969 /* keep looking until we find the right record */
2970 while (rec_ptr) {
2971 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
2972 return 0;
2973
2974 if (!TDB_DEAD(r) && hash==r->full_hash
2975 && key.dsize==r->key_len
2976 && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
2977 r->key_len, tdb_key_compare,
2978 NULL) == 0) {
2979 return rec_ptr;
2980 }
2981 rec_ptr = r->next;
2982 }
2983 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
2984}
2985
2986/* As tdb_find, but if you succeed, keep the lock */
2987tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
2988 struct list_struct *rec)
2989{
2990 u32 rec_ptr;
2991
2992 if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
2993 return 0;
2994 if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
2995 tdb_unlock(tdb, BUCKET(hash), locktype);
2996 return rec_ptr;
2997}
2998
2999
3000/* update an entry in place - this only works if the new data size
3001 is <= the old data size and the key exists.
3002 on failure return -1.
3003*/
3004static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
3005{
3006 struct list_struct rec;
3007 tdb_off_t rec_ptr;
3008
3009 /* find entry */
3010 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
3011 return -1;
3012
3013 /* must be long enough key, data and tailer */
3014 if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
3015 tdb->ecode = TDB_SUCCESS; /* Not really an error */
3016 return -1;
3017 }
3018
3019 if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3020 dbuf.dptr, dbuf.dsize) == -1)
3021 return -1;
3022
3023 if (dbuf.dsize != rec.data_len) {
3024 /* update size */
3025 rec.data_len = dbuf.dsize;
3026 return tdb_rec_write(tdb, rec_ptr, &rec);
3027 }
3028
3029 return 0;
3030}
3031
3032/* find an entry in the database given a key */
3033/* If an entry doesn't exist tdb_err will be set to
3034 * TDB_ERR_NOEXIST. If a key has no data attached
3035 * then the TDB_DATA will have zero length but
3036 * a non-zero pointer
3037 */
3038TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
3039{
3040 tdb_off_t rec_ptr;
3041 struct list_struct rec;
3042 TDB_DATA ret;
3043 u32 hash;
3044
3045 /* find which hash bucket it is in */
3046 hash = tdb->hash_fn(&key);
3047 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
3048 return tdb_null;
3049
3050 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
3051 rec.data_len);
3052 ret.dsize = rec.data_len;
3053 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3054 return ret;
3055}
3056
3057/*
3058 * Find an entry in the database and hand the record's data to a parsing
3059 * function. The parsing function is executed under the chain read lock, so it
3060 * should be fast and should not block on other syscalls.
3061 *
3062 * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
3063 *
3064 * For mmapped tdb's that do not have a transaction open it points the parsing
3065 * function directly at the mmap area, it avoids the malloc/memcpy in this
3066 * case. If a transaction is open or no mmap is available, it has to do
3067 * malloc/read/parse/free.
3068 *
3069 * This is interesting for all readers of potentially large data structures in
3070 * the tdb records, ldb indexes being one example.
3071 */
3072
3073int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
3074 int (*parser)(TDB_DATA key, TDB_DATA data,
3075 void *private_data),
3076 void *private_data)
3077{
3078 tdb_off_t rec_ptr;
3079 struct list_struct rec;
3080 int ret;
3081 u32 hash;
3082
3083 /* find which hash bucket it is in */
3084 hash = tdb->hash_fn(&key);
3085
3086 if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
3087 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
3088 }
3089
3090 ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
3091 rec.data_len, parser, private_data);
3092
3093 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3094
3095 return ret;
3096}
3097
3098/* check if an entry in the database exists
3099
3100 note that 1 is returned if the key is found and 0 is returned if not found
3101 this doesn't match the conventions in the rest of this module, but is
3102 compatible with gdbm
3103*/
3104static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3105{
3106 struct list_struct rec;
3107
3108 if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
3109 return 0;
3110 tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
3111 return 1;
3112}
3113
3114int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
3115{
3116 u32 hash = tdb->hash_fn(&key);
3117 return tdb_exists_hash(tdb, key, hash);
3118}
3119
3120/* actually delete an entry in the database given the offset */
3121int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
3122{
3123 tdb_off_t last_ptr, i;
3124 struct list_struct lastrec;
3125
3126 if (tdb->read_only || tdb->traverse_read) return -1;
3127
3128 if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
3129 /* Someone traversing here: mark it as dead */
3130 rec->magic = TDB_DEAD_MAGIC;
3131 return tdb_rec_write(tdb, rec_ptr, rec);
3132 }
3133 if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
3134 return -1;
3135
3136 /* find previous record in hash chain */
3137 if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
3138 return -1;
3139 for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
3140 if (tdb_rec_read(tdb, i, &lastrec) == -1)
3141 return -1;
3142
3143 /* unlink it: next ptr is at start of record. */
3144 if (last_ptr == 0)
3145 last_ptr = TDB_HASH_TOP(rec->full_hash);
3146 if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
3147 return -1;
3148
3149 /* recover the space */
3150 if (tdb_free(tdb, rec_ptr, rec) == -1)
3151 return -1;
3152 return 0;
3153}
3154
3155static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
3156{
3157 int res = 0;
3158 tdb_off_t rec_ptr;
3159 struct list_struct rec;
3160
3161 /* read in the hash top */
3162 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3163 return 0;
3164
3165 while (rec_ptr) {
3166 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
3167 return 0;
3168
3169 if (rec.magic == TDB_DEAD_MAGIC) {
3170 res += 1;
3171 }
3172 rec_ptr = rec.next;
3173 }
3174 return res;
3175}
3176
3177/*
3178 * Purge all DEAD records from a hash chain
3179 */
3180static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
3181{
3182 int res = -1;
3183 struct list_struct rec;
3184 tdb_off_t rec_ptr;
3185
3186 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3187 return -1;
3188 }
3189
3190 /* read in the hash top */
3191 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3192 goto fail;
3193
3194 while (rec_ptr) {
3195 tdb_off_t next;
3196
3197 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
3198 goto fail;
3199 }
3200
3201 next = rec.next;
3202
3203 if (rec.magic == TDB_DEAD_MAGIC
3204 && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
3205 goto fail;
3206 }
3207 rec_ptr = next;
3208 }
3209 res = 0;
3210 fail:
3211 tdb_unlock(tdb, -1, F_WRLCK);
3212 return res;
3213}
3214
3215/* delete an entry in the database given a key */
3216static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
3217{
3218 tdb_off_t rec_ptr;
3219 struct list_struct rec;
3220 int ret;
3221
3222 if (tdb->max_dead_records != 0) {
3223
3224 /*
3225 * Allow for some dead records per hash chain, mainly for
3226 * tdb's with a very high create/delete rate like locking.tdb.
3227 */
3228
3229 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3230 return -1;
3231
3232 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
3233 /*
3234 * Don't let the per-chain freelist grow too large,
3235 * delete all existing dead records
3236 */
3237 tdb_purge_dead(tdb, hash);
3238 }
3239
3240 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
3241 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3242 return -1;
3243 }
3244
3245 /*
3246 * Just mark the record as dead.
3247 */
3248 rec.magic = TDB_DEAD_MAGIC;
3249 ret = tdb_rec_write(tdb, rec_ptr, &rec);
3250 }
3251 else {
3252 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
3253 &rec)))
3254 return -1;
3255
3256 ret = tdb_do_delete(tdb, rec_ptr, &rec);
3257 }
3258
3259 if (ret == 0) {
3260 tdb_increment_seqnum(tdb);
3261 }
3262
3263 if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
3264 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
3265 return ret;
3266}
3267
3268int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
3269{
3270 u32 hash = tdb->hash_fn(&key);
3271 return tdb_delete_hash(tdb, key, hash);
3272}
3273
3274/*
3275 * See if we have a dead record around with enough space
3276 */
3277static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
3278 struct list_struct *r, tdb_len_t length)
3279{
3280 tdb_off_t rec_ptr;
3281
3282 /* read in the hash top */
3283 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
3284 return 0;
3285
3286 /* keep looking until we find the right record */
3287 while (rec_ptr) {
3288 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
3289 return 0;
3290
3291 if (TDB_DEAD(r) && r->rec_len >= length) {
3292 /*
3293 * First fit for simple coding, TODO: change to best
3294 * fit
3295 */
3296 return rec_ptr;
3297 }
3298 rec_ptr = r->next;
3299 }
3300 return 0;
3301}
3302
3303/* store an element in the database, replacing any existing element
3304 with the same key
3305
3306 return 0 on success, -1 on failure
3307*/
3308int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
3309{
3310 struct list_struct rec;
3311 u32 hash;
3312 tdb_off_t rec_ptr;
3313 char *p = NULL;
3314 int ret = -1;
3315
3316 if (tdb->read_only || tdb->traverse_read) {
3317 tdb->ecode = TDB_ERR_RDONLY;
3318 return -1;
3319 }
3320
3321 /* find which hash bucket it is in */
3322 hash = tdb->hash_fn(&key);
3323 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3324 return -1;
3325
3326 /* check for it existing, on insert. */
3327 if (flag == TDB_INSERT) {
3328 if (tdb_exists_hash(tdb, key, hash)) {
3329 tdb->ecode = TDB_ERR_EXISTS;
3330 goto fail;
3331 }
3332 } else {
3333 /* first try in-place update, on modify or replace. */
3334 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
3335 goto done;
3336 }
3337 if (tdb->ecode == TDB_ERR_NOEXIST &&
3338 flag == TDB_MODIFY) {
3339 /* if the record doesn't exist and we are in TDB_MODIFY mode then
3340 we should fail the store */
3341 goto fail;
3342 }
3343 }
3344 /* reset the error code potentially set by the tdb_update() */
3345 tdb->ecode = TDB_SUCCESS;
3346
3347 /* delete any existing record - if it doesn't exist we don't
3348 care. Doing this first reduces fragmentation, and avoids
3349 coalescing with `allocated' block before it's updated. */
3350 if (flag != TDB_INSERT)
3351 tdb_delete_hash(tdb, key, hash);
3352
3353 /* Copy key+value *before* allocating free space in case malloc
3354 fails and we are left with a dead spot in the tdb. */
3355
3356 if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
3357 tdb->ecode = TDB_ERR_OOM;
3358 goto fail;
3359 }
3360
3361 memcpy(p, key.dptr, key.dsize);
3362 if (dbuf.dsize)
3363 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
3364
3365 if (tdb->max_dead_records != 0) {
3366 /*
3367 * Allow for some dead records per hash chain, look if we can
3368 * find one that can hold the new record. We need enough space
3369 * for key, data and tailer. If we find one, we don't have to
3370 * consult the central freelist.
3371 */
3372 rec_ptr = tdb_find_dead(
3373 tdb, hash, &rec,
3374 key.dsize + dbuf.dsize + sizeof(tdb_off_t));
3375
3376 if (rec_ptr != 0) {
3377 rec.key_len = key.dsize;
3378 rec.data_len = dbuf.dsize;
3379 rec.full_hash = hash;
3380 rec.magic = TDB_MAGIC;
3381 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3382 || tdb->methods->tdb_write(
3383 tdb, rec_ptr + sizeof(rec),
3384 p, key.dsize + dbuf.dsize) == -1) {
3385 goto fail;
3386 }
3387 goto done;
3388 }
3389 }
3390
3391 /*
3392 * We have to allocate some space from the freelist, so this means we
3393 * have to lock it. Use the chance to purge all the DEAD records from
3394 * the hash chain under the freelist lock.
3395 */
3396
3397 if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
3398 goto fail;
3399 }
3400
3401 if ((tdb->max_dead_records != 0)
3402 && (tdb_purge_dead(tdb, hash) == -1)) {
3403 tdb_unlock(tdb, -1, F_WRLCK);
3404 goto fail;
3405 }
3406
3407 /* we have to allocate some space */
3408 rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
3409
3410 tdb_unlock(tdb, -1, F_WRLCK);
3411
3412 if (rec_ptr == 0) {
3413 goto fail;
3414 }
3415
3416 /* Read hash top into next ptr */
3417 if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
3418 goto fail;
3419
3420 rec.key_len = key.dsize;
3421 rec.data_len = dbuf.dsize;
3422 rec.full_hash = hash;
3423 rec.magic = TDB_MAGIC;
3424
3425 /* write out and point the top of the hash chain at it */
3426 if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
3427 || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
3428 || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
3429 /* Need to tdb_unallocate() here */
3430 goto fail;
3431 }
3432
3433 done:
3434 ret = 0;
3435 fail:
3436 if (ret == 0) {
3437 tdb_increment_seqnum(tdb);
3438 }
3439
3440 SAFE_FREE(p);
3441 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3442 return ret;
3443}
3444
3445
3446/* Append to an entry. Create if not exist. */
3447int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
3448{
3449 u32 hash;
3450 TDB_DATA dbuf;
3451 int ret = -1;
3452
3453 /* find which hash bucket it is in */
3454 hash = tdb->hash_fn(&key);
3455 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
3456 return -1;
3457
3458 dbuf = tdb_fetch(tdb, key);
3459
3460 if (dbuf.dptr == NULL) {
3461 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
3462 } else {
3463 dbuf.dptr = (unsigned char *)realloc(dbuf.dptr,
3464 dbuf.dsize + new_dbuf.dsize);
3465 }
3466
3467 if (dbuf.dptr == NULL) {
3468 tdb->ecode = TDB_ERR_OOM;
3469 goto failed;
3470 }
3471
3472 memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
3473 dbuf.dsize += new_dbuf.dsize;
3474
3475 ret = tdb_store(tdb, key, dbuf, 0);
3476
3477failed:
3478 tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
3479 SAFE_FREE(dbuf.dptr);
3480 return ret;
3481}
3482
3483
3484/*
3485 return the name of the current tdb file
3486 useful for external logging functions
3487*/
3488const char *tdb_name(struct tdb_context *tdb)
3489{
3490 return tdb->name;
3491}
3492
3493/*
3494 return the underlying file descriptor being used by tdb, or -1
3495 useful for external routines that want to check the device/inode
3496 of the fd
3497*/
3498int tdb_fd(struct tdb_context *tdb)
3499{
3500 return tdb->fd;
3501}
3502
3503/*
3504 return the current logging function
3505 useful for external tdb routines that wish to log tdb errors
3506*/
3507tdb_log_func tdb_log_fn(struct tdb_context *tdb)
3508{
3509 return tdb->log.log_fn;
3510}
3511
3512
3513/*
3514 get the tdb sequence number. Only makes sense if the writers opened
3515 with TDB_SEQNUM set. Note that this sequence number will wrap quite
3516 quickly, so it should only be used for a 'has something changed'
3517 test, not for code that relies on the count of the number of changes
3518 made. If you want a counter then use a tdb record.
3519
3520 The aim of this sequence number is to allow for a very lightweight
3521 test of a possible tdb change.
3522*/
3523int tdb_get_seqnum(struct tdb_context *tdb)
3524{
3525 tdb_off_t seqnum=0;
3526
3527 tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
3528 return seqnum;
3529}
3530
3531int tdb_hash_size(struct tdb_context *tdb)
3532{
3533 return tdb->header.hash_size;
3534}
3535
3536size_t tdb_map_size(struct tdb_context *tdb)
3537{
3538 return tdb->map_size;
3539}
3540
3541int tdb_get_flags(struct tdb_context *tdb)
3542{
3543 return tdb->flags;
3544}
3545
3546/* file: open.c */
3547
3548/* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
3549static struct tdb_context *tdbs = NULL;
3550
3551
3552/* This is based on the hash algorithm from gdbm */
3553static unsigned int default_tdb_hash(TDB_DATA *key)
3554{
3555 u32 value; /* Used to compute the hash value. */
3556 u32 i; /* Used to cycle through random values. */
3557
3558 /* Set the initial value from the key size. */
3559 for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
3560 value = (value + (key->dptr[i] << (i*5 % 24)));
3561
3562 return (1103515243 * value + 12345);
3563}
3564
3565
3566/* initialise a new database with a specified hash size */
3567static int tdb_new_database(struct tdb_context *tdb, int hash_size)
3568{
3569 struct tdb_header *newdb;
3570 int size, ret = -1;
3571
3572 /* We make it up in memory, then write it out if not internal */
3573 size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off_t);
3574 if (!(newdb = (struct tdb_header *)calloc(size, 1)))
3575 return TDB_ERRCODE(TDB_ERR_OOM, -1);
3576
3577 /* Fill in the header */
3578 newdb->version = TDB_VERSION;
3579 newdb->hash_size = hash_size;
3580 if (tdb->flags & TDB_INTERNAL) {
3581 tdb->map_size = size;
3582 tdb->map_ptr = (char *)newdb;
3583 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3584 /* Convert the `ondisk' version if asked. */
3585 CONVERT(*newdb);
3586 return 0;
3587 }
3588 if (lseek(tdb->fd, 0, SEEK_SET) == -1)
3589 goto fail;
3590
3591 if (ftruncate(tdb->fd, 0) == -1)
3592 goto fail;
3593
3594 /* This creates an endian-converted header, as if read from disk */
3595 CONVERT(*newdb);
3596 memcpy(&tdb->header, newdb, sizeof(tdb->header));
3597 /* Don't endian-convert the magic food! */
3598 memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
3599 if (write(tdb->fd, newdb, size) != size) {
3600 ret = -1;
3601 } else {
3602 ret = 0;
3603 }
3604
3605 fail:
3606 SAFE_FREE(newdb);
3607 return ret;
3608}
3609
3610
3611
3612static int tdb_already_open(dev_t device,
3613 ino_t ino)
3614{
3615 struct tdb_context *i;
3616
3617 for (i = tdbs; i; i = i->next) {
3618 if (i->device == device && i->inode == ino) {
3619 return 1;
3620 }
3621 }
3622
3623 return 0;
3624}
3625
3626/* open the database, creating it if necessary
3627
3628 The open_flags and mode are passed straight to the open call on the
3629 database file. A flags value of O_WRONLY is invalid. The hash size
3630 is advisory, use zero for a default value.
3631
3632 Return is NULL on error, in which case errno is also set. Don't
3633 try to call tdb_error or tdb_errname, just do strerror(errno).
3634
3635 @param name may be NULL for internal databases. */
3636struct tdb_context *tdb_open(const char *name, int hash_size, int tdb_flags,
3637 int open_flags, mode_t mode)
3638{
3639 return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
3640}
3641
3642/* a default logging function */
3643static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...) PRINTF_ATTRIBUTE(3, 4);
3644static void null_log_fn(struct tdb_context *tdb, enum tdb_debug_level level, const char *fmt, ...)
3645{
3646}
3647
3648
3649struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
3650 int open_flags, mode_t mode,
3651 const struct tdb_logging_context *log_ctx,
3652 tdb_hash_func hash_fn)
3653{
3654 struct tdb_context *tdb;
3655 struct stat st;
3656 int rev = 0, locked = 0;
3657 unsigned char *vp;
3658 u32 vertest;
3659
3660 if (!(tdb = (struct tdb_context *)calloc(1, sizeof *tdb))) {
3661 /* Can't log this */
3662 errno = ENOMEM;
3663 goto fail;
3664 }
3665 tdb_io_init(tdb);
3666 tdb->fd = -1;
3667 tdb->name = NULL;
3668 tdb->map_ptr = NULL;
3669 tdb->flags = tdb_flags;
3670 tdb->open_flags = open_flags;
3671 if (log_ctx) {
3672 tdb->log = *log_ctx;
3673 } else {
3674 tdb->log.log_fn = null_log_fn;
3675 tdb->log.log_private = NULL;
3676 }
3677 tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
3678
3679 /* cache the page size */
3680 tdb->page_size = getpagesize();
3681 if (tdb->page_size <= 0) {
3682 tdb->page_size = 0x2000;
3683 }
3684
3685 if ((open_flags & O_ACCMODE) == O_WRONLY) {
3686 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: can't open tdb %s write-only\n",
3687 name));
3688 errno = EINVAL;
3689 goto fail;
3690 }
3691
3692 if (hash_size == 0)
3693 hash_size = DEFAULT_HASH_SIZE;
3694 if ((open_flags & O_ACCMODE) == O_RDONLY) {
3695 tdb->read_only = 1;
3696 /* read only databases don't do locking or clear if first */
3697 tdb->flags |= TDB_NOLOCK;
3698 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3699 }
3700
3701 /* internal databases don't mmap or lock, and start off cleared */
3702 if (tdb->flags & TDB_INTERNAL) {
3703 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
3704 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3705 if (tdb_new_database(tdb, hash_size) != 0) {
3706 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: tdb_new_database failed!"));
3707 goto fail;
3708 }
3709 goto internal;
3710 }
3711
3712 if ((tdb->fd = open(name, open_flags, mode)) == -1) {
3713 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_open_ex: could not open file %s: %s\n",
3714 name, strerror(errno)));
3715 goto fail; /* errno set by open(2) */
3716 }
3717
3718 /* ensure there is only one process initialising at once */
3719 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
3720 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to get global lock on %s: %s\n",
3721 name, strerror(errno)));
3722 goto fail; /* errno set by tdb_brlock */
3723 }
3724
3725 /* we need to zero database if we are the only one with it open */
3726 if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
3727 (locked = (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0, 1) == 0))) {
3728 open_flags |= O_CREAT;
3729 if (ftruncate(tdb->fd, 0) == -1) {
3730 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_open_ex: "
3731 "failed to truncate %s: %s\n",
3732 name, strerror(errno)));
3733 goto fail; /* errno set by ftruncate */
3734 }
3735 }
3736
3737 if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
3738 || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
3739 || (tdb->header.version != TDB_VERSION
3740 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
3741 /* its not a valid database - possibly initialise it */
3742 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
3743 errno = EIO; /* ie bad format or something */
3744 goto fail;
3745 }
3746 rev = (tdb->flags & TDB_CONVERT);
3747 }
3748 vp = (unsigned char *)&tdb->header.version;
3749 vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
3750 (((u32)vp[2]) << 8) | (u32)vp[3];
3751 tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
3752 if (!rev)
3753 tdb->flags &= ~TDB_CONVERT;
3754 else {
3755 tdb->flags |= TDB_CONVERT;
3756 tdb_convert(&tdb->header, sizeof(tdb->header));
3757 }
3758 if (fstat(tdb->fd, &st) == -1)
3759 goto fail;
3760
3761 if (tdb->header.rwlocks != 0) {
3762 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: spinlocks no longer supported\n"));
3763 goto fail;
3764 }
3765
3766 /* Is it already in the open list? If so, fail. */
3767 if (tdb_already_open(st.st_dev, st.st_ino)) {
3768 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3769 "%s (%d,%d) is already open in this process\n",
3770 name, (int)st.st_dev, (int)st.st_ino));
3771 errno = EBUSY;
3772 goto fail;
3773 }
3774
3775 if (!(tdb->name = (char *)strdup(name))) {
3776 errno = ENOMEM;
3777 goto fail;
3778 }
3779
3780 tdb->map_size = st.st_size;
3781 tdb->device = st.st_dev;
3782 tdb->inode = st.st_ino;
3783 tdb->max_dead_records = 0;
3784 tdb_mmap(tdb);
3785 if (locked) {
3786 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
3787 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: "
3788 "failed to take ACTIVE_LOCK on %s: %s\n",
3789 name, strerror(errno)));
3790 goto fail;
3791 }
3792
3793 }
3794
3795 /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
3796 we didn't get the initial exclusive lock as we need to let all other
3797 users know we're using it. */
3798
3799 if (tdb_flags & TDB_CLEAR_IF_FIRST) {
3800 /* leave this lock in place to indicate it's in use */
3801 if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)
3802 goto fail;
3803 }
3804
3805 /* if needed, run recovery */
3806 if (tdb_transaction_recover(tdb) == -1) {
3807 goto fail;
3808 }
3809
3810 internal:
3811 /* Internal (memory-only) databases skip all the code above to
3812 * do with disk files, and resume here by releasing their
3813 * global lock and hooking into the active list. */
3814 if (tdb->methods->tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1) == -1)
3815 goto fail;
3816 tdb->next = tdbs;
3817 tdbs = tdb;
3818 return tdb;
3819
3820 fail:
3821 { int save_errno = errno;
3822
3823 if (!tdb)
3824 return NULL;
3825
3826 if (tdb->map_ptr) {
3827 if (tdb->flags & TDB_INTERNAL)
3828 SAFE_FREE(tdb->map_ptr);
3829 else
3830 tdb_munmap(tdb);
3831 }
3832 SAFE_FREE(tdb->name);
3833 if (tdb->fd != -1)
3834 if (close(tdb->fd) != 0)
3835 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_open_ex: failed to close tdb->fd on error!\n"));
3836 SAFE_FREE(tdb);
3837 errno = save_errno;
3838 return NULL;
3839 }
3840}
3841
3842/*
3843 * Set the maximum number of dead records per hash chain
3844 */
3845
3846void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
3847{
3848 tdb->max_dead_records = max_dead;
3849}
3850
3851/**
3852 * Close a database.
3853 *
3854 * @returns -1 for error; 0 for success.
3855 **/
3856int tdb_close(struct tdb_context *tdb)
3857{
3858 struct tdb_context **i;
3859 int ret = 0;
3860
3861 if (tdb->transaction) {
3862 tdb_transaction_cancel(tdb);
3863 }
3864
3865 if (tdb->map_ptr) {
3866 if (tdb->flags & TDB_INTERNAL)
3867 SAFE_FREE(tdb->map_ptr);
3868 else
3869 tdb_munmap(tdb);
3870 }
3871 SAFE_FREE(tdb->name);
3872 if (tdb->fd != -1)
3873 ret = close(tdb->fd);
3874 SAFE_FREE(tdb->lockrecs);
3875
3876 /* Remove from contexts list */
3877 for (i = &tdbs; *i; i = &(*i)->next) {
3878 if (*i == tdb) {
3879 *i = tdb->next;
3880 break;
3881 }
3882 }
3883
3884 memset(tdb, 0, sizeof(*tdb));
3885 SAFE_FREE(tdb);
3886
3887 return ret;
3888}
3889
3890/* register a loging function */
3891void tdb_set_logging_function(struct tdb_context *tdb,
3892 const struct tdb_logging_context *log_ctx)
3893{
3894 tdb->log = *log_ctx;
3895}
3896
3897void *tdb_get_logging_private(struct tdb_context *tdb)
3898{
3899 return tdb->log.log_private;
3900}
3901
3902/* reopen a tdb - this can be used after a fork to ensure that we have an independent
3903 seek pointer from our parent and to re-establish locks */
3904int tdb_reopen(struct tdb_context *tdb)
3905{
3906 struct stat st;
3907
3908 if (tdb->flags & TDB_INTERNAL) {
3909 return 0; /* Nothing to do. */
3910 }
3911
3912 if (tdb->num_locks != 0 || tdb->global_lock.count) {
3913 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed with locks held\n"));
3914 goto fail;
3915 }
3916
3917 if (tdb->transaction != 0) {
3918 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_reopen: reopen not allowed inside a transaction\n"));
3919 goto fail;
3920 }
3921
3922 if (tdb_munmap(tdb) != 0) {
3923 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
3924 goto fail;
3925 }
3926 if (close(tdb->fd) != 0)
3927 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
3928 tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
3929 if (tdb->fd == -1) {
3930 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: open failed (%s)\n", strerror(errno)));
3931 goto fail;
3932 }
3933 if ((tdb->flags & TDB_CLEAR_IF_FIRST) &&
3934 (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0, 1) == -1)) {
3935 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: failed to obtain active lock\n"));
3936 goto fail;
3937 }
3938 if (fstat(tdb->fd, &st) != 0) {
3939 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
3940 goto fail;
3941 }
3942 if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
3943 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_reopen: file dev/inode has changed!\n"));
3944 goto fail;
3945 }
3946 tdb_mmap(tdb);
3947
3948 return 0;
3949
3950fail:
3951 tdb_close(tdb);
3952 return -1;
3953}
3954
3955/* reopen all tdb's */
3956int tdb_reopen_all(int parent_longlived)
3957{
3958 struct tdb_context *tdb;
3959
3960 for (tdb=tdbs; tdb; tdb = tdb->next) {
3961 /*
3962 * If the parent is longlived (ie. a
3963 * parent daemon architecture), we know
3964 * it will keep it's active lock on a
3965 * tdb opened with CLEAR_IF_FIRST. Thus
3966 * for child processes we don't have to
3967 * add an active lock. This is essential
3968 * to improve performance on systems that
3969 * keep POSIX locks as a non-scalable data
3970 * structure in the kernel.
3971 */
3972 if (parent_longlived) {
3973 /* Ensure no clear-if-first. */
3974 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
3975 }
3976
3977 if (tdb_reopen(tdb) != 0)
3978 return -1;
3979 }
3980
3981 return 0;
3982}