/* mupdate.c -- cyrus murder database master
*
* Copyright (c) 1994-2008 Carnegie Mellon University. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The name "Carnegie Mellon University" must not be used to
* endorse or promote products derived from this software without
* prior written permission. For permission or any legal
* details, please contact
* Carnegie Mellon University
* Center for Technology Transfer and Enterprise Creation
* 4615 Forbes Avenue
* Suite 302
* Pittsburgh, PA 15213
* (412) 268-7393, fax: (412) 268-7395
* innovation@andrew.cmu.edu
*
* 4. Redistributions of any form whatsoever must retain the following
* acknowledgment:
* "This product includes software developed by Computing Services
* at Carnegie Mellon University (http://www.cmu.edu/computing/)."
*
* CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
* THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
* FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
* AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <config.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <signal.h>
#include <stdlib.h>
#include <sysexits.h>
#include <syslog.h>
#include <errno.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#if !defined(SIOCGIFCONF) && defined(HAVE_SYS_SOCKIO_H)
# include <sys/sockio.h>
#endif
#include <net/if.h>
#include <pthread.h>
#include <sasl/sasl.h>
#include <sasl/saslutil.h>
#include "mupdate.h"
#include "mupdate-client.h"
#include "telemetry.h"
#include "strarray.h"
#include "assert.h"
#include "global.h"
#include "mailbox.h"
#include "mboxlist.h"
#include "mpool.h"
#include "nonblock.h"
#include "prot.h"
#include "tls.h"
#include "tls_th-lock.h"
#include "util.h"
#include "version.h"
#include "xmalloc.h"
#include "xstrlcpy.h"
/* generated headers are not necessarily in current directory */
#include "imap/imap_err.h"
/* Sent to clients that we can't accept a connection for. */
static const char SERVER_UNABLE_STRING[] = "* BYE \"Server Unable\"\r\n";
static const int NO_NEW_CONNECTION = -1;
static int masterp = 0;
typedef enum {
DOCMD_OK = 0,
DOCMD_CONN_FINISHED = 1
} mupdate_docmd_result_t;
enum {
poll_interval = 1,
update_wait = 5
};
struct pending {
struct pending *next;
char mailbox[MAX_MAILBOX_BUFFER];
};
struct conn {
int fd;
int logfd;
struct protstream *pin;
struct protstream *pout;
sasl_conn_t *saslconn;
char *userid;
#ifdef HAVE_SSL
SSL *tlsconn;
#else
void *tlsconn;
#endif
void *tls_comp; /* TLS compression method, if any */
int compress_done; /* have we done a successful compress? */
int idle;
char clienthost[NI_MAXHOST*2+1];
struct saslprops_t saslprops;
/* UPDATE command handling */
const char *streaming; /* tag */
strarray_t *streaming_hosts; /* partial updates */
/* pending changes to send, in reverse order */
pthread_mutex_t m;
struct pending *plist;
struct pending *ptail;
struct conn *updatelist_next;
struct prot_waitevent *ev; /* invoked every 'update_wait' seconds
to send out updates */
/* Prefix for list commands */
const char *list_prefix;
size_t list_prefix_len;
/* For parsing */
struct buf tag, cmd, arg1, arg2, arg3;
/* For connection list management */
struct conn *next;
struct conn *next_idle;
};
static int ready_for_connections = 0;
static pthread_cond_t ready_for_connections_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t ready_for_connections_mutex = PTHREAD_MUTEX_INITIALIZER;
static int synced = 0;
static pthread_cond_t synced_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t synced_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t listener_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t listener_cond = PTHREAD_COND_INITIALIZER;
static int listener_lock = 0;
/* if you want to lock both listener and either of these two, you
* must lock listener first. You must have both listener_mutex and
* idle_connlist_mutex locked to remove anything from the idle_connlist */
static pthread_mutex_t idle_connlist_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct conn *idle_connlist = NULL; /* protected by listener_mutex */
static pthread_mutex_t connection_count_mutex = PTHREAD_MUTEX_INITIALIZER;
static int connection_count = 0;
static pthread_mutex_t idle_worker_mutex = PTHREAD_MUTEX_INITIALIZER;
static int idle_worker_count = 0;
static pthread_mutex_t worker_count_mutex = PTHREAD_MUTEX_INITIALIZER;
static int worker_count = 0;
static pthread_mutex_t connlist_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct conn *connlist = NULL;
static pthread_mutex_t clienthost_mutex = PTHREAD_MUTEX_INITIALIZER;
/* ---- connection signaling pipe */
static int conn_pipe[2];
/* ---- database access ---- */
static pthread_mutex_t mailboxes_mutex = PTHREAD_MUTEX_INITIALIZER;
static struct conn *updatelist = NULL;
/* --- prototypes --- */
static void conn_free(struct conn *C);
static mupdate_docmd_result_t docmd(struct conn *c);
static void cmd_authenticate(struct conn *C,
const char *tag, const char *mech,
const char *clientstart);
static void cmd_set(struct conn *C,
const char *tag, const char *mailbox,
const char *location, const char *acl, enum settype t);
static void cmd_find(struct conn *C, const char *tag, const char *mailbox,
int send_ok, int send_delete);
static void cmd_list(struct conn *C, const char *tag, const char *host_prefix);
static void cmd_startupdate(struct conn *C, const char *tag,
strarray_t *partial);
static void cmd_starttls(struct conn *C, const char *tag);
#ifdef HAVE_ZLIB
static void cmd_compress(struct conn *C, const char *tag, const char *alg);
#endif
void shut_down(int code);
static int reset_saslconn(struct conn *c);
static void database_init(void);
static void sendupdates(struct conn *C, int flushnow);
extern int saslserver(sasl_conn_t *conn, const char *mech,
const char *init_resp, const char *resp_prefix,
const char *continuation, const char *empty_chal,
struct protstream *pin, struct protstream *pout,
int *sasl_result, char **success_data);
/* --- prototypes in mupdate-slave.c */
void *mupdate_client_start(void *rock);
void *mupdate_placebo_kick_start(void *rock);
/* --- main() for each thread */
static void *thread_main(void *rock);
/* --- for config.c */
const int config_need_data = 0;
static struct conn *conn_new(int fd)
{
struct conn *C = xzmalloc(sizeof(struct conn));
const char *clienthost, *localip, *remoteip;
int r;
C->fd = fd;
C->logfd = -1;
C->pin = prot_new(C->fd, 0);
C->pout = prot_new(C->fd, 1);
prot_setflushonread(C->pin, C->pout);
prot_settimeout(C->pin, 180*60);
C->pin->userdata = C->pout->userdata = C;
pthread_mutex_lock(&connlist_mutex); /* LOCK */
C->next = connlist;
connlist = C;
pthread_mutex_unlock(&connlist_mutex); /* UNLOCK */
pthread_mutex_lock(&connection_count_mutex); /* LOCK */
connection_count++;
pthread_mutex_unlock(&connection_count_mutex); /* UNLOCK */
/* Find out name of client host
*
* MUST do this inside a mutex because the values returned
* from get_clienthost are all static to that function.
*/
pthread_mutex_lock(&clienthost_mutex); /* LOCK */
clienthost = get_clienthost(C->fd, &localip, &remoteip);
strlcpy(C->clienthost, clienthost, sizeof(C->clienthost));
if (localip && remoteip) {
buf_setcstr(&C->saslprops.ipremoteport, remoteip);
buf_setcstr(&C->saslprops.iplocalport, localip);
}
pthread_mutex_unlock(&clienthost_mutex); /* UNLOCK */
/* create sasl connection */
r = sasl_server_new("mupdate",
config_servername, NULL,
buf_cstringnull_ifempty(&C->saslprops.iplocalport),
buf_cstringnull_ifempty(&C->saslprops.ipremoteport),
NULL, 0,
&C->saslconn);
if (r != SASL_OK) {
syslog(LOG_ERR, "failed to start sasl for connection: %s",
sasl_errstring(r, NULL, NULL));
prot_printf(C->pout, SERVER_UNABLE_STRING);
C->idle = 0;
conn_free(C);
return NULL;
}
/* set my allowable security properties */
sasl_setprop(C->saslconn, SASL_SEC_PROPS, mysasl_secprops(SASL_SEC_NOANONYMOUS));
return C;
}
static void conn_free(struct conn *C)
{
assert(!C->idle); /* Not allowed to free idle connections */
if (C->streaming) { /* remove from updatelist */
struct conn *upc;
pthread_mutex_lock(&mailboxes_mutex);
if (C == updatelist) {
/* first thing in updatelist */
updatelist = C->updatelist_next;
} else {
/* find in update list */
for (upc = updatelist; upc->updatelist_next != NULL;
upc = upc->updatelist_next) {
if (upc->updatelist_next == C) break;
}
/* must find it ! */
assert(upc->updatelist_next == C);
upc->updatelist_next = C->updatelist_next;
}
pthread_mutex_unlock(&mailboxes_mutex);
}
/* decrease connection counter */
pthread_mutex_lock(&connection_count_mutex);
connection_count--;
pthread_mutex_unlock(&connection_count_mutex);
/* remove from connlist */
pthread_mutex_lock(&connlist_mutex); /* LOCK */
if (C == connlist) {
connlist = connlist->next;
} else {
struct conn *t;
for (t = connlist; t->next != NULL; t = t->next) {
if (t->next == C) break;
}
assert(t != NULL);
t->next = C->next;
}
pthread_mutex_unlock(&connlist_mutex); /* UNLOCK */
if (C->ev) prot_removewaitevent(C->pin, C->ev);
prot_flush(C->pout);
if (C->pin) prot_free(C->pin);
if (C->pout) prot_free(C->pout);
#ifdef HAVE_SSL
if (C->tlsconn) tls_reset_servertls(&C->tlsconn);
tls_shutdown_serverengine();
#endif
cyrus_close_sock(C->fd);
if (C->logfd != -1) close(C->logfd);
if (C->saslconn) sasl_dispose(&C->saslconn);
saslprops_free(&C->saslprops);
/* free struct bufs */
buf_free(&(C->tag));
buf_free(&(C->cmd));
buf_free(&(C->arg1));
buf_free(&(C->arg2));
buf_free(&(C->arg3));
if (C->streaming_hosts) strarray_free(C->streaming_hosts);
free(C);
}
/*
* The auth_*.c backends called by mysasl_proxy_policy()
* use static variables which we need to protect with a mutex.
*/
static pthread_mutex_t proxy_policy_mutex = PTHREAD_MUTEX_INITIALIZER;
static int mupdate_proxy_policy(sasl_conn_t *conn,
void *context,
const char *requested_user, unsigned rlen,
const char *auth_identity, unsigned alen,
const char *def_realm,
unsigned urlen,
struct propctx *propctx)
{
int r;
pthread_mutex_lock(&proxy_policy_mutex); /* LOCK */
r = mysasl_proxy_policy(conn, context, requested_user, rlen,
auth_identity, alen, def_realm, urlen, propctx);
pthread_mutex_unlock(&proxy_policy_mutex); /* UNLOCK */
return r;
}
static struct sasl_callback mysasl_cb[] = {
{ SASL_CB_GETOPT, (mysasl_cb_ft *) &mysasl_config, NULL },
{ SASL_CB_PROXY_POLICY, (mysasl_cb_ft *) &mupdate_proxy_policy, NULL },
{ SASL_CB_LIST_END, NULL, NULL }
};
/*
* Is the IP address of the given hostname local?
* Returns 1 if local, 0 otherwise.
*/
static int islocalip(const char *hostname)
{
struct hostent *hp;
struct in_addr *haddr, *iaddr;
struct ifconf ifc;
struct ifreq *ifr;
char buf[8192]; /* XXX this limits us to 256 interfaces */
int sock, islocal = 0;
if ((hp = gethostbyname(hostname)) == NULL) {
fprintf(stderr, "unknown host: %s\n", hostname);
return 0;
}
haddr = (struct in_addr *) hp->h_addr;
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
fprintf(stderr, "socket() failed\n");
return 0;
}
ifc.ifc_buf = buf;
ifc.ifc_len = sizeof(buf);
if (ioctl(sock, SIOCGIFCONF, &ifc) != 0) {
fprintf(stderr, "ioctl(SIOCGIFCONF) failed: %d\n", errno);
close(sock);
return 0;
}
for (ifr = ifc.ifc_req; ifr - ifc.ifc_req < ifc.ifc_len; ifr++) {
if (ioctl(sock, SIOCGIFADDR, ifr) != 0) continue;
if (ioctl(sock, SIOCGIFFLAGS, ifr) != 0) continue;
/* skip any inactive or loopback interfaces */
if (!(ifr->ifr_flags & IFF_UP) || (ifr->ifr_flags & IFF_LOOPBACK))
continue;
iaddr = &(((struct sockaddr_in *) &ifr->ifr_addr)->sin_addr);
/* compare the host address to the interface address */
if (!memcmp(haddr, iaddr, sizeof(struct in_addr))) {
islocal = 1;
break;
}
}
close(sock);
return islocal;
}
/*
* run once when process is forked;
* MUST NOT exit directly; must return with non-zero error code
*/
int service_init(int argc, char **argv,
char **envp __attribute__((unused)))
{
int i, r, workers_to_start;
int opt, autoselect = 0;
pthread_t t;
if (geteuid() == 0) fatal("must run as the Cyrus user", EX_USAGE);
/* Do minor configuration checking */
workers_to_start = config_getint(IMAPOPT_MUPDATE_WORKERS_START);
if (config_getint(IMAPOPT_MUPDATE_WORKERS_MAX) < config_getint(IMAPOPT_MUPDATE_WORKERS_MINSPARE)) {
syslog(LOG_CRIT, "Maximum total worker threads is less than minimum spare worker threads");
return EX_SOFTWARE;
}
if (workers_to_start < config_getint(IMAPOPT_MUPDATE_WORKERS_MINSPARE)) {
syslog(LOG_CRIT, "Starting worker threads is less than minimum spare worker threads");
return EX_SOFTWARE;
}
if (config_getint(IMAPOPT_MUPDATE_WORKERS_MAXSPARE) < workers_to_start) {
syslog(LOG_CRIT, "Maximum spare worker threads is less than starting worker threads");
return EX_SOFTWARE;
}
if (config_getint(IMAPOPT_MUPDATE_WORKERS_MINSPARE) > workers_to_start) {
syslog(LOG_CRIT, "Minimum spare worker threads is greater than starting worker threads");
return EX_SOFTWARE;
}
if (config_getint(IMAPOPT_MUPDATE_WORKERS_MAX) < workers_to_start) {
syslog(LOG_CRIT, "Maximum total worker threads is less than starting worker threads");
return EX_SOFTWARE;
}
/* set signal handlers */
signals_set_shutdown(&shut_down);
signal(SIGPIPE, SIG_IGN);
global_sasl_init(1, 1, mysasl_cb);
/* see if we're the master or a slave */
while ((opt = getopt(argc, argv, "ma")) != EOF) {
switch (opt) {
case 'm':
masterp = 1;
break;
case 'a':
autoselect = 1;
break;
default:
break;
}
}
if (!masterp && autoselect) masterp = islocalip(config_mupdate_server);
if (masterp &&
config_mupdate_config == IMAP_ENUM_MUPDATE_CONFIG_UNIFIED) {
/* XXX We currently prohibit this because mailboxes created
* on the master will cause local mailbox entries to be propagated
* to the slave. We can probably fix this by prepending
* config_servername onto the entries before updating the slaves.
*/
fatal("cannot run mupdate master on a unified server", EX_USAGE);
}
if (pipe(conn_pipe) == -1) {
syslog(LOG_ERR, "could not setup connection signaling pipe %m");
return EX_OSERR;
}
database_init();
#ifdef HAVE_SSL
#if OPENSSL_VERSION_NUMBER < 0x10100000L
CRYPTO_thread_setup();
#endif
#endif
if (!masterp) {
r = pthread_create(&t, NULL, &mupdate_client_start, NULL);
if (r == 0) {
pthread_detach(t);
} else {
syslog(LOG_ERR, "could not start client thread");
return EX_SOFTWARE;
}
/* Wait until they sync the database */
pthread_mutex_lock(&synced_mutex);
if (!synced)
pthread_cond_wait(&synced_cond, &synced_mutex);
pthread_mutex_unlock(&synced_mutex);
} else {
pthread_t t;
r = pthread_create(&t, NULL, &mupdate_placebo_kick_start, NULL);
if (r == 0) {
pthread_detach(t);
} else {
syslog(LOG_ERR, "could not start placebo kick thread");
return EX_SOFTWARE;
}
mupdate_ready();
}
/* Now create the worker thread pool */
for(i=0; i < workers_to_start; i++) {
r = pthread_create(&t, NULL, &thread_main, NULL);
if (r == 0) {
pthread_detach(t);
} else {
syslog(LOG_ERR, "could not start client thread");
return EX_SOFTWARE;
}
}
return 0;
}
/* Called by service API to shut down the service */
void service_abort(int error)
{
#ifdef HAVE_SSL
#if OPENSSL_VERSION_NUMBER < 0x10100000L
CRYPTO_thread_cleanup();
#endif
#endif
shut_down(error);
}
EXPORTED void fatal(const char *s, int code)
{
static int recurse_code = 0;
if (recurse_code) exit(code);
else recurse_code = code;
syslog(LOG_ERR, "%s", s);
shut_down(code);
/* NOTREACHED */
exit(code); /* shut up GCC */
}
#define CHECKNEWLINE(c, ch) do { if ((ch) == '\r') (ch)=prot_getc((c)->pin); \
if ((ch) != '\n') goto extraargs; } while (0)
static mupdate_docmd_result_t docmd(struct conn *c)
{
mupdate_docmd_result_t ret = DOCMD_OK;
int ch;
int was_blocking = prot_IS_BLOCKING(c->pin);
char *p;
/* We know we have input, so skip the check below.
* Note that we MUST skip this nonblocking check in order to properly
* catch connections that have timed out.
*/
goto cmd;
nextcmd:
/* First we do a check for input */
prot_NONBLOCK(c->pin);
ch = prot_getc(c->pin);
if (ch == EOF && errno == EAGAIN) {
/* no input from client */
goto done;
} else if (ch == EOF) {
goto lost_conn;
} else {
/* there's input waiting, put back our character */
prot_ungetc(ch, c->pin);
}
/* Set it back to blocking so we don't get half a word */
prot_BLOCK(c->pin);
cmd:
ch = getword(c->pin, &(c->tag));
if (ch == EOF) goto lost_conn;
if (ch != ' ') {
prot_printf(c->pout, "* BAD \"Need command\"\r\n");
eatline(c->pin, ch);
goto nextcmd;
}
/* parse command name */
ch = getword(c->pin, &(c->cmd));
if (ch == EOF) {
goto lost_conn;
} else if (!c->cmd.s[0]) {
prot_printf(c->pout, "%s BAD \"Null command\"\r\n", c->tag.s);
eatline(c->pin, ch);
goto nextcmd;
}
if (Uislower(c->cmd.s[0])) {
c->cmd.s[0] = toupper((unsigned char) c->cmd.s[0]);
}
for (p = &(c->cmd.s[1]); *p; p++) {
if (Uisupper(*p)) *p = tolower((unsigned char) *p);
}
switch (c->cmd.s[0]) {
case 'A':
if (!strcmp(c->cmd.s, "Authenticate")) {
int opt = 0;
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg1));
if (ch == ' ') {
ch = getstring(c->pin, c->pout, &(c->arg2));
opt = 1;
}
CHECKNEWLINE(c, ch);
if (c->userid) {
prot_printf(c->pout,
"%s BAD \"already authenticated\"\r\n",
c->tag.s);
goto nextcmd;
}
cmd_authenticate(c, c->tag.s, c->arg1.s,
opt ? c->arg2.s : NULL);
}
else if (!c->userid) goto nologin;
else if (!strcmp(c->cmd.s, "Activate")) {
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg1));
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg2));
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg3));
CHECKNEWLINE(c, ch);
if (c->streaming) goto notwhenstreaming;
if (!masterp) goto masteronly;
cmd_set(c, c->tag.s, c->arg1.s, c->arg2.s,
c->arg3.s, SET_ACTIVE);
}
else goto badcmd;
break;
#ifdef HAVE_ZLIB
case 'C':
if (!strcmp(c->cmd.s, "Compress")) {
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg1));
CHECKNEWLINE(c, ch);
cmd_compress(c, c->tag.s, c->arg1.s);
}
else goto badcmd;
break;
#endif
case 'D':
if (!c->userid) goto nologin;
else if (!strcmp(c->cmd.s, "Deactivate")) {
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg1));
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg2));
CHECKNEWLINE(c, ch);
if (c->streaming) goto notwhenstreaming;
if (!masterp) goto masteronly;
cmd_set(c, c->tag.s, c->arg1.s, c->arg2.s,
NULL, SET_DEACTIVATE);
}
else if (!strcmp(c->cmd.s, "Delete")) {
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg1));
CHECKNEWLINE(c, ch);
if (c->streaming) goto notwhenstreaming;
if (!masterp) goto masteronly;
cmd_set(c, c->tag.s, c->arg1.s, NULL, NULL, SET_DELETE);
}
else goto badcmd;
break;
case 'F':
if (!c->userid) goto nologin;
else if (!strcmp(c->cmd.s, "Find")) {
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg1));
CHECKNEWLINE(c, ch);
if (c->streaming) goto notwhenstreaming;
cmd_find(c, c->tag.s, c->arg1.s, 1, 0);
}
else goto badcmd;
break;
case 'L':
if (!strcmp(c->cmd.s, "Logout")) {
CHECKNEWLINE(c, ch);
prot_printf(c->pout, "%s OK \"bye-bye\"\r\n", c->tag.s);
ret = DOCMD_CONN_FINISHED;
goto done;
}
else if (!c->userid) goto nologin;
else if (!strcmp(c->cmd.s, "List")) {
int opt = 0;
if (ch == ' ') {
/* Optional partition/host prefix parameter */
ch = getstring(c->pin, c->pout, &(c->arg1));
opt = 1;
}
CHECKNEWLINE(c, ch);
if (c->streaming) goto notwhenstreaming;
cmd_list(c, c->tag.s, opt ? c->arg1.s : NULL);
prot_printf(c->pout, "%s OK \"list complete\"\r\n", c->tag.s);
}
else goto badcmd;
break;
case 'N':
if (!c->userid) goto nologin;
else if (!strcmp(c->cmd.s, "Noop")) {
CHECKNEWLINE(c, ch);
if (c->streaming) {
/* Make *very* sure we are up-to-date */
kick_mupdate();
sendupdates(c, 0); /* don't flush pout though */
}
prot_printf(c->pout, "%s OK \"Noop done\"\r\n", c->tag.s);
}
else goto badcmd;
break;
case 'R':
if (!c->userid) goto nologin;
else if (!strcmp(c->cmd.s, "Reserve")) {
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg1));
if (ch != ' ') goto missingargs;
ch = getstring(c->pin, c->pout, &(c->arg2));
CHECKNEWLINE(c, ch);
if (c->streaming) goto notwhenstreaming;
if (!masterp) goto masteronly;
cmd_set(c, c->tag.s, c->arg1.s, c->arg2.s, NULL, SET_RESERVE);
}
else goto badcmd;
break;
case 'S':
if (!strcmp(c->cmd.s, "Starttls")) {
CHECKNEWLINE(c, ch);
/* XXX discard any input pipelined after STARTTLS */
prot_flush(c->pin);
if (!tls_enabled()) {
/* we don't support starttls */
goto badcmd;
}
/* if we've already done SASL fail */
if (c->userid) {
prot_printf(c->pout,
"%s BAD Can't Starttls after authentication\r\n",
c->tag.s);
goto nextcmd;
}
/* if we've already done COMPRESS fail */
if (c->compress_done) {
prot_printf(c->pout,
"%s BAD Can't Starttls after Compress\r\n",
c->tag.s);
goto nextcmd;
}
/* check if already did a successful tls */
if (c->tlsconn) {
prot_printf(c->pout,
"%s BAD Already did a successful Starttls\r\n",
c->tag.s);
goto nextcmd;
}
cmd_starttls(c, c->tag.s);
}
else goto badcmd;
break;
case 'U':
if (!c->userid) goto nologin;
else if (!strcmp(c->cmd.s, "Update")) {
strarray_t *arg = NULL;
int counter = 30; /* limit on number of processed hosts */
while(ch == ' ') {
/* Hey, look, more bits of a PARTIAL-UPDATE command */
ch = getstring(c->pin, c->pout, &(c->arg1));
if (c->arg1.s[0] == '\0') {
strarray_free(arg);
goto badargs;
}
if (counter-- == 0) {
strarray_free(arg);
goto extraargs;
}
if (!arg) arg = strarray_new();
strarray_append(arg, c->arg1.s);
}
CHECKNEWLINE(c, ch);
if (c->streaming) goto notwhenstreaming;
cmd_startupdate(c, c->tag.s, arg);
}
else goto badcmd;
break;
default:
badcmd:
prot_printf(c->pout, "%s BAD \"Unrecognized command\"\r\n",
c->tag.s);
eatline(c->pin, ch);
break;
extraargs:
prot_printf(c->pout, "%s BAD \"Extra arguments\"\r\n",
c->tag.s);
eatline(c->pin, ch);
break;
badargs:
prot_printf(c->pout, "%s BAD \"Badly formed arguments\"\r\n",
c->tag.s);
eatline(c->pin, ch);
break;
missingargs:
prot_printf(c->pout, "%s BAD \"Missing arguments\"\r\n",
c->tag.s);
eatline(c->pin, ch);
break;
notwhenstreaming:
prot_printf(c->pout, "%s BAD \"not legal when streaming\"\r\n",
c->tag.s);
break;
masteronly:
prot_printf(c->pout,
"%s BAD \"read-only session\"\r\n",
c->tag.s);
break;
nologin:
prot_printf(c->pout, "%s BAD Please login first\r\n", c->tag.s);
eatline(c->pin, ch);
break;
}
/* Check for more input */
goto nextcmd;
lost_conn:
{
const char *err;
if ((err = prot_error(c->pin)) != NULL
&& strcmp(err, PROT_EOF_STRING)) {
syslog(LOG_WARNING, "%s, closing connection", err);
prot_printf(c->pout, "* BYE \"%s\"\r\n", err);
}
ret = DOCMD_CONN_FINISHED;
}
done:
/* Restore the state of the input stream */
if (was_blocking)
prot_BLOCK(c->pin);
else
prot_NONBLOCK(c->pin);
/* Necessary since we don't ever do a prot_read on an idle connection
* in mupdate */
prot_flush(c->pout);
return ret;
}
/*
* run for each accepted connection
*/
int service_main_fd(int fd,
int argc __attribute__((unused)),
char **argv __attribute__((unused)),
char **envp __attribute__((unused)))
{
int flag;
int r;
/* First check that we can handle the new connection. */
pthread_mutex_lock(&connection_count_mutex); /* LOCK */
flag =
(connection_count >= config_getint(IMAPOPT_MUPDATE_CONNECTIONS_MAX));
pthread_mutex_unlock(&connection_count_mutex); /* UNLOCK */
if (flag) {
/* Do the nonblocking write, if it fails, too bad for them. */
nonblock(fd, 1);
r = write(fd,SERVER_UNABLE_STRING,sizeof(SERVER_UNABLE_STRING));
close(fd);
syslog(LOG_ERR,
"Server too busy, dropping connection.");
if (r) return 0; /* filthy hack to avoid warning on 'r' */
} else if (write(conn_pipe[1], &fd, sizeof(fd)) == -1) {
/* signal that a new file descriptor is available.
* If it fails... */
syslog(LOG_CRIT,
"write to conn_pipe to signal new connection failed: %m");
return EX_TEMPFAIL;
}
return 0;
}
/*
* Issue the capability banner
*/
static void dobanner(struct conn *c)
{
char slavebuf[4096];
const char *mechs;
int mechcount;
int ret;
/* send initial the banner + flush pout */
ret = sasl_listmech(c->saslconn, NULL,
"* AUTH \"", "\" \"", "\"",
&mechs, NULL, &mechcount);
/* Add mupdate:// tag if necessary */
if (!masterp) {
if (!config_mupdate_server)
fatal("mupdate server was not specified for slave",
EX_TEMPFAIL);
snprintf(slavebuf, sizeof(slavebuf), "mupdate://%s",
config_mupdate_server);
}
prot_printf(c->pout, "%s\r\n",
(ret == SASL_OK && mechcount > 0) ? mechs : "* AUTH");
if (tls_enabled() && !c->tlsconn) {
prot_printf(c->pout, "* STARTTLS\r\n");
}
#ifdef HAVE_ZLIB
if (!c->compress_done && !c->tls_comp) {
prot_printf(c->pout, "* COMPRESS \"DEFLATE\"\r\n");
}
#endif
prot_printf(c->pout, "* PARTIAL-UPDATE\r\n");
prot_printf(c->pout,
"* OK MUPDATE \"%s\" \"Cyrus IMAP\" \"%s\" \"%s\"\r\n",
config_servername,
CYRUS_VERSION, masterp ? "(master)" : slavebuf);
prot_flush(c->pout);
}
/*
* The main thread loop
*/
/* Note that You Must Lock Listen mutex before idle worker mutex,
* though you can lock them individually too */
static void *thread_main(void *rock __attribute__((unused)))
{
struct conn *C; /* used for loops */
struct conn *currConn = NULL; /* the connection we care about currently */
struct protgroup *protin = protgroup_new(PROTGROUP_SIZE_DEFAULT);
struct protgroup *protout = NULL;
struct timeval now;
struct timespec timeout;
int need_workers, too_many;
int max_worker_flag;
int do_a_command;
int send_a_banner;
int connflag;
int new_fd;
int ret = 0;
struct conn *ni;
/* Lock Worker Count Mutex */
pthread_mutex_lock(&worker_count_mutex); /* LOCK */
/* Change total number of workers */
worker_count++;
syslog(LOG_DEBUG,
"New worker thread started, for a total of %d", worker_count);
/* Unlock Worker Count Mutex */
pthread_mutex_unlock(&worker_count_mutex); /* UNLOCK */
/* This is a big infinite loop */
while (1) {
send_a_banner = do_a_command = 0;
pthread_mutex_lock(&idle_worker_mutex);
/* If we are over the limit on idle threads, die. */
max_worker_flag = (idle_worker_count >=
config_getint(IMAPOPT_MUPDATE_WORKERS_MAXSPARE));
/* Increment Idle Workers */
if (!max_worker_flag) idle_worker_count++;
pthread_mutex_unlock(&idle_worker_mutex);
if (max_worker_flag) goto worker_thread_done;
retry_lock:
/* Lock Listen Mutex - If locking takes more than 60 seconds,
* kill off this thread. Ideally this is a FILO queue */
pthread_mutex_lock(&listener_mutex); /* LOCK */
ret = 0;
while (listener_lock && ret != ETIMEDOUT) {
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + 60;
timeout.tv_nsec = now.tv_usec * 1000;
ret = pthread_cond_timedwait(&listener_cond,
&listener_mutex,
&timeout);
}
if (!ret) {
/* Set listener lock until we decide what to do */
listener_lock = 1;
}
pthread_mutex_unlock(&listener_mutex); /* UNLOCK */
if (ret == ETIMEDOUT) {
pthread_mutex_lock(&idle_worker_mutex); /* LOCK */
if (idle_worker_count <= config_getint(IMAPOPT_MUPDATE_WORKERS_MINSPARE)) {
pthread_mutex_unlock(&idle_worker_mutex); /* UNLOCK */
/* below number of spare workers, try to get the lock again */
goto retry_lock;
} else {
/* Decrement Idle Worker Count */
idle_worker_count--;
pthread_mutex_unlock(&idle_worker_mutex); /* UNLOCK */
syslog(LOG_DEBUG,
"Thread timed out waiting for listener_lock");
goto worker_thread_done;
}
}
signals_poll();
/* Check if we are ready for connections, if not, wait */
pthread_mutex_lock(&ready_for_connections_mutex); /* LOCK */
/* are we ready to take connections? */
while (!ready_for_connections) {
pthread_cond_wait(&ready_for_connections_cond,
&ready_for_connections_mutex);
}
pthread_mutex_unlock(&ready_for_connections_mutex); /* UNLOCK */
connflag = 0;
/* Reset protin to all zeros (to preserve memory allocation) */
protgroup_reset(protin);
/* Clear protout if needed */
protgroup_free(protout);
protout = NULL;
/* Build list of idle protstreams */
pthread_mutex_lock(&idle_connlist_mutex); /* LOCK */
for (C=idle_connlist; C; C=C->next_idle) {
assert(C->idle);
protgroup_insert(protin, C->pin);
}
pthread_mutex_unlock(&idle_connlist_mutex); /* UNLOCK */
/* Select on Idle Conns + conn_pipe */
if (prot_select(protin, conn_pipe[0],
&protout, &connflag, NULL) == -1) {
syslog(LOG_ERR, "prot_select() failed in thread_main: %m");
fatal("prot_select() failed in thread_main", EX_TEMPFAIL);
}
/* we've got work to do */
pthread_mutex_lock(&idle_worker_mutex); /* LOCK */
idle_worker_count--;
pthread_mutex_unlock(&idle_worker_mutex); /* UNLOCK */
/* If we've been signaled to be unready, drop all current connections
* in the idle list */
pthread_mutex_lock(&ready_for_connections_mutex); /* LOCK */
if (!ready_for_connections) {
pthread_mutex_unlock(&ready_for_connections_mutex); /* UNLOCK */
/* Free all connections on idle_connlist. Note that
* any connection not currently on the idle_connlist will
* instead be freed when they drop out of their docmd() below */
pthread_mutex_lock(&idle_connlist_mutex); /* LOCK */
for (C=idle_connlist; C; C = ni) {
ni = C->next_idle;
prot_printf(C->pout,
"* BYE \"no longer ready for connections\"\r\n");
C->idle = 0;
conn_free(C);
}
idle_connlist = NULL;
pthread_mutex_unlock(&idle_connlist_mutex); /* UNLOCK */
goto nextlistener;
}
pthread_mutex_unlock(&ready_for_connections_mutex); /* UNLOCK */
if (connflag) {
/* read the fd from the pipe, if needed */
if (read(conn_pipe[0], &new_fd, sizeof(new_fd)) == -1) {
syslog(LOG_CRIT,
"read from conn_pipe for new connection failed: %m");
fatal("conn_pipe read failed", EX_TEMPFAIL);
}
} else {
new_fd = NO_NEW_CONNECTION;
}
if (new_fd != NO_NEW_CONNECTION) {
/* new_fd indicates a new connection */
currConn = conn_new(new_fd);
if (currConn)
send_a_banner = 1;
} else if (protout) {
/* Handle existing connection, we'll need to pull it off
* the idle_connlist */
struct protstream *ptmp;
struct conn **prev;
pthread_mutex_lock(&idle_connlist_mutex); /* LOCK */
/* Grab the first connection out of the ready set, and use it */
ptmp = protgroup_getelement(protout, 0);
assert(ptmp);
currConn = ptmp->userdata;
assert(currConn);
assert(currConn->idle);
currConn->idle = 0;
for (C=idle_connlist, prev = &(idle_connlist); C;
prev = &(C->next_idle), C=C->next_idle) {
if (C == currConn) {
*prev = C->next_idle;
C->next_idle = NULL;
break;
}
}
pthread_mutex_unlock(&idle_connlist_mutex); /* UNLOCK */
do_a_command = 1;
}
/*
* If this worker will do any real work, we'll want to make sure
* there are sufficient additional workers while we're busy.
*/
if (send_a_banner || do_a_command) {
pthread_mutex_lock(&idle_worker_mutex); /* LOCK */
need_workers = config_getint(IMAPOPT_MUPDATE_WORKERS_MINSPARE)
- idle_worker_count;
pthread_mutex_unlock(&idle_worker_mutex); /* UNLOCK */
pthread_mutex_lock(&worker_count_mutex); /* LOCK */
if (need_workers > 0) {
too_many = (need_workers + worker_count) -
config_getint(IMAPOPT_MUPDATE_WORKERS_MAX);
if (too_many > 0) need_workers -= too_many;
}
pthread_mutex_unlock(&worker_count_mutex); /* UNLOCK */
/* Do we need a new worker (or two, or three...)?
* (are we allowed to create one?) */
while (need_workers > 0) {
pthread_t t;
int r = pthread_create(&t, NULL, &thread_main, NULL);
if (r == 0) {
pthread_detach(t);
} else {
syslog(LOG_ERR,
"could not start a new worker thread (not fatal)");
}
/* Even if we fail to create the new thread, keep going */
need_workers--;
}
}
nextlistener:
/* Let another listener in */
pthread_mutex_lock(&listener_mutex);
assert(listener_lock);
listener_lock = 0;
pthread_cond_signal(&listener_cond);
pthread_mutex_unlock(&listener_mutex);
/* Do work in this thread, if needed */
if (send_a_banner) {
dobanner(currConn);
} else if (do_a_command) {
assert(currConn);
if (docmd(currConn) == DOCMD_CONN_FINISHED) {
conn_free(currConn);
/* continue to top of loop here since we won't be adding
* this back to the idle list */
continue;
}
/* Are we allowed to continue serving data? */
pthread_mutex_lock(&ready_for_connections_mutex); /* LOCK */
if (!ready_for_connections) {
pthread_mutex_unlock(&ready_for_connections_mutex); /* UNLOCK */
prot_printf(C->pout,
"* BYE \"no longer ready for connections\"\r\n");
conn_free(currConn);
/* continue to top of loop here since we won't be adding
* this back to the idle list */
continue;
}
pthread_mutex_unlock(&ready_for_connections_mutex); /* UNLOCK */
} /* done handling command */
if (send_a_banner || do_a_command) {
/* We did work in this thread, so we need to [re-]add the
* connection to the idle list and signal the current listener */
pthread_mutex_lock(&idle_connlist_mutex); /* LOCK */
currConn->idle = 1;
currConn->next_idle = idle_connlist;
idle_connlist = currConn;
pthread_mutex_unlock(&idle_connlist_mutex); /* UNLOCK */
/* Signal to our caller that we should add something
* to select() on, since this connection is ready again */
if (write(conn_pipe[1], &NO_NEW_CONNECTION,
sizeof(NO_NEW_CONNECTION)) == -1) {
fatal("write to conn_pipe to signal docmd done failed",
EX_TEMPFAIL);
}
}
} /* while(1) */
worker_thread_done:
/* Remove this worker from the pool */
/* Note that workers exiting the loop above should NOT be counted
* in the idle_worker_count */
pthread_mutex_lock(&worker_count_mutex); /* LOCK */
worker_count--;
pthread_mutex_lock(&idle_worker_mutex); /* LOCK */
syslog(LOG_DEBUG,
"Worker thread finished, for a total of %d (%d spare)",
worker_count, idle_worker_count);
pthread_mutex_unlock(&idle_worker_mutex); /* UNLOCK */
pthread_mutex_unlock(&worker_count_mutex); /* UNLOCK */
protgroup_free(protin);
protgroup_free(protout);
return NULL;
}
/* read from disk database must be unlocked. */
static void database_init(void)
{
pthread_mutex_lock(&mailboxes_mutex); /* LOCK */
pthread_mutex_unlock(&mailboxes_mutex); /* UNLOCK */
}
/* log change to database. database must be locked. */
static void database_log(const struct mbent *mb, struct txn **mytid)
{
char *c;
mbentry_t *mbentry = NULL;
mbentry = mboxlist_entry_create();
mbentry->name = xstrdupnull(mb->mailbox);
mbentry->server = xstrdupnull(mb->location);
c = strchr(mbentry->server, '!');
if (c) {
*c++ = '\0';
mbentry->partition = xstrdupnull(c);
}
mbentry->acl = xstrdupnull(mb->acl);
switch (mb->t) {
case SET_ACTIVE:
mbentry->mbtype = 0;
mboxlist_insertremote(mbentry, mytid);
break;
case SET_RESERVE:
mbentry->mbtype = MBTYPE_RESERVE;
mboxlist_insertremote(mbentry, mytid);
break;
case SET_DELETE:
mboxlist_deleteremote(mb->mailbox, mytid);
break;
case SET_DEACTIVATE:
/* SET_DEACTIVATE is not a real value that an actual
mailbox can have! */
abort();
}
mboxlist_entry_free(&mbentry);
}
/* lookup in database. database must be locked */
/* This could probably be more efficient and avoid some copies */
/* passing in a NULL pool implies that we should use regular xmalloc,
* a non-null pool implies we should use the mpool functionality */
static struct mbent *database_lookup(const char *name, const mbentry_t *mbentry,
struct mpool *pool)
{
mbentry_t *my_mbentry = NULL;
struct mbent *out;
char *location = NULL;
int r;
if (!name) return NULL;
if (!mbentry) {
r = mboxlist_lookup_allow_all(name, &my_mbentry, NULL);
if (r) return NULL;
mbentry = my_mbentry;
}
/* XXX - if mbtype & MBTYPE_DELETED, maybe set a delete */
if (mbentry->mbtype & MBTYPE_RESERVE) {
if (!pool) out = xmalloc(sizeof(struct mbent) + 1);
else out = mpool_malloc(pool, sizeof(struct mbent) + 1);
out->t = SET_RESERVE;
out->acl[0] = '\0';
}
else {
if (!pool) out = xmalloc(sizeof(struct mbent) + strlen(mbentry->acl));
else out = mpool_malloc(pool, sizeof(struct mbent) + strlen(mbentry->acl));
out->t = SET_ACTIVE;
strcpy(out->acl, mbentry->acl);
}
if (mbentry->server && mbentry->partition)
location = strconcat(mbentry->server, "!", mbentry->partition, NULL);
else if (mbentry->server)
location = xstrdup(mbentry->server);
else if (mbentry->partition)
location = xstrdup(mbentry->partition);
else
location = xstrdup("");
if (pool) {
out->mailbox = mpool_strdup(pool, name);
out->location = mpool_strdup(pool, location);
free(location);
}
else {
out->mailbox = xstrdup(name);
out->location = location;
}
if (my_mbentry) mboxlist_entry_free(&my_mbentry);
return out;
}
static void cmd_authenticate(struct conn *C,
const char *tag, const char *mech,
const char *clientstart)
{
int r, sasl_result;
const void *val;
int failedloginpause;
r = saslserver(C->saslconn, mech, clientstart, "", "", "",
C->pin, C->pout, &sasl_result, NULL);
if (r) {
const char *errorstring = NULL;
const char *userid = "-notset-";
switch (r) {
case IMAP_SASL_CANCEL:
prot_printf(C->pout,
"%s NO Client canceled authentication\r\n", tag);
break;
case IMAP_SASL_PROTERR:
errorstring = prot_error(C->pin);
prot_printf(C->pout,
"%s NO Error reading client response: %s\r\n",
tag, errorstring ? errorstring : "");
break;
default:
failedloginpause = config_getduration(IMAPOPT_FAILEDLOGINPAUSE, 's');
if (failedloginpause != 0) {
sleep(failedloginpause);
}
if (r != SASL_NOUSER)
sasl_getprop(C->saslconn, SASL_USERNAME, (const void **) &userid);
syslog(LOG_ERR, "badlogin: %s %s (%s) [%s]",
C->clienthost,
mech, userid, sasl_errdetail(C->saslconn));
prot_printf(C->pout, "%s NO \"%s\"\r\n", tag,
sasl_errstring((r == SASL_NOUSER ? SASL_BADAUTH : r),
NULL, NULL));
}
reset_saslconn(C);
return;
}
/* Successful Authentication */
r = sasl_getprop(C->saslconn, SASL_USERNAME, &val);
if (r != SASL_OK) {
prot_printf(C->pout, "%s NO \"SASL Error\"\r\n", tag);
reset_saslconn(C);
return;
}
C->userid = (char *) val;
syslog(LOG_NOTICE, "login: %s %s %s%s %s", C->clienthost, C->userid,
mech, C->tlsconn ? "+TLS" : "", "User logged in");
prot_printf(C->pout, "%s OK \"Authenticated\"\r\n", tag);
prot_setsasl(C->pin, C->saslconn);
prot_setsasl(C->pout, C->saslconn);
C->logfd = telemetry_log(C->userid, C->pin, C->pout, 1);
return;
}
/* Log the update out to anyone who is in our updatelist */
/* INVARIANT: caller MUST hold mailboxes_mutex */
/* oldlocation is the previous value of the location in this update,
thislocation is the current value of the mailbox's location */
static void log_update(const char *mailbox,
const char *oldlocation,
const char *thislocation)
{
struct conn *upc;
for (upc = updatelist; upc != NULL; upc = upc->updatelist_next) {
/* for each connection, add to pending list */
struct pending *p = (struct pending *) xmalloc(sizeof(struct pending));
p->next = NULL;
strlcpy(p->mailbox, mailbox, sizeof(p->mailbox));
/* this might need to be inside the mutex, but I doubt it */
if (upc->streaming_hosts
&& (!oldlocation || strarray_find(upc->streaming_hosts,
oldlocation, 0) < 0)
&& (!thislocation || strarray_find(upc->streaming_hosts,
thislocation, 0) < 0)) {
/* No Match! Continue! */
continue;
}
pthread_mutex_lock(&upc->m);
if ( upc->plist == NULL ) {
upc->plist = upc->ptail = p;
} else {
upc->ptail->next = p;
upc->ptail = p;
}
pthread_mutex_unlock(&upc->m);
}
}
static void cmd_set(struct conn *C,
const char *tag, const char *mailbox,
const char *location, const char *acl, enum settype t)
{
struct mbent *m;
char *oldlocation = NULL;
char *thislocation = NULL;
char *tmp;
/* Hold any output that we need to do */
enum {
EXISTS, NOTACTIVE, DOESNTEXIST, ISOK, NOOUTPUT
} msg = NOOUTPUT;
syslog(LOG_DEBUG, "cmd_set(fd:%d, %s)", C->fd, mailbox);
pthread_mutex_lock(&mailboxes_mutex); /* LOCK */
m = database_lookup(mailbox, NULL, NULL);
if (m && t == SET_RESERVE) {
/* Check if we run in a discrete murder topology */
if (config_mupdate_config == IMAP_ENUM_MUPDATE_CONFIG_STANDARD) {
/* Replicated backends with the same server name issue
* reservations twice. Suppress bailing out on the second one
* (the replica).
*/
if (strcmp(m->location, location)) {
/* failed; mailbox already exists */
msg = EXISTS;
goto done;
}
}
/* otherwise do nothing (local create on master) */
}
if ((!m || m->t != SET_ACTIVE) && t == SET_DEACTIVATE) {
/* Check if we run in a discrete murder topology */
if (config_mupdate_config == IMAP_ENUM_MUPDATE_CONFIG_STANDARD) {
/* Replicated backends with the same server name issue
* deactivation twice. Suppress bailing out on the second one
* (the replica).
*/
if (strcmp(m->location, location)) {
/* failed; mailbox not currently active */
msg = NOTACTIVE;
goto done;
}
}
} else if (t == SET_DEACTIVATE) {
t = SET_RESERVE;
}
if (t == SET_DELETE) {
if (!m) {
/* Check if we run in a discrete murder topology */
if (config_mupdate_config == IMAP_ENUM_MUPDATE_CONFIG_STANDARD) {
/* Replicated backends with the same server name issue
* deletion twice. Suppress bailing out on the second one
* (the replica).
*/
if (strcmp(m->location, location)) {
/* failed; mailbox doesn't exist */
msg = DOESNTEXIST;
goto done;
}
}
/* otherwise do nothing (local delete on master) */
} else {
oldlocation = xstrdup(m->location);
/* do the deletion */
m->t = SET_DELETE;
}
} else {
if (m)
oldlocation = m->location;
if (m && (!acl || strlen(acl) < strlen(m->acl))) {
/* change what's already there -- the acl is smaller */
m->location = xstrdup(location);
if (acl) strcpy(m->acl, acl);
else m->acl[0] = '\0';
m->t = t;
} else {
char *thismailbox = m ? m->mailbox : xstrdup(mailbox);
struct mbent *newm;
/* allocate new mailbox */
if (acl) {
newm = xrealloc(m, sizeof(struct mbent) + strlen(acl));
} else {
newm = xrealloc(m, sizeof(struct mbent) + 1);
}
newm->mailbox = thismailbox;
newm->location = xstrdup(location);
if (acl) {
strcpy(newm->acl, acl);
} else {
newm->acl[0] = '\0';
}
newm->t = t;
/* re-scope */
m = newm;
}
}
/* write to disk */
if (m) database_log(m, NULL);
if (oldlocation) {
tmp = strchr(oldlocation, '!');
if (tmp) *tmp = '\0';
}
if (location) {
thislocation = xstrdup(location);
tmp = strchr(thislocation, '!');
if (tmp) *tmp = '\0';
}
/* post pending changes */
log_update(mailbox, oldlocation, thislocation);
msg = ISOK;
done:
if (thislocation) free(thislocation);
if (oldlocation) free(oldlocation);
free_mbent(m);
pthread_mutex_unlock(&mailboxes_mutex); /* UNLOCK */
/* Delay output until here to avoid blocking while holding
* mailboxes_mutex */
switch(msg) {
case EXISTS:
prot_printf(C->pout, "%s NO \"mailbox already exists\"\r\n", tag);
break;
case NOTACTIVE:
prot_printf(C->pout, "%s NO \"mailbox not currently active\"\r\n",
tag);
break;
case DOESNTEXIST:
prot_printf(C->pout, "%s NO \"mailbox doesn't exist\"\r\n", tag);
break;
case ISOK:
prot_printf(C->pout, "%s OK \"done\"\r\n", tag);
break;
default:
break;
}
}
static void cmd_find(struct conn *C, const char *tag, const char *mailbox,
int send_ok, int send_delete)
{
struct mbent *m;
syslog(LOG_DEBUG, "cmd_find(fd:%d, %s)", C->fd, mailbox);
/* Only hold the mutex around database_lookup,
* since the mbent stays valid even if the database changes,
* and we don't want to block on network I/O */
pthread_mutex_lock(&mailboxes_mutex); /* LOCK */
m = database_lookup(mailbox, NULL, NULL);
pthread_mutex_unlock(&mailboxes_mutex); /* UNLOCK */
if (m && m->t == SET_ACTIVE) {
prot_printf(C->pout,
"%s MAILBOX "
"{" SIZE_T_FMT "+}\r\n%s "
"{" SIZE_T_FMT "+}\r\n%s "
"{" SIZE_T_FMT "+}\r\n%s\r\n",
tag,
strlen(m->mailbox), m->mailbox,
strlen(m->location), m->location,
strlen(m->acl), m->acl
);
} else if (m && m->t == SET_RESERVE) {
prot_printf(C->pout,
"%s RESERVE "
"{" SIZE_T_FMT "+}\r\n%s "
"{" SIZE_T_FMT "+}\r\n%s\r\n",
tag,
strlen(m->mailbox), m->mailbox,
strlen(m->location), m->location
);
} else if (send_delete) {
/* not found, if needed, send a delete */
prot_printf(C->pout,
"%s DELETE "
"{" SIZE_T_FMT "+}\r\n%s\r\n",
tag,
strlen(mailbox), mailbox
);
}
free_mbent(m);
if (send_ok) {
prot_printf(C->pout, "%s OK \"Search completed\"\r\n", tag);
}
}
/* Callback for cmd_startupdate to be passed to mboxlist_allmbox. */
/* Requires that C->streaming be set to the tag to respond with */
static int sendupdate(const mbentry_t *mbentry, void *rock)
{
struct conn *C = (struct conn *)rock;
struct mbent *m;
if (!C) return -1;
m = database_lookup(mbentry->name, mbentry, NULL);
if (!m) return -1;
if (!C->list_prefix ||
!strncmp(m->location, C->list_prefix, C->list_prefix_len)) {
/* Either there is not a prefix to test, or we matched it */
if (!C->streaming_hosts ||
strarray_find(C->streaming_hosts, mbentry->server, 0) >= 0) {
switch (m->t) {
case SET_ACTIVE:
prot_printf(C->pout,
"%s MAILBOX "
"{" SIZE_T_FMT "+}\r\n%s "
"{" SIZE_T_FMT "+}\r\n%s "
"{" SIZE_T_FMT "+}\r\n%s\r\n",
C->streaming,
strlen(m->mailbox), m->mailbox,
strlen(m->location), m->location,
strlen(m->acl), m->acl
);
break;
case SET_RESERVE:
prot_printf(C->pout,
"%s RESERVE "
"{" SIZE_T_FMT "+}\r\n%s "
"{" SIZE_T_FMT "+}\r\n%s\r\n",
C->streaming,
strlen(m->mailbox), m->mailbox,
strlen(m->location), m->location
);
break;
case SET_DELETE:
/* deleted item in the list !?! */
case SET_DEACTIVATE:
/* SET_DEACTIVATE is not a real value! */
abort();
}
}
}
free_mbent(m);
return 0;
}
static void cmd_list(struct conn *C, const char *tag, const char *host_prefix)
{
/* List operations can result in a lot of output, let's do this
* with the prot layer nonblocking so we don't hold the mutex forever*/
prot_NONBLOCK(C->pout);
/* indicate interest in updates */
pthread_mutex_lock(&mailboxes_mutex); /* LOCK */
/* since this isn't valid when streaming, just use the same callback */
C->streaming = tag;
C->list_prefix = host_prefix;
if (C->list_prefix) C->list_prefix_len = strlen(C->list_prefix);
else C->list_prefix_len = 0;
mboxlist_allmbox("", sendupdate, (void*)C, /*flags*/0);
C->streaming = NULL;
C->list_prefix = NULL;
C->list_prefix_len = 0;
pthread_mutex_unlock(&mailboxes_mutex); /* UNLOCK */
prot_BLOCK(C->pout);
prot_flush(C->pout);
}
/*
* we've registered this connection for streaming, and every X seconds
* this will be invoked. note that we always send out updates as soon
* as we get a noop: that resets this counter back */
static struct prot_waitevent *sendupdates_evt(struct protstream *s __attribute__((unused)),
struct prot_waitevent *ev,
void *rock)
{
struct conn *C = (struct conn *) rock;
sendupdates(C, 1);
/* 'sendupdates()' will update when we next trigger */
return ev;
}
static void cmd_startupdate(struct conn *C, const char *tag,
strarray_t *partial)
{
/* initialize my condition variable */
/* The inital dump of the database can result in a lot of data,
* let's do this nonblocking */
prot_NONBLOCK(C->pout);
/* indicate interest in updates */
pthread_mutex_lock(&mailboxes_mutex); /* LOCK */
C->updatelist_next = updatelist;
updatelist = C;
C->streaming = xstrdup(tag);
C->streaming_hosts = partial;
/* dump initial list */
mboxlist_allmbox("", sendupdate, (void*)C, /*flags*/0);
pthread_mutex_unlock(&mailboxes_mutex); /* UNLOCK */
prot_printf(C->pout, "%s OK \"streaming starts\"\r\n", tag);
prot_BLOCK(C->pout);
prot_flush(C->pout);
/* schedule our first update */
C->ev = prot_addwaitevent(C->pin, time(NULL) + update_wait,
sendupdates_evt, C);
}
/* send out any pending updates.
if 'flushnow' is set, flush the output buffer */
static void sendupdates(struct conn *C, int flushnow)
{
struct pending *p, *q;
pthread_mutex_lock(&C->m);
/* just grab the update list and release the lock */
p = C->plist;
C->plist = NULL;
C->ptail = NULL;
pthread_mutex_unlock(&C->m);
while (p != NULL) {
/* send update */
q = p;
p = p->next;
/* notify just like a FIND - except enable sending of DELETE
* notifications */
cmd_find(C, C->streaming, q->mailbox, 0, 1);
free(q);
}
/* reschedule event for 'update_wait' seconds */
C->ev->mark = time(NULL) + update_wait;
if (flushnow) {
prot_flush(C->pout);
}
}
#ifdef HAVE_SSL
static void cmd_starttls(struct conn *C, const char *tag)
{
int result;
result=tls_init_serverengine("mupdate",
5, /* depth to verify */
1, /* can client auth? */
NULL);
if (result == -1) {
syslog(LOG_ERR, "error initializing TLS");
prot_printf(C->pout, "%s NO Error initializing TLS\r\n", tag);
return;
}
prot_printf(C->pout, "%s OK Begin TLS negotiation now\r\n", tag);
/* must flush our buffers before starting tls */
prot_flush(C->pout);
result=tls_start_servertls(C->pin->fd, /* read */
C->pout->fd, /* write */
180, /* 3 minutes */
&C->saslprops,
&C->tlsconn);
/* if error */
if (result==-1) {
prot_printf(C->pout, "%s NO Starttls negotiation failed\r\n",
tag);
syslog(LOG_NOTICE, "STARTTLS negotiation failed: %s",
C->clienthost);
return;
}
/* tell SASL about the negotiated layer */
result = saslprops_set_tls(&C->saslprops, C->saslconn);
if (result != SASL_OK) {
fatal("saslprops_set_tls() failed: cmd_starttls()", EX_TEMPFAIL);
}
/* tell the prot layer about our new layers */
prot_settls(C->pin, C->tlsconn);
prot_settls(C->pout, C->tlsconn);
#if (OPENSSL_VERSION_NUMBER >= 0x0090800fL)
C->tls_comp = (void *) SSL_get_current_compression(C->tlsconn);
#endif
/* Reissue capability banner */
dobanner(C);
}
#else
void cmd_starttls(struct conn *C __attribute__((unused)),
const char *tag __attribute__((unused)))
{
fatal("cmd_starttls() executed, but starttls isn't implemented!",
EX_SOFTWARE);
}
#endif /* HAVE_SSL */
#ifdef HAVE_ZLIB
static void cmd_compress(struct conn *C, const char *tag, const char *alg)
{
if (C->compress_done) {
prot_printf(C->pout,
"%s BAD DEFLATE active via COMPRESS\r\n", tag);
}
#if defined(HAVE_SSL) && (OPENSSL_VERSION_NUMBER >= 0x0090800fL)
else if (C->tls_comp) {
prot_printf(C->pout,
"%s NO %s active via TLS\r\n",
tag, SSL_COMP_get_name(C->tls_comp));
}
#endif
else if (strcasecmp(alg, "DEFLATE")) {
prot_printf(C->pout,
"%s NO Unknown COMPRESS algorithm: %s\r\n", tag, alg);
}
else if (ZLIB_VERSION[0] != zlibVersion()[0]) {
prot_printf(C->pout,
"%s NO Error initializing %s (incompatible zlib version)\r\n",
tag, alg);
}
else {
prot_printf(C->pout,
"%s OK %s active\r\n", tag, alg);
/* enable (de)compression for the prot layer */
prot_setcompress(C->pin);
prot_setcompress(C->pout);
C->compress_done = 1;
}
}
#else
void cmd_compress(struct conn *C __attribute__((unused)),
const char *tag __attribute__((unused)),
const char *alg __attribute__((unused)))
{
fatal("cmd_compress() executed, but COMPRESS isn't implemented!",
EX_SOFTWARE);
}
#endif /* HAVE_ZLIB */
void shut_down(int code) __attribute__((noreturn));
void shut_down(int code)
{
in_shutdown = 1;
cyrus_done();
exit(code);
}
/* Reset the given sasl_conn_t to a sane state */
static int reset_saslconn(struct conn *c)
{
int ret;
sasl_security_properties_t *secprops = NULL;
sasl_dispose(&c->saslconn);
/* do initialization typical of service_main */
ret = sasl_server_new("mupdate", config_servername, NULL,
buf_cstringnull_ifempty(&c->saslprops.iplocalport),
buf_cstringnull_ifempty(&c->saslprops.ipremoteport),
NULL, 0, &c->saslconn);
if (ret != SASL_OK) return ret;
secprops = mysasl_secprops(SASL_SEC_NOANONYMOUS);
ret = sasl_setprop(c->saslconn, SASL_SEC_PROPS, secprops);
if (ret != SASL_OK) return ret;
/* end of service_main initialization excepting SSF */
/* If we have TLS/SSL info, set it */
if (c->saslprops.ssf) {
ret = saslprops_set_tls(&c->saslprops, c->saslconn);
}
if (ret != SASL_OK) return ret;
/* End TLS/SSL Info */
return SASL_OK;
}
int cmd_change(struct mupdate_mailboxdata *mdata,
const char *rock, void *context __attribute__((unused)))
{
struct mbent *m = NULL;
char *oldlocation = NULL;
char *thislocation = NULL;
char *tmp;
enum settype t = -1;
int ret = 0;
if (!mdata || !rock || !mdata->mailbox) return 1;
pthread_mutex_lock(&mailboxes_mutex); /* LOCK */
if (!strncmp(rock, "DELETE", 6)) {
m = database_lookup(mdata->mailbox, NULL, NULL);
if (!m) {
syslog(LOG_DEBUG, "attempt to delete unknown mailbox %s",
mdata->mailbox);
/* Mailbox doesn't exist - this isn't as fatal as you might
* think. */
/* ret = -1; */
goto done;
}
m->t = t = SET_DELETE;
oldlocation = xstrdup(m->location);
} else {
m = database_lookup(mdata->mailbox, NULL, NULL);
if (m)
oldlocation = m->location;
if (m && (!mdata->acl || strlen(mdata->acl) < strlen(m->acl))) {
/* change what's already there */
/* old m->location freed when oldlocation is freed */
m->location = xstrdup(mdata->location);
if (mdata->acl) strcpy(m->acl, mdata->acl);
else m->acl[0] = '\0';
if (!strncmp(rock, "MAILBOX", 6)) {
m->t = t = SET_ACTIVE;
} else if (!strncmp(rock, "RESERVE", 7)) {
m->t = t = SET_RESERVE;
} else {
syslog(LOG_DEBUG,
"bad mupdate command in cmd_change: %s", rock);
ret = 1;
goto done;
}
} else {
struct mbent *newm;
if (m) {
free(m->mailbox);
/* m->location freed when oldlocation freed */
}
/* allocate new mailbox */
if (mdata->acl) {
newm = xrealloc(m, sizeof(struct mbent) + strlen(mdata->acl));
} else {
newm = xrealloc(m, sizeof(struct mbent) + 1);
}
newm->mailbox = xstrdup(mdata->mailbox);
newm->location = xstrdup(mdata->location);
if (mdata->acl) {
strcpy(newm->acl, mdata->acl);
} else {
newm->acl[0] = '\0';
}
if (!strncmp(rock, "MAILBOX", 6)) {
newm->t = t = SET_ACTIVE;
} else if (!strncmp(rock, "RESERVE", 7)) {
newm->t = t = SET_RESERVE;
} else {
syslog(LOG_DEBUG,
"bad mupdate command in cmd_change: %s", rock);
ret = 1;
goto done;
}
/* Bring it back into scope */
m = newm;
}
}
/* write to disk */
database_log(m, NULL);
if (oldlocation) {
tmp = strchr(oldlocation, '!');
if (tmp) *tmp = '\0';
}
if (mdata->location) {
thislocation = xstrdup(mdata->location);
tmp = strchr(thislocation, '!');
if (tmp) *tmp = '\0';
}
/* post pending changes to anyone we are talking to */
log_update(mdata->mailbox, oldlocation, thislocation);
done:
if (oldlocation) free(oldlocation);
if (thislocation) free(thislocation);
free_mbent(m);
pthread_mutex_unlock(&mailboxes_mutex); /* UNLOCK */
return ret;
}
struct sync_rock
{
struct mpool *pool;
struct mbent_queue *boxes;
};
/* Read a series of MAILBOX and RESERVE commands and tack them onto a
* queue */
static int cmd_resync(struct mupdate_mailboxdata *mdata,
const char *rock, void *context)
{
struct sync_rock *r = (struct sync_rock *)context;
struct mbent_queue *remote_boxes = r->boxes;
struct mbent *newm = NULL;
if (!mdata || !rock || !mdata->mailbox || !remote_boxes) return 1;
/* allocate new mailbox */
if (mdata->acl) {
newm = mpool_malloc(r->pool,sizeof(struct mbent) + strlen(mdata->acl));
} else {
newm = mpool_malloc(r->pool,sizeof(struct mbent) + 1);
}
newm->mailbox = mpool_strdup(r->pool, mdata->mailbox);
newm->location = mpool_strdup(r->pool, mdata->location);
if (mdata->acl) {
strcpy(newm->acl, mdata->acl);
} else {
newm->acl[0] = '\0';
}
if (!strncmp(rock, "MAILBOX", 6)) {
newm->t = SET_ACTIVE;
} else if (!strncmp(rock, "RESERVE", 7)) {
newm->t = SET_RESERVE;
} else {
syslog(LOG_NOTICE,
"bad mupdate command in cmd_resync: %s", rock);
return 1;
}
/* Insert onto queue */
newm->next = NULL;
*(remote_boxes->tail) = newm;
remote_boxes->tail = &(newm->next);
return 0;
}
/* Callback for mupdate_synchronize to be passed to mboxlist_allmbox. */
static int sync_findall_cb(const mbentry_t *mbentry, void *rock)
{
struct sync_rock *r = (struct sync_rock *)rock;
struct mbent_queue *local_boxes = (struct mbent_queue *)r->boxes;
struct mbent *m;
if (!local_boxes) return 1;
m = database_lookup(mbentry->name, mbentry, r->pool);
/* If it doesn't exist, fine... */
if (!m) return 0;
m->next = NULL;
*(local_boxes->tail) = m;
local_boxes->tail = &(m->next);
return 0;
}
int mupdate_synchronize_remote(mupdate_handle *handle,
struct mbent_queue *remote_boxes,
struct mpool *pool)
{
struct sync_rock rock;
if (!handle || !handle->saslcompleted) return 1;
rock.pool = pool;
/* ask mupdate master for updates and set nonblocking */
prot_printf(handle->conn->out, "U01 UPDATE\r\n");
syslog(LOG_NOTICE,
"scarfing mailbox list from master mupdate server");
remote_boxes->head = NULL;
remote_boxes->tail = &(remote_boxes->head);
rock.boxes = remote_boxes;
/* If there is a fatal error, return, other errors ignore */
if (mupdate_scarf(handle, cmd_resync, &rock, 1, NULL) != 0) {
struct mbent *p=remote_boxes->head, *p_next=NULL;
while(p) {
p_next = p->next;
p = p_next;
}
return 1;
}
/* Make socket nonblocking now */
prot_NONBLOCK(handle->conn->in);
return 0;
}
int mupdate_synchronize(struct mbent_queue *remote_boxes, struct mpool *pool)
{
struct mbent_queue local_boxes;
struct mbent *l,*r;
struct sync_rock rock;
struct txn *tid = NULL;
int ret = 0;
int err = 0;
char *c;
rock.pool = pool;
/* Note that this prevents other people from running an UPDATE against
* us for the duration. this is a GOOD THING */
pthread_mutex_lock(&mailboxes_mutex); /* LOCK */
syslog(LOG_NOTICE,
"synchronizing mailbox list with master mupdate server");
local_boxes.head = NULL;
local_boxes.tail = &(local_boxes.head);
rock.boxes = &local_boxes;
mboxlist_allmbox("", sync_findall_cb, (void*)&rock, /*flags*/0);
/* Traverse both lists, compare the names */
/* If they match, ensure that location and acl are correct, if so,
move on, if not, fix them */
/* If the local is before the next remote, delete it */
/* If the next remote is before the local, insert it and try again */
for(l = local_boxes.head, r = remote_boxes->head; l && r;
l = local_boxes.head, r = remote_boxes->head)
{
int ret = strcmp(l->mailbox, r->mailbox);
if (!ret) {
/* Match */
if (l->t != r->t ||
strcmp(l->location, r->location) ||
strcmp(l->acl,r->acl)) {
/* Something didn't match, replace it */
/*
* If this is a locally hosted mailbox, don't make a
* change, just warn.
*/
if ((config_mupdate_config == IMAP_ENUM_MUPDATE_CONFIG_UNIFIED) &&
(strchr( l->location, '!' ) == NULL )) {
syslog(LOG_ERR, "local mailbox %s wrong in mailbox list",
l->mailbox );
err++;
} else {
mbentry_t *mbentry = mboxlist_entry_create();
mbentry->name = xstrdupnull(r->mailbox);
mbentry->mbtype = (r->t == SET_RESERVE ? MBTYPE_RESERVE : 0);
mbentry->server = xstrdupnull(r->location);
c = strchr(mbentry->server, '!');
if (c) {
*c++ = '\0';
mbentry->partition = xstrdupnull(c);
}
mbentry->acl = xstrdupnull(r->acl);
mboxlist_insertremote(mbentry, &tid);
mboxlist_entry_free(&mbentry);
}
}
/* Okay, dump these two */
local_boxes.head = l->next;
remote_boxes->head = r->next;
} else if (ret < 0) {
/* Local without corresponding remote, delete it */
/*
* In a unified murder, we don't want to delete locally
* hosted mailboxes during mupdate's resync process.
* If that sort of operation appears necessary, it
* probably requires an operator to review it --
* ctl_mboxlist is the right place to fix the kind
* of configuration error implied.
*
* A similar problem exists when the location thinks
* it is locally hosting a mailbox, but mupdate master
* thinks it's somewhere else.
*/
if ((config_mupdate_config == IMAP_ENUM_MUPDATE_CONFIG_UNIFIED) &&
(strchr( l->location, '!' ) == NULL )) {
syslog(LOG_ERR, "local mailbox %s not in mailbox list",
l->mailbox );
err++;
} else {
mboxlist_deleteremote(l->mailbox, &tid);
}
local_boxes.head = l->next;
} else /* (ret > 0) */ {
/* Remote without corresponding local, insert it */
mbentry_t *mbentry = mboxlist_entry_create();
mbentry->name = xstrdupnull(r->mailbox);
mbentry->mbtype = (r->t == SET_RESERVE ? MBTYPE_RESERVE : 0);
mbentry->server = xstrdupnull(r->location);
c = strchr(mbentry->server, '!');
if (c) {
*c++ = '\0';
mbentry->partition = xstrdupnull(c);
}
mbentry->acl = xstrdupnull(r->acl);
mboxlist_insertremote(mbentry, &tid);
mboxlist_entry_free(&mbentry);
remote_boxes->head = r->next;
}
}
if (l && !r) {
/* we have more deletes to do */
while(l) {
if ((config_mupdate_config == IMAP_ENUM_MUPDATE_CONFIG_UNIFIED) &&
(strchr( l->location, '!' ) == NULL )) {
syslog(LOG_ERR, "local mailbox %s not in mailbox list",
l->mailbox );
err++;
} else {
mboxlist_deleteremote(l->mailbox, &tid);
}
local_boxes.head = l->next;
l = local_boxes.head;
}
} else if (r && !l) {
/* we have more inserts to do */
while (r) {
mbentry_t *mbentry = mboxlist_entry_create();
mbentry->name = xstrdupnull(r->mailbox);
mbentry->mbtype = (r->t == SET_RESERVE ? MBTYPE_RESERVE : 0);
mbentry->server = xstrdupnull(r->location);
c = strchr(mbentry->server, '!');
if (c) {
*c++ = '\0';
mbentry->partition = xstrdupnull(c);
}
mbentry->acl = xstrdupnull(r->acl);
mboxlist_insertremote(mbentry, &tid);
mboxlist_entry_free(&mbentry);
remote_boxes->head = r->next;
r = remote_boxes->head;
}
}
if (tid) mboxlist_commit(tid);
/* All up to date! */
if ( err ) {
syslog(LOG_ERR, "mailbox list synchronization NOT complete (%d) errors",
err);
} else {
syslog(LOG_NOTICE, "mailbox list synchronization complete");
}
pthread_mutex_unlock(&mailboxes_mutex); /* UNLOCK */
return ret;
}
void mupdate_signal_db_synced(void)
{
pthread_mutex_lock(&synced_mutex);
synced = 1;
pthread_cond_broadcast(&synced_cond);
pthread_mutex_unlock(&synced_mutex);
}
void mupdate_ready(void)
{
pthread_mutex_lock(&ready_for_connections_mutex);
if (ready_for_connections) {
syslog(LOG_CRIT, "mupdate_ready called when already ready");
fatal("mupdate_ready called when already ready", EX_TEMPFAIL);
}
ready_for_connections = 1;
pthread_cond_broadcast(&ready_for_connections_cond);
pthread_mutex_unlock(&ready_for_connections_mutex);
}
/* Signal unreadyness. Next active worker will kill off all idle connections.
* any non-idle connection will die off when it leaves docmd() */
void mupdate_unready(void)
{
pthread_mutex_lock(&ready_for_connections_mutex);
syslog(LOG_NOTICE, "unready for connections");
ready_for_connections = 0;
pthread_mutex_unlock(&ready_for_connections_mutex);
}
/* Used to free malloc'd mbent's (not for mpool'd mbents) */
void free_mbent(struct mbent *p)
{
if (!p) return;
free(p->location);
free(p->mailbox);
free(p);
}