/* cyrusdb_skiplist.c -- cyrusdb skiplist implementation
*
* Copyright (c) 1994-2008 Carnegie Mellon University. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The name "Carnegie Mellon University" must not be used to
* endorse or promote products derived from this software without
* prior written permission. For permission or any legal
* details, please contact
* Carnegie Mellon University
* Center for Technology Transfer and Enterprise Creation
* 4615 Forbes Avenue
* Suite 302
* Pittsburgh, PA 15213
* (412) 268-7393, fax: (412) 268-7395
* innovation@andrew.cmu.edu
*
* 4. Redistributions of any form whatsoever must retain the following
* acknowledgment:
* "This product includes software developed by Computing Services
* at Carnegie Mellon University (http://www.cmu.edu/computing/)."
*
* CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
* THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
* FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
* AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
* $Id: cyrusdb_skiplist.c,v 1.71 2010/01/06 17:01:45 murch Exp $
*/
/* xxx check retry_xxx for failure */
/* xxx all offsets should be uint32_ts i think */
#include <config.h>
#include <stdlib.h>
#include <string.h>
#include <limits.h>
#include <errno.h>
#include <syslog.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <netinet/in.h>
#include "assert.h"
#include "bsearch.h"
#include "cyrusdb.h"
#include "libcyr_cfg.h"
#include "cyr_lock.h"
#include "map.h"
#include "retry.h"
#include "util.h"
#include "xmalloc.h"
#include "xstrlcpy.h"
#include "xstrlcat.h"
#define PROB (0.5)
/*
*
* disk format; all numbers in network byte order
*
* there's the data file, consisting of the
* multiple records of "key", "data", and "skip pointers", where skip
* pointers are the record number of the data pointer.
*
* on startup, recovery is performed. the last known good data file
* is taken and the intent log is replayed on it. the index file is
* regenerated from scratch.
*
* during operation ckecpoints will compress the data. the data file
* is locked. then a checkpoint rewrites the data file in order,
* removing any unused records. this is written and fsync'd to
* dfile.NEW and stored for use during recovery.
*/
/*
header "skiplist file\0\0\0"
version (4 bytes)
version_minor (4 bytes)
maxlevel (4 bytes)
curlevel (4 bytes)
listsize (4 bytes)
in active items
log start (4 bytes)
offset where log records start, used mainly to tell when to compress
last recovery (4 bytes)
seconds since unix epoch
1 or more skipnodes, one of:
record type (4 bytes) [DUMMY, INORDER, ADD]
key size (4 bytes)
key string (bit string, rounded to up to 4 byte multiples w/ 0s)
data size (4 bytes)
data string (bit string, rounded to up to 4 byte multiples w/ 0s)
skip pointers (4 bytes each)
least to most
padding (4 bytes, must be -1)
record type (4 bytes) [DELETE]
record ptr (4 bytes; record to be deleted)
record type (4 bytes) [COMMIT]
record type is either
DUMMY (first node is of this type)
INORDER
ADD
DELETE
COMMIT (commit the previous records)
*/
enum {
INORDER = 1,
ADD = 2,
DELETE = 4,
COMMIT = 255,
DUMMY = 257
};
enum {
UNLOCKED = 0,
READLOCKED = 1,
WRITELOCKED = 2,
};
struct txn {
int syncfd;
/* logstart is where we start changes from on commit, where we truncate
to on abort */
unsigned logstart;
unsigned logend; /* where to write to continue this txn */
};
struct db {
/* file data */
char *fname;
int fd;
const char *map_base;
unsigned long map_len; /* mapped size */
unsigned long map_size; /* actual size */
ino_t map_ino;
/* header info */
uint32_t version;
uint32_t version_minor;
uint32_t maxlevel;
uint32_t curlevel;
uint32_t listsize;
uint32_t logstart; /* where the log starts from last chkpnt */
time_t last_recovery;
/* tracking info */
int lock_status;
int is_open;
struct txn *current_txn;
/* comparator function to use for sorting */
int (*compar) (const char *s1, int l1, const char *s2, int l2);
};
struct db_list {
struct db *db;
struct db_list *next;
int refcount;
};
static time_t global_recovery = 0;
static struct db_list *open_db = NULL;
/* Perform an FSYNC/FDATASYNC if we are *not* operating in UNSAFE mode */
#define DO_FSYNC (!libcyrus_config_getswitch(CYRUSOPT_SKIPLIST_UNSAFE))
enum {
be_paranoid = 0,
use_osync = 0
};
static int compare(const char *s1, int l1, const char *s2, int l2);
static void getsyncfd(struct db *db, struct txn *t)
{
if (!use_osync) {
t->syncfd = db->fd;
} else if (t->syncfd == -1) {
t->syncfd = open(db->fname, O_RDWR | O_DSYNC, 0666);
assert(t->syncfd != -1); /* xxx do better error recovery */
}
}
static void closesyncfd(struct db *db __attribute__((unused)),
struct txn *t)
{
/* if we're using fsync, then we don't want to close the file */
if (use_osync && (t->syncfd != -1)) {
close(t->syncfd);
}
t->syncfd = -1;
}
static int myinit(const char *dbdir, int myflags)
{
char sfile[1024];
int fd, r = 0;
uint32_t net32_time;
snprintf(sfile, sizeof(sfile), "%s/skipstamp", dbdir);
if (myflags & CYRUSDB_RECOVER) {
/* set the recovery timestamp; all databases earlier than this
time need recovery run when opened */
global_recovery = time(NULL);
fd = open(sfile, O_RDWR | O_CREAT, 0644);
if (fd == -1) r = -1;
if (r != -1) r = ftruncate(fd, 0);
net32_time = htonl(global_recovery);
if (r != -1) r = write(fd, &net32_time, 4);
if (r != -1) r = close(fd);
if (r == -1) {
syslog(LOG_ERR, "DBERROR: writing %s: %m", sfile);
if (fd != -1) close(fd);
return CYRUSDB_IOERROR;
}
} else {
/* read the global recovery timestamp */
fd = open(sfile, O_RDONLY, 0644);
if (fd == -1) r = -1;
if (r != -1) r = read(fd, &net32_time, 4);
if (r != -1) r = close(fd);
if (r == -1) {
syslog(LOG_ERR, "DBERROR: reading %s, assuming the worst: %m",
sfile);
global_recovery = 0;
} else {
global_recovery = ntohl(net32_time);
}
}
srand(time(NULL) * getpid());
open_db = NULL;
return 0;
}
static int mydone(void)
{
return 0;
}
static int mysync(void)
{
return 0;
}
static int myarchive(const char **fnames, const char *dirname)
{
int r;
const char **fname;
char dstname[1024], *dp;
int length, rest;
strlcpy(dstname, dirname, sizeof(dstname));
length = strlen(dstname);
dp = dstname + length;
rest = sizeof(dstname) - length;
/* archive those files specified by the app */
for (fname = fnames; *fname != NULL; ++fname) {
syslog(LOG_DEBUG, "archiving database file: %s", *fname);
strlcpy(dp, strrchr(*fname, '/'), rest);
r = cyrusdb_copyfile(*fname, dstname);
if (r) {
syslog(LOG_ERR,
"DBERROR: error archiving database file: %s", *fname);
return CYRUSDB_IOERROR;
}
}
return 0;
}
enum {
SKIPLIST_VERSION = 1,
SKIPLIST_VERSION_MINOR = 2,
SKIPLIST_MAXLEVEL = 20,
SKIPLIST_MINREWRITE = 16834 /* don't rewrite logs smaller than this */
};
#define HEADER_MAGIC ("\241\002\213\015skiplist file\0\0\0")
#define HEADER_MAGIC_SIZE (20)
/* offsets of header files */
enum {
OFFSET_HEADER = 0,
OFFSET_VERSION = 20,
OFFSET_VERSION_MINOR = 24,
OFFSET_MAXLEVEL = 28,
OFFSET_CURLEVEL = 32,
OFFSET_LISTSIZE = 36,
OFFSET_LOGSTART = 40,
OFFSET_LASTRECOVERY = 44
};
enum {
HEADER_SIZE = OFFSET_LASTRECOVERY + 4
};
static int mycommit(struct db *db, struct txn *tid);
static int myabort(struct db *db, struct txn *tid);
static int mycheckpoint(struct db *db, int locked);
static int myconsistent(struct db *db, struct txn *tid, int locked);
static int recovery(struct db *db, int flags);
enum {
/* Force recovery regardless of timestamp on database */
RECOVERY_FORCE = 1,
/* Caller already has a write lock on the database. In the case
* of successful recovery, the database will still be locked on return.
*
* If the recovery fails, then the database will be unlocked an an
* error will be returned */
RECOVERY_CALLER_LOCKED = 2
};
/* file looks like:
struct header {
...
}
struct dummy {
uint32_t t = htonl(DUMMY);
uint32_t ks = 0;
uint32_t ds = 0;
uint32_t forward[db->maxlevel];
uint32_t pad = -1;
} */
#define DUMMY_OFFSET(db) (HEADER_SIZE)
#define DUMMY_PTR(db) ((db)->map_base + HEADER_SIZE)
#define DUMMY_SIZE(db) (4 * (3 + db->maxlevel + 1))
/* bump to the next multiple of 4 bytes */
#define ROUNDUP(num) (((num) + 3) & 0xFFFFFFFC)
#define TYPE(ptr) (ntohl(*((uint32_t *)(ptr))))
#define KEY(ptr) ((ptr) + 8)
#define KEYLEN(ptr) (ntohl(*((uint32_t *)((ptr) + 4))))
#define DATA(ptr) ((ptr) + 8 + ROUNDUP(KEYLEN(ptr)) + 4)
#define DATALEN(ptr) (ntohl(*((uint32_t *)((ptr) + 8 + ROUNDUP(KEYLEN(ptr))))))
#define FIRSTPTR(ptr) ((ptr) + 8 + ROUNDUP(KEYLEN(ptr)) + 4 + ROUNDUP(DATALEN(ptr)))
/* return a pointer to the pointer */
#define PTR(ptr, x) (FIRSTPTR(ptr) + 4 * (x))
/* FORWARD(ptr, x)
* given a pointer to the start of the record, return the offset
* corresponding to the xth pointer
*/
#define FORWARD(ptr, x) (ntohl(*((uint32_t *)(FIRSTPTR(ptr) + 4 * (x)))))
/* how many levels does this record have? */
static unsigned LEVEL(const char *ptr)
{
const uint32_t *p, *q;
assert(TYPE(ptr) == DUMMY || TYPE(ptr) == INORDER || TYPE(ptr) == ADD);
p = q = (uint32_t *) FIRSTPTR(ptr);
while (*p != (uint32_t)-1) p++;
return (p - q);
}
/* how big is this record? */
static unsigned RECSIZE(const char *ptr)
{
int ret = 0;
switch (TYPE(ptr)) {
case DUMMY:
case INORDER:
case ADD:
ret += 4; /* tag */
ret += 4; /* keylen */
ret += ROUNDUP(KEYLEN(ptr)); /* key */
ret += 4; /* datalen */
ret += ROUNDUP(DATALEN(ptr)); /* data */
ret += 4 * LEVEL(ptr); /* pointers */
ret += 4; /* padding */
break;
case DELETE:
ret += 8;
break;
case COMMIT:
ret += 4;
break;
}
return ret;
}
/* Determine if it is safe to append to this skiplist database.
* e.g. does it end in 4 bytes of -1 followed by a commit record?
* *or* does it end with 'DELETE' + 4 bytes + a commit record?
* *or* is this the beginning of the log, in which case we only need
* the padding from the last INORDER (or DUMMY) record
*/
static int SAFE_TO_APPEND(struct db *db)
{
/* check it's a multiple of 4 */
if (db->map_size % 4) return 1;
/* is it the beginning of the log? */
if (db->map_size == db->logstart) {
if (*((uint32_t *)(db->map_base + db->map_size - 4)) != htonl(-1)) {
return 1;
}
}
/* in the middle of the log somewhere */
else {
if (*((uint32_t *)(db->map_base + db->map_size - 4)) != htonl(COMMIT)) {
return 1;
}
/* if it's not an end of a record or a delete */
if (!((*((uint32_t *)(db->map_base + db->map_size - 8)) == htonl(-1)) ||
(*((uint32_t *)(db->map_base + db->map_size -12)) == htonl(DELETE)))) {
return 1;
}
}
return 0;
}
static int newtxn(struct db *db, struct txn **tidptr)
{
struct txn *tid;
/* is this file safe to append to?
*
* If it isn't, we need to run recovery. */
if (SAFE_TO_APPEND(db)) {
int r = recovery(db, RECOVERY_FORCE | RECOVERY_CALLER_LOCKED);
if (r) return r;
}
/* create the transaction */
tid = xmalloc(sizeof(struct txn));
tid->syncfd = -1;
tid->logstart = db->map_size;
/* assert(t->logstart != -1);*/
tid->logend = tid->logstart;
db->current_txn = tid;
/* pass it back out */
*tidptr = tid;
return 0;
}
#define PADDING(ptr) (ntohl(*((uint32_t *)((ptr) + RECSIZE(ptr) - 4))))
/* given an open, mapped db, read in the header information */
static int read_header(struct db *db)
{
const char *dptr;
int r;
assert(db && db->map_len && db->fname && db->map_base
&& db->is_open && db->lock_status);
if (db->map_len < HEADER_SIZE) {
syslog(LOG_ERR,
"skiplist: file not large enough for header: %s", db->fname);
}
if (memcmp(db->map_base, HEADER_MAGIC, HEADER_MAGIC_SIZE)) {
syslog(LOG_ERR, "skiplist: invalid magic header: %s", db->fname);
return CYRUSDB_IOERROR;
}
db->version = ntohl(*((uint32_t *)(db->map_base + OFFSET_VERSION)));
db->version_minor =
ntohl(*((uint32_t *)(db->map_base + OFFSET_VERSION_MINOR)));
if (db->version != SKIPLIST_VERSION) {
syslog(LOG_ERR, "skiplist: version mismatch: %s has version %d.%d",
db->fname, db->version, db->version_minor);
return CYRUSDB_IOERROR;
}
db->maxlevel = ntohl(*((uint32_t *)(db->map_base + OFFSET_MAXLEVEL)));
if (db->maxlevel > SKIPLIST_MAXLEVEL) {
syslog(LOG_ERR,
"skiplist %s: MAXLEVEL %d in database beyond maximum %d\n",
db->fname, db->maxlevel, SKIPLIST_MAXLEVEL);
return CYRUSDB_IOERROR;
}
db->curlevel = ntohl(*((uint32_t *)(db->map_base + OFFSET_CURLEVEL)));
if (db->curlevel > db->maxlevel) {
syslog(LOG_ERR,
"skiplist %s: CURLEVEL %d in database beyond maximum %d\n",
db->fname, db->curlevel, db->maxlevel);
return CYRUSDB_IOERROR;
}
db->listsize = ntohl(*((uint32_t *)(db->map_base + OFFSET_LISTSIZE)));
db->logstart = ntohl(*((uint32_t *)(db->map_base + OFFSET_LOGSTART)));
db->last_recovery =
ntohl(*((uint32_t *)(db->map_base + OFFSET_LASTRECOVERY)));
/* verify dummy node */
dptr = DUMMY_PTR(db);
r = 0;
if (!r && TYPE(dptr) != DUMMY) {
syslog(LOG_ERR, "DBERROR: %s: first node not type DUMMY",
db->fname);
r = CYRUSDB_IOERROR;
}
if (!r && KEYLEN(dptr) != 0) {
syslog(LOG_ERR, "DBERROR: %s: DUMMY has non-zero KEYLEN",
db->fname);
r = CYRUSDB_IOERROR;
}
if (!r && DATALEN(dptr) != 0) {
syslog(LOG_ERR, "DBERROR: %s: DUMMY has non-zero DATALEN",
db->fname);
r = CYRUSDB_IOERROR;
}
if (!r && LEVEL(dptr) != db->maxlevel) {
syslog(LOG_ERR, "DBERROR: %s: DUMMY level(%d) != db->maxlevel(%d)",
db->fname, LEVEL(dptr), db->maxlevel);
r = CYRUSDB_IOERROR;
}
return r;
}
/* given an open, mapped db, locked db,
write the header information */
static int write_header(struct db *db)
{
char buf[HEADER_SIZE];
int n;
assert (db->lock_status == WRITELOCKED);
memcpy(buf + 0, HEADER_MAGIC, HEADER_MAGIC_SIZE);
*((uint32_t *)(buf + OFFSET_VERSION)) = htonl(db->version);
*((uint32_t *)(buf + OFFSET_VERSION_MINOR)) = htonl(db->version_minor);
*((uint32_t *)(buf + OFFSET_MAXLEVEL)) = htonl(db->maxlevel);
*((uint32_t *)(buf + OFFSET_CURLEVEL)) = htonl(db->curlevel);
*((uint32_t *)(buf + OFFSET_LISTSIZE)) = htonl(db->listsize);
*((uint32_t *)(buf + OFFSET_LOGSTART)) = htonl(db->logstart);
*((uint32_t *)(buf + OFFSET_LASTRECOVERY)) = htonl(db->last_recovery);
/* write it out */
lseek(db->fd, 0, SEEK_SET);
n = retry_write(db->fd, buf, HEADER_SIZE);
if (n != HEADER_SIZE) {
syslog(LOG_ERR, "DBERROR: writing skiplist header for %s: %m",
db->fname);
return CYRUSDB_IOERROR;
}
return 0;
}
/* make sure our mmap() is big enough */
static int update_lock(struct db *db, struct txn *txn)
{
/* txn->logend is the current size of the file */
assert (db->is_open && db->lock_status == WRITELOCKED);
map_refresh(db->fd, 0, &db->map_base, &db->map_len, txn->logend,
db->fname, 0);
db->map_size = txn->logend;
return 0;
}
static int write_lock(struct db *db, const char *altname)
{
struct stat sbuf;
const char *lockfailaction;
const char *fname = altname ? altname : db->fname;
assert(db->lock_status == UNLOCKED);
if (lock_reopen(db->fd, fname, &sbuf, &lockfailaction) < 0) {
syslog(LOG_ERR, "IOERROR: %s %s: %m", lockfailaction, fname);
return CYRUSDB_IOERROR;
}
if (db->map_ino != sbuf.st_ino) {
map_free(&db->map_base, &db->map_len);
}
db->map_size = sbuf.st_size;
db->map_ino = sbuf.st_ino;
db->lock_status = WRITELOCKED;
map_refresh(db->fd, 0, &db->map_base, &db->map_len, sbuf.st_size,
fname, 0);
if (db->is_open) {
/* reread header */
read_header(db);
}
/* printf("%d: write lock: %d\n", getpid(), db->map_ino); */
return 0;
}
static int read_lock(struct db *db)
{
struct stat sbuf, sbuffile;
int newfd = -1;
assert(db->lock_status == UNLOCKED);
for (;;) {
if (lock_shared(db->fd) < 0) {
syslog(LOG_ERR, "IOERROR: lock_shared %s: %m", db->fname);
return CYRUSDB_IOERROR;
}
if (fstat(db->fd, &sbuf) == -1) {
syslog(LOG_ERR, "IOERROR: fstat %s: %m", db->fname);
lock_unlock(db->fd);
return CYRUSDB_IOERROR;
}
if (stat(db->fname, &sbuffile) == -1) {
syslog(LOG_ERR, "IOERROR: stat %s: %m", db->fname);
lock_unlock(db->fd);
return CYRUSDB_IOERROR;
}
if (sbuf.st_ino == sbuffile.st_ino) break;
newfd = open(db->fname, O_RDWR, 0644);
if (newfd == -1) {
syslog(LOG_ERR, "IOERROR: open %s: %m", db->fname);
lock_unlock(db->fd);
return CYRUSDB_IOERROR;
}
dup2(newfd, db->fd);
close(newfd);
}
if (db->map_ino != sbuf.st_ino) {
map_free(&db->map_base, &db->map_len);
}
db->map_size = sbuf.st_size;
db->map_ino = sbuf.st_ino;
db->lock_status = READLOCKED;
/* printf("%d: read lock: %d\n", getpid(), db->map_ino); */
map_refresh(db->fd, 0, &db->map_base, &db->map_len, sbuf.st_size,
db->fname, 0);
if (db->is_open) {
/* reread header */
read_header(db);
}
return 0;
}
static int unlock(struct db *db)
{
if (db->lock_status == UNLOCKED) {
syslog(LOG_NOTICE, "skiplist: unlock while not locked");
}
if (lock_unlock(db->fd) < 0) {
syslog(LOG_ERR, "IOERROR: lock_unlock %s: %m", db->fname);
return CYRUSDB_IOERROR;
}
db->lock_status = UNLOCKED;
/* printf("%d: unlock: %d\n", getpid(), db->map_ino); */
return 0;
}
static int lock_or_refresh(struct db *db, struct txn **tidptr)
{
int r;
assert(db != NULL && tidptr != NULL);
if (*tidptr) {
/* check that the DB agrees that we're in this transaction */
assert(db->current_txn == *tidptr);
/* just update the active transaction */
update_lock(db, *tidptr);
} else {
/* check that the DB isn't in a transaction */
assert(db->current_txn == NULL);
/* grab a r/w lock */
if ((r = write_lock(db, NULL)) < 0) {
return r;
}
/* start the transaction */
if ((r = newtxn(db, tidptr))) {
return r;
}
}
return 0;
}
static int dispose_db(struct db *db)
{
if (!db) return 0;
if (db->lock_status) {
syslog(LOG_ERR, "skiplist: closed while still locked");
unlock(db);
}
if (db->fname) {
free(db->fname);
}
if (db->map_base) {
map_free(&db->map_base, &db->map_len);
}
if (db->fd != -1) {
close(db->fd);
}
free(db);
return 0;
}
static int myopen(const char *fname, int flags, struct db **ret)
{
struct db *db;
struct db_list *list_ent = open_db;
int r;
int new = 0;
while (list_ent && strcmp(list_ent->db->fname, fname)) {
list_ent = list_ent->next;
}
if (list_ent) {
/* we already have this DB open! */
syslog(LOG_NOTICE, "skiplist: %s is already open %d time%s, returning object",
fname, list_ent->refcount, list_ent->refcount == 1 ? "" : "s");
*ret = list_ent->db;
++list_ent->refcount;
return 0;
}
db = (struct db *) xzmalloc(sizeof(struct db));
db->fd = -1;
db->fname = xstrdup(fname);
db->compar = (flags & CYRUSDB_MBOXSORT) ? bsearch_ncompare : compare;
db->fd = open(fname, O_RDWR, 0644);
if (db->fd == -1 && errno == ENOENT) {
if (!(flags & CYRUSDB_CREATE)) {
dispose_db(db);
return CYRUSDB_NOTFOUND;
}
if (cyrus_mkdir(fname, 0755) == -1) {
dispose_db(db);
return CYRUSDB_IOERROR;
}
db->fd = open(fname, O_RDWR | O_CREAT, 0644);
new = 1;
}
if (db->fd == -1) {
syslog(LOG_ERR, "IOERROR: opening %s: %m", fname);
dispose_db(db);
return CYRUSDB_IOERROR;
}
db->curlevel = 0;
db->is_open = 0;
db->lock_status = UNLOCKED;
/* grab a read lock, only reading the header */
r = read_lock(db);
if (r < 0) {
dispose_db(db);
return r;
}
/* if the file is empty, then the header needs to be created first */
if (db->map_size == 0) {
unlock(db);
r = write_lock(db, NULL);
if (r < 0) {
dispose_db(db);
return r;
}
}
/* race condition. Another process may have already got the write
* lock and created the header. Only go ahead if the map_size is
* still zero (read/write_lock updates map_size). */
if (db->map_size == 0) {
/* initialize in memory structure */
db->version = SKIPLIST_VERSION;
db->version_minor = SKIPLIST_VERSION_MINOR;
db->maxlevel = SKIPLIST_MAXLEVEL;
db->curlevel = 1;
db->listsize = 0;
/* where do we start writing new entries? */
db->logstart = DUMMY_OFFSET(db) + DUMMY_SIZE(db);
db->last_recovery = time(NULL);
/* create the header */
r = write_header(db);
if (!r) {
int n;
int dsize = DUMMY_SIZE(db);
uint32_t *buf = (uint32_t *) xzmalloc(dsize);
buf[0] = htonl(DUMMY);
buf[(dsize / 4) - 1] = htonl(-1);
lseek(db->fd, DUMMY_OFFSET(db), SEEK_SET);
n = retry_write(db->fd, (char *) buf, dsize);
if (n != dsize) {
syslog(LOG_ERR, "DBERROR: writing dummy node for %s: %m",
db->fname);
r = CYRUSDB_IOERROR;
}
free(buf);
}
/* sync the db */
if (!r && DO_FSYNC && (fsync(db->fd) < 0)) {
syslog(LOG_ERR, "DBERROR: fsync(%s): %m", db->fname);
r = CYRUSDB_IOERROR;
}
/* map the new file */
db->map_size = db->logstart;
map_refresh(db->fd, 0, &db->map_base, &db->map_len, db->logstart,
db->fname, 0);
}
db->is_open = 1;
r = read_header(db);
if (r) {
dispose_db(db);
return r;
}
/* unlock the db */
unlock(db);
if (!global_recovery || db->last_recovery < global_recovery) {
/* run recovery; we rebooted since the last time recovery
was run */
r = recovery(db, 0);
if (r) {
dispose_db(db);
return r;
}
}
*ret = db;
/* track this database in the open list */
list_ent = (struct db_list *) xzmalloc(sizeof(struct db_list));
list_ent->db = db;
list_ent->next = open_db;
list_ent->refcount = 1;
open_db = list_ent;
return 0;
}
int myclose(struct db *db)
{
struct db_list *list_ent = open_db;
struct db_list *prev = NULL;
/* remove this DB from the open list */
while (list_ent && list_ent->db != db) {
prev = list_ent;
list_ent = list_ent->next;
}
assert(list_ent);
if (--list_ent->refcount <= 0) {
if (prev) prev->next = list_ent->next;
else open_db = list_ent->next;
free(list_ent);
return dispose_db(db);
}
return 0;
}
static int compare(const char *s1, int l1, const char *s2, int l2)
{
int min = l1 < l2 ? l1 : l2;
int cmp = 0;
while (min-- > 0 && (cmp = *s1 - *s2) == 0) {
s1++;
s2++;
}
if (min >= 0) {
return cmp;
} else {
if (l1 > l2) return 1;
else if (l2 > l1) return -1;
else return 0;
}
}
/* returns the offset to the node asked for, or the node after it
if it doesn't exist.
if previous is set, finds the last node < key */
static const char *find_node(struct db *db,
const char *key, int keylen,
unsigned *updateoffsets)
{
const char *ptr = db->map_base + DUMMY_OFFSET(db);
int i;
unsigned offset;
if (updateoffsets) {
for (i = 0; (unsigned) i < db->maxlevel; i++) {
updateoffsets[i] = DUMMY_OFFSET(db);
}
}
for (i = db->curlevel - 1; i >= 0; i--) {
while ((offset = FORWARD(ptr, i)) &&
db->compar(KEY(db->map_base + offset), KEYLEN(db->map_base + offset),
key, keylen) < 0) {
/* move forward at level 'i' */
ptr = db->map_base + offset;
}
if (updateoffsets) updateoffsets[i] = ptr - db->map_base;
}
ptr = db->map_base + FORWARD(ptr, 0);
return ptr;
}
int myfetch(struct db *db,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **tidptr)
{
const char *ptr;
int r = 0;
assert(db != NULL && key != NULL);
if (data) *data = NULL;
if (datalen) *datalen = 0;
/* Hacky workaround:
*
* If no transaction was passed, but we're in a transaction,
* then just do the read within that transaction.
*/
if (!tidptr && db->current_txn != NULL) {
tidptr = &(db->current_txn);
}
if (tidptr) {
/* make sure we're write locked and up to date */
if ((r = lock_or_refresh(db, tidptr)) < 0) {
return r;
}
} else {
/* grab a r lock */
if ((r = read_lock(db)) < 0) {
return r;
}
}
ptr = find_node(db, key, keylen, 0);
if (ptr == db->map_base || db->compar(KEY(ptr), KEYLEN(ptr), key, keylen)) {
/* failed to find key/keylen */
r = CYRUSDB_NOTFOUND;
} else {
if (datalen) *datalen = DATALEN(ptr);
if (data) *data = DATA(ptr);
}
if (!tidptr) {
/* release read lock */
int r1;
if ((r1 = unlock(db)) < 0) {
return r1;
}
}
return r;
}
static int fetch(struct db *mydb,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **tidptr)
{
return myfetch(mydb, key, keylen, data, datalen, tidptr);
}
static int fetchlock(struct db *db,
const char *key, int keylen,
const char **data, int *datalen,
struct txn **tidptr)
{
return myfetch(db, key, keylen, data, datalen, tidptr);
}
/* foreach allows for subsidary mailbox operations in 'cb'.
if there is a txn, 'cb' must make use of it.
*/
int myforeach(struct db *db,
char *prefix, int prefixlen,
foreach_p *goodp,
foreach_cb *cb, void *rock,
struct txn **tidptr)
{
const char *ptr;
char *savebuf = NULL;
size_t savebuflen = 0;
size_t savebufsize;
int r = 0, cb_r = 0;
int need_unlock = 0;
assert(db != NULL);
assert(prefixlen >= 0);
/* Hacky workaround:
*
* If no transaction was passed, but we're in a transaction,
* then just do the read within that transaction.
*/
if (!tidptr && db->current_txn != NULL) {
tidptr = &(db->current_txn);
}
if (tidptr) {
/* make sure we're write locked and up to date */
if ((r = lock_or_refresh(db, tidptr)) < 0) {
return r;
}
} else {
/* grab a r lock */
if ((r = read_lock(db)) < 0) {
return r;
}
need_unlock = 1;
}
ptr = find_node(db, prefix, prefixlen, 0);
while (ptr != db->map_base) {
/* does it match prefix? */
if (KEYLEN(ptr) < (uint32_t) prefixlen) break;
if (prefixlen && db->compar(KEY(ptr), prefixlen, prefix, prefixlen)) break;
if (!goodp ||
goodp(rock, KEY(ptr), KEYLEN(ptr), DATA(ptr), DATALEN(ptr))) {
ino_t ino = db->map_ino;
unsigned long sz = db->map_size;
if (!tidptr) {
/* release read lock */
if ((r = unlock(db)) < 0) {
return r;
}
need_unlock = 0;
}
/* save KEY, KEYLEN */
if (!savebuf || KEYLEN(ptr) > savebuflen) {
savebuflen = KEYLEN(ptr) + 1024;
savebuf = xrealloc(savebuf, savebuflen);
}
memcpy(savebuf, KEY(ptr), KEYLEN(ptr));
savebufsize = KEYLEN(ptr);
/* make callback */
cb_r = cb(rock, KEY(ptr), KEYLEN(ptr), DATA(ptr), DATALEN(ptr));
if (cb_r) break;
if (!tidptr) {
/* grab a r lock */
if ((r = read_lock(db)) < 0) {
free(savebuf);
return r;
}
need_unlock = 1;
} else {
/* make sure we're up to date */
update_lock(db, *tidptr);
}
/* reposition */
if (!(ino == db->map_ino && sz == db->map_size)) {
/* something changed in the file; reseek */
ptr = find_node(db, savebuf, savebufsize, 0);
/* 'ptr' might not equal 'savebuf'. if it's different,
we want to stay where we are. if it's the same, we
should move on to the next one */
if (savebufsize == KEYLEN(ptr) &&
!memcmp(savebuf, KEY(ptr), savebufsize)) {
ptr = db->map_base + FORWARD(ptr, 0);
} else {
/* 'savebuf' got deleted, so we're now pointing at the
right thing */
}
} else {
/* move to the next one */
ptr = db->map_base + FORWARD(ptr, 0);
}
} else {
/* we didn't make the callback; keep going */
ptr = db->map_base + FORWARD(ptr, 0);
}
}
free(savebuf);
if (need_unlock) {
/* release read lock */
if ((r = unlock(db)) < 0) {
return r;
}
}
return r ? r : cb_r;
}
unsigned int randlvl(struct db *db)
{
unsigned int lvl = 1;
while ((((float) rand() / (float) (RAND_MAX)) < PROB)
&& (lvl < db->maxlevel)) {
lvl++;
}
/* syslog(LOG_DEBUG, "picked level %d", lvl); */
return lvl;
}
int mystore(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tidptr, int overwrite)
{
const char *ptr;
uint32_t klen;
uint32_t dlen;
struct iovec iov[50];
unsigned lvl;
unsigned i;
unsigned num_iov;
struct txn *tid;
struct txn *localtid = NULL;
uint32_t endpadding = htonl(-1);
uint32_t zeropadding[4] = { 0, 0, 0, 0 };
unsigned updateoffsets[SKIPLIST_MAXLEVEL+1];
unsigned newoffsets[SKIPLIST_MAXLEVEL+1];
uint32_t addrectype = htonl(ADD);
uint32_t delrectype = htonl(DELETE);
uint32_t todelete;
unsigned newoffset;
uint32_t netnewoffset;
int r;
assert(db != NULL);
assert(key && keylen);
/* not keeping the transaction, just create one local to
* this function */
if (!tidptr) {
tidptr = &localtid;
}
/* make sure we're write locked and up to date */
if ((r = lock_or_refresh(db, tidptr)) < 0) {
return r;
}
tid = *tidptr; /* consistent naming is nice */
if (be_paranoid) {
assert(myconsistent(db, tid, 1) == 0);
}
num_iov = 0;
newoffset = tid->logend;
ptr = find_node(db, key, keylen, updateoffsets);
if (ptr != db->map_base &&
!db->compar(KEY(ptr), KEYLEN(ptr), key, keylen)) {
if (!overwrite) {
myabort(db, tid); /* releases lock */
return CYRUSDB_EXISTS;
} else {
/* replace with an equal height node */
lvl = LEVEL(ptr);
/* log a removal */
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &delrectype, 4);
todelete = htonl(ptr - db->map_base);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &todelete, 4);
/* now we write at newoffset */
newoffset += 8;
/* our pointers are whatever the old node pointed to */
for (i = 0; i < lvl; i++) {
newoffsets[i] = htonl(FORWARD(ptr, i));
}
}
} else {
/* pick a size for the new node */
lvl = randlvl(db);
/* do we need to update the header ? */
if (lvl > db->curlevel) {
for (i = db->curlevel; i < lvl; i++) {
updateoffsets[i] = DUMMY_OFFSET(db);
}
db->curlevel = lvl;
/* write out that change */
write_header(db); /* xxx errors? */
}
/* we point to what we're updating used to point to */
/* newoffsets is written in the iovec later */
for (i = 0; i < lvl; i++) {
/* written in the iovec */
newoffsets[i] =
htonl(FORWARD(db->map_base + updateoffsets[i], i));
}
}
klen = htonl(keylen);
dlen = htonl(datalen);
netnewoffset = htonl(newoffset);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &addrectype, 4);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &klen, 4);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) key, keylen);
if (ROUNDUP(keylen) - keylen > 0) {
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) zeropadding,
ROUNDUP(keylen) - keylen);
}
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &dlen, 4);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) data, datalen);
if (ROUNDUP(datalen) - datalen > 0) {
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) zeropadding,
ROUNDUP(datalen) - datalen);
}
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) newoffsets, 4 * lvl);
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &endpadding, 4);
getsyncfd(db, tid);
lseek(tid->syncfd, tid->logend, SEEK_SET);
r = retry_writev(tid->syncfd, iov, num_iov);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: retry_writev(): %m");
myabort(db, tid);
return CYRUSDB_IOERROR;
}
tid->logend += r; /* update where to write next */
/* update pointers after writing record so abort is guaranteed to
* see which records need reverting */
for (i = 0; i < lvl; i++) {
/* write pointer updates */
/* FORWARD(updates[i], i) = newoffset; */
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &netnewoffset, 4);
}
if (be_paranoid) {
assert(myconsistent(db, tid, 1) == 0);
}
if (localtid) {
/* commit the store, which releases the write lock */
r = mycommit(db, tid);
if (r) return r;
}
return 0;
}
static int create(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, 0);
}
static int store(struct db *db,
const char *key, int keylen,
const char *data, int datalen,
struct txn **tid)
{
return mystore(db, key, keylen, data, datalen, tid, 1);
}
int mydelete(struct db *db,
const char *key, int keylen,
struct txn **tidptr, int force __attribute__((unused)))
{
const char *ptr;
uint32_t delrectype = htonl(DELETE);
unsigned updateoffsets[SKIPLIST_MAXLEVEL+1];
uint32_t offset;
uint32_t writebuf[2];
struct txn *tid, *localtid = NULL;
unsigned i;
int r;
/* not keeping the transaction, just create one local to
* this function */
if (!tidptr) {
tidptr = &localtid;
}
/* make sure we're write locked and up to date */
if ((r = lock_or_refresh(db, tidptr)) < 0) {
return r;
}
tid = *tidptr; /* consistent naming is nice */
if (be_paranoid) {
assert(myconsistent(db, tid, 1) == 0);
}
ptr = find_node(db, key, keylen, updateoffsets);
if (ptr != db->map_base &&
!db->compar(KEY(ptr), KEYLEN(ptr), key, keylen)) {
/* gotcha */
offset = ptr - db->map_base;
/* log the deletion */
getsyncfd(db, tid);
lseek(tid->syncfd, tid->logend, SEEK_SET);
writebuf[0] = delrectype;
writebuf[1] = htonl(offset);
/* update end-of-log */
r = retry_write(tid->syncfd, (char *) writebuf, 8);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: retry_write(): %m");
myabort(db, tid);
return CYRUSDB_IOERROR;
}
tid->logend += r;
/* update pointers after writing record so abort is guaranteed to
* see which records need reverting */
for (i = 0; i < db->curlevel; i++) {
uint32_t netnewoffset;
if (FORWARD(db->map_base + updateoffsets[i], i) != offset) {
break;
}
netnewoffset = htonl(FORWARD(ptr, i));
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &netnewoffset, 4);
}
}
if (be_paranoid) {
assert(myconsistent(db, tid, 1) == 0);
}
if (localtid) {
/* commit the store, which releases the write lock */
mycommit(db, tid);
}
return 0;
}
int mycommit(struct db *db, struct txn *tid)
{
uint32_t commitrectype = htonl(COMMIT);
int r = 0;
assert(db && tid);
assert(db->current_txn == tid);
update_lock(db, tid);
if (be_paranoid) {
assert(myconsistent(db, tid, 1) == 0);
}
/* verify that we did something this txn */
if (tid->logstart == tid->logend) {
/* empty txn, done */
r = 0;
goto done;
}
/* fsync if we're not using O_SYNC writes */
if (!use_osync && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR, "IOERROR: writing %s: %m", db->fname);
r = CYRUSDB_IOERROR;
goto done;
}
/* xxx consider unlocking the database here: the transaction isn't
yet durable but the file is in a form that is consistent for
other transactions to use. releasing the lock here would give
ACI properties. */
/* write a commit record */
assert(tid->syncfd != -1);
lseek(tid->syncfd, tid->logend, SEEK_SET);
retry_write(tid->syncfd, (char *) &commitrectype, 4);
/* fsync if we're not using O_SYNC writes */
if (!use_osync && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR, "IOERROR: writing %s: %m", db->fname);
r = CYRUSDB_IOERROR;
goto done;
}
done:
if (!r)
db->current_txn = NULL;
/* consider checkpointing */
if (!r && tid->logend > (2 * db->logstart + SKIPLIST_MINREWRITE)) {
r = mycheckpoint(db, 1);
}
if (be_paranoid) {
assert(myconsistent(db, db->current_txn, 1) == 0);
}
if (r) {
int r2;
/* error during commit; we must abort */
r2 = myabort(db, tid);
if (r2) {
syslog(LOG_ERR, "DBERROR: skiplist %s: commit AND abort failed",
db->fname);
}
} else {
/* release the write lock */
if ((r = unlock(db)) < 0) {
return r;
}
/* must close this after releasing the lock */
closesyncfd(db, tid);
/* free tid */
free(tid);
}
return r;
}
int myabort(struct db *db, struct txn *tid)
{
const char *ptr;
unsigned updateoffsets[SKIPLIST_MAXLEVEL+1];
unsigned offset;
unsigned i;
int r = 0;
assert(db && tid);
assert(db->current_txn == tid);
/* update the mmap so we can see the log entries we need to remove */
update_lock(db, tid);
/* look at the log entries we've written, and undo their effects */
while (tid->logstart != tid->logend) {
/* find the last log entry */
for (offset = tid->logstart, ptr = db->map_base + offset;
offset + RECSIZE(ptr) != (uint32_t) tid->logend;
offset += RECSIZE(ptr), ptr = db->map_base + offset) ;
offset = ptr - db->map_base;
assert(TYPE(ptr) == ADD || TYPE(ptr) == DELETE);
switch (TYPE(ptr)) {
case DUMMY:
case INORDER:
case COMMIT:
abort();
case ADD:
/* remove this record */
(void) find_node(db, KEY(ptr), KEYLEN(ptr), updateoffsets);
for (i = 0; i < db->curlevel; i++) {
uint32_t netnewoffset;
if (FORWARD(db->map_base + updateoffsets[i], i) != offset) {
break;
}
netnewoffset = htonl(FORWARD(ptr, i));
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &netnewoffset, 4);
}
break;
case DELETE:
{
unsigned lvl;
uint32_t netnewoffset;
const char *q;
/* re-add this record. it can't exist right now. */
netnewoffset = *((uint32_t *)(ptr + 4));
q = db->map_base + ntohl(netnewoffset);
lvl = LEVEL(q);
(void) find_node(db, KEY(q), KEYLEN(q), updateoffsets);
for (i = 0; i < lvl; i++) {
/* the current pointers FROM this node are correct,
so we just have to update 'updateoffsets' */
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &netnewoffset, 4);
}
break;
}
}
/* remove looking at this */
tid->logend -= RECSIZE(ptr);
}
/* truncate the file to remove log entries */
if (ftruncate(db->fd, tid->logstart) < 0) {
syslog(LOG_ERR,
"DBERROR: skiplist abort %s: ftruncate: %m",
db->fname);
r = CYRUSDB_IOERROR;
unlock(db);
return r;
}
db->map_size = tid->logstart;
/* release the write lock */
if ((r = unlock(db)) < 0) {
return r;
}
/* must close this after releasing the lock */
closesyncfd(db, tid);
/* free the tid */
free(tid);
db->current_txn = NULL;
return 0;
}
/* compress 'db'. if 'locked != 0', the database is already R/W locked and
will be returned as such. */
static int mycheckpoint(struct db *db, int locked)
{
char fname[1024];
int oldfd;
struct iovec iov[50];
unsigned num_iov;
unsigned updateoffsets[SKIPLIST_MAXLEVEL+1];
const char *ptr;
unsigned offset;
int r = 0;
uint32_t iorectype = htonl(INORDER);
unsigned i;
time_t start = time(NULL);
/* grab write lock (could be read but this prevents multiple checkpoints
simultaneously) */
if (!locked) {
r = write_lock(db, NULL);
if (r < 0) return r;
} else {
/* we need the latest and greatest data */
assert(db->is_open && db->lock_status == WRITELOCKED);
map_refresh(db->fd, 0, &db->map_base, &db->map_len, MAP_UNKNOWN_LEN,
db->fname, 0);
}
/* can't be in a transaction */
assert(db->current_txn == NULL);
if ((r = myconsistent(db, NULL, 1)) < 0) {
syslog(LOG_ERR, "db %s, inconsistent pre-checkpoint, bailing out",
db->fname);
return r;
}
/* open fname.NEW */
snprintf(fname, sizeof(fname), "%s.NEW", db->fname);
oldfd = db->fd;
db->fd = open(fname, O_RDWR | O_CREAT, 0644);
if (db->fd < 0) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: open(%s): %m", fname);
if (!locked) unlock(db);
db->fd = oldfd;
return CYRUSDB_IOERROR;
}
/* truncate it just in case! */
r = ftruncate(db->fd, 0);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint %s: ftruncate %m", fname);
if (!locked) unlock(db);
db->fd = oldfd;
return CYRUSDB_IOERROR;
}
/* write dummy record */
if (!r) {
int dsize = DUMMY_SIZE(db);
uint32_t *buf = (uint32_t *) xzmalloc(dsize);
buf[0] = htonl(DUMMY);
buf[(dsize / 4) - 1] = htonl(-1);
lseek(db->fd, DUMMY_OFFSET(db), SEEK_SET);
r = retry_write(db->fd, (char *) buf, dsize);
if (r != dsize) {
r = CYRUSDB_IOERROR;
} else {
r = 0;
}
free(buf);
/* initialize the updateoffsets array so when we append records
we know where to set the pointers */
for (i = 0; i < db->maxlevel; i++) {
/* header_size + 4 (rectype) + 4 (ksize) + 4 (dsize)
+ 4 * i */
updateoffsets[i] = DUMMY_OFFSET(db) + 12 + 4 * i;
}
}
/* write records to new file */
offset = FORWARD(db->map_base + DUMMY_OFFSET(db), 0);
db->listsize = 0;
while (!r && offset != 0) {
unsigned int lvl;
unsigned newoffset;
uint32_t netnewoffset;
ptr = db->map_base + offset;
lvl = LEVEL(ptr);
db->listsize++;
num_iov = 0;
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) &iorectype, 4);
/* copy all but the rectype from the record */
WRITEV_ADD_TO_IOVEC(iov, num_iov, (char *) ptr + 4, RECSIZE(ptr) - 4);
newoffset = lseek(db->fd, 0, SEEK_END);
netnewoffset = htonl(newoffset);
r = retry_writev(db->fd, iov, num_iov);
if (r < 0) {
r = CYRUSDB_IOERROR;
} else {
r = 0;
}
for (i = 0; !r && i < lvl; i++) {
/* update pointers */
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &netnewoffset, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
/* PTR(ptr, i) - ptr is the offset relative to me
to my ith pointer */
updateoffsets[i] = newoffset + (PTR(ptr, i) - ptr);
}
offset = FORWARD(ptr, 0);
}
/* set any dangling pointers to zero */
for (i = 0; !r && i < db->maxlevel; i++) {
uint32_t netnewoffset = htonl(0);
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &netnewoffset, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
}
/* create the header */
db->logstart = lseek(db->fd, 0, SEEK_END);
db->last_recovery = time(NULL);
r = write_header(db);
/* sync new file */
if (!r && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: fdatasync(%s): %m", fname);
r = CYRUSDB_IOERROR;
}
if (!r) {
/* get new lock */
db->lock_status = UNLOCKED; /* well, the new file is... */
r = write_lock(db, fname);
}
/* move new file to original file name */
if (!r && (rename(fname, db->fname) < 0)) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: rename(%s, %s): %m",
fname, db->fname);
r = CYRUSDB_IOERROR;
}
/* force the new file name to disk */
if (!r && DO_FSYNC && (fsync(db->fd) < 0)) {
syslog(LOG_ERR, "DBERROR: skiplist checkpoint: fsync(%s): %m", fname);
r = CYRUSDB_IOERROR;
}
if (r) {
/* clean up */
close(db->fd);
db->fd = oldfd;
unlink(fname);
}
/* release old write lock */
close(oldfd);
{
struct stat sbuf;
/* let's make sure we're up to date */
map_free(&db->map_base, &db->map_len);
if (fstat(db->fd, &sbuf) == -1) {
syslog(LOG_ERR, "IOERROR: fstat %s: %m", db->fname);
return CYRUSDB_IOERROR;
}
db->map_size = sbuf.st_size;
db->map_ino = sbuf.st_ino;
map_refresh(db->fd, 0, &db->map_base, &db->map_len, sbuf.st_size,
db->fname, 0);
}
if ((r = myconsistent(db, NULL, 1)) < 0) {
syslog(LOG_ERR, "db %s, inconsistent post-checkpoint, bailing out",
db->fname);
return r;
}
if (!locked) {
/* unlock the new db files */
unlock(db);
}
{
int diff = time(NULL) - start;
syslog(LOG_INFO,
"skiplist: checkpointed %s (%d record%s, %d bytes) in %d second%s",
db->fname, db->listsize, db->listsize == 1 ? "" : "s",
db->logstart, diff, diff == 1 ? "" : "s");
}
return r;
}
/* dump the database.
if detail == 1, dump all records.
if detail == 2, also dump pointers for active records.
if detail == 3, dump all records/all pointers.
*/
static int dump(struct db *db, int detail __attribute__((unused)))
{
const char *ptr, *end;
unsigned i;
read_lock(db);
ptr = db->map_base + DUMMY_OFFSET(db);
end = db->map_base + db->map_size;
while (ptr < end) {
printf("%04lX: ", (unsigned long) (ptr - db->map_base));
switch (TYPE(ptr)) {
case DUMMY:
printf("DUMMY ");
break;
case INORDER:
printf("INORDER ");
break;
case ADD:
printf("ADD ");
break;
case DELETE:
printf("DELETE ");
break;
case COMMIT:
printf("COMMIT ");
break;
}
switch (TYPE(ptr)) {
case DUMMY:
case INORDER:
case ADD:
printf("kl=%d dl=%d lvl=%d\n",
KEYLEN(ptr), DATALEN(ptr), LEVEL(ptr));
printf("\t");
for (i = 0; i < LEVEL(ptr); i++) {
printf("%04X ", FORWARD(ptr, i));
}
printf("\n");
break;
case DELETE:
printf("offset=%04X\n", ntohl(*((uint32_t *)(ptr + 4))));
break;
case COMMIT:
printf("\n");
break;
}
ptr += RECSIZE(ptr);
}
unlock(db);
return 0;
}
static int consistent(struct db *db)
{
return myconsistent(db, NULL, 0);
}
/* perform some basic consistency checks */
static int myconsistent(struct db *db, struct txn *tid, int locked)
{
const char *ptr;
uint32_t offset;
assert(db->current_txn == tid); /* could both be null */
if (!locked) read_lock(db);
else if (tid) update_lock(db, tid);
offset = FORWARD(db->map_base + DUMMY_OFFSET(db), 0);
while (offset != 0) {
unsigned i;
ptr = db->map_base + offset;
for (i = 0; i < LEVEL(ptr); i++) {
offset = FORWARD(ptr, i);
if (offset > db->map_size) {
fprintf(stdout,
"skiplist inconsistent: %04X: ptr %d is %04X; "
"eof is %04X\n",
(unsigned int) (ptr - db->map_base),
i, offset, (unsigned int) db->map_size);
if (!locked) unlock(db);
return CYRUSDB_INTERNAL;
}
if (offset != 0) {
/* check to see that ptr < ptr -> next */
const char *q = db->map_base + offset;
int cmp;
cmp = db->compar(KEY(ptr), KEYLEN(ptr), KEY(q), KEYLEN(q));
if (cmp >= 0) {
fprintf(stdout,
"skiplist inconsistent: %04X: ptr %d is %04X; "
"db->compar() = %d\n",
(unsigned int) (ptr - db->map_base),
i,
offset, cmp);
if (!locked) unlock(db);
return CYRUSDB_INTERNAL;
}
}
}
offset = FORWARD(ptr, 0);
}
if (!locked) unlock(db);
return 0;
}
/* run recovery on this file */
static int recovery(struct db *db, int flags)
{
const char *ptr, *keyptr;
unsigned updateoffsets[SKIPLIST_MAXLEVEL+1];
uint32_t offset, offsetnet, myoff = 0;
int r = 0, need_checkpoint = 0;
time_t start = time(NULL);
unsigned i;
if (!(flags & RECOVERY_CALLER_LOCKED) && (r = write_lock(db, NULL)) < 0) {
return r;
}
assert(db->is_open && db->lock_status == WRITELOCKED);
if ((r = read_header(db)) < 0) {
unlock(db);
return r;
}
if (!(flags & RECOVERY_FORCE)
&& global_recovery
&& db->last_recovery >= global_recovery) {
/* someone beat us to it */
unlock(db);
return 0;
}
/* can't run recovery inside a txn */
assert(db->current_txn == NULL);
db->listsize = 0;
ptr = DUMMY_PTR(db);
r = 0;
/* verify this is DUMMY */
if (!r && TYPE(ptr) != DUMMY) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR, "DBERROR: skiplist recovery %s: no dummy node?",
db->fname);
}
/* zero key */
if (!r && KEYLEN(ptr) != 0) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: dummy node KEYLEN != 0",
db->fname);
}
/* zero data */
if (!r && DATALEN(ptr) != 0) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: dummy node DATALEN != 0",
db->fname);
}
/* pointers for db->maxlevel */
if (!r && LEVEL(ptr) != db->maxlevel) {
r = CYRUSDB_IOERROR;
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: dummy node level: %d != %d",
db->fname, LEVEL(ptr), db->maxlevel);
}
for (i = 0; i < db->maxlevel; i++) {
/* header_size + 4 (rectype) + 4 (ksize) + 4 (dsize)
+ 4 * i */
updateoffsets[i] = DUMMY_OFFSET(db) + 12 + 4 * i;
}
/* reset the data that was written INORDER by the last checkpoint */
offset = DUMMY_OFFSET(db) + DUMMY_SIZE(db);
while (!r && (offset < db->map_size)
&& TYPE(db->map_base + offset) == INORDER) {
ptr = db->map_base + offset;
offsetnet = htonl(offset);
db->listsize++;
/* xxx check \0 fill on key */
/* xxx check \0 fill on data */
/* update previous pointers, record these for updating */
for (i = 0; !r && i < LEVEL(ptr); i++) {
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: lseek %s: %m", db->fname);
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &offsetnet, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
/* PTR(ptr, i) - ptr is the offset relative to me
to my ith pointer */
updateoffsets[i] = offset + (PTR(ptr, i) - ptr);
}
/* check padding */
if (!r && PADDING(ptr) != (uint32_t) -1) {
syslog(LOG_ERR, "DBERROR: %s: offset %04X padding not -1",
db->fname, offset);
r = CYRUSDB_IOERROR;
}
if (!r) {
offset += RECSIZE(ptr);
}
}
if (offset != db->logstart) {
syslog(LOG_NOTICE, "skiplist recovery %s: incorrect logstart %04X changed to %04X",
db->fname, db->logstart, offset);
db->logstart = offset; /* header will be committed later */
}
/* zero out the remaining pointers */
if (!r) {
for (i = 0; !r && i < db->maxlevel; i++) {
int zerooffset = 0;
r = lseek(db->fd, updateoffsets[i], SEEK_SET);
if (r < 0) {
syslog(LOG_ERR, "DBERROR: lseek %s: %m", db->fname);
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
r = retry_write(db->fd, (char *) &zerooffset, 4);
if (r < 0) {
r = CYRUSDB_IOERROR;
break;
} else {
r = 0;
}
}
}
/* replay the log */
while (!r && offset < db->map_size) {
const char *p, *q;
/* refresh map, so we see the writes we've just done */
map_refresh(db->fd, 0, &db->map_base, &db->map_len, db->map_size,
db->fname, 0);
ptr = db->map_base + offset;
/* bugs in recovery truncates could have left some bogus zeros here */
if (TYPE(ptr) == 0) {
int orig = offset;
while (TYPE(ptr) == 0 && offset < db->map_size) {
offset += 4;
ptr = db->map_base + offset;
}
syslog(LOG_ERR, "skiplist recovery %s: skipped %d bytes of zeros at %04X",
db->fname, offset - orig, orig);
need_checkpoint = 1;
}
offsetnet = htonl(offset);
/* if this is a commit, we've processed everything in this txn */
if (TYPE(ptr) == COMMIT) {
offset += RECSIZE(ptr);
continue;
}
/* make sure this is ADD or DELETE */
if (TYPE(ptr) != ADD && TYPE(ptr) != DELETE) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: %04X should be ADD or DELETE",
db->fname, offset);
r = CYRUSDB_IOERROR;
break;
}
/* look ahead for a commit */
q = db->map_base + db->map_size;
p = ptr;
for (;;) {
if (RECSIZE(p) <= 0) {
/* hmm, we can't trust this transaction */
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: found a RECSIZE of 0, "
"truncating corrupted file instead of looping forever...",
db->fname);
p = q;
break;
}
p += RECSIZE(p);
if (p >= q) break;
if (TYPE(p) == COMMIT) break;
}
if (p >= q) {
syslog(LOG_NOTICE,
"skiplist recovery %s: found partial txn, not replaying",
db->fname);
/* no commit, we should truncate */
if (ftruncate(db->fd, offset) < 0) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: ftruncate: %m",
db->fname);
r = CYRUSDB_IOERROR;
}
/* set the map size back as well */
db->map_size = offset;
break;
}
keyptr = NULL;
/* look for the key */
if (TYPE(ptr) == ADD) {
keyptr = find_node(db, KEY(ptr), KEYLEN(ptr), updateoffsets);
if (keyptr == db->map_base ||
db->compar(KEY(ptr), KEYLEN(ptr), KEY(keyptr), KEYLEN(keyptr))) {
/* didn't find exactly this node */
keyptr = NULL;
}
} else { /* type == DELETE */
const char *p;
myoff = ntohl(*((uint32_t *)(ptr + 4)));
p = db->map_base + myoff;
keyptr = find_node(db, KEY(p), KEYLEN(p), updateoffsets);
if (keyptr == db->map_base ||
db->compar(KEY(p), KEYLEN(p), KEY(keyptr), KEYLEN(keyptr))) {
/* didn't find exactly this node */
keyptr = NULL;
}
}
/* if DELETE & found key, skip over it */
if (TYPE(ptr) == DELETE && keyptr) {
db->listsize--;
for (i = 0; i < db->curlevel; i++) {
int newoffset;
if (FORWARD(db->map_base + updateoffsets[i], i) != myoff) {
break;
}
newoffset = htonl(FORWARD(db->map_base + myoff, i));
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &newoffset, 4);
}
/* otherwise if DELETE, throw an error */
} else if (TYPE(ptr) == DELETE) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: DELETE at %04X doesn't exist, skipping",
db->fname, offset);
need_checkpoint = 1;
/* otherwise insert it */
} else if (TYPE(ptr) == ADD) {
unsigned int lvl;
uint32_t newoffsets[SKIPLIST_MAXLEVEL+1];
if (keyptr) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: ADD at %04X exists, replacing",
db->fname, offset);
need_checkpoint = 1;
} else {
db->listsize++;
}
offsetnet = htonl(offset);
lvl = LEVEL(ptr);
if (lvl > SKIPLIST_MAXLEVEL) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: node claims level %d (greater than max %d)",
db->fname, lvl, SKIPLIST_MAXLEVEL);
r = CYRUSDB_IOERROR;
} else {
/* NOTE - in the bogus case where a record with the same key already
* exists, there are three possible cases:
* lvl == LEVEL(keyptr)
* * trivial: all to me, all mine to keyptr's FORWARD
* lvl > LEVEL(keyptr) -
* * all updateoffsets values should point to me
* * up until LEVEL(keyptr) set to keyptr's next values
* (updateoffsets[i] should be keyptr in these cases)
* then point all my higher pointers are updateoffsets[i]'s
* FORWARD instead.
* lvl < LEVEL(keyptr)
* * updateoffsets values up to lvl should point to me
* * all mine should point to keyptr's next values
* * from lvl up, all updateoffsets[i] should point to
* FORWARD(keyptr, i) instead.
*
* All of this fully unstitches keyptr from the chain and stitches
* the current node in, regardless of height difference. Man what
* a pain!
*/
for (i = 0; i < lvl; i++) {
/* set our next pointers */
if (keyptr && i < LEVEL(keyptr)) {
/* need to replace the matching record key */
newoffsets[i] =
htonl(FORWARD(keyptr, i));
} else {
newoffsets[i] =
htonl(FORWARD(db->map_base + updateoffsets[i], i));
}
/* replace 'updateoffsets' to point to me */
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &offsetnet, 4);
}
/* write out newoffsets */
lseek(db->fd, FIRSTPTR(ptr) - db->map_base, SEEK_SET);
retry_write(db->fd, (char *) newoffsets, 4 * lvl);
if (keyptr && lvl < LEVEL(keyptr)) {
uint32_t newoffsetnet;
for (i = lvl; i < LEVEL(keyptr); i++) {
newoffsetnet = htonl(FORWARD(keyptr, i));
/* replace 'updateoffsets' to point onwards */
lseek(db->fd,
PTR(db->map_base + updateoffsets[i], i) - db->map_base,
SEEK_SET);
retry_write(db->fd, (char *) &newoffsetnet, 4);
}
}
}
/* can't happen */
} else {
abort();
}
/* move to next record */
offset += RECSIZE(ptr);
}
if (libcyrus_config_getswitch(CYRUSOPT_SKIPLIST_ALWAYS_CHECKPOINT)) {
/* refresh map, so we see the writes we've just done */
map_refresh(db->fd, 0, &db->map_base, &db->map_len, db->map_size,
db->fname, 0);
r = mycheckpoint(db, 1);
if (r || !(flags & RECOVERY_CALLER_LOCKED)) {
unlock(db);
}
return r;
}
/* fsync the recovered database */
if (!r && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: fdatasync: %m", db->fname);
r = CYRUSDB_IOERROR;
}
/* set the last recovery timestamp */
if (!r) {
db->last_recovery = time(NULL);
write_header(db);
}
/* fsync the new header */
if (!r && DO_FSYNC && (fdatasync(db->fd) < 0)) {
syslog(LOG_ERR,
"DBERROR: skiplist recovery %s: fdatasync: %m", db->fname);
r = CYRUSDB_IOERROR;
}
if (!r) {
int diff = time(NULL) - start;
syslog(LOG_NOTICE,
"skiplist: recovered %s (%d record%s, %ld bytes) in %d second%s",
db->fname, db->listsize, db->listsize == 1 ? "" : "s",
db->map_size, diff, diff == 1 ? "" : "s");
}
if (!r && need_checkpoint) {
r = mycheckpoint(db, 1);
}
if(r || !(flags & RECOVERY_CALLER_LOCKED)) {
unlock(db);
}
return r;
}
struct cyrusdb_backend cyrusdb_skiplist =
{
"skiplist", /* name */
&myinit,
&mydone,
&mysync,
&myarchive,
&myopen,
&myclose,
&fetch,
&fetchlock,
&myforeach,
&create,
&store,
&mydelete,
&mycommit,
&myabort,
&dump,
&consistent
};