Codebase list mbuffer / 88832e1
New upstream version 20170921 Peter Pentchev 6 years ago
20 changed file(s) with 2022 addition(s) and 1441 deletion(s). Raw diff Collapse all Expand all
00 repo: 6e3b485d74645931e2408ed1f57e659029b5639a
1 node: 82afe6072c3fda541aad24062ceb272494d9a900
1 node: 85959504a0cec75ca0051e38107ac198a6641400
22 branch: default
3 latesttag: R20170806
3 latesttag: R20170921
44 latesttagdistance: 1
55 changessincelatesttag: 1
0 syntax:glob
1 *.o
9292 b1ffb4d4c54b907cbcd33dac4ed2290a5e67b172 R20170514
9393 506b90d6445b2472950e079ede2a200733727945 R20170515
9494 6b7a041f8df8ad8c6c961f087ddea56cb2da645c R20170806
95 42e5e9a3f9cc79319e3976955f3865841a934774 R20170921
0 20170921:
1 - added jumpbuffer reading mode for inconsistent block sizes
2 - code separation into more files for enhanced readability
3 - some cleanup work for global variables
4 - fixed regression in sanity checks
5 - fix: option -f should truncate output file
6 - fix: failed opening of network output should not redirect to stdout
7 - fix: summary printout should respect quiet options
8
09 20170806:
110 - add support for libgcrypt
211 - add support for tape aware out-of-space handling
1818 INSTALL = @INSTALL@
1919
2020 TARGET = mbuffer$(EXE)
21 SOURCES = log.c network.c mbuffer.c hashing.c
21 SOURCES = log.c network.c mbuffer.c hashing.c input.c common.c settings.c globals.c
2222 OBJECTS = $(SOURCES:.c=.o)
2323
24 TESTTREE = /usr/lib/gcc
24 TESTTREE = .
2525
2626 .PHONY: clean all distclean install check testcleanup
2727
5454 lint:
5555 lint $(DEFS) $(SOURCES)
5656
57 check: $(TARGET) test0 test1 test2 test3 test4 test5 testcleanup
57 check: $(TARGET) test0 test1 test2 test3 test4 test5
5858
5959 testcleanup:
6060 rm -f test0 test1 test2 test3 test4 test5 \
131131 diff $@.md5 test.md5
132132 touch $@
133133
134 test6: mbuffer idev.so
135 LD_PRELOAD=./idev.so BSIZE=317 IDEV=mbuffer ./mbuffer -s256 -i mbuffer -f -o mbuffer2
136
134137 tapetest.so: tapetest.c
135138 $(CC) -shared -fPIC $< -o $@ -ldl
139
140 idev.so: idev.c
141 $(CC) -shared -g -fPIC $< -o $@ -ldl
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common.h"
20 #include "log.h"
21 #include "globals.h"
22 #include "settings.h"
23
24 #include <assert.h>
25 #include <errno.h>
26 #include <string.h>
27 #include <time.h>
28 #include <unistd.h>
29
30 #ifdef __sun
31 #if defined(__SunOS_5_8) || defined(__SunOS_5_9)
32 int SemWait(sema_t *s)
33 {
34 int err;
35 do {
36 err = sema_wait(s);
37 } while (err == EINTR);
38 return err;
39 }
40 #endif
41 #endif
42
43
44 static inline long long timediff(struct timespec *restrict t1, struct timespec *restrict t2)
45 {
46 long long tdiff;
47 tdiff = (t1->tv_sec - t2->tv_sec) * 1000000;
48 tdiff += (t1->tv_nsec - t2->tv_nsec) / 1000;
49 if (tdiff < 0)
50 tdiff = 0;
51 return tdiff;
52 }
53
54
55 /* Thread-safe replacement for usleep. Argument must be a whole
56 * number of microseconds to sleep.
57 */
58 int mt_usleep(unsigned long long sleep_usecs)
59 {
60 struct timespec tv;
61 tv.tv_sec = sleep_usecs / 1000000;
62 tv.tv_nsec = (sleep_usecs % 1000000) * 1000;
63
64 do {
65 /* Sleep for the time specified in tv. If interrupted by a
66 * signal, place the remaining time left to sleep back into tv.
67 */
68 if (0 == nanosleep(&tv, &tv))
69 return 0;
70 } while (errno == EINTR);
71 return -1;
72 }
73
74
75 long long enforceSpeedLimit(unsigned long long limit, long long num, struct timespec *last)
76 {
77 struct timespec now;
78 long long tdiff;
79 double dt;
80 long self = (long) pthread_self();
81
82 num += Blocksize;
83 if (num < 0) {
84 debugmsg("enforceSpeedLimit(%lld,%lld): thread %ld\n",limit,num,self);
85 return num;
86 }
87 (void) clock_gettime(ClockSrc,&now);
88 tdiff = timediff(&now,last);
89 dt = (double)tdiff * 1E-6;
90 if (((double)num/dt) > (double)limit) {
91 double req = (double)num/limit - dt;
92 long long w = (long long) (req * 1E6);
93 if (w >= TickTime) {
94 long long slept, ret;
95 (void) mt_usleep(w);
96 (void) clock_gettime(ClockSrc,last);
97 slept = timediff(last,&now);
98 ret = -(long long)((double)limit * (double)(slept-w) * 1E-6);
99 debugmsg("thread %ld: slept for %lld usec (planned for %lld), ret = %lld\n",self,slept,w,ret);
100 return ret;
101 } else {
102 debugmsg("thread %ld: request for sleeping %lld usec delayed\n",self,w);
103 /*
104 * Sleeping now would cause too much of a slowdown. So
105 * we defer this sleep until the sleeping time is
106 * longer than the tick time. Like this we can stay as
107 * close to the speed limit as possible.
108 */
109 return num;
110 }
111 }
112 debugmsg("thread %ld: %lld/%g (%g) <= %g\n",self,num,dt,num/dt,(double)limit);
113 return num;
114 }
115
116
117 void releaseLock(void *l)
118 {
119 int err = pthread_mutex_unlock((pthread_mutex_t *)l);
120 assert(err == 0);
121 }
122
123
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #ifndef COMMON_H
20 #define COMMON_H
21
22 #include <sys/time.h>
23
24 #ifdef __sun
25 #include <synch.h>
26 #define sem_t sema_t
27 #define sem_init(a,b,c) sema_init(a,c,USYNC_THREAD,0)
28 #define sem_post sema_post
29 #define sem_getvalue(a,b) ((*(b) = (a)->count), 0)
30 #if defined(__SunOS_5_8) || defined(__SunOS_5_9)
31 #define sem_wait SemWait
32 #else
33 #define sem_wait sema_wait
34 #endif
35 #endif
36
37 int mt_usleep(unsigned long long sleep_usecs);
38 long long enforceSpeedLimit(unsigned long long limit, long long num, struct timespec *last);
39 void releaseLock(void *l);
40
41 #endif
8787 #undef inline
8888 #endif
8989
90
91 /* derived configuration defines */
92 #include <assert.h>
93 #include <fcntl.h>
94 #include <limits.h>
95 #include <semaphore.h>
96 #include <unistd.h>
97
98 #ifdef O_LARGEFILE
99 #define LARGEFILE O_LARGEFILE
100 #else
101 #define LARGEFILE 0
90102 #endif
103
104 #ifdef O_DIRECT
105 #define DIRECT O_DIRECT
106 #else
107 #define DIRECT 0
108 #endif
109
110 #ifndef PATH_MAX
111 #define PATH_MAX 1024
112 #endif
113
114 #ifndef S_SPLINT_S
115 #ifndef _POSIX_SEMAPHORES
116 #error posix sempahores are required
117 #endif
118 #endif
119
120 #ifdef __CYGWIN__
121 #include <malloc.h>
122 #undef assert
123 #define assert(x) ((x) || (*(char *) 0 = 1))
124 #endif
125
126 #ifdef __FreeBSD__
127 #include <sys/sysctl.h>
128 #endif
129
130 #define _GNU_SOURCE 1 /* needed for O_DIRECT */
131
132 #endif
00 #! /bin/sh
11 # Guess values for system-dependent variables and create Makefiles.
2 # Generated by GNU Autoconf 2.69 for mbuffer 20170806.
2 # Generated by GNU Autoconf 2.69 for mbuffer 20170921.
33 #
44 #
55 # Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
576576 # Identity of this package.
577577 PACKAGE_NAME='mbuffer'
578578 PACKAGE_TARNAME='mbuffer'
579 PACKAGE_VERSION='20170806'
580 PACKAGE_STRING='mbuffer 20170806'
579 PACKAGE_VERSION='20170921'
580 PACKAGE_STRING='mbuffer 20170921'
581581 PACKAGE_BUGREPORT=''
582582 PACKAGE_URL=''
583583
671671 docdir
672672 oldincludedir
673673 includedir
674 runstatedir
675674 localstatedir
676675 sharedstatedir
677676 sysconfdir
746745 sysconfdir='${prefix}/etc'
747746 sharedstatedir='${prefix}/com'
748747 localstatedir='${prefix}/var'
749 runstatedir='${localstatedir}/run'
750748 includedir='${prefix}/include'
751749 oldincludedir='/usr/include'
752750 docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
999997 | -silent | --silent | --silen | --sile | --sil)
1000998 silent=yes ;;
1001999
1002 -runstatedir | --runstatedir | --runstatedi | --runstated \
1003 | --runstate | --runstat | --runsta | --runst | --runs \
1004 | --run | --ru | --r)
1005 ac_prev=runstatedir ;;
1006 -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
1007 | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
1008 | --run=* | --ru=* | --r=*)
1009 runstatedir=$ac_optarg ;;
1010
10111000 -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
10121001 ac_prev=sbindir ;;
10131002 -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
11451134 for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
11461135 datadir sysconfdir sharedstatedir localstatedir includedir \
11471136 oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
1148 libdir localedir mandir runstatedir
1137 libdir localedir mandir
11491138 do
11501139 eval ac_val=\$$ac_var
11511140 # Remove trailing slashes.
12581247 # Omit some internal or obsolete options to make the list less imposing.
12591248 # This message is too long to be a string in the A/UX 3.1 sh.
12601249 cat <<_ACEOF
1261 \`configure' configures mbuffer 20170806 to adapt to many kinds of systems.
1250 \`configure' configures mbuffer 20170921 to adapt to many kinds of systems.
12621251
12631252 Usage: $0 [OPTION]... [VAR=VALUE]...
12641253
12981287 --sysconfdir=DIR read-only single-machine data [PREFIX/etc]
12991288 --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
13001289 --localstatedir=DIR modifiable single-machine data [PREFIX/var]
1301 --runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
13021290 --libdir=DIR object code libraries [EPREFIX/lib]
13031291 --includedir=DIR C header files [PREFIX/include]
13041292 --oldincludedir=DIR C header files for non-gcc [/usr/include]
13251313
13261314 if test -n "$ac_init_help"; then
13271315 case $ac_init_help in
1328 short | recursive ) echo "Configuration of mbuffer 20170806:";;
1316 short | recursive ) echo "Configuration of mbuffer 20170921:";;
13291317 esac
13301318 cat <<\_ACEOF
13311319
14141402 test -n "$ac_init_help" && exit $ac_status
14151403 if $ac_init_version; then
14161404 cat <<\_ACEOF
1417 mbuffer configure 20170806
1405 mbuffer configure 20170921
14181406 generated by GNU Autoconf 2.69
14191407
14201408 Copyright (C) 2012 Free Software Foundation, Inc.
18901878 This file contains any messages produced by compilers while
18911879 running configure, to aid debugging if configure makes a mistake.
18921880
1893 It was created by mbuffer $as_me 20170806, which was
1881 It was created by mbuffer $as_me 20170921, which was
18941882 generated by GNU Autoconf 2.69. Invocation command line was
18951883
18961884 $ $0 $@
64796467 # report actual input values of CONFIG_FILES etc. instead of their
64806468 # values after options handling.
64816469 ac_log="
6482 This file was extended by mbuffer $as_me 20170806, which was
6470 This file was extended by mbuffer $as_me 20170921, which was
64836471 generated by GNU Autoconf 2.69. Invocation command line was
64846472
64856473 CONFIG_FILES = $CONFIG_FILES
65416529 cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
65426530 ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
65436531 ac_cs_version="\\
6544 mbuffer config.status 20170806
6532 mbuffer config.status 20170921
65456533 configured by $0, generated by GNU Autoconf 2.69,
65466534 with options \\"\$ac_cs_config\\"
65476535
0 AC_INIT([mbuffer],[20170806])
0 AC_INIT([mbuffer],[20170921])
11 AC_CONFIG_HEADER([config.h])
22 AC_CONFIG_SRCDIR([mbuffer.c])
33
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "globals.h"
20 #include <fcntl.h>
21 #include <sys/time.h>
22
23 dest_t *Dest = 0;
24
25 int
26 Hashers = 0, /* number of hashing threads */
27 In = -1,
28 OptMode = O_EXCL,
29 Terminal = 0, /* do we have a controling terminal? */
30 TermQ[2],
31 Tmp = -1;
32
33 volatile int
34 ActSenders = 0,
35 NumSenders = -1, /* number of sender threads */
36 SendSize = 0,
37 Terminate = 0, /* abort execution, because of error or signal */
38 Watchdog = 0; /* 0: off, 1: started, 2: raised */
39
40 volatile unsigned
41 Done = 0,
42 EmptyCount = 0, /* counter incremented when buffer runs empty */
43 FullCount = 0, /* counter incremented when buffer gets full */
44 MainOutOK = 1; /* is the main outputThread still writing or just coordinating senders */
45
46 volatile unsigned long long
47 Rest = 0,
48 Numin = 0,
49 Numout = 0,
50 InSize = 0;
51
52 char *volatile
53 SendAt = 0;
54
55 size_t
56 IDevBSize = 0,
57 PrefixLen = 0;
58
59 long
60 PgSz = 0,
61 Finish = -1, /* this is for graceful termination */
62 TickTime = 0;
63
64 char
65 *Prefix,
66 **Buffer;
67
68 pthread_mutex_t
69 TermMut = PTHREAD_MUTEX_INITIALIZER, /* prevents statusThread from interfering with request*Volume */
70 LowMut = PTHREAD_MUTEX_INITIALIZER,
71 HighMut = PTHREAD_MUTEX_INITIALIZER,
72 SendMut = PTHREAD_MUTEX_INITIALIZER;
73
74 sem_t
75 Dev2Buf,
76 Buf2Dev;
77
78 pthread_cond_t
79 PercLow = PTHREAD_COND_INITIALIZER, /* low watermark */
80 PercHigh = PTHREAD_COND_INITIALIZER, /* high watermark */
81 SendCond = PTHREAD_COND_INITIALIZER;
82
83 pthread_t
84 ReaderThr,
85 WatchdogThr;
86
87 struct timeval
88 Starttime;
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #ifndef GLOBALS_H
20 #define GLOBALS_H
21
22 #include <pthread.h>
23 #include <semaphore.h>
24
25 typedef struct destination dest_t;
26
27 extern dest_t *Dest;
28
29 #define OPTION_B 1
30 #define OPTION_M 2
31 #define OPTION_S 4
32
33 extern int
34 Hashers,
35 Terminal,
36 TermQ[2],
37 Tmp,
38 In,
39 OptMode;
40
41 extern volatile int
42 ActSenders,
43 NumSenders, /* number of sender threads */
44 SendSize,
45 Terminate, /* abort execution, because of error or signal */
46 Watchdog; /* 0: off, 1: started, 2: raised */
47
48 extern volatile unsigned
49 Done,
50 EmptyCount, /* counter incremented when buffer runs empty */
51 FullCount, /* counter incremented when buffer gets full */
52 MainOutOK; /* is the main outputThread still writing or just coordinating senders */
53
54 extern volatile unsigned long long
55 Rest,
56 Numin,
57 Numout,
58 InSize;
59
60 extern char *volatile
61 SendAt;
62
63 extern size_t
64 IDevBSize,
65 PrefixLen;
66
67 extern long
68 PgSz,
69 Finish, /* this is for graceful termination */
70 TickTime;
71
72 extern char
73 *Prefix,
74 **Buffer;
75
76 extern pthread_mutex_t
77 TermMut, /* prevents statusThread from interfering with request*Volume */
78 LowMut,
79 HighMut,
80 SendMut;
81
82 extern sem_t
83 Dev2Buf,
84 Buf2Dev;
85
86 extern pthread_cond_t
87 PercLow, /* low watermark */
88 PercHigh, /* high watermark */
89 SendCond;
90
91 extern pthread_t
92 ReaderThr,
93 WatchdogThr;
94
95 extern struct timeval
96 Starttime;
97
98 #endif
0 /*
1 * Copyright (C) 2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #define _GNU_SOURCE
20 #include <assert.h>
21 #include <dlfcn.h>
22 #include <errno.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28
29 ssize_t (*d_open)(const char *path, int oflag, int mode) = 0;
30 ssize_t (*d_read)(int filedes, void *buf, size_t nbyte) = 0;
31 int (*d_fstat)(int ver, int fd, struct stat *st) = 0;
32
33 ssize_t Fd = -1;
34 size_t BSize = 0;
35
36
37
38 int open(const char *path, int oflag, int mode)
39 {
40 if (d_open == 0) {
41 d_open = (ssize_t (*)(const char *,int,int)) dlsym(RTLD_NEXT, "open");
42 fprintf(stderr,"idev.so: d_open = %p\n",d_open);
43 fflush(stderr);
44 }
45 assert(d_open);
46 int fd = d_open(path, oflag, mode);
47 fprintf(stderr,"idev.so: open %s (%s)\n",path,getenv("IDEV"));
48 if (strcmp(path, getenv("IDEV")) == 0) {
49 fprintf(stderr,"idev.so: FD = %d\n",fd);
50 fflush(stderr);
51 Fd = fd;
52 }
53 return fd;
54 }
55
56
57
58 ssize_t read(int fd,void *buf, size_t s)
59 {
60 if (d_read == 0) {
61 d_read = (ssize_t (*)(int,void*,size_t)) dlsym(RTLD_NEXT, "read");
62 fprintf(stderr,"idev.so: d_read = %p\n",d_read);
63 }
64 assert(d_read);
65 if (fd != Fd)
66 return d_read(fd,buf,s);
67 if (BSize == 0)
68 BSize = strtol(getenv("BSIZE"),0,0);
69 if (s < BSize) {
70 fprintf(stderr,"idev.so: read(%d,%p,%u<%u) = ENOMEM\n",fd,buf,s,BSize);
71 fflush(stderr);
72 errno = ENOMEM;
73 return -1;
74 }
75 return d_read(fd,buf,s);
76 }
77
78
79 int __fxstat(int ver, int fd, struct stat *st)
80 {
81 fprintf(stderr,"idev.so: fstat(%d,%d,%p)\n",ver,fd,st);
82 if (d_fstat == 0) {
83 d_fstat = (int (*)(int,int,struct stat *)) dlsym(RTLD_NEXT, "__fxstat");
84 fprintf(stderr,"idev.so: d_fstat = %p\n",d_fstat);
85 }
86 assert(d_fstat);
87 int r = d_fstat(ver,fd,st);
88 if (fd == Fd) {
89 if (BSize == 0)
90 BSize = strtol(getenv("BSIZE"),0,0);
91 fprintf(stderr,"idev.so: blksize set to %lu\n",BSize);
92 fflush(stderr);
93 st->st_blksize = BSize;
94 st->st_mode &= ~S_IFMT;
95 st->st_mode |= S_IFCHR;
96 }
97 return r;
98 }
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "config.h"
20 #include "input.h"
21 #include "common.h"
22 #include "log.h"
23 #include "globals.h"
24 #include "settings.h"
25
26 #include <assert.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <float.h>
30 #include <math.h>
31 #include <pthread.h>
32 #include <stddef.h>
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <string.h>
36 #include <sys/select.h>
37 #include <sys/stat.h>
38 #include <sys/time.h>
39 #include <unistd.h>
40
41
42 #ifdef __sun
43 #define waitInput
44 #else
45 static void waitInput(void)
46 {
47 if (Status != 0) {
48 int maxfd = TermQ[0] > In ? TermQ[0] + 1 : In + 1;
49 int err;
50
51 fd_set readfds;
52 FD_ZERO(&readfds);
53 FD_SET(TermQ[0],&readfds);
54 FD_SET(In,&readfds);
55 do {
56 err = select(maxfd,&readfds,0,0,0);
57 debugiomsg("inputThread: select(%d, {%d,%d}, 0, 0, 0) = %d\n", maxfd,In,TermQ[0],err);
58 assert((err > 0) || (errno == EBADF || errno == EINTR));
59 } while ((err < 0) && (errno == EINTR));
60 if (FD_ISSET(TermQ[0],&readfds))
61 pthread_exit((void *)-1);
62 assert(FD_ISSET(In,&readfds));
63 }
64 }
65 #endif
66
67
68 int promptInteractive()
69 {
70 static const char prompt[] = "\nContinue with next volume? Press 'y' to continue or 'n' to finish...";
71 static const char contmsg[] = "\nyes - continuing with next volume...\n";
72 static const char donemsg[] = "\nno - input done, waiting for output to finish...\n";
73 int err;
74
75 err = pthread_mutex_lock(&TermMut);
76 assert(0 == err);
77 if (-1 == write(STDERR_FILENO,prompt,sizeof(prompt))) {
78 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
79 Terminate = 1;
80 pthread_exit((void *) -1);
81 }
82 for (;;) {
83 char c = 0;
84 if (-1 == read(STDERR_FILENO,&c,1) && (errno != EINTR)) {
85 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
86 Terminate = 1;
87 pthread_exit((void *) -1);
88 }
89 debugmsg("prompt input %c\n",c);
90 switch (c) {
91 case 'n':
92 case 'N':
93 (void) write(STDERR_FILENO,donemsg,sizeof(donemsg));
94 err = pthread_mutex_unlock(&TermMut);
95 assert(0 == err);
96 return 0;
97 case 'y':
98 case 'Y':
99 (void) write(STDERR_FILENO,contmsg,sizeof(contmsg));
100 err = pthread_mutex_unlock(&TermMut);
101 assert(0 == err);
102 return 1;
103 default:;
104 }
105 }
106 }
107
108
109 static int requestInputVolume()
110 {
111 static struct timeval volstart = {0,0};
112 const char *cmd;
113 struct timeval now;
114 double diff;
115 unsigned min,hr;
116 char cmd_buf[15+strlen(Infile)];
117
118 debugmsg("requesting new volume for input\n");
119 (void) gettimeofday(&now,0);
120 if (volstart.tv_sec)
121 diff = now.tv_sec - volstart.tv_sec + (double) (now.tv_usec - volstart.tv_usec) * 1E-6;
122 else
123 diff = now.tv_sec - Starttime.tv_sec + (double) (now.tv_usec - Starttime.tv_usec) * 1E-6;
124 if (diff > 3600) {
125 hr = (unsigned) (diff / 3600);
126 diff -= hr * 3600;
127 min = (unsigned) (diff / 60);
128 diff -= min * 60;
129 infomsg("time for reading volume: %u:%02u:%02f\n",hr,min,diff);
130 } else if (diff > 60) {
131 min = (unsigned) (diff / 60);
132 diff -= min * 60;
133 infomsg("time for reading volume: %02u:%02f\n",min,diff);
134 } else
135 infomsg("time for reading volume: %02fsec.\n",diff);
136 if (-1 == close(In))
137 errormsg("error closing input: %s\n",strerror(errno));
138 do {
139 if ((Autoloader) && (Infile)) {
140 int ret;
141 if (AutoloadCmd) {
142 cmd = AutoloadCmd;
143 } else {
144 (void) snprintf(cmd_buf, sizeof(cmd_buf), "mt -f %s offline", Infile);
145 cmd = cmd_buf;
146 }
147 infomsg("requesting new input volume with command '%s'\n",cmd);
148 ret = system(cmd);
149 if (0 < ret) {
150 warningmsg("error running \"%s\" to change volume in autoloader: exitcode %d\n",cmd,ret);
151 Terminate = 1;
152 pthread_exit((void *) 0);
153 } else if (0 > ret) {
154 errormsg("error starting \"%s\" to change volume in autoloader: %s\n", cmd, strerror(errno));
155 Terminate = 1;
156 pthread_exit((void *) -1);
157 }
158 if (AutoloadTime) {
159 infomsg("waiting for drive to get ready...\n");
160 (void) sleep(AutoloadTime);
161 }
162 } else {
163 if (0 == promptInteractive())
164 return 0;
165 }
166 In = open(Infile, O_RDONLY | LARGEFILE | Direct);
167 if ((-1 == In) && (errno == EINVAL))
168 In = open(Infile, O_RDONLY | Direct);
169 if (-1 == In)
170 errormsg("could not reopen input: %s\n",strerror(errno));
171 #ifdef __sun
172 if (-1 == directio(In,DIRECTIO_ON))
173 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno));
174 #endif
175 } while (In == -1);
176 (void) gettimeofday(&volstart,0);
177 diff = volstart.tv_sec - now.tv_sec + (double) (volstart.tv_usec - now.tv_usec) * 1E-6;
178 infomsg("tape-change took %fsec. - continuing with next volume\n",diff);
179 NumVolumes--;
180 if (Terminal && ! Autoloader) {
181 char msg[] = "\nOK - continuing...\n";
182 (void) write(STDERR_FILENO,msg,sizeof(msg));
183 }
184 return 1;
185 }
186
187
188 void openInput()
189 {
190 if (Infile) {
191 debugmsg("opening input %s\n",Infile);
192 int flags = O_RDONLY | LARGEFILE | Direct;
193 In = open(Infile,flags);
194 if (-1 == In) {
195 if (errno == EINVAL) {
196 flags &= ~LARGEFILE;
197 In = open(Infile,flags);
198 }
199 if (-1 == In)
200 fatal("could not open input file: %s\n",strerror(errno));
201 }
202 struct stat st;
203 if ((0 == fstat(In,&st)) && ((st.st_mode & S_IFMT) == S_IFREG))
204 InSize = st.st_size;
205 } else if (In == -1) {
206 debugmsg("input is stdin\n");
207 In = STDIN_FILENO;
208 }
209 #ifdef __sun
210 if (0 == directio(In,DIRECTIO_ON))
211 infomsg("direct I/O hinting enabled for input\n");
212 else
213 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno));
214 #endif
215 }
216
217
218 static int devread(unsigned at)
219 {
220 static char *DevBuf = 0;
221 static size_t IFill = 0, Off = 0;
222 int num = 0;
223 do {
224 if (IFill) {
225 size_t s = IFill;
226 if (IFill > (Blocksize-num))
227 s = Blocksize-num;
228 debugmsg("fillop %d, fill %d, off %d\n",s,IFill,Off);
229 memcpy(Buffer[at]+num,DevBuf+Off,s);
230 Off += s;
231 IFill -= s;
232 num += s;
233 if (num == Blocksize)
234 return num;
235 }
236 ssize_t in = read(In,Buffer[at] + num,Blocksize - num);
237 debugmsg("devread %d = %d\n",Blocksize-num,in);
238 if (in > 0)
239 num += in;
240 else if (in == 0)
241 return num;
242 else if (in == -1) {
243 if (errno != ENOMEM)
244 return -1;
245 if (DevBuf == 0) {
246 // devread is only called when IDevBSize > 0
247 assert(IDevBSize > 0);
248 DevBuf = malloc(IDevBSize);
249 assert(DevBuf);
250 }
251 assert(IFill == 0);
252 ssize_t i2 = read(In,DevBuf,IDevBSize);
253 debugmsg("devread2 %d = %d %d/%s\n",IDevBSize,i2,errno,strerror(errno));
254 if (i2 == -1) {
255 assert(errno != ENOMEM);
256 return -1;
257 }
258 if (i2 == 0)
259 return num;
260 assert(IFill == 0);
261 IFill = i2;
262 Off = 0;
263 }
264 } while (num != Blocksize);
265 return num;
266 }
267
268
269 int readBlock(unsigned at)
270 {
271 int err;
272 size_t num = 0;
273 waitInput();
274 do {
275 ssize_t in;
276 if (IDevBSize)
277 in = devread(at);
278 else
279 in = read(In,Buffer[at] + num,Blocksize - num);
280 debugiomsg("inputThread: read(In, Buffer[%d] + %llu, %llu) = %d\n", at, num, Blocksize - num, in);
281 if (in > 0) {
282 num += in;
283 } else if ((0 == in) && (Terminal||Autoloader) && (NumVolumes != 1)) {
284 if (0 == requestInputVolume()) {
285 Finish = at;
286 Rest = num;
287 err = pthread_mutex_lock(&HighMut);
288 assert(err == 0);
289 err = sem_post(&Buf2Dev);
290 assert(err == 0);
291 err = pthread_cond_signal(&PercHigh);
292 assert(err == 0);
293 err = pthread_mutex_unlock(&HighMut);
294 assert(err == 0);
295 if (Status)
296 pthread_exit(0);
297 return 0;
298 }
299 } else if ((-1 == in) && (errno == EIO) && (Terminal||Autoloader) && (NumVolumes != 1)) {
300 requestInputVolume(at,num);
301 } else if (in <= 0) {
302 /* error or end-of-file */
303 if ((-1 == in) && (errno == EINTR))
304 continue;
305 if ((-1 == in) && (Terminate == 0))
306 errormsg("inputThread: error reading at offset 0x%llx: %s\n",Numin*Blocksize,strerror(errno));
307 Rest = num;
308 Finish = at;
309 debugmsg("inputThread: last block has %llu bytes\n",num);
310 err = pthread_mutex_lock(&HighMut);
311 assert(err == 0);
312 err = sem_post(&Buf2Dev);
313 assert(err == 0);
314 err = pthread_cond_signal(&PercHigh);
315 assert(err == 0);
316 err = pthread_mutex_unlock(&HighMut);
317 assert(err == 0);
318 infomsg("inputThread: exiting...\n");
319 if (Status)
320 pthread_exit((void *)(ptrdiff_t) in);
321 return in;
322 }
323 } while (num < Blocksize);
324 return 1;
325 }
326
327
328 void *inputThread(void *ignored)
329 {
330 int fill = 0;
331 unsigned at = 0;
332 long long xfer = 0;
333 const double startread = StartRead, startwrite = StartWrite;
334 struct timespec last;
335
336 #ifndef __sun
337 if (Status != 0)
338 assert(TermQ[0] != -1);
339 #endif
340 (void) clock_gettime(ClockSrc,&last);
341 assert(ignored == 0);
342 infomsg("inputThread: starting with threadid %ld...\n",(long)pthread_self());
343 for (;;) {
344 int err;
345
346 if (startread < 1) {
347 err = pthread_mutex_lock(&LowMut);
348 assert(err == 0);
349 err = sem_getvalue(&Buf2Dev,&fill);
350 assert(err == 0);
351 if (fill == Numblocks - 1) {
352 debugmsg("inputThread: buffer full, waiting for it to drain.\n");
353 pthread_cleanup_push(releaseLock,&LowMut);
354 err = pthread_cond_wait(&PercLow,&LowMut);
355 assert(err == 0);
356 pthread_cleanup_pop(0);
357 ++FullCount;
358 debugmsg("inputThread: low watermark reached, continuing...\n");
359 }
360 err = pthread_mutex_unlock(&LowMut);
361 assert(err == 0);
362 }
363 if (Terminate) { /* for async termination requests */
364 debugmsg("inputThread: terminating early upon request...\n");
365 if (-1 == close(In))
366 errormsg("error closing input: %s\n",strerror(errno));
367 if (Status)
368 pthread_exit((void *)1);
369 return (void *) 1;
370 }
371 err = sem_wait(&Dev2Buf); /* Wait for one or more buffer blocks to be free */
372 assert(err == 0);
373 if (0 == readBlock(at))
374 return 0;
375 if (MaxReadSpeed)
376 xfer = enforceSpeedLimit(MaxReadSpeed,xfer,&last);
377 err = sem_post(&Buf2Dev);
378 assert(err == 0);
379 if (startwrite > 0) {
380 err = pthread_mutex_lock(&HighMut);
381 assert(err == 0);
382 err = sem_getvalue(&Buf2Dev,&fill);
383 assert(err == 0);
384 if (((double) fill / (double) Numblocks) + DBL_EPSILON >= startwrite) {
385 err = pthread_cond_signal(&PercHigh);
386 assert(err == 0);
387 }
388 err = pthread_mutex_unlock(&HighMut);
389 assert(err == 0);
390 }
391 if (++at == Numblocks)
392 at = 0;
393 Numin++;
394 }
395 }
396
397
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #ifndef INPUT_H
20 #define INPUT_H
21
22 void *inputThread(void *ignored);
23 void openInput();
24
25 #endif
2727 #endif
2828 #endif
2929
30 #define _GNU_SOURCE 1 /* needed for O_DIRECT */
31 #include <assert.h>
3230 #include <errno.h>
33 #include <fcntl.h>
34 #include <float.h>
3531 #include <libgen.h>
3632 #include <limits.h>
3733 #include <math.h>
38 #include <netdb.h>
39 #include <pthread.h>
40 #include <semaphore.h>
4134 #include <signal.h>
35 #include <stddef.h>
4236 #include <stdio.h>
4337 #include <stdlib.h>
4438 #include <strings.h>
4539 #include <string.h>
4640 #include <sys/mman.h>
47 #include <sys/socket.h>
4841 #include <sys/stat.h>
49 #include <sys/time.h>
5042 #include <termios.h>
51 #include <unistd.h>
52 #include <stddef.h>
53
54
55 #ifdef __FreeBSD__
56 #include <sys/sysctl.h>
57 #endif
43
5844
5945 #ifdef HAVE_SENDFILE
6046 #ifdef HAVE_SENDFILE_H
6652 #define EBADRQC EINVAL
6753 #endif
6854
69 #ifndef PATH_MAX
70 #define PATH_MAX 1024
71 #endif
72
73 /*
74 * _POSIX_THREADS is only defined if full thread support is available.
75 * We don't need full thread support, so we skip this test...
76 * #ifndef _POSIX_THREADS
77 * #error posix threads are required
78 * #endif
79 */
80
81 #ifndef S_SPLINT_S
82 #ifndef _POSIX_SEMAPHORES
83 #error posix sempahores are required
84 #endif
85 #endif
86
87 #ifdef O_LARGEFILE
88 #define LARGEFILE O_LARGEFILE
89 #else
90 #define LARGEFILE 0
91 #endif
92
93 #ifdef O_DIRECT
94 #define DIRECT O_DIRECT
95 #else
96 #define DIRECT 0
97 #endif
98
99
55
56 #include "common.h"
10057 #include "dest.h"
58 #include "globals.h"
10159 #include "hashing.h"
60 #include "input.h"
61 #include "log.h"
10262 #include "network.h"
103 #include "log.h"
63 #include "settings.h"
10464
10565 /* if this sendfile implementation does not support sending from buffers,
10666 disable sendfile support */
11171 #undef HAVE_SENDFILE
11272 #endif
11373
114 char
115 *Prefix;
116 int
117 In = -1, WatchdogRaised = 0;
118 volatile int
119 NumSenders = -1,/* number of sender threads */
120 SendSize = 0,
121 Terminate = 0; /* abort execution, because of error or signal */
122 size_t
123 PrefixLen = 0;
124 dest_t *Dest = 0;
125 char *volatile SendAt = 0;
126
127
128 static pthread_t
129 Reader, Watchdog;
130 static long
131 Tmp = -1,
132 OptSync = 0;
133 static unsigned long
134 Outsize = 10240, Timeout = 0;
135 static volatile int
136 EmptyCount = 0, /* counter incremented when buffer runs empty */
137 FullCount = 0, /* counter incremented when buffer gets full */
138 Done = 0,
139 MainOutOK = 1; /* is the main outputThread still writing or just coordinating senders */
140 static unsigned long long
141 Totalmem = 0, PgSz = 0, NumP = 0, Blocksize = 10240, Pause = 0,
142 MaxReadSpeed = 0, MaxWriteSpeed = 0, OutVolsize = 0;
143 static volatile unsigned long long
144 Rest = 0, Numin = 0, Numout = 0, InSize = 0;
145 static double
146 StartWrite = 0, StartRead = 1;
147 static char
148 *Tmpfile = 0, **Buffer;
149 static const char
150 *Infile = 0, *AutoloadCmd = 0;
151 static unsigned int
152 AutoloadTime = 0;
153 static int
154 Memlock = 0, TermQ[2],
155 Memmap = 0, Quiet = 0, Status = 1, StatusLog = 1,
156 Hashers = 0, Direct = 0, SetOutsize = 0, TapeAware = 0;
157 static long
158 NumVolumes = 1, /* number of input volumes, 0 for interactive prompting */
159 Finish = -1, /* this is for graceful termination */
160 Numblocks = 512; /* number of buffer blocks */
161 static long long
162 TickTime = 0;
163
164 static clockid_t
165 ClockSrc = CLOCK_REALTIME;
166
167 static float
168 StatusInterval = 0.5;
169
170 #ifdef __sun
171 #include <synch.h>
172 #define sem_t sema_t
173 #define sem_init(a,b,c) sema_init(a,c,USYNC_THREAD,0)
174 #define sem_post sema_post
175 #define sem_getvalue(a,b) ((*(b) = (a)->count), 0)
176 #if defined(__SunOS_5_8) || defined(__SunOS_5_9)
177 #define sem_wait SemWait
178 int SemWait(sema_t *s)
179 {
180 int err;
181 do {
182 err = sema_wait(s);
183 } while (err == EINTR);
184 return err;
185 }
186 #else
187 #define sem_wait sema_wait
188 #endif
189 #endif
190
191 static sem_t Dev2Buf, Buf2Dev;
192 static pthread_cond_t
193 PercLow = PTHREAD_COND_INITIALIZER, /* low watermark */
194 PercHigh = PTHREAD_COND_INITIALIZER, /* high watermark */
195 SendCond = PTHREAD_COND_INITIALIZER;
196 static pthread_mutex_t
197 TermMut = PTHREAD_MUTEX_INITIALIZER, /* prevents statusThread from interfering with request*Volume */
198 LowMut = PTHREAD_MUTEX_INITIALIZER,
199 HighMut = PTHREAD_MUTEX_INITIALIZER,
200 SendMut = PTHREAD_MUTEX_INITIALIZER;
201 static int Terminal = 1, Autoloader = 0;
202 static struct timeval Starttime;
203 static volatile int ActSenders = 0;
204
205 #ifdef __CYGWIN__
206 #include <malloc.h>
207 #undef assert
208 #define assert(x) ((x) || (*(char *) 0 = 1))
209 #endif
210
211 typedef enum { off, on, invalid } flag_t;
21274
21375
21476 static int kb2str(char *s, double v)
274136 msg += sprintf(msg,", %dx full",FullCount);
275137 *msg++ = '\n';
276138 *msg = '\0';
277 if (Log != STDERR_FILENO)
139 if ((Log != STDERR_FILENO) && (StatusLog != 0))
278140 (void) write(Log,buf,msg-buf);
279 if (Status)
141 if ((Status != 0) && (Quiet == 0))
280142 (void) write(STDERR_FILENO,buf,msg-buf);
281143 }
282144
291153 d = d->next;
292154 } while (d);
293155 if (Status)
294 (void) pthread_cancel(Reader);
156 (void) pthread_cancel(ReaderThr);
295157 }
296158
297159
303165 ErrorOccurred = 1;
304166 Terminate = 1;
305167 (void) close(In);
306 if (TermQ[1] != -1) {
168 if (TermQ[1] != -1)
307169 (void) write(TermQ[1],"0",1);
308 }
309170 if (StartWrite > 0)
310171 (void) pthread_cond_signal(&PercHigh);
311172 if (StartRead < 1)
317178 }
318179
319180
320
321 /* Thread-safe replacement for usleep. Argument must be a whole
322 * number of microseconds to sleep.
323 */
324 static int mt_usleep(unsigned long long sleep_usecs)
325 {
326 struct timespec tv;
327 tv.tv_sec = sleep_usecs / 1000000;
328 tv.tv_nsec = (sleep_usecs % 1000000) * 1000;
329
330 do {
331 /* Sleep for the time specified in tv. If interrupted by a
332 * signal, place the remaining time left to sleep back into tv.
333 */
334 if (0 == nanosleep(&tv, &tv))
335 return 0;
336 } while (errno == EINTR);
337 return -1;
338 }
339
340
341
342 static void *watchdogThread(void *ignored)
181 void *watchdogThread(void *ignored)
343182 {
344183 unsigned long ni = Numin, no = Numout;
345184 unsigned long long timeout = Timeout * 1000000LL;
346185 for (;;) {
347186 mt_usleep(timeout);
348 if (WatchdogRaised) {
187 if (Watchdog > 1) {
349188 errormsg("watchdog timeout: SIGINT had no effect; sending SIGKILL\n");
350189 kill(getpid(),SIGKILL);
351190 }
352191 if ((ni == Numin) && (Finish == -1)) {
353192 errormsg("watchdog timeout: input stalled; sending SIGINT\n");
354 WatchdogRaised = 1;
193 Watchdog = 2;
355194 kill(getpid(),SIGINT);
356195 }
357196 if (no == Numout) {
358197 errormsg("watchdog timeout: output stalled; sending SIGINT\n");
359 WatchdogRaised = 1;
198 Watchdog = 2;
360199 kill(getpid(),SIGINT);
361200 }
362201 ni = Numin;
479318 }
480319
481320
482
483 static inline long long timediff(struct timespec *restrict t1, struct timespec *restrict t2)
484 {
485 long long tdiff;
486 tdiff = (t1->tv_sec - t2->tv_sec) * 1000000;
487 tdiff += (t1->tv_nsec - t2->tv_nsec) / 1000;
488 if (tdiff < 0)
489 tdiff = 0;
490 return tdiff;
491 }
492
493
494
495 static long long enforceSpeedLimit(unsigned long long limit, long long num, struct timespec *last)
496 {
497 struct timespec now;
498 long long tdiff;
499 double dt;
500 long self = (long) pthread_self();
501
502 num += Blocksize;
503 if (num < 0) {
504 debugmsg("enforceSpeedLimit(%lld,%lld): thread %ld\n",limit,num,self);
505 return num;
506 }
507 (void) clock_gettime(ClockSrc,&now);
508 tdiff = timediff(&now,last);
509 dt = (double)tdiff * 1E-6;
510 if (((double)num/dt) > (double)limit) {
511 double req = (double)num/limit - dt;
512 long long w = (long long) (req * 1E6);
513 if (w >= TickTime) {
514 long long slept, ret;
515 (void) mt_usleep(w);
516 (void) clock_gettime(ClockSrc,last);
517 slept = timediff(last,&now);
518 ret = -(long long)((double)limit * (double)(slept-w) * 1E-6);
519 debugmsg("thread %ld: slept for %lld usec (planned for %lld), ret = %lld\n",self,slept,w,ret);
520 return ret;
521 } else {
522 debugmsg("thread %ld: request for sleeping %lld usec delayed\n",self,w);
523 /*
524 * Sleeping now would cause too much of a slowdown. So
525 * we defer this sleep until the sleeping time is
526 * longer than the tick time. Like this we can stay as
527 * close to the speed limit as possible.
528 */
529 return num;
530 }
531 }
532 debugmsg("thread %ld: %lld/%g (%g) <= %g\n",self,num,dt,num/dt,(double)limit);
533 return num;
534 }
535
536
537
538 static int promptInteractive(unsigned at, unsigned num)
539 {
540 static const char prompt[] = "\nContinue with next volume? Press 'y' to continue or 'n' to finish...";
541 static const char contmsg[] = "\nyes - continuing with next volume...\n";
542 static const char donemsg[] = "\nno - input done, waiting for output to finish...\n";
543 int err;
544
545 err = pthread_mutex_lock(&TermMut);
546 assert(0 == err);
547 if (-1 == write(STDERR_FILENO,prompt,sizeof(prompt))) {
548 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
549 Terminate = 1;
550 pthread_exit((void *) -1);
551 }
552 for (;;) {
553 char c = 0;
554 if (-1 == read(STDERR_FILENO,&c,1) && (errno != EINTR)) {
555 errormsg("error accessing controlling terminal for manual volume change request: %s\nConsider using autoload option, when running mbuffer without terminal.\n",strerror(errno));
556 Terminate = 1;
557 pthread_exit((void *) -1);
558 }
559 debugmsg("prompt input %c\n",c);
560 switch (c) {
561 case 'n':
562 case 'N':
563 Rest = num;
564 (void) write(STDERR_FILENO,donemsg,sizeof(donemsg));
565 err = pthread_mutex_lock(&HighMut);
566 assert(err == 0);
567 err = sem_post(&Buf2Dev);
568 assert(err == 0);
569 err = pthread_cond_signal(&PercHigh);
570 assert(err == 0);
571 err = pthread_mutex_unlock(&HighMut);
572 assert(err == 0);
573 err = pthread_mutex_unlock(&TermMut);
574 assert(0 == err);
575 Finish = at;
576 if (Status)
577 pthread_exit(0);
578 return 0;
579 case 'y':
580 case 'Y':
581 (void) write(STDERR_FILENO,contmsg,sizeof(contmsg));
582 err = pthread_mutex_unlock(&TermMut);
583 assert(0 == err);
584 return 1;
585 default:;
586 }
587 }
588 }
589
590
591
592 static int requestInputVolume(unsigned at, unsigned num)
593 {
594 static struct timeval volstart = {0,0};
595 const char *cmd;
596 struct timeval now;
597 double diff;
598 unsigned min,hr;
599 char cmd_buf[15+strlen(Infile)];
600
601 debugmsg("requesting new volume for input\n");
602 (void) gettimeofday(&now,0);
603 if (volstart.tv_sec)
604 diff = now.tv_sec - volstart.tv_sec + (double) (now.tv_usec - volstart.tv_usec) * 1E-6;
605 else
606 diff = now.tv_sec - Starttime.tv_sec + (double) (now.tv_usec - Starttime.tv_usec) * 1E-6;
607 if (diff > 3600) {
608 hr = (unsigned) (diff / 3600);
609 diff -= hr * 3600;
610 min = (unsigned) (diff / 60);
611 diff -= min * 60;
612 infomsg("time for reading volume: %u:%02u:%02f\n",hr,min,diff);
613 } else if (diff > 60) {
614 min = (unsigned) (diff / 60);
615 diff -= min * 60;
616 infomsg("time for reading volume: %02u:%02f\n",min,diff);
617 } else
618 infomsg("time for reading volume: %02fsec.\n",diff);
619 if (-1 == close(In))
620 errormsg("error closing input: %s\n",strerror(errno));
621 do {
622 if ((Autoloader) && (Infile)) {
623 int ret;
624 if (AutoloadCmd) {
625 cmd = AutoloadCmd;
626 } else {
627 (void) snprintf(cmd_buf, sizeof(cmd_buf), "mt -f %s offline", Infile);
628 cmd = cmd_buf;
629 }
630 infomsg("requesting new input volume with command '%s'\n",cmd);
631 ret = system(cmd);
632 if (0 < ret) {
633 warningmsg("error running \"%s\" to change volume in autoloader: exitcode %d\n",cmd,ret);
634 Terminate = 1;
635 pthread_exit((void *) 0);
636 } else if (0 > ret) {
637 errormsg("error starting \"%s\" to change volume in autoloader: %s\n", cmd, strerror(errno));
638 Terminate = 1;
639 pthread_exit((void *) -1);
640 }
641 if (AutoloadTime) {
642 infomsg("waiting for drive to get ready...\n");
643 (void) sleep(AutoloadTime);
644 }
645 } else {
646 if (0 == promptInteractive(at,num))
647 return 0;
648 }
649 In = open(Infile, O_RDONLY | LARGEFILE | Direct);
650 if ((-1 == In) && (errno == EINVAL))
651 In = open(Infile, O_RDONLY | Direct);
652 if (-1 == In)
653 errormsg("could not reopen input: %s\n",strerror(errno));
654 #ifdef __sun
655 if (-1 == directio(In,DIRECTIO_ON))
656 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno));
657 #endif
658 } while (In == -1);
659 (void) gettimeofday(&volstart,0);
660 diff = volstart.tv_sec - now.tv_sec + (double) (volstart.tv_usec - now.tv_usec) * 1E-6;
661 infomsg("tape-change took %fsec. - continuing with next volume\n",diff);
662 NumVolumes--;
663 if (Terminal && ! Autoloader) {
664 char msg[] = "\nOK - continuing...\n";
665 (void) write(STDERR_FILENO,msg,sizeof(msg));
666 }
667 return 1;
668 }
669
670
671
672 static void releaseLock(void *l)
673 {
674 int err = pthread_mutex_unlock((pthread_mutex_t *)l);
675 assert(err == 0);
676 }
677
678
679
680 #ifndef __sun
681 static void waitInput(void)
682 {
683 if (Status != 0) {
684 int maxfd = TermQ[0] > In ? TermQ[0] + 1 : In + 1;
685 int err;
686
687 fd_set readfds;
688 FD_ZERO(&readfds);
689 FD_SET(TermQ[0],&readfds);
690 FD_SET(In,&readfds);
691 do {
692 err = select(maxfd,&readfds,0,0,0);
693 debugiomsg("inputThread: select(%d, {%d,%d}, 0, 0, 0) = %d\n", maxfd,In,TermQ[0],err);
694 assert((err > 0) || (errno == EBADF || errno == EINTR));
695 } while ((err < 0) && (errno == EINTR));
696 if (FD_ISSET(TermQ[0],&readfds))
697 pthread_exit((void *)-1);
698 assert(FD_ISSET(In,&readfds));
699 }
700 }
701 #endif
702
703
704
705 static void *inputThread(void *ignored)
706 {
707 int fill = 0;
708 unsigned long long num;
709 int at = 0;
710 long long xfer = 0;
711 const double startread = StartRead, startwrite = StartWrite;
712 struct timespec last;
713
714 #ifndef __sun
715 if (Status != 0)
716 assert(TermQ[0] != -1);
717 #endif
718 (void) clock_gettime(ClockSrc,&last);
719 assert(ignored == 0);
720 infomsg("inputThread: starting with threadid %ld...\n",(long)pthread_self());
721 for (;;) {
722 int err;
723
724 if (startread < 1) {
725 err = pthread_mutex_lock(&LowMut);
726 assert(err == 0);
727 err = sem_getvalue(&Buf2Dev,&fill);
728 assert(err == 0);
729 if (fill == Numblocks - 1) {
730 debugmsg("inputThread: buffer full, waiting for it to drain.\n");
731 pthread_cleanup_push(releaseLock,&LowMut);
732 err = pthread_cond_wait(&PercLow,&LowMut);
733 assert(err == 0);
734 pthread_cleanup_pop(0);
735 ++FullCount;
736 debugmsg("inputThread: low watermark reached, continuing...\n");
737 }
738 err = pthread_mutex_unlock(&LowMut);
739 assert(err == 0);
740 }
741 if (Terminate) { /* for async termination requests */
742 debugmsg("inputThread: terminating early upon request...\n");
743 if (-1 == close(In))
744 errormsg("error closing input: %s\n",strerror(errno));
745 if (Status)
746 pthread_exit((void *)1);
747 return (void *) 1;
748 }
749 err = sem_wait(&Dev2Buf); /* Wait for one or more buffer blocks to be free */
750 assert(err == 0);
751 num = 0;
752 do {
753 int in;
754 #ifndef __sun
755 waitInput();
756 #endif
757 in = read(In,Buffer[at] + num,Blocksize - num);
758 debugiomsg("inputThread: read(In, Buffer[%d] + %llu, %llu) = %d\n", at, num, Blocksize - num, in);
759 if (in > 0) {
760 num += in;
761 } else if ((0 == in) && (Terminal||Autoloader) && (NumVolumes != 1)) {
762 if (0 == requestInputVolume(at,num))
763 return 0;
764 } else if ((-1 == in) && (errno == EIO) && (Terminal||Autoloader) && (NumVolumes != 1)) {
765 requestInputVolume(at,num);
766 } else if (in <= 0) {
767 /* error or end-of-file */
768 if ((-1 == in) && (errno == EINTR))
769 continue;
770 if ((-1 == in) && (Terminate == 0))
771 errormsg("inputThread: error reading at offset 0x%llx: %s\n",Numin*Blocksize,strerror(errno));
772 Rest = num;
773 Finish = at;
774 debugmsg("inputThread: last block has %llu bytes\n",num);
775 err = pthread_mutex_lock(&HighMut);
776 assert(err == 0);
777 err = sem_post(&Buf2Dev);
778 assert(err == 0);
779 err = pthread_cond_signal(&PercHigh);
780 assert(err == 0);
781 err = pthread_mutex_unlock(&HighMut);
782 assert(err == 0);
783 infomsg("inputThread: exiting...\n");
784 if (Status)
785 pthread_exit((void *)(ptrdiff_t) in);
786 return (void *)(ptrdiff_t) in;
787 }
788 } while (num < Blocksize);
789 if (MaxReadSpeed)
790 xfer = enforceSpeedLimit(MaxReadSpeed,xfer,&last);
791 err = sem_post(&Buf2Dev);
792 assert(err == 0);
793 if (startwrite > 0) {
794 err = pthread_mutex_lock(&HighMut);
795 assert(err == 0);
796 err = sem_getvalue(&Buf2Dev,&fill);
797 assert(err == 0);
798 if (((double) fill / (double) Numblocks) + DBL_EPSILON >= startwrite) {
799 err = pthread_cond_signal(&PercHigh);
800 assert(err == 0);
801 }
802 err = pthread_mutex_unlock(&HighMut);
803 assert(err == 0);
804 }
805 if (++at == Numblocks)
806 at = 0;
807 Numin++;
808 }
809 }
810
811
812
813321 int syncSenders(char *b, int s)
814322 {
815323 static volatile int size = 0, skipped = 0;
1338846
1339847
1340848
1341 static void version(void)
1342 {
1343 (void) fprintf(stderr,
1344 "mbuffer version "PACKAGE_VERSION"\n"\
1345 "Copyright 2001-2017 - T. Maier-Komor\n"\
1346 "License: GPLv3 - see file LICENSE\n"\
1347 "This program comes with ABSOLUTELY NO WARRANTY!!!\n"
1348 "Donations via PayPal to thomas@maier-komor.de are welcome and support this work!\n"
1349 "\n"
1350 );
1351 exit(EXIT_SUCCESS);
1352 }
1353
1354
1355
1356 static void usage(void)
1357 {
1358 const char *dim = "bkMGTP";
1359 unsigned long long m = Numblocks * Blocksize;
1360 while (m >= 10000) {
1361 m >>= 10;
1362 ++dim;
1363 }
1364 (void) fprintf(stderr,
1365 "usage: mbuffer [Options]\n"
1366 "Options:\n"
1367 "-b <num> : use <num> blocks for buffer (default: %ld)\n"
1368 "-s <size> : use blocks of <size> bytes for processing (default: %llu)\n"
1369 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
1370 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: 2%% = %llu%c)\n"
1371 #else
1372 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: %llu%c)\n"
1373 #endif
1374 #ifdef _POSIX_MEMLOCK_RANGE
1375 "-L : lock buffer in memory (unusable with file based buffers)\n"
1376 #endif
1377 "-d : use blocksize of device for output\n"
1378 "-D <size> : assumed output device size (default: infinite/auto-detect)\n"
1379 "-P <num> : start writing after buffer has been filled more than <num>%%\n"
1380 "-p <num> : start reading after buffer has been filled less than <num>%%\n"
1381 "-i <file> : use <file> for input\n"
1382 "-o <file> : use <file> for output (this option can be passed MULTIPLE times)\n"
1383 "--append : append to output file (must be passed before -o)\n"
1384 "--truncate : truncate next file (must be passed before -o)\n"
1385 "-I <h>:<p> : use network port <port> as input, allow only host <h> to connect\n"
1386 "-I <p> : use network port <port> as input\n"
1387 "-O <h>:<p> : output data to host <h> and port <p> (MUTLIPLE outputs supported)\n"
1388 "-n <num> : <num> volumes for input, '0' to prompt interactively\n"
1389 "-t : use memory mapped temporary file (for huge buffer)\n"
1390 "-T <file> : as -t but uses <file> as buffer\n"
1391 "-l <file> : use <file> for logging messages\n"
1392 "-u <num> : pause <num> milliseconds after each write\n"
1393 "-r <rate> : limit read rate to <rate> B/s, where <rate> can be given in b,k,M,G\n"
1394 "-R <rate> : same as -r for writing; use either one, if your tape is too fast\n"
1395 "-f : overwrite existing files\n"
1396 "-a <time> : autoloader which needs <time> seconds to reload\n"
1397 "-A <cmd> : issue command <cmd> to request new volume\n"
1398 "-v <level> : set verbose level to <level> (valid values are 0..6)\n"
1399 "-q : quiet - do not display the status on stderr\n"
1400 "-Q : quiet - do not log the status\n"
1401 "-c : write with synchronous data integrity support\n"
1402 "-e : stop processing on any kind of error\n"
1403 #ifdef O_DIRECT
1404 "--direct : open input and output with O_DIRECT\n"
1405 #endif
1406 #if defined HAVE_LIBCRYPTO || defined HAVE_LIBMD5 || defined HAVE_LIBMHASH
1407 "-H\n"
1408 "--md5 : generate md5 hash of transfered data\n"
1409 "--hash <a> : use alogritm <a>, if <a> is 'list' possible algorithms are listed\n"
1410 "--pid : print PID of this instance\n"
1411 "-W <time> : set watchdog timeout to <time> seconds\n"
1412 #endif
1413 "-4 : force use of IPv4\n"
1414 "-6 : force use of IPv6\n"
1415 "-0 : use IPv4 or IPv6\n"
1416 "--tcpbuffer: size for TCP buffer\n"
1417 "--tapeaware: write to end of tape instead of stopping when the drive signals\n"
1418 " the media end is approaching (write until 2x ENOSPC errors)\n"
1419 "-V\n"
1420 "--version : print version information\n"
1421 "Unsupported buffer options: -t -Z -B\n"
1422 ,Numblocks
1423 ,Blocksize
1424 ,m
1425 ,*dim
1426 );
1427 exit(EXIT_SUCCESS);
1428 }
1429
1430
1431 static unsigned long long calcint(const char **argv, int c, unsigned long long def)
1432 {
1433 char ch;
1434 double d = (double)def;
1435
1436 switch (sscanf(argv[c],"%lf%c",&d,&ch)) {
1437 default:
1438 abort();
1439 break;
1440 case 2:
1441 if (d <= 0)
1442 fatal("invalid argument - must be > 0\n");
1443 switch (ch) {
1444 case 'k':
1445 case 'K':
1446 d *= 1024.0;
1447 return (unsigned long long) d;
1448 case 'm':
1449 case 'M':
1450 d *= 1024.0*1024.0;
1451 return (unsigned long long) d;
1452 case 'g':
1453 case 'G':
1454 d *= 1024.0*1024.0*1024.0;
1455 return (unsigned long long) d;
1456 case 't':
1457 case 'T':
1458 d *= 1024.0*1024.0*1024.0*1024.0;
1459 return (unsigned long long) d;
1460 case '%':
1461 if ((d >= 90) || (d <= 0))
1462 fatal("invalid value for percentage (must be 0..90)\n");
1463 return (unsigned long long) d;
1464 case 'b':
1465 case 'B':
1466 if (d < 128)
1467 fatal("invalid value for number of bytes\n");
1468 return (unsigned long long) d;
1469 default:
1470 if (argv[c][-2] == '-')
1471 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch,&argv[c][-2]);
1472 else
1473 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch,argv[c-1]);
1474 return d;
1475 }
1476 case 1:
1477 if (d <= 0)
1478 fatal("invalid argument - must be > 0\n");
1479 if (d <= 100) {
1480 if (argv[c][-2] == '-')
1481 fatal("invalid low value for option \"%s\" - missing suffix?\n",&argv[c][-2]);
1482 else
1483 fatal("invalid low value for option \"%s\" - missing suffix?\n",argv[c-1]);
1484 }
1485 return d;
1486 case 0:
1487 break;
1488 }
1489 errormsg("unrecognized argument \"%s\" for option \"%s\"\n",argv[c],argv[c-1]);
1490 return d;
1491 }
1492
1493
1494
1495 static int argcheck(const char *opt, const char **argv, int *c, int argc)
1496 {
1497 if (strncmp(opt,argv[*c],strlen(opt)))
1498 return 1;
1499 if (strlen(argv[*c]) > 2)
1500 argv[*c] += 2;
1501 else {
1502 (*c)++;
1503 if (*c == argc)
1504 fatal("missing argument to option %s\n",opt);
1505 }
1506 return 0;
1507 }
1508
1509
1510
1511849 static void openDestinationFiles(dest_t *d)
1512850 {
1513851 unsigned errs = ErrorOccurred;
1549887 fatal("unable to open all outputs\n");
1550888 }
1551889
1552
1553
1554 static const char *calcval(const char *arg, unsigned long long *res)
1555 {
1556 char ch;
1557 double d;
1558
1559 switch (sscanf(arg,"%lf%c",&d,&ch)) {
1560 default:
1561 abort();
1562 break;
1563 case 2:
1564 if (d <= 0)
1565 return "negative value out of range";
1566 switch (ch) {
1567 case 'k':
1568 case 'K':
1569 d *= 1024.0;
1570 *res = d;
1571 return 0;
1572 case 'm':
1573 case 'M':
1574 d *= 1024.0*1024.0;
1575 *res = d;
1576 return 0;
1577 case 'g':
1578 case 'G':
1579 d *= 1024.0*1024.0*1024.0;
1580 *res = d;
1581 return 0;
1582 case 't':
1583 case 'T':
1584 d *= 1024.0*1024.0*1024.0*1024.0;
1585 *res = d;
1586 return 0;
1587 case '%':
1588 if ((d >= 90) || (d <= 0))
1589 return "invalid value for percentage (must be 0..90)";
1590 *res = d;
1591 return 0;
1592 case 'b':
1593 case 'B':
1594 if (d < 128)
1595 return "invalid value for number of bytes";
1596 *res = d;
1597 return 0;
1598 default:
1599 return "invalid dimension";
1600 }
1601 case 1:
1602 if (d <= 0)
1603 return "value out of range";
1604 *res = d;
1605 return 0;
1606 case 0:
1607 break;
1608 }
1609 return "unrecognized argument";
1610 }
1611
1612
1613 static int isEmpty(const char *l)
1614 {
1615 while (*l) {
1616 if ((*l != ' ') && (*l != '\t'))
1617 return 0;
1618 ++l;
1619 }
1620 return 1;
1621 }
1622
1623
1624 flag_t parseFlag(const char *valuestr)
1625 {
1626 if ((strcasecmp(valuestr,"yes") == 0) || (strcasecmp(valuestr,"on") == 0) || (strcmp(valuestr,"1") == 0) || (strcmp(valuestr,"true") == 0))
1627 return on;
1628 else if ((strcasecmp(valuestr,"no") == 0) || (strcasecmp(valuestr,"off") == 0) || (strcmp(valuestr,"0") == 0) || (strcmp(valuestr,"false") == 0))
1629 return off;
1630 else
1631 return invalid;
1632 }
1633
1634
1635 static void readConfigFile(const char *cfname)
1636 {
1637 int df,lineno = 0;
1638 struct stat st;
1639 char *cfdata, *line;
1640
1641 df = open(cfname,O_RDONLY);
1642 if (df == -1) {
1643 if (errno == ENOENT)
1644 infomsg("no config file %s\n",cfname);
1645 else
1646 warningmsg("error opening config file %s: %s\n",cfname,strerror(errno));
1647 return;
1648 }
1649 if (-1 == fstat(df,&st)) {
1650 warningmsg("unable to stat config file %s: %s\n",cfname,strerror(errno));
1651 close(df);
1652 return;
1653 }
1654 if ((getuid() != st.st_uid) && (st.st_uid != 0)) {
1655 warningmsg("ignoring config file '%s' from different user\n",cfname);
1656 close(df);
1657 return;
1658 }
1659 infomsg("reading config file %s\n",cfname);
1660 cfdata = malloc(st.st_size+1);
1661 if (cfdata == 0)
1662 fatal("out of memory\n");
1663 int n = read(df,cfdata,st.st_size);
1664 close(df);
1665 if (n < 0) {
1666 warningmsg("error reading %s: %s\n",cfname,strerror(errno));
1667 free(cfdata);
1668 return;
1669 }
1670 cfdata[n] = 0;
1671 line = cfdata;
1672 while (line && *line) {
1673 char key[64],valuestr[64];
1674 int a;
1675 ++lineno;
1676 char *nl = strchr(line,'\n');
1677 if (nl) {
1678 *nl = 0;
1679 ++nl;
1680 }
1681 char *pound = strchr(line,'#');
1682 if (pound)
1683 *pound = 0;
1684 if (isEmpty(line)) {
1685 line = nl;
1686 continue;
1687 }
1688 a = sscanf(line,"%63[A-Za-z]%*[ \t=:]%63[0-9a-zA-Z.]",key,valuestr);
1689 if (a != 2) {
1690 warningmsg("config file %s, line %d: error parsing '%s'\n",cfname,lineno,line);
1691 line = nl;
1692 continue;
1693 }
1694 line = nl;
1695 debugmsg("parsing key/value pair %s=%s\n",key,valuestr);
1696 if (strcasecmp(key,"numblocks") == 0) {
1697 long nb = strtol(valuestr,0,0);
1698 if ((nb == 0) && (errno == EINVAL)) {
1699 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1700 } else {
1701 Numblocks = nb;
1702 debugmsg("Numblocks = %llu\n",Numblocks);
1703 }
1704 } else if (strcasecmp(key,"pause") == 0) {
1705 long long p = strtoll(valuestr,0,0);
1706 if ((p == 0) && (errno == EINVAL)) {
1707 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1708 } else {
1709 Pause = p;
1710 debugmsg("Pause = %lldusec\n",Pause);
1711 }
1712 } else if (strcasecmp(key,"autoloadtime") == 0) {
1713 long at = strtol(valuestr,0,0) - 1;
1714 if ((at == 0) && (errno == EINVAL))
1715 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1716 else {
1717 AutoloadTime = at;
1718 debugmsg("Autoloader time = %d\n",AutoloadTime);
1719 }
1720 } else if (strcasecmp(key,"startread") == 0) {
1721 double sr = 0;
1722 if (1 == sscanf(valuestr,"%lf",&sr))
1723 sr /= 100;
1724 if ((sr <= 1) && (sr > 0)) {
1725 StartRead = sr;
1726 debugmsg("StartRead = %1.2lf\n",StartRead);
1727 }
1728 } else if (strcasecmp(key,"startwrite") == 0) {
1729 double sw = 0;
1730 if (1 == sscanf(valuestr,"%lf",&sw))
1731 sw /= 100;
1732 if ((sw <= 1) && (sw > 0)) {
1733 StartWrite = sw;
1734 debugmsg("StartWrite = %1.2lf\n",StartWrite);
1735 }
1736 } else if (strcasecmp(key,"timeout") == 0) {
1737 long t = strtol(valuestr,0,0);
1738 if (((t == 0) && (errno == EINVAL)) || (t < 0))
1739 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1740 else {
1741 Timeout = t;
1742 debugmsg("Timeout = %lu sec.\n",Timeout);
1743 }
1744 } else if (strcasecmp(key,"showstatus") == 0) {
1745 switch (parseFlag(valuestr)) {
1746 case on:
1747 Quiet = 0;
1748 debugmsg("showstatus = yes\n");
1749 break;
1750 case off:
1751 Quiet = 1;
1752 debugmsg("showstatus = no\n");
1753 break;
1754 default:
1755 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1756 }
1757 } else if (strcasecmp(key,"tapeaware") == 0) {
1758 switch (parseFlag(valuestr)) {
1759 case on:
1760 TapeAware = 0;
1761 debugmsg("tapeaware = off\n");
1762 break;
1763 case off:
1764 TapeAware = 1;
1765 debugmsg("tapeaware = on\n");
1766 break;
1767 default:
1768 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1769 }
1770 } else if (strcasecmp(key,"logstatus") == 0) {
1771 switch (parseFlag(valuestr)) {
1772 case on:
1773 StatusLog = 1;
1774 debugmsg("logstatus = yes\n");
1775 break;
1776 case off:
1777 StatusLog = 0;
1778 debugmsg("logstatus = no\n");
1779 break;
1780 default:
1781 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1782 }
1783 } else if (strcasecmp(key,"memlock") == 0) {
1784 switch (parseFlag(valuestr)) {
1785 case on:
1786 Memlock = 1;
1787 debugmsg("Memlock = %lu\n",Memlock);
1788 break;
1789 case off:
1790 Memlock = 0;
1791 debugmsg("Memlock = %lu\n",Memlock);
1792 break;
1793 default:
1794 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1795 }
1796 } else if (strcasecmp(key,"printpid") == 0) {
1797 switch (parseFlag(valuestr)) {
1798 case on:
1799 printmsg("PID is %d\n",getpid());
1800 break;
1801 case off:
1802 /* don't do anything */
1803 break;
1804 default:
1805 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
1806 }
1807 } else if (strcasecmp(key,"StatusInterval") == 0) {
1808 StatusInterval = strtof(valuestr,0);
1809 debugmsg("StatusInterval = %f\n",StatusInterval);
1810 } else if (strcasecmp(key,"verbose") == 0) {
1811 setVerbose(valuestr);
1812 } else {
1813 unsigned long long value = 0;
1814 const char *argerror = calcval(valuestr,&value);
1815 if (argerror) {
1816 warningmsg("ignoring invalid key/value pair (%s = %s): %s\n",key,valuestr,argerror);
1817 } else if (strcasecmp(key,"blocksize") == 0) {
1818 Blocksize = value;
1819 debugmsg("Blocksize = %lu\n",Blocksize);
1820 } else if (strcasecmp(key,"maxwritespeed") == 0) {
1821 MaxWriteSpeed = value;
1822 debugmsg("MaxWriteSpeed = %lu\n",MaxWriteSpeed);
1823 } else if (strcasecmp(key,"maxreadspeed") == 0) {
1824 MaxReadSpeed = value;
1825 debugmsg("MaxReadSpeed = %lu\n",MaxReadSpeed);
1826 } else if (strcasecmp(key,"Totalmem") == 0) {
1827 if (value >= 100) {
1828 Totalmem = value;
1829 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
1830 } else if (NumP && PgSz) {
1831 Totalmem = ((unsigned long long) NumP * PgSz * value) / 100 ;
1832 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
1833 } else {
1834 warningmsg("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
1835 }
1836 } else if (strcasecmp(key,"tcpbuffer") == 0) {
1837 TCPBufSize = value;
1838 debugmsg("TCPBufSize = %lu\n",TCPBufSize);
1839 } else {
1840 warningmsg("unknown parameter: %s\n",key);
1841 }
1842 }
1843 }
1844 free(cfdata);
1845 }
1846890
1847891
1848892 static int joinSenders()
1903947 {
1904948 /* gather system parameters */
1905949 TickTime = 1000000 / sysconf(_SC_CLK_TCK);
1906 #if defined(_SC_AVPHYS_PAGES) //&& !defined(__CYGWIN__)
950 #if defined(_SC_AVPHYS_PAGES)
1907951 NumP = sysconf(_SC_AVPHYS_PAGES);
1908952 if (NumP < 0) {
1909953 warningmsg("unable to determine number of available memory pages: %s\n",strerror(errno));
1954998 dfname[l++] = '/';
1955999 memcpy(dfname+l,".mbuffer.rc",12);
19561000 readConfigFile(dfname);
1957 }
1958
1959
1960 static void openInput()
1961 {
1962 debugmsg("opening input...\n");
1963 if (Infile) {
1964 int flags = O_RDONLY | LARGEFILE | Direct;
1965 In = open(Infile,flags);
1966 if (-1 == In) {
1967 if (errno == EINVAL) {
1968 flags &= ~LARGEFILE;
1969 In = open(Infile,flags);
1970 }
1971 if (-1 == In)
1972 fatal("could not open input file: %s\n",strerror(errno));
1973 }
1974 struct stat st;
1975 if ((0 == fstat(In,&st)) && ((st.st_mode & S_IFMT) == S_IFREG))
1976 InSize = st.st_size;
1977 } else if (In == -1) {
1978 In = STDIN_FILENO;
1979 }
1980 #ifdef __sun
1981 if (0 == directio(In,DIRECTIO_ON))
1982 infomsg("direct I/O hinting enabled for input\n");
1983 else
1984 infomsg("direct I/O hinting failed for input: %s\n",strerror(errno));
1985 #endif
1986 }
1987
1988
1989 static void initBuffer()
1990 {
1991 int c = 0;
1992
1993 if ((Blocksize * (long long)Numblocks) > (long long)SSIZE_MAX)
1994 fatal("Cannot address so much memory (%lld*%d=%lld>%lld).\n",Blocksize,Numblocks,Blocksize*(long long)Numblocks,(long long)SSIZE_MAX);
1995 /* create buffer */
1996 Buffer = (char **) valloc(Numblocks * sizeof(char *));
1997 if (!Buffer)
1998 fatal("Could not allocate enough memory (%d requested): %s\n",Numblocks * sizeof(char *),strerror(errno));
1999 if (Memmap) {
2000 infomsg("mapping temporary file to memory with %llu blocks with %llu byte (%llu kB total)...\n",(unsigned long long) Numblocks,(unsigned long long) Blocksize,(unsigned long long) ((Numblocks*Blocksize) >> 10));
2001 if (!Tmpfile) {
2002 char tmplname[] = "mbuffer-XXXXXX";
2003 char *tmpdir = getenv("TMPDIR") ? getenv("TMPDIR") : "/var/tmp";
2004 size_t tl = strlen(tmpdir);
2005 Tmpfile = malloc(sizeof(tmplname) + tl + 1);
2006 if (!Tmpfile)
2007 fatal("out of memory: %s\n",strerror(errno));
2008 (void) memcpy(Tmpfile,tmpdir,tl);
2009 Tmpfile[tl] = '/';
2010 (void) strncpy(Tmpfile+tl+1,tmplname,sizeof(tmplname));
2011 Tmp = mkstemp(Tmpfile);
2012 infomsg("tmpfile is %s\n",Tmpfile);
2013 } else {
2014 mode_t mode = O_RDWR | LARGEFILE;
2015 if (strncmp(Tmpfile,"/dev/",5))
2016 mode |= O_CREAT|O_EXCL;
2017 Tmp = open(Tmpfile,mode,0600);
2018 }
2019 if (-1 == Tmp)
2020 fatal("could not create temporary file (%s): %s\n",Tmpfile,strerror(errno));
2021 if (strncmp(Tmpfile,"/dev/",5))
2022 (void) unlink(Tmpfile);
2023 /* resize the file. Needed - at least under linux, who knows why? */
2024 if (-1 == lseek(Tmp,Numblocks * Blocksize - sizeof(int),SEEK_SET))
2025 fatal("could not resize temporary file: %s\n",strerror(errno));
2026 if (-1 == write(Tmp,&c,sizeof(c)))
2027 fatal("could not resize temporary file: %s\n",strerror(errno));
2028 Buffer[0] = mmap(0,Blocksize*Numblocks,PROT_READ|PROT_WRITE,MAP_SHARED,Tmp,0);
2029 if (MAP_FAILED == Buffer[0])
2030 fatal("could not map buffer-file to memory: %s\n",strerror(errno));
2031 debugmsg("temporary file mapped to address %p\n",Buffer[0]);
2032 } else {
2033 infomsg("allocating memory for %d blocks with %llu byte (%llu kB total)...\n",Numblocks,(unsigned long long) Blocksize,(unsigned long long) ((Numblocks*Blocksize) >> 10));
2034 Buffer[0] = (char *) valloc(Blocksize * Numblocks);
2035 if (Buffer[0] == 0)
2036 fatal("Could not allocate enough memory (%lld requested): %s\n",(unsigned long long)Blocksize * Numblocks,strerror(errno));
2037 #ifdef MADV_DONTFORK
2038 if (-1 == madvise(Buffer[0],Blocksize * Numblocks, MADV_DONTFORK))
2039 warningmsg("unable to advise memory handling of buffer: %s\n",strerror(errno));
2040 #endif
2041 }
2042 for (c = 1; c < Numblocks; c++) {
2043 Buffer[c] = Buffer[0] + Blocksize * c;
2044 *Buffer[c] = 0; /* touch every block before locking */
2045 }
2046
2047 #ifdef _POSIX_MEMLOCK_RANGE
2048 if (Memlock) {
2049 uid_t uid;
2050 #ifndef HAVE_SETEUID
2051 #define seteuid setuid
2052 #endif
2053 uid = geteuid();
2054 if (0 != seteuid(0))
2055 warningmsg("could not change to uid 0 to lock memory (is mbuffer setuid root?)\n");
2056 else if ((0 != mlock((char *)Buffer,Numblocks * sizeof(char *))) || (0 != mlock(Buffer[0],Blocksize * Numblocks)))
2057 warningmsg("could not lock buffer in memory: %s\n",strerror(errno));
2058 else
2059 infomsg("memory locked successfully\n");
2060 int err = seteuid(uid); /* don't give anyone a chance to attack this program, so giveup uid 0 after locking... */
2061 assert(err == 0);
2062 }
2063 #endif
2064 }
2065
2066
2067 static void searchOptionV(int argc, const char **argv)
2068 {
2069 int c;
2070 for (c = 1; c < argc; c++) {
2071 const char *arg = argv[c];
2072 if ((arg[0] == '-') && (arg[1] == 'v')) {
2073 if (arg[2]) {
2074 setVerbose(arg+2);
2075 } else if (++c < argc) {
2076 setVerbose(argv[c]);
2077 } else {
2078 fatal("missing argument to option -v\n");
2079 }
2080 }
2081 }
20821001 }
20831002
20841003
21101029 if (-1 == fstat(In,&st))
21111030 warningmsg("could not stat input: %s\n",strerror(errno));
21121031 else if (S_ISBLK(st.st_mode) || S_ISCHR(st.st_mode)) {
2113 if (Blocksize % st.st_blksize != 0) {
1032 IDevBSize = st.st_blksize;
1033 if ((st.st_blksize != 0) && (Blocksize % st.st_blksize != 0)) {
21141034 warningmsg("Block size is not a multiple of native input size.\n");
21151035 infomsg("input device's native block-size is %d bytes\n",st.st_blksize);
21161036 infomsg("transfer block size is %lld\n", Blocksize);
2117 infomsg("Adjust block size (option -s) if you get out-of-memory on input.\n");
21181037 } else {
21191038 infomsg("input device's native block-size is %d bytes\n",st.st_blksize);
21201039 }
21281047 }
21291048
21301049
2131 static int outputSet()
2132 {
2133 if (Dest == 0)
2134 return 0;
1050 static int outputIsSet()
1051 {
21351052 dest_t *d = Dest;
2136 do {
2137 debugmsg("outputSet: %d\n",d->fd);
2138 if (d->fd != -2)
1053 while (d) {
1054 if (d->fd != -2) {
1055 debugmsg("outputIsSet: %d\n",d->fd);
21391056 return 1;
1057 }
21401058 d = d->next;
2141 } while (d);
1059 }
1060 debugmsg("no output is set\n");
21421061 return 0;
21431062 }
21441063
21451064
21461065 int main(int argc, const char **argv)
21471066 {
2148 int optMode = O_EXCL;
2149 int optMset = 0, optSset = 0, optBset = 0, watchdogStarted = 0;
2150 int numstdout = 0;
2151 long mxnrsem;
21521067 int c, fl, err;
21531068 sigset_t signalSet;
2154 unsigned short netPortIn = 0;
2155 unsigned short netPortOut = 0;
21561069 char *argv0 = strdup(argv[0]), *progname, null;
2157 const char *outfile = 0;
21581070 struct sigaction sig;
21591071 dest_t *dest = 0;
21601072
21721084 /* setup parameters */
21731085 initDefaults();
21741086 debugmsg("default buffer set to %d blocks of %lld bytes\n",Numblocks,Blocksize);
2175 for (c = 1; c < argc; c++) {
2176 if (!argcheck("-s",argv,&c,argc)) {
2177 Blocksize = Outsize = calcint(argv,c,Blocksize);
2178 optSset = 1;
2179 debugmsg("Blocksize = %llu\n",Blocksize);
2180 if (Blocksize < 100)
2181 fatal("cannot set blocksize as percentage of total physical memory\n");
2182 } else if (!strcmp("--append",argv[c])) {
2183 optMode |= O_APPEND;
2184 optMode &= ~O_EXCL;
2185 debugmsg("append to next file\n");
2186 } else if (!strcmp("--truncate",argv[c])) {
2187 optMode |= O_TRUNC;
2188 debugmsg("truncate next file\n");
2189 } else if (!argcheck("-m",argv,&c,argc)) {
2190 Totalmem = calcint(argv,c,Totalmem);
2191 optMset = 1;
2192 if (Totalmem < 100) {
2193 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
2194 Totalmem = ((unsigned long long) NumP * PgSz * Totalmem) / 100 ;
2195 #else
2196 fatal("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
2197 #endif
2198 }
2199 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
2200 } else if (!argcheck("-b",argv,&c,argc)) {
2201 long nb = strtol(argv[c],0,0);
2202 if ((nb == 0) && (errno == EINVAL)) {
2203 errormsg("invalid argument to option -b: \"%s\"\n",argv[c]);
2204 } else {
2205 Numblocks = nb;
2206 optBset = 1;
2207 }
2208 debugmsg("Numblocks = %llu\n",Numblocks);
2209 } else if (!strcmp("--tcpbuffer",argv[c])) {
2210 TCPBufSize = calcint(argv,++c,TCPBufSize);
2211 debugmsg("TCPBufSize = %lu\n",TCPBufSize);
2212 } else if (!strcmp("--tapeaware",argv[c])) {
2213 TapeAware = 1;
2214 debugmsg("sensing early end-of-tape warning\n");
2215 } else if (!argcheck("-d",argv,&c,argc)) {
2216 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
2217 SetOutsize = 1;
2218 debugmsg("setting output size according to the blocksize of the device\n");
2219 #else
2220 fatal("cannot determine blocksize of device (unsupported by OS)\n");
2221 #endif
2222 } else if (!argcheck("-v",argv,&c,argc)) {
2223 /* option has to be checked again, so that the
2224 * command line can override a config file */
2225 setVerbose(argv[c]);
2226 } else if (!argcheck("-u",argv,&c,argc)) {
2227
2228 long long p = strtoll(argv[c],0,0);
2229 if ((p == 0) && (errno == EINVAL))
2230 errormsg("invalid argument to option -u: \"%s\"\n",argv[c]);
2231 else
2232 Pause = p;
2233 debugmsg("Pause = %lldusec\n",Pause);
2234 } else if (!argcheck("-r",argv,&c,argc)) {
2235 MaxReadSpeed = calcint(argv,c,0);
2236 debugmsg("MaxReadSpeed = %lld\n",MaxReadSpeed);
2237 } else if (!argcheck("-R",argv,&c,argc)) {
2238 MaxWriteSpeed = calcint(argv,c,0);
2239 debugmsg("MaxWriteSpeed = %lld\n",MaxWriteSpeed);
2240 } else if (!argcheck("-n",argv,&c,argc)) {
2241 long nv = strtol(argv[c],0,0);
2242 if ((nv < 0) || ((nv == 0) && (errno == EINVAL)))
2243 fatal("invalid argument to option -n: \"%s\"\n",argv[c]);
2244 else
2245 NumVolumes = nv;
2246 if (NumVolumes < 0)
2247 fatal("argument for number of volumes must be > 0\n");
2248 debugmsg("NumVolumes = %d\n",NumVolumes);
2249 } else if (!argcheck("-i",argv,&c,argc)) {
2250 if (strcmp(argv[c],"-")) {
2251 Infile = argv[c];
2252 debugmsg("Infile = %s\n",Infile);
2253 } else {
2254 Infile = STDIN_FILENO;
2255 debugmsg("Infile is stdin\n");
2256 }
2257 } else if (!argcheck("-o",argv,&c,argc)) {
2258 dest_t *dest = malloc(sizeof(dest_t));
2259 if (strcmp(argv[c],"-")) {
2260 debugmsg("output file: %s\n",argv[c]);
2261 dest->arg = argv[c];
2262 dest->name = argv[c];
2263 dest->fd = -1;
2264 dest->mode = O_CREAT|O_WRONLY|optMode|Direct|LARGEFILE|OptSync;
2265 } else {
2266 if (numstdout++)
2267 fatal("cannot output multiple times to stdout\n");
2268 debugmsg("output to stdout\n",argv[c]);
2269 dest->fd = dup(STDOUT_FILENO);
2270 err = dup2(STDERR_FILENO,STDOUT_FILENO);
2271 assert(err != -1);
2272 dest->arg = "<stdout>";
2273 dest->name = "<stdout>";
2274 dest->mode = 0;
2275 }
2276 optMode = O_EXCL;
2277 dest->port = 0;
2278 dest->result = 0;
2279 bzero(&dest->thread,sizeof(dest->thread));
2280 dest->next = Dest;
2281 Dest = dest;
2282 if (outfile == 0)
2283 outfile = argv[c];
2284 ++NumSenders;
2285 #ifdef AF_INET6
2286 } else if (!strcmp("-0",argv[c])) {
2287 AddrFam = AF_UNSPEC;
2288 } else if (!strcmp("-4",argv[c])) {
2289 AddrFam = AF_INET;
2290 } else if (!strcmp("-6",argv[c])) {
2291 AddrFam = AF_INET6;
2292 #endif
2293 } else if (!argcheck("-I",argv,&c,argc)) {
2294 initNetworkInput(argv[c]);
2295 } else if (!argcheck("-O",argv,&c,argc)) {
2296 dest_t *d = createNetworkOutput(argv[c]);
2297 if (d->fd == -1) {
2298 free(d);
2299 } else {
2300 d->next = Dest;
2301 Dest = d;
2302 ++NumSenders;
2303 }
2304 } else if (!argcheck("-T",argv,&c,argc)) {
2305 Tmpfile = strdup(argv[c]);
2306 if (!Tmpfile)
2307 fatal("out of memory\n");
2308 Memmap = 1;
2309 debugmsg("Tmpfile = %s\n",Tmpfile);
2310 } else if (!strcmp("-t",argv[c])) {
2311 Memmap = 1;
2312 debugmsg("Memmap = 1\n");
2313 } else if (!argcheck("-l",argv,&c,argc)) {
2314 Log = open(argv[c],O_WRONLY|O_APPEND|O_TRUNC|O_CREAT|LARGEFILE,0666);
2315 if (-1 == Log) {
2316 Log = STDERR_FILENO;
2317 errormsg("error opening log file: %s\n",strerror(errno));
2318 }
2319 debugmsg("logFile set to %s\n",argv[c]);
2320 } else if (!strcmp("-f",argv[c])) {
2321 optMode &= ~O_EXCL;
2322 debugmsg("overwrite = 1\n");
2323 } else if (!strcmp("-q",argv[c])) {
2324 debugmsg("disabling display of status\n");
2325 Quiet = 1;
2326 } else if (!strcmp("-Q",argv[c])) {
2327 debugmsg("disabling logging of status\n");
2328 StatusLog = 0;
2329 } else if (!strcmp("-c",argv[c])) {
2330 debugmsg("enabling full synchronous I/O\n");
2331 OptSync = O_SYNC;
2332 } else if (!strcmp("-e",argv[c])) {
2333 debugmsg("will terminate on any kind of error\n");
2334 ErrorsFatal = 1;
2335 } else if (!argcheck("-a",argv,&c,argc)) {
2336 long at = strtol(argv[c],0,0) - 1;
2337 if ((at == 0) && (errno == EINVAL))
2338 errormsg("invalid argument to option -a: \"%s\"\n",argv[c]);
2339 else {
2340 Autoloader = 1;
2341 AutoloadTime = at;
2342 }
2343 debugmsg("Autoloader time = %d\n",AutoloadTime);
2344 } else if (!argcheck("-A",argv,&c,argc)) {
2345 Autoloader = 1;
2346 AutoloadCmd = argv[c];
2347 debugmsg("Autoloader command = \"%s\"\n", AutoloadCmd);
2348 } else if (!argcheck("-P",argv,&c,argc)) {
2349 if (1 != sscanf(argv[c],"%lf",&StartWrite))
2350 StartWrite = 0;
2351 StartWrite /= 100;
2352 if ((StartWrite > 1) || (StartWrite <= 0))
2353 fatal("error in argument -P: must be bigger than 0 and less or equal 100\n");
2354 debugmsg("StartWrite = %1.2lf\n",StartWrite);
2355 } else if (!argcheck("-p",argv,&c,argc)) {
2356 if (1 == sscanf(argv[c],"%lf",&StartRead))
2357 StartRead /= 100;
2358 else
2359 StartRead = 1.0;
2360 if ((StartRead >= 1) || (StartRead < 0))
2361 fatal("error in argument -p: must be bigger or equal to 0 and less than 100\n");
2362 debugmsg("StartRead = %1.2lf\n",StartRead);
2363 } else if (!strcmp("-L",argv[c])) {
2364 #ifdef _POSIX_MEMLOCK_RANGE
2365 Memlock = 1;
2366 debugmsg("memory locking enabled\n");
2367 #else
2368 warning("POSIX memory locking is unsupported on this system.\n");
2369 #endif
2370 } else if (!argcheck("-W",argv,&c,argc)) {
2371 Timeout = strtol(argv[c],0,0);
2372 if (Timeout <= 0)
2373 fatal("invalid argument to option -W\n");
2374 if (Timeout <= AutoloadTime)
2375 fatal("timeout must be bigger than autoload time\n");
2376 err = pthread_create(&Watchdog,0,&watchdogThread,(void*)0);
2377 assert(0 == err);
2378 infomsg("started watchdog with Timeout = %lu sec.\n",Timeout);
2379 watchdogStarted = 1;
2380 } else if (!strcmp("--direct",argv[c])) {
2381 #ifdef O_DIRECT
2382 debugmsg("using O_DIRECT to open file descriptors\n");
2383 Direct = O_DIRECT;
2384 #else
2385 warningmsg("--direct is unsupported on this system\n");
2386 #endif
2387 } else if (!strcmp("--help",argv[c]) || !strcmp("-h",argv[c])) {
2388 usage();
2389 } else if (!strcmp("--version",argv[c]) || !strcmp("-V",argv[c])) {
2390 version();
2391 } else if (!strcmp("--md5",argv[c]) || !strcmp("-H",argv[c])) {
2392 #ifdef HAVE_MD5
2393 if (addHashAlgorithm("MD5")) {
2394 ++Hashers;
2395 ++NumSenders;
2396 }
2397 #else
2398 fatal("hash calculation support has not been compiled in!\n");
2399 #endif
2400 } else if (!strcmp("--hash",argv[c])) {
2401 ++c;
2402 if (c == argc)
2403 fatal("missing argument to option --hash\n");
2404 if (!strcmp(argv[c],"list")) {
2405 listHashAlgos();
2406 exit(EXIT_SUCCESS);
2407 }
2408 if (addHashAlgorithm(argv[c])) {
2409 ++Hashers;
2410 ++NumSenders;
2411 }
2412 } else if (!strcmp("--pid",argv[c])) {
2413 printmsg("PID is %d\n",getpid());
2414 } else if (!argcheck("-D",argv,&c,argc)) {
2415 OutVolsize = calcint(argv,c,0);
2416 debugmsg("OutVolsize = %llu\n",OutVolsize);
2417 } else
2418 fatal("unknown option \"%s\"\n",argv[c]);
2419 }
1087 for (c = 1; c < argc; c++)
1088 c = parseOption(c,argc,argv);
24201089
24211090 /* consistency check for options */
24221091 if (AutoloadTime && Timeout && Timeout <= AutoloadTime)
24231092 fatal("autoload time must be smaller than watchdog timeout\n");
2424 if (optBset&optSset&optMset) {
1093 if (Options == (OPTION_B|OPTION_M|OPTION_S)) {
1094 /* options -m -b -s set */
24251095 if (Numblocks * Blocksize != Totalmem)
24261096 fatal("inconsistent options: blocksize * number of blocks != totalsize!\n");
2427 } else if (((!optBset)&optSset&optMset) || (optMset&(!optBset)&(!optSset))) {
1097 } else if ((Options == (OPTION_S|OPTION_M)) || (Options == OPTION_M)) {
24281098 if (Totalmem <= Blocksize)
24291099 fatal("total memory must be larger than block size\n");
24301100 Numblocks = Totalmem / Blocksize;
24311101 infomsg("Numblocks = %llu, Blocksize = %llu, Totalmem = %llu\n",(unsigned long long)Numblocks,(unsigned long long)Blocksize,(unsigned long long)Totalmem);
2432 } else if (optBset&!optSset&optMset) {
1102 } else if (Options == (OPTION_B|OPTION_M)) {
24331103 if (Blocksize == 0)
24341104 fatal("blocksize must be greater than 0\n");
24351105 if (Totalmem <= Blocksize)
24421112 if ((NumSenders-Hashers > 0) && (Autoloader || OutVolsize))
24431113 fatal("multi-volume support is unsupported with multiple outputs\n");
24441114 if (Autoloader) {
2445 if ((!outfile) && (!Infile))
1115 if ((!OutFile) && (!Infile))
24461116 fatal("Setting autoloader time or command without using a device doesn't make any sense!\n");
2447 if (outfile && Infile) {
1117 if (OutFile && Infile) {
24481118 fatal("Which one is your autoloader? Input or output? Replace input or output with a pipe.\n");
24491119 }
24501120 }
2451 if (Infile && netPortIn)
2452 fatal("Setting both network input port and input file doesn't make sense!\n");
2453 if (outfile && netPortOut)
2454 fatal("Setting both network output and output file doesn't make sense!\n");
24551121
24561122 /* multi volume input consistency checking */
24571123 if ((NumVolumes != 1) && (!Infile))
24581124 fatal("multi volume support for input needs an explicit given input device (option -i)\n");
24591125
24601126 /* SPW: Volsize consistency checking */
2461 if (OutVolsize && !outfile)
1127 if (OutVolsize && !OutFile)
24621128 fatal("Setting OutVolsize without an output device doesn't make sense!\n");
24631129 if ((OutVolsize != 0) && (OutVolsize < Blocksize))
24641130 /* code assumes we can write at least one block */
24651131 fatal("If non-zero, OutVolsize must be at least as large as the buffer blocksize (%llu)!\n",Blocksize);
24661132 /* SPW END */
24671133
2468 /* check that we stay within system limits */
2469 #ifdef __FreeBSD__
2470 {
2471 size_t semvmx_size = sizeof(mxnrsem);
2472 if (sysctlbyname("kern.ipc.semvmx", &mxnrsem, &semvmx_size, 0, 0) == -1)
2473 mxnrsem = -1;
2474 }
2475 #else
2476 mxnrsem = sysconf(_SC_SEM_VALUE_MAX);
2477 #endif
2478 if (-1 == mxnrsem) {
2479 #ifdef SEM_MAX_VALUE
2480 mxnrsem = SEM_MAX_VALUE;
2481 #else
2482 mxnrsem = LONG_MAX;
2483 warningmsg("unable to determine maximum value of semaphores\n");
2484 #endif
2485 }
24861134 if (Numblocks < 5)
24871135 fatal("Minimum block count is 5.\n");
2488 if (Numblocks > mxnrsem) {
2489 fatal("cannot allocate more than %d blocks.\nThis is a system dependent limit, depending on the maximum semaphore value.\nPlease choose a bigger block size.\n",mxnrsem);
2490 }
24911136
24921137 initBuffer();
24931138
24981143 fatal("Error creating semaphore Dev2Buf: %s\n",strerror(errno));
24991144
25001145 openInput();
2501 if (!outputSet()) {
1146 if (!outputIsSet()) {
25021147 debugmsg("no output set - adding stdout as destination\n");
25031148 dest_t *d = malloc(sizeof(dest_t));
25041149 d->fd = dup(STDOUT_FILENO);
25141159 ++NumSenders;
25151160 }
25161161 openDestinationFiles(Dest);
2517 if (NumSenders == -1) {
1162 if (NumSenders == -1)
25181163 fatal("no output left - nothing to do\n");
2519 }
25201164
25211165 sig.sa_handler = SIG_IGN;
25221166 sigemptyset(&sig.sa_mask);
25261170 fl = fcntl(STDERR_FILENO,F_GETFL);
25271171 err = fcntl(STDERR_FILENO,F_SETFL,fl | O_NONBLOCK);
25281172 assert(err == 0);
2529 if ((read(STDERR_FILENO,&c,1) == -1) && (errno != EAGAIN)) {
1173 if ((read(STDERR_FILENO,&c,1) != -1) || (errno == EAGAIN)) {
1174 Terminal = 1;
1175 } else {
25301176 int tty = open("/dev/tty",O_RDWR);
25311177 if (-1 == tty) {
25321178 Terminal = 0;
2533 if ((Autoloader == 0) && (outfile))
1179 if ((Autoloader == 0) && (OutFile))
25341180 warningmsg("No controlling terminal and no autoloader command specified.\n");
25351181 } else {
25361182 Terminal = 1;
25381184 assert(err != -1);
25391185 }
25401186 }
2541 if (Terminal) {
2542 debugmsg("found controlling terminal\n");
2543 } else {
2544 debugmsg("no access to controlling terminal available\n");
2545 }
1187 debugmsg(Terminal ? "found controlling terminal\n" : "no access to controlling terminal available\n");
25461188 err = fcntl(STDERR_FILENO,F_SETFL,fl);
25471189 assert(err == 0);
25481190 if ((Terminal == 1) && (NumVolumes != 1)) {
26031245 TermQ[0] = -1;
26041246 TermQ[1] = -1;
26051247 }
2606 if (!watchdogStarted && Timeout) {
2607 err = pthread_create(&Watchdog,0,&watchdogThread,(void*)0);
1248 if ((Watchdog == 0) && (Timeout != 0)) {
1249 err = pthread_create(&WatchdogThr,0,&watchdogThread,(void*)0);
26081250 assert(0 == err);
26091251 infomsg("started watchdog with Timeout = %lu sec.\n",Timeout);
26101252 }
26151257 err = pthread_create(&dest->thread,0,&outputThread,dest);
26161258 assert(0 == err);
26171259 if (Status) {
2618 err = pthread_create(&Reader,0,&inputThread,0);
1260 err = pthread_create(&ReaderThr,0,&inputThread,0);
26191261 assert(0 == err);
26201262 (void) pthread_sigmask(SIG_UNBLOCK, &signalSet, NULL);
26211263 statusThread();
2622 err = pthread_join(Reader,0);
1264 err = pthread_join(ReaderThr,0);
26231265 if (err != 0)
26241266 errormsg("error joining reader: %s\n",strerror(errno));
26251267 } else {
3030 #endif
3131
3232
33 #include <assert.h>
3433 #include <errno.h>
3534 #include <stdio.h>
3635 #include <stdlib.h>
3938 #include <sys/types.h>
4039 #include <sys/socket.h>
4140 #include <sys/un.h>
42 #include <unistd.h>
4341 #include <netdb.h>
4442 #include <netinet/in.h>
4543 #include <arpa/inet.h>
4644
4745 #include "dest.h"
46 #include "globals.h"
4847 #include "network.h"
48 #include "settings.h"
4949 #include "log.h"
5050
51 extern int In;
5251 int32_t TCPBufSize = (int32_t)1 << 20;
5352 #if defined(PF_INET6) && defined(PF_UNSPEC)
5453 int AddrFam = PF_UNSPEC;
9796 int err, sock = -1, l;
9897
9998 debugmsg("initNetworkInput(\"%s\")\n",addr);
99 if (Infile != 0)
100 fatal("cannot initialize input from network - input from file already set\n");
101 if (In != -1)
102 fatal("cannot initialize input from network - input already set\n");
100103 l = strlen(addr) + 1;
101104 host = alloca(l);
102105 memcpy(host,addr,l);
1919 #ifndef NETWORK_H
2020 #define NETWORK_H
2121
22 #include <sys/types.h>
23
24 extern int32_t TCPBufSize;
25 extern int AddrFam;
26
2722 void initNetworkInput(const char *addr);
2823 struct destination *createNetworkOutput(const char *addr);
2924
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "config.h"
20 #include "dest.h"
21 #include "hashing.h"
22 #include "network.h"
23 #include "settings.h"
24 #include "globals.h"
25 #include "log.h"
26
27 #include <assert.h>
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <limits.h>
31 #include <stdint.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/mman.h>
36 #include <sys/socket.h> // for PF_INET6
37 #include <sys/stat.h>
38 #include <unistd.h>
39
40 #ifdef __FreeBSD__
41 #include <sys/sysctl.h>
42 #endif
43
44
45 typedef enum { off, on, invalid } flag_t;
46
47
48 clockid_t
49 ClockSrc;
50
51 int Autoloader = 0,
52 Direct = 0,
53 Status = 1,
54 Memlock = 0,
55 TapeAware = 0,
56 Memmap = 0,
57 Quiet = 0,
58 Options = 0,
59 OptSync = 0,
60 SetOutsize = 0,
61 StatusLog = 1;
62
63 unsigned int
64 NumVolumes = 1, /* number of input volumes, 0 for interactive prompting */
65 AutoloadTime = 0;
66
67 unsigned long
68 NumP = 0,
69 Timeout = 0,
70 Outsize = 10240;
71
72 unsigned long long
73 Blocksize = 10240, // fundamental I/O block size
74 MaxReadSpeed = 0,
75 MaxWriteSpeed = 0,
76 Totalmem = 0,
77 OutVolsize = 0,
78 Pause = 0;
79
80 signed long
81 Numblocks = 512; /* number of buffer blocks */
82
83 float StatusInterval = 0.5; /* status update interval time */
84
85 double StartWrite = 0, /* high watermark threshold */
86 StartRead = 1; /* low watermark threshold */
87
88 const char
89 *Infile = 0,
90 *OutFile = 0,
91 *AutoloadCmd = 0;
92 char
93 *Tmpfile = 0;
94
95 extern void *watchdogThread(void *ignored);
96
97 static const char *calcval(const char *arg, unsigned long long *res)
98 {
99 char ch;
100 double d;
101
102 switch (sscanf(arg,"%lf%c",&d,&ch)) {
103 default:
104 abort();
105 break;
106 case 2:
107 if (d <= 0)
108 return "negative value out of range";
109 switch (ch) {
110 case 'k':
111 case 'K':
112 d *= 1024.0;
113 *res = d;
114 return 0;
115 case 'm':
116 case 'M':
117 d *= 1024.0*1024.0;
118 *res = d;
119 return 0;
120 case 'g':
121 case 'G':
122 d *= 1024.0*1024.0*1024.0;
123 *res = d;
124 return 0;
125 case 't':
126 case 'T':
127 d *= 1024.0*1024.0*1024.0*1024.0;
128 *res = d;
129 return 0;
130 case '%':
131 if ((d >= 90) || (d <= 0))
132 return "invalid value for percentage (must be 0..90)";
133 *res = d;
134 return 0;
135 case 'b':
136 case 'B':
137 if (d < 128)
138 return "invalid value for number of bytes";
139 *res = d;
140 return 0;
141 default:
142 return "invalid dimension";
143 }
144 case 1:
145 if (d <= 0)
146 return "value out of range";
147 *res = d;
148 return 0;
149 case 0:
150 break;
151 }
152 return "unrecognized argument";
153 }
154
155
156 static int isEmpty(const char *l)
157 {
158 while (*l) {
159 if ((*l != ' ') && (*l != '\t'))
160 return 0;
161 ++l;
162 }
163 return 1;
164 }
165
166
167 static flag_t parseFlag(const char *valuestr)
168 {
169 if ((strcasecmp(valuestr,"yes") == 0) || (strcasecmp(valuestr,"on") == 0) || (strcmp(valuestr,"1") == 0) || (strcmp(valuestr,"true") == 0))
170 return on;
171 else if ((strcasecmp(valuestr,"no") == 0) || (strcasecmp(valuestr,"off") == 0) || (strcmp(valuestr,"0") == 0) || (strcmp(valuestr,"false") == 0))
172 return off;
173 else
174 return invalid;
175 }
176
177
178 void readConfigFile(const char *cfname)
179 {
180 int df,lineno = 0;
181 struct stat st;
182 char *cfdata, *line;
183
184 df = open(cfname,O_RDONLY);
185 if (df == -1) {
186 if (errno == ENOENT)
187 infomsg("no config file %s\n",cfname);
188 else
189 warningmsg("error opening config file %s: %s\n",cfname,strerror(errno));
190 return;
191 }
192 if (-1 == fstat(df,&st)) {
193 warningmsg("unable to stat config file %s: %s\n",cfname,strerror(errno));
194 close(df);
195 return;
196 }
197 if ((getuid() != st.st_uid) && (st.st_uid != 0)) {
198 warningmsg("ignoring config file '%s' from different user\n",cfname);
199 close(df);
200 return;
201 }
202 infomsg("reading config file %s\n",cfname);
203 cfdata = malloc(st.st_size+1);
204 if (cfdata == 0)
205 fatal("out of memory\n");
206 int n = read(df,cfdata,st.st_size);
207 close(df);
208 if (n < 0) {
209 warningmsg("error reading %s: %s\n",cfname,strerror(errno));
210 free(cfdata);
211 return;
212 }
213 cfdata[n] = 0;
214 line = cfdata;
215 while (line && *line) {
216 char key[64],valuestr[64];
217 int a;
218 ++lineno;
219 char *nl = strchr(line,'\n');
220 if (nl) {
221 *nl = 0;
222 ++nl;
223 }
224 char *pound = strchr(line,'#');
225 if (pound)
226 *pound = 0;
227 if (isEmpty(line)) {
228 line = nl;
229 continue;
230 }
231 a = sscanf(line,"%63[A-Za-z]%*[ \t=:]%63[0-9a-zA-Z.]",key,valuestr);
232 if (a != 2) {
233 warningmsg("config file %s, line %d: error parsing '%s'\n",cfname,lineno,line);
234 line = nl;
235 continue;
236 }
237 line = nl;
238 debugmsg("parsing key/value pair %s=%s\n",key,valuestr);
239 if (strcasecmp(key,"numblocks") == 0) {
240 long nb = strtol(valuestr,0,0);
241 if ((nb == 0) && (errno == EINVAL)) {
242 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
243 } else {
244 Numblocks = nb;
245 debugmsg("Numblocks = %llu\n",Numblocks);
246 }
247 } else if (strcasecmp(key,"pause") == 0) {
248 long long p = strtoll(valuestr,0,0);
249 if ((p == 0) && (errno == EINVAL)) {
250 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
251 } else {
252 Pause = p;
253 debugmsg("Pause = %lldusec\n",Pause);
254 }
255 } else if (strcasecmp(key,"autoloadtime") == 0) {
256 long at = strtol(valuestr,0,0) - 1;
257 if ((at == 0) && (errno == EINVAL)) {
258 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
259 } else if ((at < 0) || (at > UINT_MAX)) {
260 warningmsg("ignoring invalid value for %s: \"%s\"\n",key,valuestr);
261 } else {
262 AutoloadTime = at;
263 debugmsg("Autoloader time = %d\n",AutoloadTime);
264 }
265 } else if (strcasecmp(key,"startread") == 0) {
266 double sr = 0;
267 if (1 == sscanf(valuestr,"%lf",&sr))
268 sr /= 100;
269 if ((sr <= 1) && (sr > 0)) {
270 StartRead = sr;
271 debugmsg("StartRead = %1.2lf\n",StartRead);
272 }
273 } else if (strcasecmp(key,"startwrite") == 0) {
274 double sw = 0;
275 if (1 == sscanf(valuestr,"%lf",&sw))
276 sw /= 100;
277 if ((sw <= 1) && (sw > 0)) {
278 StartWrite = sw;
279 debugmsg("StartWrite = %1.2lf\n",StartWrite);
280 }
281 } else if (strcasecmp(key,"timeout") == 0) {
282 long t = strtol(valuestr,0,0);
283 if (((t == 0) && (errno == EINVAL)) || (t < 0))
284 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
285 else {
286 Timeout = t;
287 debugmsg("Timeout = %lu sec.\n",Timeout);
288 }
289 } else if (strcasecmp(key,"showstatus") == 0) {
290 switch (parseFlag(valuestr)) {
291 case on:
292 Quiet = 0;
293 debugmsg("showstatus = yes\n");
294 break;
295 case off:
296 Quiet = 1;
297 debugmsg("showstatus = no\n");
298 break;
299 default:
300 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
301 }
302 } else if (strcasecmp(key,"tapeaware") == 0) {
303 switch (parseFlag(valuestr)) {
304 case on:
305 TapeAware = 0;
306 debugmsg("tapeaware = off\n");
307 break;
308 case off:
309 TapeAware = 1;
310 debugmsg("tapeaware = on\n");
311 break;
312 default:
313 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
314 }
315 } else if (strcasecmp(key,"logstatus") == 0) {
316 switch (parseFlag(valuestr)) {
317 case on:
318 StatusLog = 1;
319 debugmsg("logstatus = yes\n");
320 break;
321 case off:
322 StatusLog = 0;
323 debugmsg("logstatus = no\n");
324 break;
325 default:
326 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
327 }
328 } else if (strcasecmp(key,"memlock") == 0) {
329 switch (parseFlag(valuestr)) {
330 case on:
331 Memlock = 1;
332 debugmsg("Memlock = %lu\n",Memlock);
333 break;
334 case off:
335 Memlock = 0;
336 debugmsg("Memlock = %lu\n",Memlock);
337 break;
338 default:
339 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
340 }
341 } else if (strcasecmp(key,"printpid") == 0) {
342 switch (parseFlag(valuestr)) {
343 case on:
344 printmsg("PID is %d\n",getpid());
345 break;
346 case off:
347 /* don't do anything */
348 break;
349 default:
350 warningmsg("invalid argument for %s: \"%s\"\n",key,valuestr);
351 }
352 } else if (strcasecmp(key,"StatusInterval") == 0) {
353 StatusInterval = strtof(valuestr,0);
354 debugmsg("StatusInterval = %f\n",StatusInterval);
355 } else if (strcasecmp(key,"verbose") == 0) {
356 setVerbose(valuestr);
357 } else {
358 unsigned long long value = 0;
359 const char *argerror = calcval(valuestr,&value);
360 if (argerror) {
361 warningmsg("ignoring invalid key/value pair (%s = %s): %s\n",key,valuestr,argerror);
362 } else if (strcasecmp(key,"blocksize") == 0) {
363 Blocksize = value;
364 debugmsg("Blocksize = %lu\n",Blocksize);
365 } else if (strcasecmp(key,"maxwritespeed") == 0) {
366 MaxWriteSpeed = value;
367 debugmsg("MaxWriteSpeed = %lu\n",MaxWriteSpeed);
368 } else if (strcasecmp(key,"maxreadspeed") == 0) {
369 MaxReadSpeed = value;
370 debugmsg("MaxReadSpeed = %lu\n",MaxReadSpeed);
371 } else if (strcasecmp(key,"Totalmem") == 0) {
372 if (value >= 100) {
373 Totalmem = value;
374 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
375 } else if (NumP && PgSz) {
376 Totalmem = ((unsigned long long) NumP * PgSz * value) / 100 ;
377 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
378 } else {
379 warningmsg("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
380 }
381 } else if (strcasecmp(key,"tcpbuffer") == 0) {
382 TCPBufSize = value;
383 debugmsg("TCPBufSize = %lu\n",TCPBufSize);
384 } else {
385 warningmsg("unknown parameter: %s\n",key);
386 }
387 }
388 }
389 free(cfdata);
390 }
391
392
393 void initBuffer()
394 {
395 int c = 0;
396 long mxnrsem;
397
398 /* check that we stay within system limits */
399 #ifdef __FreeBSD__
400 size_t semvmx_size = sizeof(mxnrsem);
401 if (sysctlbyname("kern.ipc.semvmx", &mxnrsem, &semvmx_size, 0, 0) == -1)
402 mxnrsem = -1;
403 #else
404 mxnrsem = sysconf(_SC_SEM_VALUE_MAX);
405 #endif
406 if (-1 == mxnrsem) {
407 #ifdef SEM_MAX_VALUE
408 mxnrsem = SEM_MAX_VALUE;
409 #else
410 mxnrsem = LONG_MAX;
411 warningmsg("unable to determine maximum value of semaphores\n");
412 #endif
413 }
414 if (Numblocks > mxnrsem)
415 fatal("cannot allocate more than %d blocks.\nThis is a system dependent limit, depending on the maximum semaphore value.\nPlease choose a bigger block size.\n",mxnrsem);
416
417 if ((Blocksize * (long long)Numblocks) > (long long)SSIZE_MAX)
418 fatal("Cannot address so much memory (%lld*%d=%lld>%lld).\n",Blocksize,Numblocks,Blocksize*(long long)Numblocks,(long long)SSIZE_MAX);
419 /* create buffer */
420 Buffer = (char **) valloc(Numblocks * sizeof(char *));
421 if (!Buffer)
422 fatal("Could not allocate enough memory (%d requested): %s\n",Numblocks * sizeof(char *),strerror(errno));
423 if (Memmap) {
424 infomsg("mapping temporary file to memory with %llu blocks with %llu byte (%llu kB total)...\n",(unsigned long long) Numblocks,(unsigned long long) Blocksize,(unsigned long long) ((Numblocks*Blocksize) >> 10));
425 if (!Tmpfile) {
426 char tmplname[] = "mbuffer-XXXXXX";
427 char *tmpdir = getenv("TMPDIR") ? getenv("TMPDIR") : "/var/tmp";
428 size_t tl = strlen(tmpdir);
429 Tmpfile = malloc(sizeof(tmplname) + tl + 1);
430 if (!Tmpfile)
431 fatal("out of memory: %s\n",strerror(errno));
432 (void) memcpy(Tmpfile,tmpdir,tl);
433 Tmpfile[tl] = '/';
434 (void) strncpy(Tmpfile+tl+1,tmplname,sizeof(tmplname));
435 Tmp = mkstemp(Tmpfile);
436 infomsg("tmpfile is %s\n",Tmpfile);
437 } else {
438 mode_t mode = O_RDWR | LARGEFILE;
439 if (strncmp(Tmpfile,"/dev/",5))
440 mode |= O_CREAT|O_EXCL;
441 Tmp = open(Tmpfile,mode,0600);
442 }
443 if (-1 == Tmp)
444 fatal("could not create temporary file (%s): %s\n",Tmpfile,strerror(errno));
445 if (strncmp(Tmpfile,"/dev/",5))
446 (void) unlink(Tmpfile);
447 /* resize the file. Needed - at least under linux, who knows why? */
448 if (-1 == lseek(Tmp,Numblocks * Blocksize - sizeof(int),SEEK_SET))
449 fatal("could not resize temporary file: %s\n",strerror(errno));
450 if (-1 == write(Tmp,&c,sizeof(c)))
451 fatal("could not resize temporary file: %s\n",strerror(errno));
452 Buffer[0] = mmap(0,Blocksize*Numblocks,PROT_READ|PROT_WRITE,MAP_SHARED,Tmp,0);
453 if (MAP_FAILED == Buffer[0])
454 fatal("could not map buffer-file to memory: %s\n",strerror(errno));
455 debugmsg("temporary file mapped to address %p\n",Buffer[0]);
456 } else {
457 infomsg("allocating memory for %d blocks with %llu byte (%llu kB total)...\n",Numblocks,(unsigned long long) Blocksize,(unsigned long long) ((Numblocks*Blocksize) >> 10));
458 Buffer[0] = (char *) valloc(Blocksize * Numblocks);
459 if (Buffer[0] == 0)
460 fatal("Could not allocate enough memory (%lld requested): %s\n",(unsigned long long)Blocksize * Numblocks,strerror(errno));
461 #ifdef MADV_DONTFORK
462 if (-1 == madvise(Buffer[0],Blocksize * Numblocks, MADV_DONTFORK))
463 warningmsg("unable to advise memory handling of buffer: %s\n",strerror(errno));
464 #endif
465 }
466 for (c = 1; c < Numblocks; c++) {
467 Buffer[c] = Buffer[0] + Blocksize * c;
468 *Buffer[c] = 0; /* touch every block before locking */
469 }
470
471 #ifdef _POSIX_MEMLOCK_RANGE
472 if (Memlock) {
473 uid_t uid;
474 #ifndef HAVE_SETEUID
475 #define seteuid setuid
476 #endif
477 uid = geteuid();
478 if (0 != seteuid(0))
479 warningmsg("could not change to uid 0 to lock memory (is mbuffer setuid root?)\n");
480 else if ((0 != mlock((char *)Buffer,Numblocks * sizeof(char *))) || (0 != mlock(Buffer[0],Blocksize * Numblocks)))
481 warningmsg("could not lock buffer in memory: %s\n",strerror(errno));
482 else
483 infomsg("memory locked successfully\n");
484 int err = seteuid(uid); /* don't give anyone a chance to attack this program, so giveup uid 0 after locking... */
485 assert(err == 0);
486 }
487 #endif
488 }
489
490
491 void searchOptionV(int argc, const char **argv)
492 {
493 int c;
494 for (c = 1; c < argc; c++) {
495 const char *arg = argv[c];
496 if ((arg[0] == '-') && (arg[1] == 'v')) {
497 if (arg[2]) {
498 setVerbose(arg+2);
499 } else if (++c < argc) {
500 setVerbose(argv[c]);
501 } else {
502 fatal("missing argument to option -v\n");
503 }
504 }
505 }
506 }
507
508
509 static void version(void)
510 {
511 (void) fprintf(stderr,
512 "mbuffer version "PACKAGE_VERSION"\n"\
513 "Copyright 2001-2017 - T. Maier-Komor\n"\
514 "License: GPLv3 - see file LICENSE\n"\
515 "This program comes with ABSOLUTELY NO WARRANTY!!!\n"
516 "Donations via PayPal to thomas@maier-komor.de are welcome and support this work!\n"
517 "\n"
518 );
519 exit(EXIT_SUCCESS);
520 }
521
522
523
524 static void usage(void)
525 {
526 const char *dim = "bkMGTP";
527 unsigned long long m = Numblocks * Blocksize;
528 while (m >= 10000) {
529 m >>= 10;
530 ++dim;
531 }
532 (void) fprintf(stderr,
533 "usage: mbuffer [Options]\n"
534 "Options:\n"
535 "-b <num> : use <num> blocks for buffer (default: %ld)\n"
536 "-s <size> : use blocks of <size> bytes for processing (default: %llu)\n"
537 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
538 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: 2%% = %llu%c)\n"
539 #else
540 "-m <size> : memory <size> of buffer in b,k,M,G,%% (default: %llu%c)\n"
541 #endif
542 #ifdef _POSIX_MEMLOCK_RANGE
543 "-L : lock buffer in memory (unusable with file based buffers)\n"
544 #endif
545 "-d : use blocksize of device for output\n"
546 "-D <size> : assumed output device size (default: infinite/auto-detect)\n"
547 "-P <num> : start writing after buffer has been filled more than <num>%%\n"
548 "-p <num> : start reading after buffer has been filled less than <num>%%\n"
549 "-i <file> : use <file> for input\n"
550 "-o <file> : use <file> for output (this option can be passed MULTIPLE times)\n"
551 "--append : append to output file (must be passed before -o)\n"
552 "--truncate : truncate next file (must be passed before -o)\n"
553 "-I <h>:<p> : use network port <port> as input, allow only host <h> to connect\n"
554 "-I <p> : use network port <port> as input\n"
555 "-O <h>:<p> : output data to host <h> and port <p> (MUTLIPLE outputs supported)\n"
556 "-n <num> : <num> volumes for input, '0' to prompt interactively\n"
557 "-t : use memory mapped temporary file (for huge buffer)\n"
558 "-T <file> : as -t but uses <file> as buffer\n"
559 "-l <file> : use <file> for logging messages\n"
560 "-u <num> : pause <num> milliseconds after each write\n"
561 "-r <rate> : limit read rate to <rate> B/s, where <rate> can be given in b,k,M,G\n"
562 "-R <rate> : same as -r for writing; use either one, if your tape is too fast\n"
563 "-f : overwrite existing files\n"
564 "-a <time> : autoloader which needs <time> seconds to reload\n"
565 "-A <cmd> : issue command <cmd> to request new volume\n"
566 "-v <level> : set verbose level to <level> (valid values are 0..6)\n"
567 "-q : quiet - do not display the status on stderr\n"
568 "-Q : quiet - do not log the status\n"
569 "-c : write with synchronous data integrity support\n"
570 "-e : stop processing on any kind of error\n"
571 #ifdef O_DIRECT
572 "--direct : open input and output with O_DIRECT\n"
573 #endif
574 #if defined HAVE_LIBCRYPTO || defined HAVE_LIBMD5 || defined HAVE_LIBMHASH
575 "-H\n"
576 "--md5 : generate md5 hash of transfered data\n"
577 "--hash <a> : use alogritm <a>, if <a> is 'list' possible algorithms are listed\n"
578 "--pid : print PID of this instance\n"
579 "-W <time> : set watchdog timeout to <time> seconds\n"
580 #endif
581 "-4 : force use of IPv4\n"
582 "-6 : force use of IPv6\n"
583 "-0 : use IPv4 or IPv6\n"
584 "--tcpbuffer: size for TCP buffer\n"
585 "--tapeaware: write to end of tape instead of stopping when the drive signals\n"
586 " the media end is approaching (write until 2x ENOSPC errors)\n"
587 "-V\n"
588 "--version : print version information\n"
589 "Unsupported buffer options: -t -Z -B\n"
590 ,Numblocks
591 ,Blocksize
592 ,m
593 ,*dim
594 );
595 exit(EXIT_SUCCESS);
596 }
597
598
599 static unsigned long long calcint(const char **argv, int c, unsigned long long def)
600 {
601 char ch;
602 double d = (double)def;
603
604 switch (sscanf(argv[c],"%lf%c",&d,&ch)) {
605 default:
606 abort();
607 break;
608 case 2:
609 if (d <= 0)
610 fatal("invalid argument - must be > 0\n");
611 switch (ch) {
612 case 'k':
613 case 'K':
614 d *= 1024.0;
615 return (unsigned long long) d;
616 case 'm':
617 case 'M':
618 d *= 1024.0*1024.0;
619 return (unsigned long long) d;
620 case 'g':
621 case 'G':
622 d *= 1024.0*1024.0*1024.0;
623 return (unsigned long long) d;
624 case 't':
625 case 'T':
626 d *= 1024.0*1024.0*1024.0*1024.0;
627 return (unsigned long long) d;
628 case '%':
629 if ((d >= 90) || (d <= 0))
630 fatal("invalid value for percentage (must be 0..90)\n");
631 return (unsigned long long) d;
632 case 'b':
633 case 'B':
634 if (d < 128)
635 fatal("invalid value for number of bytes\n");
636 return (unsigned long long) d;
637 default:
638 if (argv[c][-2] == '-')
639 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch,&argv[c][-2]);
640 else
641 fatal("unrecognized size charakter \"%c\" for option \"%s\"\n",ch,argv[c-1]);
642 return d;
643 }
644 case 1:
645 if (d <= 0)
646 fatal("invalid argument - must be > 0\n");
647 if (d <= 100) {
648 if (argv[c][-2] == '-')
649 fatal("invalid low value for option \"%s\" - missing suffix?\n",&argv[c][-2]);
650 else
651 fatal("invalid low value for option \"%s\" - missing suffix?\n",argv[c-1]);
652 }
653 return d;
654 case 0:
655 break;
656 }
657 errormsg("unrecognized argument \"%s\" for option \"%s\"\n",argv[c],argv[c-1]);
658 return d;
659 }
660
661
662 static int argcheck(const char *opt, const char **argv, int *c, int argc)
663 {
664 if (strncmp(opt,argv[*c],strlen(opt)))
665 return 1;
666 if (strlen(argv[*c]) > 2)
667 argv[*c] += 2;
668 else {
669 (*c)++;
670 if (*c == argc)
671 fatal("missing argument to option %s\n",opt);
672 }
673 return 0;
674 }
675
676
677 int parseOption(int c, int argc, const char **argv)
678 {
679 if (!argcheck("-s",argv,&c,argc)) {
680 Blocksize = Outsize = calcint(argv,c,Blocksize);
681 Options |= OPTION_S;
682 debugmsg("Blocksize = %llu\n",Blocksize);
683 if (Blocksize < 100)
684 fatal("cannot set blocksize as percentage of total physical memory\n");
685 } else if (!strcmp("--append",argv[c])) {
686 OptMode |= O_APPEND;
687 OptMode &= ~O_EXCL;
688 debugmsg("append to next file\n");
689 } else if (!strcmp("--truncate",argv[c])) {
690 OptMode |= O_TRUNC;
691 debugmsg("truncate next file\n");
692 } else if (!argcheck("-m",argv,&c,argc)) {
693 Totalmem = calcint(argv,c,Totalmem);
694 Options |= OPTION_M;
695 if (Totalmem < 100) {
696 #if defined(_SC_AVPHYS_PAGES) && defined(_SC_PAGESIZE) && !defined(__CYGWIN__) || defined(__FreeBSD__)
697 Totalmem = ((unsigned long long) NumP * PgSz * Totalmem) / 100 ;
698 #else
699 fatal("Unable to determine page size or amount of available memory - please specify an absolute amount of memory.\n");
700 #endif
701 }
702 debugmsg("Totalmem = %lluk\n",Totalmem>>10);
703 } else if (!argcheck("-b",argv,&c,argc)) {
704 long nb = strtol(argv[c],0,0);
705 if ((nb == 0) && (errno == EINVAL)) {
706 errormsg("invalid argument to option -b: \"%s\"\n",argv[c]);
707 } else {
708 Numblocks = nb;
709 Options |= OPTION_B;
710 }
711 debugmsg("Numblocks = %llu\n",Numblocks);
712 } else if (!strcmp("--tcpbuffer",argv[c])) {
713 TCPBufSize = calcint(argv,++c,TCPBufSize);
714 debugmsg("TCPBufSize = %lu\n",TCPBufSize);
715 } else if (!strcmp("--tapeaware",argv[c])) {
716 TapeAware = 1;
717 debugmsg("sensing early end-of-tape warning\n");
718 } else if (!argcheck("-d",argv,&c,argc)) {
719 #ifdef HAVE_STRUCT_STAT_ST_BLKSIZE
720 SetOutsize = 1;
721 debugmsg("setting output size according to the blocksize of the device\n");
722 #else
723 fatal("cannot determine blocksize of device (unsupported by OS)\n");
724 #endif
725 } else if (!argcheck("-v",argv,&c,argc)) {
726 /* option has to be checked again, so that the
727 * command line can override a config file */
728 setVerbose(argv[c]);
729 } else if (!argcheck("-u",argv,&c,argc)) {
730
731 long long p = strtoll(argv[c],0,0);
732 if ((p == 0) && (errno == EINVAL))
733 errormsg("invalid argument to option -u: \"%s\"\n",argv[c]);
734 else
735 Pause = p;
736 debugmsg("Pause = %lldusec\n",Pause);
737 } else if (!argcheck("-r",argv,&c,argc)) {
738 MaxReadSpeed = calcint(argv,c,0);
739 debugmsg("MaxReadSpeed = %lld\n",MaxReadSpeed);
740 } else if (!argcheck("-R",argv,&c,argc)) {
741 MaxWriteSpeed = calcint(argv,c,0);
742 debugmsg("MaxWriteSpeed = %lld\n",MaxWriteSpeed);
743 } else if (!argcheck("-n",argv,&c,argc)) {
744 long nv = strtol(argv[c],0,0);
745 if ((nv < 0) || ((nv == 0) && (errno == EINVAL)))
746 fatal("invalid argument to option -n: \"%s\"\n",argv[c]);
747 else if ((nv < 0) || (nv > UINT_MAX))
748 errormsg("argument for number of volumes out of range: %ld\n",nv);
749 else
750 NumVolumes = nv;
751 debugmsg("NumVolumes = %u\n",NumVolumes);
752 } else if (!argcheck("-i",argv,&c,argc)) {
753 if (Infile)
754 fatal("cannot set input file: file already set\n");
755 if (In != -1)
756 fatal("cannot initialize input - input already set\n");
757 if (strcmp(argv[c],"-")) {
758 Infile = argv[c];
759 debugmsg("Infile = %s\n",Infile);
760 } else {
761 In = STDIN_FILENO;
762 debugmsg("Infile is stdin\n");
763 }
764 } else if (!argcheck("-o",argv,&c,argc)) {
765 dest_t *dest = malloc(sizeof(dest_t));
766 if (strcmp(argv[c],"-")) {
767 debugmsg("output file: %s\n",argv[c]);
768 dest->arg = argv[c];
769 dest->name = argv[c];
770 dest->fd = -1;
771 dest->mode = O_CREAT|O_WRONLY|OptMode|Direct|LARGEFILE|OptSync;
772 } else {
773 dest_t *d = Dest;
774 while (d) {
775 if (0 == strcmp(d->name,"<stdout>"))
776 fatal("cannot output multiple times to stdout\n");
777 d = d->next;
778 }
779 debugmsg("output to stdout\n",argv[c]);
780 dest->fd = dup(STDOUT_FILENO);
781 int err = dup2(STDERR_FILENO,STDOUT_FILENO);
782 assert(err != -1);
783 dest->arg = "<stdout>";
784 dest->name = "<stdout>";
785 dest->mode = 0;
786 }
787 OptMode = O_EXCL;
788 dest->port = 0;
789 dest->result = 0;
790 bzero(&dest->thread,sizeof(dest->thread));
791 dest->next = Dest;
792 Dest = dest;
793 if (OutFile == 0)
794 OutFile = argv[c];
795 ++NumSenders;
796 #ifdef AF_INET6
797 } else if (!strcmp("-0",argv[c])) {
798 AddrFam = AF_UNSPEC;
799 } else if (!strcmp("-4",argv[c])) {
800 AddrFam = AF_INET;
801 } else if (!strcmp("-6",argv[c])) {
802 AddrFam = AF_INET6;
803 #endif
804 } else if (!argcheck("-I",argv,&c,argc)) {
805 initNetworkInput(argv[c]);
806 } else if (!argcheck("-O",argv,&c,argc)) {
807 dest_t *d = createNetworkOutput(argv[c]);
808 d->next = Dest;
809 Dest = d;
810 ++NumSenders;
811 } else if (!argcheck("-T",argv,&c,argc)) {
812 Tmpfile = strdup(argv[c]);
813 if (!Tmpfile)
814 fatal("out of memory\n");
815 Memmap = 1;
816 debugmsg("Tmpfile = %s\n",Tmpfile);
817 } else if (!strcmp("-t",argv[c])) {
818 Memmap = 1;
819 debugmsg("Memmap = 1\n");
820 } else if (!argcheck("-l",argv,&c,argc)) {
821 Log = open(argv[c],O_WRONLY|O_APPEND|O_TRUNC|O_CREAT|LARGEFILE,0666);
822 if (-1 == Log) {
823 Log = STDERR_FILENO;
824 errormsg("error opening log file: %s\n",strerror(errno));
825 }
826 debugmsg("logFile set to %s\n",argv[c]);
827 } else if (!strcmp("-f",argv[c])) {
828 OptMode &= ~O_EXCL;
829 OptMode |= O_TRUNC;
830 debugmsg("overwrite = 1\n");
831 } else if (!strcmp("-q",argv[c])) {
832 debugmsg("disabling display of status\n");
833 Quiet = 1;
834 } else if (!strcmp("-Q",argv[c])) {
835 debugmsg("disabling logging of status\n");
836 StatusLog = 0;
837 } else if (!strcmp("-c",argv[c])) {
838 debugmsg("enabling full synchronous I/O\n");
839 OptSync = O_SYNC;
840 } else if (!strcmp("-e",argv[c])) {
841 debugmsg("will terminate on any kind of error\n");
842 ErrorsFatal = 1;
843 } else if (!argcheck("-a",argv,&c,argc)) {
844 long at = strtol(argv[c],0,0) - 1;
845 if ((at == 0) && (errno == EINVAL)) {
846 errormsg("invalid argument to option -a: \"%s\"\n",argv[c]);
847 } else if ((at < 0) || (at > UINT_MAX)) {
848 warningmsg("ignoring invalid value for autoload time: %ld\n",at);
849 } else {
850 Autoloader = 1;
851 AutoloadTime = at;
852 }
853 debugmsg("Autoloader time = %d\n",AutoloadTime);
854 } else if (!argcheck("-A",argv,&c,argc)) {
855 Autoloader = 1;
856 AutoloadCmd = argv[c];
857 debugmsg("Autoloader command = \"%s\"\n", AutoloadCmd);
858 } else if (!argcheck("-P",argv,&c,argc)) {
859 if (1 != sscanf(argv[c],"%lf",&StartWrite))
860 StartWrite = 0;
861 StartWrite /= 100;
862 if ((StartWrite > 1) || (StartWrite <= 0))
863 fatal("error in argument -P: must be bigger than 0 and less or equal 100\n");
864 debugmsg("StartWrite = %1.2lf\n",StartWrite);
865 } else if (!argcheck("-p",argv,&c,argc)) {
866 if (1 == sscanf(argv[c],"%lf",&StartRead))
867 StartRead /= 100;
868 else
869 StartRead = 1.0;
870 if ((StartRead >= 1) || (StartRead < 0))
871 fatal("error in argument -p: must be bigger or equal to 0 and less than 100\n");
872 debugmsg("StartRead = %1.2lf\n",StartRead);
873 } else if (!strcmp("-L",argv[c])) {
874 #ifdef _POSIX_MEMLOCK_RANGE
875 Memlock = 1;
876 debugmsg("memory locking enabled\n");
877 #else
878 warning("POSIX memory locking is unsupported on this system.\n");
879 #endif
880 } else if (!argcheck("-W",argv,&c,argc)) {
881 Timeout = strtol(argv[c],0,0);
882 if (Timeout <= 0)
883 fatal("invalid argument to option -W\n");
884 if (Timeout < (AutoloadTime*2))
885 fatal("timeout must be at least 2x autoload time\n");
886 int err = pthread_create(&WatchdogThr,0,&watchdogThread,(void*)0);
887 assert(0 == err);
888 infomsg("started watchdog with Timeout = %lu sec.\n",Timeout);
889 Watchdog = 1;
890 } else if (!strcmp("--direct",argv[c])) {
891 #ifdef O_DIRECT
892 debugmsg("using O_DIRECT to open file descriptors\n");
893 Direct = O_DIRECT;
894 #else
895 warningmsg("--direct is unsupported on this system\n");
896 #endif
897 } else if (!strcmp("--help",argv[c]) || !strcmp("-h",argv[c])) {
898 usage();
899 } else if (!strcmp("--version",argv[c]) || !strcmp("-V",argv[c])) {
900 version();
901 } else if (!strcmp("--md5",argv[c]) || !strcmp("-H",argv[c])) {
902 #ifdef HAVE_MD5
903 if (addHashAlgorithm("MD5")) {
904 ++Hashers;
905 ++NumSenders;
906 }
907 #else
908 fatal("hash calculation support has not been compiled in!\n");
909 #endif
910 } else if (!strcmp("--hash",argv[c])) {
911 ++c;
912 if (c == argc)
913 fatal("missing argument to option --hash\n");
914 if (!strcmp(argv[c],"list")) {
915 listHashAlgos();
916 exit(EXIT_SUCCESS);
917 }
918 if (addHashAlgorithm(argv[c])) {
919 ++Hashers;
920 ++NumSenders;
921 }
922 } else if (!strcmp("--pid",argv[c])) {
923 printmsg("PID is %d\n",getpid());
924 } else if (!argcheck("-D",argv,&c,argc)) {
925 OutVolsize = calcint(argv,c,0);
926 debugmsg("OutVolsize = %llu\n",OutVolsize);
927 } else
928 fatal("unknown option \"%s\"\n",argv[c]);
929 return c;
930 }
0 /*
1 * Copyright (C) 2000-2017, Thomas Maier-Komor
2 *
3 * This is the source code of mbuffer.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #ifndef SETTINGS_H
20 #define SETTINGS_H
21
22 #include <time.h>
23 #include <sys/types.h>
24
25 extern int32_t TCPBufSize;
26
27 extern clockid_t
28 ClockSrc;
29
30 extern int
31 Autoloader, /* use autoloader for tape change */
32 AddrFam, /* address family - in network.c */
33 Direct,
34 Memlock, /* protoect buffer in memory against swapping */
35 TapeAware,
36 Memmap,
37 Options,
38 OptSync,
39 Quiet, /* quiet mode */
40 SetOutsize,
41 Status,
42 StatusLog;
43
44 extern unsigned int
45 NumVolumes, /* number of volumes to expect while reading */
46 AutoloadTime; /* time to wait after an autoload command */
47
48 extern unsigned long
49 NumP,
50 Timeout,
51 Outsize;
52
53 extern signed long
54 Numblocks; /* number of buffer blocks */
55
56 extern unsigned long long
57 Blocksize, /* fundamental I/O block size */
58 MaxReadSpeed,
59 MaxWriteSpeed,
60 Totalmem,
61 Pause,
62 OutVolsize;
63
64 extern const char
65 *Infile,
66 *OutFile,
67 *AutoloadCmd;
68
69 extern char
70 *Tmpfile;
71
72 extern float
73 StatusInterval; /* status update interval time */
74
75 extern double
76 StartWrite, /* high watermark threshold */
77 StartRead; /* low watermark threshold */
78
79 void readConfigFile(const char *cfname);
80 void initBuffer();
81 void searchOptionV(int argc, const char **argv);
82 int parseOption(int c, int argc, const char **argv);
83
84 #endif