Codebase list beanstalkd / upstream/1.4.6 beanstalkd.c
upstream/1.4.6

Tree @upstream/1.4.6 (Download .tar.gz)

beanstalkd.c @upstream/1.4.6raw · history · blame

/* beanstalk - fast, general-purpose work queue */

/* Copyright (C) 2007 Keith Rarick and Philotic Inc.

 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.

 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.

 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "config.h"

#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <unistd.h>
#include <pwd.h>
#include <event.h>
#include <limits.h>

#include "net.h"
#include "util.h"
#include "prot.h"
#include "binlog.h"

static char *user = NULL;
static int detach = 0;
static char *port = "11300";
static char *host_addr;

static void
nullfd(int fd, int flags)
{
    int r;

    close(fd);
    r = open("/dev/null", flags);
    if (r != fd) twarn("open(\"/dev/null\")"), exit(1);
}

static void
dfork()
{
    pid_t p;

    p = fork();
    if (p == -1) exit(1);
    if (p) exit(0);
}

static void
daemonize()
{
    int r;

    r = chdir("/");
    if (r) return twarn("chdir");

    nullfd(0, O_RDONLY);
    nullfd(1, O_WRONLY);
    nullfd(2, O_WRONLY);
    umask(0);
    dfork();
    setsid();
    dfork();
}

static void
su(const char *user) {
    int r;
    struct passwd *pwent;

    errno = 0;
    pwent = getpwnam(user);
    if (errno) twarn("getpwnam(\"%s\")", user), exit(32);
    if (!pwent) twarnx("getpwnam(\"%s\"): no such user", user), exit(33);

    r = setgid(pwent->pw_gid);
    if (r == -1) twarn("setgid(%d \"%s\")", pwent->pw_gid, user), exit(34);

    r = setuid(pwent->pw_uid);
    if (r == -1) twarn("setuid(%d \"%s\")", pwent->pw_uid, user), exit(34);
}

void
exit_cleanly(int sig)
{
    binlog_shutdown();
    exit(0);
}


static void
set_sig_handlers()
{
    int r;
    struct sigaction sa;

    sa.sa_handler = SIG_IGN;
    sa.sa_flags = 0;
    r = sigemptyset(&sa.sa_mask);
    if (r == -1) twarn("sigemptyset()"), exit(111);

    r = sigaction(SIGPIPE, &sa, 0);
    if (r == -1) twarn("sigaction(SIGPIPE)"), exit(111);

    sa.sa_handler = enter_drain_mode;
    r = sigaction(SIGUSR1, &sa, 0);
    if (r == -1) twarn("sigaction(SIGUSR1)"), exit(111);

    sa.sa_handler = exit_cleanly;
    r = sigaction(SIGINT, &sa, 0);
    if (r == -1) twarn("sigaction(SIGINT)"), exit(111);

    sa.sa_handler = exit_cleanly;
    r = sigaction(SIGTERM, &sa, 0);
    if (r == -1) twarn("sigaction(SIGTERM)"), exit(111);
}

/* This is a workaround for a mystifying workaround in libevent's epoll
 * implementation. The epoll_init() function creates an epoll fd with space to
 * handle RLIMIT_NOFILE - 1 fds, accompanied by the following puzzling comment:
 * "Solaris is somewhat retarded - it's important to drop backwards
 * compatibility when making changes. So, don't dare to put rl.rlim_cur here."
 * This is presumably to work around a bug in Solaris, but it has the
 * unfortunate side-effect of causing epoll_ctl() (and, therefore, event_add())
 * to fail for a valid fd if we have hit the limit of open fds. That makes it
 * hard to provide reasonable behavior in that situation. So, let's reduce the
 * real value of RLIMIT_NOFILE by one, after epoll_init() has run. */
static void
nudge_fd_limit()
{
    int r;
    struct rlimit rl;

    r = getrlimit(RLIMIT_NOFILE, &rl);
    if (r != 0) twarn("getrlimit(RLIMIT_NOFILE)"), exit(2);

    rl.rlim_cur--;

    r = setrlimit(RLIMIT_NOFILE, &rl);
    if (r != 0) twarn("setrlimit(RLIMIT_NOFILE)"), exit(2);
}

