Codebase list fdm / 97c633e
Maximum/minimum limits to avoid filling up TMPDIR or fds. Nicholas Marriott 17 years ago
4 changed file(s) with 59 addition(s) and 27 deletion(s). Raw diff Collapse all Expand all
181181 case FETCH_AGAIN:
182182 /* delivering mail, queue for delivery */
183183 log_debug3("%s: adding to deliver queue", a->name);
184 TAILQ_REMOVE(&matchq, mctx, match_entry);
185 TAILQ_INSERT_TAIL(&deliverq, mctx, deliver_entry);
184 TAILQ_REMOVE(&matchq, mctx, entry);
185 TAILQ_INSERT_TAIL(&deliverq, mctx, entry);
186186 break;
187187 case FETCH_COMPLETE:
188188 /* finished with mail, queue on done queue */
189189 log_debug3("%s: adding to done queue", a->name);
190 TAILQ_REMOVE(&matchq, mctx, match_entry);
191 TAILQ_INSERT_TAIL(&doneq, mctx, done_entry);
190 TAILQ_REMOVE(&matchq, mctx, entry);
191 TAILQ_INSERT_TAIL(&doneq, mctx, entry);
192
193 /*
194 * XXX Destroy mail data now it is finished, just keep the
195 * struct mail.
196 */
197 shm_destroy(&mctx->mail->shm);
192198 break;
193199 }
194200
212218 if (TAILQ_EMPTY(&mctx->dqueue)) {
213219 /* delivery done. return to match queue */
214220 log_debug3("%s: returning to match queue", a->name);
215 TAILQ_REMOVE(&deliverq, mctx, deliver_entry);
216 TAILQ_INSERT_TAIL(&matchq, mctx, match_entry);
221 TAILQ_REMOVE(&deliverq, mctx, entry);
222 TAILQ_INSERT_TAIL(&matchq, mctx, entry);
217223 return (0);
218224 }
219225
252258
253259 remove:
254260 TAILQ_REMOVE(&mctx->dqueue, dctx, entry);
261 log_debug("%s: message %u delivered (rule %u) after %.3f seconds",
262 a->name, mctx->mail->idx, dctx->rule->idx, get_time() - dctx->tim);
255263 xfree(dctx);
256264 return (0);
257265 }
271279 m = mctx->mail;
272280 log_debug3("%s: running done queue", a->name);
273281
274 TAILQ_REMOVE(&doneq, mctx, done_entry);
282 TAILQ_REMOVE(&doneq, mctx, entry);
275283 ARRAY_FREE(&mctx->stack);
284 log_debug("%s: message %u done after %.3f seconds", a->name, m->idx,
285 get_time() - mctx->tim);
276286 xfree(mctx);
277287
278288 if (mctx->account->fetch->done != NULL) {
312322 mctx = TAILQ_FIRST(mq);
313323 m = mctx->mail;
314324
315 TAILQ_REMOVE(mq, mctx, done_entry);
325 TAILQ_REMOVE(mq, mctx, entry);
316326 ARRAY_FREE(&mctx->stack);
317327 xfree(mctx);
318328
406416 fetch_account(struct io *pio, struct account *a, struct ios *ios, double tim)
407417 {
408418 struct mail *m;
409 u_int n, dropped, kept;
410 int error, blocked;
419 u_int n, dropped, kept, total;
420 int error, blocked, holding;
411421 const char *cause = NULL;
412422 struct match_ctx *mctx;
413423 struct io *rio;
430440 /* fetch a message */
431441 error = FETCH_AGAIN;
432442 rio = NULL;
443 holding = 0;
433444 while (error == FETCH_AGAIN) {
434445 log_debug3("%s: queue lengths: match %u, deliver %u, "
435 "done %u; blocked=%d", a->name,
446 "done %u; blocked=%d; holding=%d", a->name,
436447 queue_length(&matchq), queue_length(&deliverq),
437 queue_length(&doneq), blocked);
438
439 if (rio != pio) {
440 error = a->fetch->fetch(a, m);
441 switch (error) {
442 case FETCH_ERROR:
443 if (rio == pio)
448 queue_length(&doneq), blocked, holding);
449
450 total = queue_length(&matchq) + queue_length(&deliverq);
451 if (total >= MAXMAILQUEUED)
452 holding = 1;
453 if (total < MINMAILQUEUED)
454 holding = 0;
455
456 if (!holding) {
457 if (rio != pio) {
458 error = a->fetch->fetch(a, m);
459 switch (error) {
460 case FETCH_ERROR:
461 if (rio != pio) {
462 cause = "fetching";
463 goto out;
464 }
444465 fatalx("child: lost parent");
445 cause = "fetching";
446 goto out;
447 case FETCH_COMPLETE:
448 goto out;
466 case FETCH_COMPLETE:
467 goto out;
468 }
449469 }
450470 }
451471 if (error == FETCH_AGAIN) {
452472 if (fetch_poll(a, blocked, ios, &rio) != 0)
453473 goto out;
454474 }
455
475
456476 if (run_match(a, &cause) != 0)
457477 goto out;
458478 if (run_deliver(a, pio, &blocked, &cause) != 0)
485505
486506 /* construct mctx */
487507 mctx = xcalloc(1, sizeof *mctx);
508 mctx->tim = get_time();
488509 mctx->io = pio;
489510 mctx->account = a;
490511 mctx->mail = m;
496517
497518 /* and queue it */
498519 log_debug3("%s: adding to match queue", a->name);
499 TAILQ_INSERT_TAIL(&matchq, mctx, match_entry);
520 TAILQ_INSERT_TAIL(&matchq, mctx, entry);
500521
501522 /* finish up a done mail */
502523 if (run_done(a, &dropped, &kept, &cause) != 0)
503524 goto out;
525 if (queue_length(&doneq) > MAXMAILQUEUED) {
526 while (queue_length(&doneq) > MINMAILQUEUED) {
527 if (run_done(a, &dropped, &kept, &cause) != 0)
528 goto out;
529 }
530 }
504531
505532 if (conf.purge_after == 0 || a->fetch->purge == NULL)
506533 continue;
872899 struct mail *m = dctx->mail;
873900 struct msg msg;
874901
902 dctx->tim = get_time();
875903 if (t->deliver->deliver == NULL)
876904 return (0);
877905
2424
2525 /* Deliver context. */
2626 struct deliver_ctx {
27 double tim;
28
2729 struct action *action;
2830 struct rule *rule;
2931
4242 #define LOCKFILE ".fdm.lock"
4343 #define SYSLOCKFILE "/var/run/fdm.lock"
4444 #define MAXMAILSIZE INT_MAX
45 #define MAXMAILQUEUED 5
46 #define MINMAILQUEUED 3
4547 #define DEFMAILSIZE (1 * 1024 * 1024 * 1024) /* 1 GB */
4648 #define DEFTIMEOUT (900 * 1000)
4749 #define LOCKSLEEPTIME 10000
2727
2828 /* Match context. */
2929 struct match_ctx {
30 double tim;
31
3032 struct io *io;
3133 struct account *account;
3234 struct mail *mail;
4042 struct deliver_queue dqueue;
4143
4244 TAILQ_ENTRY(match_ctx) entry;
43 #define match_entry entry
44 #define deliver_entry entry
45 #define done_entry entry
4645 };
46 /* XXX should this be an array since we need to know the length? */
4747 TAILQ_HEAD(match_queue, match_ctx);
4848
4949 /* Match functions. */