static void
usage(char *msg, char *arg)
{
    if (arg) warnx("%s: %s", msg, arg);
    fprintf(stderr, "Use: %s [OPTIONS]\n"
            "\n"
            "Options:\n"
            " -d       detach\n"
            " -b DIR   binlog directory (must be absolute path if used with -d)\n"
            " -f MS    fsync at most once every MS milliseconds"
                       " (use -f 0 for \"always fsync\")\n"
            " -F       never fsync (default)\n"
            " -l ADDR  listen on address (default is 0.0.0.0)\n"
            " -p PORT  listen on port (default is 11300)\n"
            " -u USER  become user and group\n"
            " -z BYTES set the maximum job size in bytes (default is %d)\n"
            " -s BYTES set the size of each binlog file (default is %d)\n"
#ifndef HAVE_POSIX_FALLOCATE
            "            (will be rounded up to a multiple of 512 bytes)\n"
#endif
            " -v       show version information\n"
            " -h       show this help\n",
            progname, JOB_DATA_SIZE_LIMIT_DEFAULT, BINLOG_SIZE_LIMIT_DEFAULT);
    exit(arg ? 5 : 0);
}

static size_t
parse_size_t(char *str)
{
    char r, x;
    size_t size;

    r = sscanf(str, "%zu%c", &size, &x);
    if (1 != r) usage("invalid size", str);
    return size;
}

static char *
require_arg(char *opt, char *arg)
{
    if (!arg) usage("option requires an argument", opt);
    return arg;
}

static void
opts(int argc, char **argv)
{
    int i;

    for (i = 1; i < argc; ++i) {
        if (argv[i][0] != '-') usage("unknown option", argv[i]);
        if (argv[i][1] == 0 || argv[i][2] != 0) usage("unknown option",argv[i]);
        switch (argv[i][1]) {
            case 'd':
                detach = 1;
                break;
            case 'p':
                port = require_arg("-p", argv[++i]);
                break;
            case 'l':
                host_addr = require_arg("-l", argv[++i]);
                break;
            case 'z':
                job_data_size_limit = parse_size_t(require_arg("-z",
                                                               argv[++i]));
                break;
            case 's':
                binlog_size_limit = parse_size_t(require_arg("-s", argv[++i]));
                break;
            case 'f':
                fsync_throttle_ms = parse_size_t(require_arg("-f", argv[++i]));
                enable_fsync = 1;
                break;
            case 'F':
                enable_fsync = 0;
                break;
            case 'u':
                user = require_arg("-u", argv[++i]);
                break;
            case 'b':
                binlog_dir = require_arg("-b", argv[++i]);
                break;
            case 'h':
                usage(NULL, NULL);
            case 'v':
                printf("beanstalkd %s\n", VERSION);
                exit(0);
            default:
                usage("unknown option", argv[i]);
        }
    }
}

int
main(int argc, char **argv)
{
    int r;
    struct event_base *ev_base;
    struct job binlog_jobs = {};

    progname = argv[0];
    opts(argc, argv);

    if (detach && binlog_dir) {
        if (binlog_dir[0] != '/') {
            warnx("The -b option requires an absolute path when used with -d.");
            usage("Path is not absolute", binlog_dir);
        }
    }

    job_init();
    prot_init();

    /* We want to make sure that only one beanstalkd tries to use the binlog
     * directory at a time. So acquire a lock now and never release it. */
    if (binlog_dir) {
        r = binlog_lock();
        if (!r) twarnx("failed to lock binlog dir %s", binlog_dir), exit(10);
    }

    r = make_server_socket(host_addr, port);
    if (r == -1) twarnx("make_server_socket()"), exit(111);

    if (user) su(user);
    ev_base = event_init();
    set_sig_handlers();
    nudge_fd_limit();

    unbrake((evh) h_accept);

    binlog_jobs.prev = binlog_jobs.next = &binlog_jobs;
    binlog_init(&binlog_jobs);
    prot_replay_binlog(&binlog_jobs);

    if (detach) {
        daemonize();
        event_reinit(ev_base);
    }

    event_dispatch();
    twarnx("event_dispatch error");
    binlog_shutdown();
    return 0;
}