/* -*- c-basic-offset: 2 -*- */
/*
Copyright(C) 2010-2014 Brazil
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License version 2.1 as published by the Free Software Foundation.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif /* HAVE_CONFIG_H */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#ifdef HAVE_SYS_WAIT_H
# include <sys/wait.h>
#endif /* HAVE_SYS_WAIT_H */
#ifdef HAVE_SYS_SOCKET_H
# include <sys/socket.h>
#endif /* HAVE_SYS_SOCKET_H */
#ifndef WIN32
# include <netinet/in.h>
#endif /* WIN32 */
#include <grn_str.h>
#include <grn_com.h>
#include <grn_db.h>
#ifdef WIN32
#include <windows.h>
#include <stddef.h>
#else
#include <sys/param.h>
#include <sys/utsname.h>
#include <sys/statvfs.h>
#include <libgen.h>
#endif /* WIN32 */
/*
#define DEBUG_FTP
#define DEBUG_HTTP
*/
#define FTPUSER "anonymous"
#define FTPPASSWD "grntest"
#define FTPSERVER "ftp.groonga.org"
#define FTPBUF 20000
#define DEFAULT_PORT 10041
#define DEFAULT_DEST "localhost"
#define OUT_JSON 0
#define OUT_TSV 1
static int grntest_outtype = OUT_JSON;
static grn_critical_section grntest_cs;
static int grntest_stop_flag = 0;
static int grntest_detail_on = 0;
static int grntest_remote_mode = 0;
static int grntest_localonly_mode = 0;
static int grntest_owndb_mode = 0;
static int grntest_onmemory_mode = 0;
static grn_bool grntest_ftp_mode = GRN_FALSE;
#define TMPFILE "_grntest.tmp"
static grn_ctx grntest_server_context;
static FILE *grntest_log_file;
#define OS_LINUX64 "LINUX64"
#define OS_LINUX32 "LINUX32"
#define OS_WINDOWS64 "WINDOWS64"
#define OS_WINDOWS32 "WINDOWS32"
#ifdef WIN32
typedef SOCKET socket_t;
#define SOCKETERROR INVALID_SOCKET
#define socketclose closesocket
static const char *groonga_path = "groonga.exe";
static PROCESS_INFORMATION grntest_pi;
#else
static pid_t grntest_server_id = 0;
typedef int socket_t;
#define socketclose close
#define SOCKETERROR -1
static const char *groonga_path = "groonga";
#endif /* WIN32 */
static const char *groonga_protocol = "gqtp";
static const char *grntest_osinfo;
static int grntest_sigint = 0;
static grn_obj *grntest_db = NULL;
#define MAX_CON_JOB 10
#define MAX_CON 64
#define BUF_LEN 1024
#define MAX_PATH_LEN 256
#define J_DO_LOCAL 1 /* do_local */
#define J_DO_GQTP 2 /* do_gqtp */
#define J_DO_HTTP 3 /* do_http */
#define J_REP_LOCAL 4 /* rep_local */
#define J_REP_GQTP 5 /* rep_gqtp */
#define J_REP_HTTP 6 /* rep_http */
#define J_OUT_LOCAL 7 /* out_local */
#define J_OUT_GQTP 8 /* out_gqtp */
#define J_OUT_HTTP 9 /* out_http */
#define J_TEST_LOCAL 10 /* test_local */
#define J_TEST_GQTP 11 /* test_gqtp */
#define J_TEST_HTTP 12 /* test_http */
static char grntest_username[BUF_LEN];
static char grntest_scriptname[BUF_LEN];
static char grntest_date[BUF_LEN];
static char grntest_serverhost[BUF_LEN];
static int grntest_serverport;
static const char *grntest_dbpath;
struct job {
char jobname[BUF_LEN];
char commandfile[BUF_LEN];
int qnum;
int jobtype;
int concurrency;
int ntimes;
int done;
long long int max;
long long int min;
FILE *outputlog;
grn_file_reader *inputlog;
char logfile[BUF_LEN];
};
struct task {
char *file;
grn_obj *commands;
int jobtype;
int ntimes;
int qnum;
int job_id;
long long int max;
long long int min;
socket_t http_socket;
grn_obj http_response;
};
static struct task grntest_task[MAX_CON];
static struct job grntest_job[MAX_CON];
static int grntest_jobdone;
static int grntest_jobnum;
static grn_ctx grntest_ctx[MAX_CON];
static grn_obj *grntest_owndb[MAX_CON];
static grn_obj grntest_starttime, grntest_jobs_start;
static int
grntest_atoi(const char *str, const char *end, const char **rest)
{
while (grn_isspace(str, GRN_ENC_UTF8) == 1) {
str++;
}
return grn_atoi(str, end, rest);
}
static int
out_p(int jobtype)
{
if (jobtype == J_OUT_LOCAL) {
return 1;
}
if (jobtype == J_OUT_GQTP) {
return 1;
}
if (jobtype == J_OUT_HTTP) {
return 1;
}
return 0;
}
static int
test_p(int jobtype)
{
if (jobtype == J_TEST_LOCAL) {
return 1;
}
if (jobtype == J_TEST_GQTP) {
return 1;
}
if (jobtype == J_TEST_HTTP) {
return 1;
}
return 0;
}
static int
report_p(int jobtype)
{
if (jobtype == J_REP_LOCAL) {
return 1;
}
if (jobtype == J_REP_GQTP) {
return 1;
}
if (jobtype == J_REP_HTTP) {
return 1;
}
return 0;
}
static int
gqtp_p(int jobtype)
{
if (jobtype == J_DO_GQTP) {
return 1;
}
if (jobtype == J_REP_GQTP) {
return 1;
}
if (jobtype == J_OUT_GQTP) {
return 1;
}
if (jobtype == J_TEST_GQTP) {
return 1;
}
return 0;
}
static int
http_p(int jobtype)
{
if (jobtype == J_DO_HTTP) {
return 1;
}
if (jobtype == J_REP_HTTP) {
return 1;
}
if (jobtype == J_OUT_HTTP) {
return 1;
}
if (jobtype == J_TEST_HTTP) {
return 1;
}
return 0;
}
static int
error_exit_in_thread(intptr_t code)
{
fprintf(stderr,
"Fatal error! Check script file or database!: %ld\n", (long)code);
fflush(stderr);
CRITICAL_SECTION_ENTER(grntest_cs);
grntest_stop_flag = 1;
CRITICAL_SECTION_LEAVE(grntest_cs);
#ifdef WIN32
_endthreadex(code);
#else
pthread_exit((void *)code);
#endif /* WIN32 */
return 0;
}
static void
escape_command(grn_ctx *ctx, const char *in, int ilen, grn_obj *escaped_command)
{
int i = 0;
while (i < ilen) {
if ((in[i] == '\\') || (in[i] == '\"') || (in[i] == '/')) {
GRN_TEXT_PUTC(ctx, escaped_command, '\\');
GRN_TEXT_PUTC(ctx, escaped_command, in[i]);
i++;
} else {
switch (in[i]) {
case '\b':
GRN_TEXT_PUTS(ctx, escaped_command, "\\b");
i++;
break;
case '\f':
GRN_TEXT_PUTS(ctx, escaped_command, "\\f");
i++;
break;
case '\n':
GRN_TEXT_PUTS(ctx, escaped_command, "\\n");
i++;
break;
case '\r':
GRN_TEXT_PUTS(ctx, escaped_command, "\\r");
i++;
break;
case '\t':
GRN_TEXT_PUTS(ctx, escaped_command, "\\t");
i++;
break;
default:
GRN_TEXT_PUTC(ctx, escaped_command, in[i]);
i++;
break;
}
}
}
GRN_TEXT_PUTC(ctx, escaped_command, '\0');
}
static int
report_command(grn_ctx *ctx, const char *command, const char *ret, int task_id,
grn_obj *start_time, grn_obj *end_time)
{
int i, len, clen;
long long int start, end;
grn_obj result, escaped_command;
GRN_TEXT_INIT(&result, 0);
if (strncmp(ret, "[[", 2) == 0) {
i = 2;
len = 1;
while (ret[i] != ']') {
i++;
len++;
if (ret[i] == '\0') {
fprintf(stderr, "Error results:command=[%s]\n", command);
error_exit_in_thread(3);
}
}
len++;
grn_text_esc(ctx, &result, ret + 1, len);
} else {
grn_text_esc(ctx, &result, ret, strlen(ret));
}
start = GRN_TIME_VALUE(start_time) - GRN_TIME_VALUE(&grntest_starttime);
end = GRN_TIME_VALUE(end_time) - GRN_TIME_VALUE(&grntest_starttime);
clen = strlen(command);
GRN_TEXT_INIT(&escaped_command, 0);
escape_command(ctx, command, clen, &escaped_command);
if (grntest_outtype == OUT_TSV) {
fprintf(grntest_log_file, "report\t%d\t%s\t%" GRN_FMT_LLD "\t%" GRN_FMT_LLD "\t%.*s\n",
task_id, GRN_TEXT_VALUE(&escaped_command), start, end,
(int)GRN_TEXT_LEN(&result), GRN_TEXT_VALUE(&result));
} else {
fprintf(grntest_log_file, "[%d, \"%s\", %" GRN_FMT_LLD ", %" GRN_FMT_LLD ", %.*s],\n",
task_id, GRN_TEXT_VALUE(&escaped_command), start, end,
(int)GRN_TEXT_LEN(&result), GRN_TEXT_VALUE(&result));
}
fflush(grntest_log_file);
GRN_OBJ_FIN(ctx, &escaped_command);
GRN_OBJ_FIN(ctx, &result);
return 0;
}
static int
output_result_final(grn_ctx *ctx, int qnum)
{
grn_obj end_time;
long long int latency, self;
double sec, qps;
GRN_TIME_INIT(&end_time, 0);
GRN_TIME_NOW(ctx, &end_time);
latency = GRN_TIME_VALUE(&end_time) - GRN_TIME_VALUE(&grntest_starttime);
self = latency;
sec = self / (double)1000000;
qps = (double)qnum / sec;
if (grntest_outtype == OUT_TSV) {
fprintf(grntest_log_file, "total\t%" GRN_FMT_LLD "\t%f\t%d\n", latency, qps, qnum);
} else {
fprintf(grntest_log_file,
"{\"total\": %" GRN_FMT_LLD ", \"qps\": %f, \"queries\": %d}]\n", latency, qps, qnum);
}
grn_obj_close(ctx, &end_time);
return 0;
}
static int
output_sysinfo(char *sysinfo)
{
if (grntest_outtype == OUT_TSV) {
fprintf(grntest_log_file, "%s", sysinfo);
} else {
fprintf(grntest_log_file, "[%s\n", sysinfo);
}
return 0;
}
/* #define ENABLE_ERROR_REPORT 1 */
#ifdef ENABLE_ERROR_REPORT
static int
error_command(grn_ctx *ctx, char *command, int task_id)
{
fprintf(stderr, "error!:command=[%s] task_id = %d\n", command, task_id);
fflush(stderr);
error_exit_in_thread(1);
return 0;
}
#endif
static void
normalize_output(char *output, int length,
char **normalized_output, int *normalized_length)
{
int i;
*normalized_output = NULL;
*normalized_length = length;
for (i = 0; i < length; i++) {
if (!strncmp(output + i, "],", 2)) {
*normalized_output = output + i + 2;
*normalized_length -= i + 2;
break;
}
}
if (!*normalized_output) {
if (length > 2 && strncmp(output + length - 2, "]]", 2)) {
*normalized_output = output + length;
*normalized_length = 0;
} else {
*normalized_output = output;
}
}
}
static grn_bool
same_result_p(char *expect, int expected_length, char *result, int result_length)
{
char *normalized_expected, *normalized_result;
int normalized_expected_length, normalized_result_length;
normalize_output(expect, expected_length,
&normalized_expected, &normalized_expected_length);
normalize_output(result, result_length,
&normalized_result, &normalized_result_length);
return((normalized_expected_length == normalized_result_length) &&
strncmp(normalized_expected, normalized_result,
normalized_expected_length) == 0);
}
static socket_t
open_socket(const char *host, int port)
{
socket_t sock;
struct hostent *servhost;
struct sockaddr_in server;
u_long inaddr;
int ret;
servhost = gethostbyname(host);
if (servhost == NULL){
fprintf(stderr, "Bad hostname [%s]\n", host);
return -1;
}
inaddr = *(u_long*)(servhost->h_addr_list[0]);
memset(&server, 0, sizeof(struct sockaddr_in));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr = *(struct in_addr*)&inaddr;
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
fprintf(stderr, "socket error\n");
return -1;
}
ret = connect(sock, (struct sockaddr *)&server, sizeof(struct sockaddr_in));
if (ret == -1) {
fprintf(stderr, "connect error\n");
return -1;
}
return sock;
}
static int
write_to_server(socket_t socket, const char *buf)
{
#ifdef DEBUG_FTP
fprintf(stderr, "send:%s", buf);
#endif
send(socket, buf, strlen(buf), 0);
return 0;
}
#define OUTPUT_TYPE "output_type"
#define OUTPUT_TYPE_LEN (sizeof(OUTPUT_TYPE) - 1)
static void
command_line_to_uri_path(grn_ctx *ctx, grn_obj *uri, const char *command)
{
char tok_type;
int offset = 0, have_key = 0;
const char *p, *e, *v;
grn_obj buf, *expr = NULL;
grn_expr_var *vars;
unsigned int nvars;
GRN_TEXT_INIT(&buf, 0);
p = command;
e = command + strlen(command);
p = grn_text_unesc_tok(ctx, &buf, p, e, &tok_type);
if ((expr = grn_ctx_get(ctx, GRN_TEXT_VALUE(&buf), GRN_TEXT_LEN(&buf)))) {
grn_obj params, output_type;
GRN_TEXT_INIT(¶ms, 0);
GRN_TEXT_INIT(&output_type, 0);
vars = ((grn_proc *)expr)->vars;
nvars = ((grn_proc *)expr)->nvars;
GRN_TEXT_PUTS(ctx, uri, "/d/");
GRN_TEXT_PUT(ctx, uri, GRN_TEXT_VALUE(&buf), GRN_TEXT_LEN(&buf));
while (p < e) {
GRN_BULK_REWIND(&buf);
p = grn_text_unesc_tok(ctx, &buf, p, e, &tok_type);
v = GRN_TEXT_VALUE(&buf);
switch (tok_type) {
case GRN_TOK_VOID :
p = e;
break;
case GRN_TOK_SYMBOL :
if (GRN_TEXT_LEN(&buf) > 2 && v[0] == '-' && v[1] == '-') {
int l = GRN_TEXT_LEN(&buf) - 2;
v += 2;
if (l == OUTPUT_TYPE_LEN && !memcmp(v, OUTPUT_TYPE, OUTPUT_TYPE_LEN)) {
GRN_BULK_REWIND(&output_type);
p = grn_text_unesc_tok(ctx, &output_type, p, e, &tok_type);
break;
}
if (GRN_TEXT_LEN(¶ms)) {
GRN_TEXT_PUTS(ctx, ¶ms, "&");
}
grn_text_urlenc(ctx, ¶ms, v, l);
have_key = 1;
break;
}
/* fallthru */
case GRN_TOK_STRING :
case GRN_TOK_QUOTE :
if (!have_key) {
if (offset < nvars) {
if (GRN_TEXT_LEN(¶ms)) {
GRN_TEXT_PUTS(ctx, ¶ms, "&");
}
grn_text_urlenc(ctx, ¶ms,
vars[offset].name, vars[offset].name_size);
offset++;
}
}
GRN_TEXT_PUTS(ctx, ¶ms, "=");
grn_text_urlenc(ctx, ¶ms, GRN_TEXT_VALUE(&buf), GRN_TEXT_LEN(&buf));
have_key = 0;
break;
}
}
GRN_TEXT_PUTS(ctx, uri, ".");
if (GRN_TEXT_LEN(&output_type)) {
GRN_TEXT_PUT(ctx, uri,
GRN_TEXT_VALUE(&output_type), GRN_TEXT_LEN(&output_type));
} else {
GRN_TEXT_PUTS(ctx, uri, "json");
}
if (GRN_TEXT_LEN(¶ms) > 0) {
GRN_TEXT_PUTS(ctx, uri, "?");
GRN_TEXT_PUT(ctx, uri, GRN_TEXT_VALUE(¶ms), GRN_TEXT_LEN(¶ms));
}
GRN_OBJ_FIN(ctx, ¶ms);
GRN_OBJ_FIN(ctx, &output_type);
}
GRN_OBJ_FIN(ctx, &buf);
}
static void
command_send_http(grn_ctx *ctx, const char *command, int type, int task_id)
{
socket_t http_socket;
grn_obj buf;
http_socket = open_socket(grntest_serverhost, grntest_serverport);
if (http_socket == SOCKETERROR) {
fprintf(stderr, "failed to connect to groonga at %s:%d via HTTP: ",
grntest_serverhost, grntest_serverport);
#ifdef WIN32
fprintf(stderr, "%lu\n", GetLastError());
#else
fprintf(stderr, "%s\n", strerror(errno));
#endif
error_exit_in_thread(100);
}
grntest_task[task_id].http_socket = http_socket;
GRN_BULK_REWIND(&grntest_task[task_id].http_response);
GRN_TEXT_INIT(&buf, 0);
GRN_TEXT_PUTS(ctx, &buf, "GET ");
if (strncmp(command, "/d/", 3) == 0) {
GRN_TEXT_PUTS(ctx, &buf, command);
} else {
command_line_to_uri_path(ctx, &buf, command);
}
#ifdef DEBUG_HTTP
fprintf(stderr, "command: <%s>\n", command);
fprintf(stderr, "path: <%.*s>\n",
(int)GRN_TEXT_LEN(&buf), GRN_TEXT_VALUE(&buf));
#endif
GRN_TEXT_PUTS(ctx, &buf, " HTTP/1.1\r\n");
GRN_TEXT_PUTS(ctx, &buf, "Host: ");
GRN_TEXT_PUTS(ctx, &buf, grntest_serverhost);
GRN_TEXT_PUTS(ctx, &buf, "\r\n");
GRN_TEXT_PUTS(ctx, &buf, "User-Agent: grntest/");
GRN_TEXT_PUTS(ctx, &buf, grn_get_version());
GRN_TEXT_PUTS(ctx, &buf, "\r\n");
GRN_TEXT_PUTS(ctx, &buf, "Connection: close\r\n");
GRN_TEXT_PUTS(ctx, &buf, "\r\n");
GRN_TEXT_PUTC(ctx, &buf, '\0');
write_to_server(http_socket, GRN_TEXT_VALUE(&buf));
GRN_OBJ_FIN(ctx, &buf);
}
static void
command_send_ctx(grn_ctx *ctx, const char *command, int type, int task_id)
{
grn_ctx_send(ctx, command, strlen(command), 0);
/* fix me.
when command fails, ctx->rc is not 0 in local mode!
if (ctx->rc) {
fprintf(stderr, "ctx_send:rc=%d:command:%s\n", ctx->rc, command);
error_exit_in_thread(1);
}
*/
}
static void
command_send(grn_ctx *ctx, const char *command, int type, int task_id)
{
if (http_p(type)) {
command_send_http(ctx, command, type, task_id);
} else {
command_send_ctx(ctx, command, type, task_id);
}
}
static void
command_recv_http(grn_ctx *ctx, int type, int task_id,
char **res, unsigned int *res_len, int *flags)
{
int len;
char buf[BUF_LEN];
char *p, *e;
socket_t http_socket;
grn_obj *http_response;
http_socket = grntest_task[task_id].http_socket;
http_response = &grntest_task[task_id].http_response;
while ((len = recv(http_socket, buf, BUF_LEN - 1, 0))) {
#ifdef DEBUG_HTTP
fprintf(stderr, "receive: <%.*s>\n", len, buf);
#endif
GRN_TEXT_PUT(ctx, http_response, buf, len);
}
p = GRN_TEXT_VALUE(http_response);
e = p + GRN_TEXT_LEN(http_response);
while (p < e) {
if (p[0] != '\r') {
p++;
continue;
}
if (e - p >= 4) {
if (!memcmp(p, "\r\n\r\n", 4)) {
*res = p + 4;
*res_len = e - *res;
#ifdef DEBUG_HTTP
fprintf(stderr, "body: <%.*s>\n", *res_len, *res);
#endif
break;
}
p += 4;
} else {
*res = NULL;
*res_len = 0;
break;
}
}
socketclose(http_socket);
grntest_task[task_id].http_socket = 0;
}
static void
command_recv_ctx(grn_ctx *ctx, int type, int task_id,
char **res, unsigned int *res_len, int *flags)
{
grn_ctx_recv(ctx, res, res_len, flags);
if (ctx->rc) {
fprintf(stderr, "ctx_recv:rc=%d\n", ctx->rc);
error_exit_in_thread(1);
}
}
static void
command_recv(grn_ctx *ctx, int type, int task_id,
char **res, unsigned int *res_len, int *flags)
{
if (http_p(type)) {
command_recv_http(ctx, type, task_id, res, res_len, flags);
} else {
command_recv_ctx(ctx, type, task_id, res, res_len, flags);
}
}
static int
shutdown_server(void)
{
char *res;
int flags;
unsigned int res_len;
int job_type;
int task_id = 0;
if (grntest_remote_mode) {
return 0;
}
job_type = grntest_task[task_id].jobtype;
command_send(&grntest_server_context, "shutdown", job_type, task_id);
if (grntest_server_context.rc) {
fprintf(stderr, "ctx_send:rc=%d\n", grntest_server_context.rc);
exit(1);
}
command_recv(&grntest_server_context, job_type, task_id,
&res, &res_len, &flags);
return 0;
}
static int
do_load_command(grn_ctx *ctx, char *command, int type, int task_id,
long long int *load_start)
{
char *res;
unsigned int res_len;
int flags, ret;
grn_obj start_time, end_time;
GRN_TIME_INIT(&start_time, 0);
if (*load_start == 0) {
GRN_TIME_NOW(ctx, &start_time);
*load_start = GRN_TIME_VALUE(&start_time);
} else {
GRN_TIME_SET(ctx, &start_time, *load_start);
}
command_send(ctx, command, type, task_id);
do {
command_recv(ctx, type, task_id, &res, &res_len, &flags);
if (res_len) {
long long int self;
GRN_TIME_INIT(&end_time, 0);
GRN_TIME_NOW(ctx, &end_time);
self = GRN_TIME_VALUE(&end_time) - *load_start;
if (grntest_task[task_id].max < self) {
grntest_task[task_id].max = self;
}
if (grntest_task[task_id].min > self) {
grntest_task[task_id].min = self;
}
if (report_p(grntest_task[task_id].jobtype)) {
char tmpbuf[BUF_LEN];
if (res_len < BUF_LEN) {
strncpy(tmpbuf, res, res_len);
tmpbuf[res_len] = '\0';
} else {
strncpy(tmpbuf, res, BUF_LEN - 2);
tmpbuf[BUF_LEN -2] = '\0';
}
report_command(ctx, "load", tmpbuf, task_id, &start_time, &end_time);
}
if (out_p(grntest_task[task_id].jobtype)) {
fwrite(res, 1, res_len, grntest_job[grntest_task[task_id].job_id].outputlog);
fputc('\n', grntest_job[grntest_task[task_id].job_id].outputlog);
fflush(grntest_job[grntest_task[task_id].job_id].outputlog);
}
if (test_p(grntest_task[task_id].jobtype)) {
grn_obj log;
grn_file_reader *input;
FILE *output;
GRN_TEXT_INIT(&log, 0);
input = grntest_job[grntest_task[task_id].job_id].inputlog;
output = grntest_job[grntest_task[task_id].job_id].outputlog;
if (grn_file_reader_read_line(ctx, input, &log) != GRN_SUCCESS) {
GRN_LOG(ctx, GRN_ERROR, "Cannot get input-log");
error_exit_in_thread(55);
}
if (GRN_TEXT_VALUE(&log)[GRN_TEXT_LEN(&log) - 1] == '\n') {
grn_bulk_truncate(ctx, &log, GRN_TEXT_LEN(&log) - 1);
}
if (!same_result_p(GRN_TEXT_VALUE(&log), GRN_TEXT_LEN(&log),
res, res_len)) {
fprintf(output, "DIFF:command:%s\n", command);
fprintf(output, "DIFF:result:");
fwrite(res, 1, res_len, output);
fputc('\n', output);
fprintf(output, "DIFF:expect:%.*s\n",
(int)GRN_TEXT_LEN(&log), GRN_TEXT_VALUE(&log));
fflush(output);
}
GRN_OBJ_FIN(ctx, &log);
}
grn_obj_close(ctx, &end_time);
ret = 1;
break;
} else {
ret = 0;
break;
}
} while ((flags & GRN_CTX_MORE));
grn_obj_close(ctx, &start_time);
return ret;
}
static int
do_command(grn_ctx *ctx, char *command, int type, int task_id)
{
char *res;
unsigned int res_len;
int flags;
grn_obj start_time, end_time;
GRN_TIME_INIT(&start_time, 0);
GRN_TIME_NOW(ctx, &start_time);
command_send(ctx, command, type, task_id);
do {
command_recv(ctx, type, task_id, &res, &res_len, &flags);
if (res_len) {
long long int self;
GRN_TIME_INIT(&end_time, 0);
GRN_TIME_NOW(ctx, &end_time);
self = GRN_TIME_VALUE(&end_time) - GRN_TIME_VALUE(&start_time);
if (grntest_task[task_id].max < self) {
grntest_task[task_id].max = self;
}
if (grntest_task[task_id].min > self) {
grntest_task[task_id].min = self;
}
if (report_p(grntest_task[task_id].jobtype)) {
char tmpbuf[BUF_LEN];
if (res_len < BUF_LEN) {
strncpy(tmpbuf, res, res_len);
tmpbuf[res_len] = '\0';
} else {
strncpy(tmpbuf, res, BUF_LEN - 2);
tmpbuf[BUF_LEN -2] = '\0';
}
report_command(ctx, command, tmpbuf, task_id, &start_time, &end_time);
}
if (out_p(grntest_task[task_id].jobtype)) {
fwrite(res, 1, res_len, grntest_job[grntest_task[task_id].job_id].outputlog);
fputc('\n', grntest_job[grntest_task[task_id].job_id].outputlog);
fflush(grntest_job[grntest_task[task_id].job_id].outputlog);
}
if (test_p(grntest_task[task_id].jobtype)) {
grn_obj log;
grn_file_reader *input;
FILE *output;
GRN_TEXT_INIT(&log, 0);
input = grntest_job[grntest_task[task_id].job_id].inputlog;
output = grntest_job[grntest_task[task_id].job_id].outputlog;
if (grn_file_reader_read_line(ctx, input, &log) != GRN_SUCCESS) {
GRN_LOG(ctx, GRN_ERROR, "Cannot get input-log");
error_exit_in_thread(55);
}
if (GRN_TEXT_VALUE(&log)[GRN_TEXT_LEN(&log) - 1] == '\n') {
grn_bulk_truncate(ctx, &log, GRN_TEXT_LEN(&log) - 1);
}
if (!same_result_p(GRN_TEXT_VALUE(&log), GRN_TEXT_LEN(&log),
res, res_len)) {
fprintf(output, "DIFF:command:%s\n", command);
fprintf(output, "DIFF:result:");
fwrite(res, 1, res_len, output);
fputc('\n', output);
fprintf(output, "DIFF:expect:%.*s\n",
(int)GRN_TEXT_LEN(&log), GRN_TEXT_VALUE(&log));
fflush(output);
}
GRN_OBJ_FIN(ctx, &log);
}
grn_obj_close(ctx, &end_time);
break;
} else {
#ifdef ENABLE_ERROR_REPORT
error_command(ctx, command, task_id);
#endif
}
} while ((flags & GRN_CTX_MORE));
grn_obj_close(ctx, &start_time);
return 0;
}
static int
comment_p(char *command)
{
if (command[0] == '#') {
return 1;
}
return 0;
}
static int
load_command_p(char *command)
{
int i = 0;
while (grn_isspace(&command[i], GRN_ENC_UTF8) == 1) {
i++;
}
if (command[i] == '\0') {
return 0;
}
if (!strncmp(&command[i], "load", 4)) {
return 1;
}
return 0;
}
static int
worker_sub(grn_ctx *ctx, grn_obj *log, int task_id)
{
int i, load_mode, load_count;
grn_obj end_time;
long long int total_elapsed_time, job_elapsed_time;
double sec, qps;
long long int load_start;
struct task *task;
struct job *job;
task = &(grntest_task[task_id]);
task->max = 0LL;
task->min = 9223372036854775807LL;
task->qnum = 0;
for (i = 0; i < task->ntimes; i++) {
if (task->file != NULL) {
grn_file_reader *reader;
grn_obj line;
reader = grn_file_reader_open(ctx, task->file);
if (!reader) {
fprintf(stderr, "Cannot open %s\n",grntest_task[task_id].file);
error_exit_in_thread(1);
}
load_mode = 0;
load_count = 0;
load_start = 0LL;
GRN_TEXT_INIT(&line, 0);
while (grn_file_reader_read_line(ctx, reader, &line) == GRN_SUCCESS) {
if (GRN_TEXT_VALUE(&line)[GRN_TEXT_LEN(&line) - 1] == '\n') {
grn_bulk_truncate(ctx, &line, GRN_TEXT_LEN(&line) - 1);
}
if (GRN_TEXT_LEN(&line) == 0) {
GRN_BULK_REWIND(&line);
continue;
}
GRN_TEXT_PUTC(ctx, &line, '\0');
if (comment_p(GRN_TEXT_VALUE(&line))) {
GRN_BULK_REWIND(&line);
continue;
}
if (load_command_p(GRN_TEXT_VALUE(&line))) {
load_mode = 1;
load_count = 1;
}
if (load_mode == 1) {
if (do_load_command(&grntest_ctx[task_id], GRN_TEXT_VALUE(&line),
task->jobtype,
task_id, &load_start)) {
task->qnum += load_count;
load_mode = 0;
load_count = 0;
load_start = 0LL;
}
load_count++;
GRN_BULK_REWIND(&line);
continue;
}
do_command(&grntest_ctx[task_id], GRN_TEXT_VALUE(&line),
task->jobtype,
task_id);
task->qnum++;
GRN_BULK_REWIND(&line);
if (grntest_sigint) {
goto exit;
}
}
GRN_OBJ_FIN(ctx, &line);
grn_file_reader_close(ctx, reader);
} else {
int i, n_commands;
grn_obj *commands;
commands = task->commands;
if (!commands) {
error_exit_in_thread(1);
}
load_mode = 0;
n_commands = GRN_BULK_VSIZE(commands) / sizeof(grn_obj *);
for (i = 0; i < n_commands; i++) {
grn_obj *command;
command = GRN_PTR_VALUE_AT(commands, i);
if (load_command_p(GRN_TEXT_VALUE(command))) {
load_mode = 1;
}
if (load_mode == 1) {
if (do_load_command(&grntest_ctx[task_id],
GRN_TEXT_VALUE(command),
task->jobtype, task_id, &load_start)) {
load_mode = 0;
load_start = 0LL;
task->qnum++;
}
continue;
}
do_command(&grntest_ctx[task_id],
GRN_TEXT_VALUE(command),
task->jobtype, task_id);
task->qnum++;
if (grntest_sigint) {
goto exit;
}
}
}
}
exit:
GRN_TIME_INIT(&end_time, 0);
GRN_TIME_NOW(&grntest_ctx[task_id], &end_time);
total_elapsed_time = GRN_TIME_VALUE(&end_time) - GRN_TIME_VALUE(&grntest_starttime);
job_elapsed_time = GRN_TIME_VALUE(&end_time) - GRN_TIME_VALUE(&grntest_jobs_start);
CRITICAL_SECTION_ENTER(grntest_cs);
job = &(grntest_job[task->job_id]);
if (job->max < task->max) {
job->max = task->max;
}
if (job->min > task->min) {
job->min = task->min;
}
job->qnum += task->qnum;
job->done++;
if (job->done == job->concurrency) {
char tmpbuf[BUF_LEN];
sec = job_elapsed_time / (double)1000000;
qps = (double)job->qnum/ sec;
grntest_jobdone++;
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf,
"job\t"
"%s\t"
"%" GRN_FMT_LLD "\t"
"%" GRN_FMT_LLD "\t"
"%f\t"
"%" GRN_FMT_LLD "\t"
"%" GRN_FMT_LLD "\t"
"%d\n",
job->jobname,
total_elapsed_time,
job_elapsed_time,
qps,
job->min,
job->max,
job->qnum);
} else {
sprintf(tmpbuf,
"{\"job\": \"%s\", "
"\"total_elapsed_time\": %" GRN_FMT_LLD ", "
"\"job_elapsed_time\": %" GRN_FMT_LLD ", "
"\"qps\": %f, "
"\"min\": %" GRN_FMT_LLD ", "
"\"max\": %" GRN_FMT_LLD ", "
"\"queries\": %d}",
job->jobname,
total_elapsed_time,
job_elapsed_time,
qps,
job->min,
job->max,
job->qnum);
if (grntest_jobdone < grntest_jobnum) {
grn_strcat(tmpbuf, BUF_LEN, ",");
}
}
GRN_TEXT_PUTS(ctx, log, tmpbuf);
if (grntest_jobdone == grntest_jobnum) {
if (grntest_outtype == OUT_TSV) {
fprintf(grntest_log_file, "%.*s",
(int)GRN_TEXT_LEN(log), GRN_TEXT_VALUE(log));
} else {
if (grntest_detail_on) {
fseek(grntest_log_file, -2, SEEK_CUR);
fprintf(grntest_log_file, "],\n");
}
fprintf(grntest_log_file, "\"summary\": [");
fprintf(grntest_log_file, "%.*s",
(int)GRN_TEXT_LEN(log), GRN_TEXT_VALUE(log));
fprintf(grntest_log_file, "]");
}
fflush(grntest_log_file);
}
}
grn_obj_close(&grntest_ctx[task_id], &end_time);
CRITICAL_SECTION_LEAVE(grntest_cs);
return 0;
}
typedef struct _grntest_worker {
grn_ctx *ctx;
grn_obj log;
int task_id;
} grntest_worker;
#ifdef WIN32
static unsigned int
__stdcall
worker(void *val)
{
grntest_worker *worker = val;
worker_sub(worker->ctx, &worker->log, worker->task_id);
return 0;
}
#else
static void *
worker(void *val)
{
grntest_worker *worker = val;
worker_sub(worker->ctx, &worker->log, worker->task_id);
return NULL;
}
#endif /* WIN32 */
#ifdef WIN32
static int
thread_main(grn_ctx *ctx, int num)
{
int i;
int ret;
HANDLE pthread[MAX_CON];
grntest_worker *workers[MAX_CON];
for (i = 0; i < num; i++) {
workers[i] = GRN_MALLOC(sizeof(grntest_worker));
workers[i]->ctx = &grntest_ctx[i];
GRN_TEXT_INIT(&workers[i]->log, 0);
workers[i]->task_id = i;
pthread[i] = (HANDLE)_beginthreadex(NULL, 0, worker, (void *)workers[i],
0, NULL);
if (pthread[i]== (HANDLE)0) {
fprintf(stderr, "thread failed:%d\n", i);
error_exit_in_thread(1);
}
}
ret = WaitForMultipleObjects(num, pthread, TRUE, INFINITE);
if (ret == WAIT_TIMEOUT) {
fprintf(stderr, "timeout\n");
error_exit_in_thread(1);
}
for (i = 0; i < num; i++) {
CloseHandle(pthread[i]);
GRN_OBJ_FIN(workers[i]->ctx, &workers[i]->log);
GRN_FREE(workers[i]);
}
return 0;
}
#else
static int
thread_main(grn_ctx *ctx, int num)
{
intptr_t i;
int ret;
pthread_t pthread[MAX_CON];
grntest_worker *workers[MAX_CON];
for (i = 0; i < num; i++) {
workers[i] = GRN_MALLOC(sizeof(grntest_worker));
workers[i]->ctx = &grntest_ctx[i];
GRN_TEXT_INIT(&workers[i]->log, 0);
workers[i]->task_id = i;
ret = pthread_create(&pthread[i], NULL, worker, (void *)workers[i]);
if (ret) {
fprintf(stderr, "Cannot create thread:ret=%d\n", ret);
error_exit_in_thread(1);
}
}
for (i = 0; i < num; i++) {
ret = pthread_join(pthread[i], NULL);
GRN_OBJ_FIN(workers[i]->ctx, &workers[i]->log);
GRN_FREE(workers[i]);
if (ret) {
fprintf(stderr, "Cannot join thread:ret=%d\n", ret);
error_exit_in_thread(1);
}
}
return 0;
}
#endif
static int
error_exit(grn_ctx *ctx, int ret)
{
fflush(stderr);
shutdown_server();
grn_ctx_fin(ctx);
grn_fin();
exit(ret);
}
static int
get_sysinfo(const char *path, char *result, int olen)
{
char tmpbuf[256];
#ifdef WIN32
ULARGE_INTEGER dinfo;
char cpustring[64];
SYSTEM_INFO sinfo;
MEMORYSTATUSEX minfo;
OSVERSIONINFO osinfo;
if (grntest_outtype == OUT_TSV) {
result[0] = '\0';
sprintf(tmpbuf, "script\t%s\n", grntest_scriptname);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, "user\t%s\n", grntest_username);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, "date\t%s\n", grntest_date);
grn_strcat(result, olen, tmpbuf);
} else {
grn_strcpy(result, olen, "{");
sprintf(tmpbuf, "\"script\": \"%s.scr\",\n", grntest_scriptname);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, " \"user\": \"%s\",\n", grntest_username);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, " \"date\": \"%s\",\n", grntest_date);
grn_strcat(result, olen, tmpbuf);
}
memset(cpustring, 0, 64);
#ifndef __GNUC__
{
int cinfo[4];
__cpuid(cinfo, 0x80000002);
memcpy(cpustring, cinfo, 16);
__cpuid(cinfo, 0x80000003);
memcpy(cpustring+16, cinfo, 16);
__cpuid(cinfo, 0x80000004);
memcpy(cpustring+32, cinfo, 16);
}
#endif
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%s\n", cpustring);
} else {
sprintf(tmpbuf, " \"CPU\": \"%s\",\n", cpustring);
}
grn_strcat(result, olen, tmpbuf);
if (sizeof(int *) == 8) {
grntest_osinfo = OS_WINDOWS64;
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "64BIT\n");
} else {
sprintf(tmpbuf, " \"BIT\": 64,\n");
}
} else {
grntest_osinfo = OS_WINDOWS32;
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "32BIT\n");
} else {
sprintf(tmpbuf, " \"BIT\": 32,\n");
}
}
grn_strcat(result, olen, tmpbuf);
GetSystemInfo(&sinfo);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "CORE\t%lu\n", sinfo.dwNumberOfProcessors);
} else {
sprintf(tmpbuf, " \"CORE\": %lu,\n", sinfo.dwNumberOfProcessors);
}
grn_strcat(result, olen, tmpbuf);
minfo.dwLength = sizeof(MEMORYSTATUSEX);
GlobalMemoryStatusEx(&minfo);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "RAM\t%I64dMByte\n", minfo.ullTotalPhys/(1024*1024));
} else {
sprintf(tmpbuf, " \"RAM\": \"%I64dMByte\",\n", minfo.ullTotalPhys/(1024*1024));
}
grn_strcat(result, olen, tmpbuf);
GetDiskFreeSpaceEx(NULL, NULL, &dinfo, NULL);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "HDD\t%I64dKBytes\n", dinfo.QuadPart/1024 );
} else {
sprintf(tmpbuf, " \"HDD\": \"%I64dKBytes\",\n", dinfo.QuadPart/1024 );
}
grn_strcat(result, olen, tmpbuf);
osinfo.dwOSVersionInfoSize = sizeof(OSVERSIONINFO); GetVersionEx(&osinfo);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "Windows %ld.%ld\n",
osinfo.dwMajorVersion, osinfo.dwMinorVersion);
} else {
sprintf(tmpbuf, " \"OS\": \"Windows %lu.%lu\",\n", osinfo.dwMajorVersion,
osinfo.dwMinorVersion);
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%s\n", grntest_serverhost);
} else {
sprintf(tmpbuf, " \"HOST\": \"%s\",\n", grntest_serverhost);
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%d\n", grntest_serverport);
} else {
sprintf(tmpbuf, " \"PORT\": \"%d\",\n", grntest_serverport);
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%s\"\n", grn_get_version());
} else {
sprintf(tmpbuf, " \"VERSION\": \"%s\"\n", grn_get_version());
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype != OUT_TSV) {
grn_strcat(result, olen, "}");
}
#else /* linux only */
FILE *fp;
int ret;
int cpunum = 0;
int minfo = 0;
int unevictable = 0;
int mlocked = 0;
#define CPU_STRING_SIZE 256
char cpu_string[CPU_STRING_SIZE];
struct utsname ubuf;
struct statvfs vfsbuf;
if (grntest_outtype == OUT_TSV) {
result[0] = '\0';
sprintf(tmpbuf, "sctipt\t%s\n", grntest_scriptname);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, "user\t%s\n", grntest_username);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, "date\t%s\n", grntest_date);
grn_strcat(result, olen, tmpbuf);
} else {
grn_strcpy(result, olen, "{");
sprintf(tmpbuf, "\"script\": \"%s.scr\",\n", grntest_scriptname);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, " \"user\": \"%s\",\n", grntest_username);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, " \"date\": \"%s\",\n", grntest_date);
grn_strcat(result, olen, tmpbuf);
}
fp = fopen("/proc/cpuinfo", "r");
if (!fp) {
fprintf(stderr, "Cannot open cpuinfo\n");
exit(1);
}
while (fgets(tmpbuf, 256, fp) != NULL) {
tmpbuf[strlen(tmpbuf)-1] = '\0';
if (!strncmp(tmpbuf, "model name\t: ", 13)) {
grn_strcpy(cpu_string, CPU_STRING_SIZE, &tmpbuf[13]);
}
}
fclose(fp);
#undef CPU_STRING_SIZE
cpunum = sysconf(_SC_NPROCESSORS_CONF);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%s\n", cpu_string);
} else {
sprintf(tmpbuf, " \"CPU\": \"%s\",\n", cpu_string);
}
grn_strcat(result, olen, tmpbuf);
if (sizeof(int *) == 8) {
grntest_osinfo = OS_LINUX64;
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "64BIT\n");
} else {
sprintf(tmpbuf, " \"BIT\": 64,\n");
}
} else {
grntest_osinfo = OS_LINUX32;
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "32BIT\n");
} else {
sprintf(tmpbuf, " \"BIT\": 32,\n");
}
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "CORE\t%d\n", cpunum);
} else {
sprintf(tmpbuf, " \"CORE\": %d,\n", cpunum);
}
grn_strcat(result, olen, tmpbuf);
fp = fopen("/proc/meminfo", "r");
if (!fp) {
fprintf(stderr, "Cannot open meminfo\n");
exit(1);
}
while (fgets(tmpbuf, 256, fp) != NULL) {
tmpbuf[strlen(tmpbuf)-1] = '\0';
if (!strncmp(tmpbuf, "MemTotal:", 9)) {
minfo = grntest_atoi(&tmpbuf[10], &tmpbuf[10] + 40, NULL);
}
if (!strncmp(tmpbuf, "Unevictable:", 12)) {
unevictable = grntest_atoi(&tmpbuf[13], &tmpbuf[13] + 40, NULL);
}
if (!strncmp(tmpbuf, "Mlocked:", 8)) {
mlocked = grntest_atoi(&tmpbuf[9], &tmpbuf[9] + 40, NULL);
}
}
fclose(fp);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%dMBytes\n", minfo/1024);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, "%dMBytes_Unevictable\n", unevictable/1024);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, "%dMBytes_Mlocked\n", mlocked/1024);
grn_strcat(result, olen, tmpbuf);
} else {
sprintf(tmpbuf, " \"RAM\": \"%dMBytes\",\n", minfo/1024);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, " \"Unevictable\": \"%dMBytes\",\n", unevictable/1024);
grn_strcat(result, olen, tmpbuf);
sprintf(tmpbuf, " \"Mlocked\": \"%dMBytes\",\n", mlocked/1024);
grn_strcat(result, olen, tmpbuf);
}
ret = statvfs(path, &vfsbuf);
if (ret) {
fprintf(stderr, "Cannot access %s\n", path);
exit(1);
}
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%" GRN_FMT_INT64U "KBytes\n", vfsbuf.f_blocks * 4);
} else {
sprintf(tmpbuf,
" \"HDD\": \"%" GRN_FMT_INT64U "KBytes\",\n",
vfsbuf.f_blocks * 4);
}
grn_strcat(result, olen, tmpbuf);
uname(&ubuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%s %s\n", ubuf.sysname, ubuf.release);
} else {
sprintf(tmpbuf, " \"OS\": \"%s %s\",\n", ubuf.sysname, ubuf.release);
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%s\n", grntest_serverhost);
} else {
sprintf(tmpbuf, " \"HOST\": \"%s\",\n", grntest_serverhost);
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%d\n", grntest_serverport);
} else {
sprintf(tmpbuf, " \"PORT\": \"%d\",\n", grntest_serverport);
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype == OUT_TSV) {
sprintf(tmpbuf, "%s\n", grn_get_version());
} else {
sprintf(tmpbuf, " \"VERSION\": \"%s\"\n", grn_get_version());
}
grn_strcat(result, olen, tmpbuf);
if (grntest_outtype != OUT_TSV) {
grn_strcat(result, olen, "},");
}
#endif /* WIN32 */
if (strlen(result) >= olen) {
fprintf(stderr, "buffer overrun in get_sysinfo!\n");
exit(1);
}
return 0;
}
static int
start_server(const char *dbpath, int r)
{
int ret;
char optbuf[BUF_LEN];
#ifdef WIN32
char tmpbuf[BUF_LEN];
STARTUPINFO si;
if (strlen(dbpath) > BUF_LEN - 100) {
fprintf(stderr, "too long dbpath!\n");
exit(1);
}
grn_strcpy(tmpbuf, BUF_LEN, groonga_path);
grn_strcat(tmpbuf, BUF_LEN, " -s --protocol ");
grn_strcat(tmpbuf, BUF_LEN, groonga_protocol);
grn_strcat(tmpbuf, BUF_LEN, " -p ");
sprintf(optbuf, "%d ", grntest_serverport);
grn_strcat(tmpbuf, BUF_LEN, optbuf);
grn_strcat(tmpbuf, BUF_LEN, dbpath);
memset(&si, 0, sizeof(STARTUPINFO));
si.cb=sizeof(STARTUPINFO);
ret = CreateProcess(NULL, tmpbuf, NULL, NULL, FALSE,
0, NULL, NULL, &si, &grntest_pi);
if (ret == 0) {
fprintf(stderr, "Cannot start groonga server: <%s>: error=%lu\n",
groonga_path, GetLastError());
exit(1);
}
#else
pid_t pid;
pid = fork();
if (pid < 0) {
fprintf(stderr, "Cannot start groonga server:Cannot fork\n");
exit(1);
}
sprintf(optbuf, "%d", grntest_serverport);
if (pid == 0) {
ret = execlp(groonga_path, groonga_path,
"-s",
"--protocol", groonga_protocol,
"-p", optbuf,
dbpath, (char*)NULL);
if (ret == -1) {
fprintf(stderr, "Cannot start groonga server: <%s>: errno=%d\n",
groonga_path, errno);
exit(1);
}
}
else {
grntest_server_id = pid;
}
#endif /* WIN32 */
return 0;
}
static int
parse_line(grn_ctx *ctx, char *buf, int start, int end, int num)
{
int i, j, error_flag = 0, out_or_test = 0;
char tmpbuf[BUF_LEN];
grntest_job[num].concurrency = 1;
grntest_job[num].ntimes = 1;
grntest_job[num].done = 0;
grntest_job[num].qnum = 0;
grntest_job[num].max = 0LL;
grntest_job[num].min = 9223372036854775807LL;
grntest_job[num].outputlog = NULL;
grntest_job[num].inputlog = NULL;
strncpy(grntest_job[num].jobname, &buf[start], end - start);
grntest_job[num].jobname[end - start] = '\0';
i = start;
while (i < end) {
if (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
i++;
continue;
}
if (!strncmp(&buf[i], "do_local", 8)) {
grntest_job[num].jobtype = J_DO_LOCAL;
i = i + 8;
break;
}
if (!strncmp(&buf[i], "do_gqtp", 7)) {
grntest_job[num].jobtype = J_DO_GQTP;
i = i + 7;
break;
}
if (!strncmp(&buf[i], "do_http", 7)) {
grntest_job[num].jobtype = J_DO_HTTP;
i = i + 7;
break;
}
if (!strncmp(&buf[i], "rep_local", 9)) {
grntest_job[num].jobtype = J_REP_LOCAL;
i = i + 9;
break;
}
if (!strncmp(&buf[i], "rep_gqtp", 8)) {
grntest_job[num].jobtype = J_REP_GQTP;
i = i + 8;
break;
}
if (!strncmp(&buf[i], "rep_http", 8)) {
grntest_job[num].jobtype = J_REP_HTTP;
i = i + 8;
break;
}
if (!strncmp(&buf[i], "out_local", 9)) {
grntest_job[num].jobtype = J_OUT_LOCAL;
i = i + 9;
out_or_test = 1;
break;
}
if (!strncmp(&buf[i], "out_gqtp", 8)) {
grntest_job[num].jobtype = J_OUT_GQTP;
i = i + 8;
out_or_test = 1;
break;
}
if (!strncmp(&buf[i], "out_http", 8)) {
grntest_job[num].jobtype = J_OUT_HTTP;
i = i + 8;
out_or_test = 1;
break;
}
if (!strncmp(&buf[i], "test_local", 10)) {
grntest_job[num].jobtype = J_TEST_LOCAL;
i = i + 10;
out_or_test = 1;
break;
}
if (!strncmp(&buf[i], "test_gqtp", 9)) {
grntest_job[num].jobtype = J_TEST_GQTP;
i = i + 9;
out_or_test = 1;
break;
}
if (!strncmp(&buf[i], "test_http", 9)) {
grntest_job[num].jobtype = J_TEST_HTTP;
i = i + 9;
out_or_test = 1;
break;
}
error_flag = 1;
i++;
}
if (error_flag) {
return 3;
}
if (i == end) {
return 1;
}
if (grn_isspace(&buf[i], GRN_ENC_UTF8) != 1) {
return 4;
}
i++;
while (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
i++;
continue;
}
j = 0;
while (i < end) {
if (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
break;
}
grntest_job[num].commandfile[j] = buf[i];
i++;
j++;
if (j > 255) {
return 5;
}
}
grntest_job[num].commandfile[j] = '\0';
while (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
i++;
}
if (i == end) {
if (out_or_test) {
fprintf(stderr, "log(test)_local(gqtp|http) needs log(test)_filename\n");
return 11;
}
return 0;
}
j = 0;
while (i < end) {
if (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
break;
}
tmpbuf[j] = buf[i];
i++;
j++;
if (j >= BUF_LEN) {
return 6;
}
}
tmpbuf[j] ='\0';
if (out_or_test) {
if (out_p(grntest_job[num].jobtype)) {
grntest_job[num].outputlog = fopen(tmpbuf, "wb");
if (grntest_job[num].outputlog == NULL) {
fprintf(stderr, "Cannot open %s\n", tmpbuf);
return 13;
}
} else {
char outlog[BUF_LEN];
grntest_job[num].inputlog = grn_file_reader_open(ctx, tmpbuf);
if (grntest_job[num].inputlog == NULL) {
fprintf(stderr, "Cannot open %s\n", tmpbuf);
return 14;
}
sprintf(outlog, "%s.diff", tmpbuf);
grntest_job[num].outputlog = fopen(outlog, "wb");
if (grntest_job[num].outputlog == NULL) {
fprintf(stderr, "Cannot open %s\n", outlog);
return 15;
}
}
grn_strcpy(grntest_job[num].logfile, BUF_LEN, tmpbuf);
return 0;
} else {
grntest_job[num].concurrency = grntest_atoi(tmpbuf, tmpbuf + j, NULL);
if (grntest_job[num].concurrency == 0) {
return 7;
}
}
while (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
i++;
}
if (i == end) {
return 0;
}
j = 0;
while (i < end) {
if (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
break;
}
tmpbuf[j] = buf[i];
i++;
j++;
if (j > 16) {
return 8;
}
}
tmpbuf[j] ='\0';
grntest_job[num].ntimes = grntest_atoi(tmpbuf, tmpbuf + j, NULL);
if (grntest_job[num].ntimes == 0) {
return 9;
}
if (i == end) {
return 0;
}
while (i < end) {
if (grn_isspace(&buf[i], GRN_ENC_UTF8) == 1) {
i++;
continue;
}
return 10;
}
return 0;
}
static int
get_jobs(grn_ctx *ctx, char *buf, int line)
{
int i, len, start, end, ret;
int jnum = 0;
len = strlen(buf);
i = 0;
while (i < len) {
if ((buf[i] == '#') || (buf[i] == '\r') || (buf[i] == '\n')) {
buf[i] = '\0';
len = i;
break;
}
i++;
}
i = 0;
start = 0;
while (i < len) {
if (buf[i] == ';') {
end = i;
ret = parse_line(ctx, buf, start, end, jnum);
if (ret) {
if (ret > 1) {
fprintf(stderr, "Syntax error:line=%d:ret=%d:%s\n", line, ret, buf);
error_exit(ctx, 1);
}
} else {
jnum++;
}
start = end + 1;
}
i++;
}
end = len;
ret = parse_line(ctx, buf, start, end, jnum);
if (ret) {
if (ret > 1) {
fprintf(stderr, "Syntax error:line=%d:ret=%d:%s\n", line, ret, buf);
error_exit(ctx, 1);
}
} else {
jnum++;
}
return jnum;
}
static int
make_task_table(grn_ctx *ctx, int jobnum)
{
int i, j;
int tid = 0;
grn_obj *commands = NULL;
for (i = 0; i < jobnum; i++) {
if ((grntest_job[i].concurrency == 1) && (!grntest_onmemory_mode)) {
grntest_task[tid].file = grntest_job[i].commandfile;
grntest_task[tid].commands = NULL;
grntest_task[tid].ntimes = grntest_job[i].ntimes;
grntest_task[tid].jobtype = grntest_job[i].jobtype;
grntest_task[tid].job_id = i;
tid++;
continue;
}
for (j = 0; j < grntest_job[i].concurrency; j++) {
if (j == 0) {
grn_file_reader *reader;
grn_obj line;
GRN_TEXT_INIT(&line, 0);
commands = grn_obj_open(ctx, GRN_PVECTOR, 0, GRN_VOID);
if (!commands) {
fprintf(stderr, "Cannot alloc commands\n");
error_exit(ctx, 1);
}
reader = grn_file_reader_open(ctx, grntest_job[i].commandfile);
if (!reader) {
fprintf(stderr, "Cannot alloc commandfile:%s\n",
grntest_job[i].commandfile);
error_exit(ctx, 1);
}
while (grn_file_reader_read_line(ctx, reader, &line) == GRN_SUCCESS) {
grn_obj *command;
if (GRN_TEXT_VALUE(&line)[GRN_TEXT_LEN(&line) - 1] == '\n') {
grn_bulk_truncate(ctx, &line, GRN_TEXT_LEN(&line) - 1);
}
if (GRN_TEXT_LEN(&line) == 0) {
GRN_BULK_REWIND(&line);
continue;
}
GRN_TEXT_PUTC(ctx, &line, '\0');
if (comment_p(GRN_TEXT_VALUE(&line))) {
GRN_BULK_REWIND(&line);
continue;
}
command = grn_obj_open(ctx, GRN_BULK, 0, GRN_VOID);
if (!command) {
fprintf(stderr, "Cannot alloc command: %s: %s\n",
grntest_job[i].commandfile, GRN_TEXT_VALUE(&line));
GRN_OBJ_FIN(ctx, &line);
error_exit(ctx, 1);
}
GRN_TEXT_SET(ctx, command, GRN_TEXT_VALUE(&line), GRN_TEXT_LEN(&line));
GRN_PTR_PUT(ctx, commands, command);
GRN_BULK_REWIND(&line);
}
grn_file_reader_close(ctx, reader);
GRN_OBJ_FIN(ctx, &line);
}
grntest_task[tid].file = NULL;
grntest_task[tid].commands = commands;
grntest_task[tid].ntimes = grntest_job[i].ntimes;
grntest_task[tid].jobtype = grntest_job[i].jobtype;
grntest_task[tid].job_id = i;
tid++;
}
}
return tid;
}
/*
static int
print_commandlist(int task_id)
{
int i;
for (i = 0; i < GRN_TEXT_LEN(grntest_task[task_id].commands); i++) {
grn_obj *command;
command = GRN_PTR_VALUE_AT(grntest_task[task_id].commands, i);
printf("%s\n", GRN_TEXT_VALUE(command));
}
return 0;
}
*/
/* return num of query */
static int
do_jobs(grn_ctx *ctx, int jobnum, int line)
{
int i, task_num, ret, qnum = 0, thread_num = 0;
for (i = 0; i < jobnum; i++) {
/*
printf("%d:type =%d:file=%s:con=%d:ntimes=%d\n", i, grntest_job[i].jobtype,
grntest_job[i].commandfile, JobTable[i].concurrency, JobTable[i].ntimes);
*/
thread_num = thread_num + grntest_job[i].concurrency;
}
if (thread_num >= MAX_CON) {
fprintf(stderr, "Too many threads requested(MAX=64):line=%d\n", line);
error_exit(ctx, 1);
}
task_num = make_task_table(ctx, jobnum);
if (task_num != thread_num) {
fprintf(stderr, "Logical error\n");
error_exit(ctx, 9);
}
grntest_detail_on = 0;
for (i = 0; i < task_num; i++) {
grn_ctx_init(&grntest_ctx[i], 0);
grntest_owndb[i] = NULL;
if (gqtp_p(grntest_task[i].jobtype)) {
ret = grn_ctx_connect(&grntest_ctx[i], grntest_serverhost, grntest_serverport, 0);
if (ret) {
fprintf(stderr, "Cannot connect groonga server:host=%s:port=%d:ret=%d\n",
grntest_serverhost, grntest_serverport, ret);
error_exit(ctx, 1);
}
} else if (http_p(grntest_task[i].jobtype)) {
grntest_task[i].http_socket = 0;
GRN_TEXT_INIT(&grntest_task[i].http_response, 0);
if (grntest_owndb_mode) {
grntest_owndb[i] = grn_db_open(&grntest_ctx[i], grntest_dbpath);
if (grntest_owndb[i] == NULL) {
fprintf(stderr, "Cannot open db:%s\n", grntest_dbpath);
exit(1);
}
} else {
grntest_owndb[i] = grn_db_create(&grntest_ctx[i], NULL, NULL);
}
} else {
if (grntest_owndb_mode) {
grntest_owndb[i] = grn_db_open(&grntest_ctx[i], grntest_dbpath);
if (grntest_owndb[i] == NULL) {
fprintf(stderr, "Cannot open db:%s\n", grntest_dbpath);
exit(1);
}
}
else {
grn_ctx_use(&grntest_ctx[i], grntest_db);
}
}
if (report_p(grntest_task[i].jobtype)) {
grntest_detail_on++;
}
}
if (grntest_detail_on) {
if (grntest_outtype == OUT_TSV) {
;
}
else {
fprintf(grntest_log_file, "\"detail\": [\n");
}
fflush(grntest_log_file);
}
thread_main(ctx, task_num);
for (i = 0; i < task_num; i++) {
if (grntest_owndb[i]) {
grn_obj_close(&grntest_ctx[i], grntest_owndb[i]);
}
if (http_p(grntest_task[i].jobtype)) {
GRN_OBJ_FIN(&grntest_ctx[i], &grntest_task[i].http_response);
}
grn_ctx_fin(&grntest_ctx[i]);
qnum = qnum + grntest_task[i].qnum;
}
i = 0;
while (i < task_num) {
int job_id;
if (grntest_task[i].commands) {
job_id = grntest_task[i].job_id;
GRN_OBJ_FIN(ctx, grntest_task[i].commands);
while (job_id == grntest_task[i].job_id) {
i++;
}
} else {
i++;
}
}
for (i = 0; i < jobnum; i++) {
if (grntest_job[i].outputlog) {
int ret;
ret = fclose(grntest_job[i].outputlog);
if (ret) {
fprintf(stderr, "Cannot close %s\n", grntest_job[i].logfile);
exit(1);
}
}
if (grntest_job[i].inputlog) {
grn_file_reader_close(ctx, grntest_job[i].inputlog);
}
}
return qnum;
}
/* return num of query */
static int
do_script(grn_ctx *ctx, const char *script_file_path)
{
int n_lines = 0;
int n_jobs;
int n_queries, total_n_queries = 0;
grn_file_reader *script_file;
grn_obj line;
script_file = grn_file_reader_open(ctx, script_file_path);
if (script_file == NULL) {
fprintf(stderr, "Cannot open script file: <%s>\n", script_file_path);
error_exit(ctx, 1);
}
GRN_TEXT_INIT(&line, 0);
while (grn_file_reader_read_line(ctx, script_file, &line) == GRN_SUCCESS) {
if (grntest_sigint) {
break;
}
n_lines++;
grntest_jobdone = 0;
n_jobs = get_jobs(ctx, GRN_TEXT_VALUE(&line), n_lines);
grntest_jobnum = n_jobs;
if (n_jobs > 0) {
GRN_TIME_INIT(&grntest_jobs_start, 0);
GRN_TIME_NOW(ctx, &grntest_jobs_start);
if (grntest_outtype == OUT_TSV) {
fprintf(grntest_log_file, "jobs-start\t%s\n", GRN_TEXT_VALUE(&line));
} else {
fprintf(grntest_log_file, "{\"jobs\": \"%s\",\n", GRN_TEXT_VALUE(&line));
}
n_queries = do_jobs(ctx, n_jobs, n_lines);
if (grntest_outtype == OUT_TSV) {
fprintf(grntest_log_file, "jobs-end\t%s\n", GRN_TEXT_VALUE(&line));
} else {
fprintf(grntest_log_file, "},\n");
}
total_n_queries += n_queries;
grn_obj_close(ctx, &grntest_jobs_start);
}
if (grntest_stop_flag) {
fprintf(stderr, "Error:Quit\n");
break;
}
GRN_BULK_REWIND(&line);
}
grn_obj_unlink(ctx, &line);
grn_file_reader_close(ctx, script_file);
return total_n_queries;
}
static int
start_local(grn_ctx *ctx, const char *dbpath)
{
grntest_db = grn_db_open(ctx, dbpath);
if (!grntest_db) {
grntest_db = grn_db_create(ctx, dbpath, NULL);
}
if (!grntest_db) {
fprintf(stderr, "Cannot open db:%s\n", dbpath);
exit(1);
}
return 0;
}
static int
check_server(grn_ctx *ctx)
{
int ret, retry = 0;
while (1) {
ret = grn_ctx_connect(ctx, grntest_serverhost, grntest_serverport, 0);
if (ret == GRN_CONNECTION_REFUSED) {
grn_sleep(1);
retry++;
if (retry > 5) {
fprintf(stderr, "Cannot connect groonga server:host=%s:port=%d:ret=%d\n",
grntest_serverhost, grntest_serverport, ret);
return 1;
}
continue;
}
if (ret) {
fprintf(stderr, "Cannot connect groonga server:host=%s:port=%d:ret=%d\n",
grntest_serverhost, grntest_serverport, ret);
return 1;
}
break;
}
return 0;
}
#define MODE_LIST 1
#define MODE_GET 2
#define MODE_PUT 3
#define MODE_TIME 4
static int
check_response(char *buf)
{
if (buf[0] == '1') {
return 1;
}
if (buf[0] == '2') {
return 1;
}
if (buf[0] == '3') {
return 1;
}
return 0;
}
static int
read_response(socket_t socket, char *buf)
{
int ret;
ret = recv(socket, buf, BUF_LEN - 1, 0);
if (ret == -1) {
fprintf(stderr, "recv error:3\n");
exit(1);
}
buf[ret] ='\0';
#ifdef DEBUG_FTP
fprintf(stderr, "recv:%s", buf);
#endif
return ret;
}
static int
put_file(socket_t socket, const char *filename)
{
FILE *fp;
int c, ret, size = 0;
char buf[1];
fp = fopen(filename, "rb");
if (!fp) {
fprintf(stderr, "LOCAL:no such file:%s\n", filename);
return 0;
}
while ((c = fgetc(fp)) != EOF) {
buf[0] = c;
ret = send(socket, buf, 1, 0);
if (ret == -1) {
fprintf(stderr, "send error\n");
exit(1);
}
size++;
}
fclose(fp);
return size;
}
static int
ftp_list(socket_t data_socket)
{
int ret;
char buf[BUF_LEN];
while (1) {
ret = recv(data_socket, buf, BUF_LEN - 2, 0);
if (ret == 0) {
fflush(stdout);
return 0;
}
buf[ret] = '\0';
fprintf(stdout, "%s", buf);
}
return 0;
}
static int
get_file(socket_t socket, const char *filename, int size)
{
FILE *fp;
int ret, total;
char buf[FTPBUF];
fp = fopen(filename, "wb");
if (!fp) {
fprintf(stderr, "Cannot open %s\n", filename);
return -1;
}
total = 0;
while (total != size) {
ret = recv(socket, buf, FTPBUF, 0);
if (ret == -1) {
fprintf(stderr, "recv error:2:ret=%d:size=%d:total\n", ret, size);
return -1;
}
if (ret == 0) {
break;
}
fwrite(buf, ret, 1, fp);
total = total + ret;
}
fclose(fp);
return size;
}
static int
get_port(char *buf, char *host, int *port)
{
int ret,d1,d2,d3,d4,d5,d6;
ret = sscanf(buf, "227 Entering Passive Mode (%d,%d,%d,%d,%d,%d)",
&d1, &d2, &d3, &d4, &d5, &d6);
if (ret != 6) {
fprintf(stderr, "Cannot enter passsive mode\n");
return 0;
}
*port = d5 * 256 + d6;
sprintf(host, "%d.%d.%d.%d", d1, d2, d3, d4);
return 1;
}
static char *
get_ftp_date(char *buf)
{
while (*buf !=' ') {
buf++;
if (*buf == '\0') {
return NULL;
}
}
buf++;
return buf;
}
static int
get_size(char *buf)
{
int size;
while (*buf !='(') {
buf++;
if (*buf == '\0') {
return 0;
}
}
buf++;
size = grntest_atoi(buf, buf + strlen(buf), NULL);
return size;
}
int
ftp_sub(const char *user, const char *passwd, const char *host,
const char *filename, int mode,
const char *cd_dirname, char *retval)
{
int size = 0;
int status = 0;
socket_t command_socket, data_socket;
int data_port;
char data_host[BUF_LEN];
char send_mesg[BUF_LEN];
char buf[BUF_LEN];
#ifdef WIN32
char base[BUF_LEN];
char fname[BUF_LEN];
char ext[BUF_LEN];
#else
char *base;
#endif /* WIN32 */
#ifdef WIN32
WSADATA ws;
WSAStartup(MAKEWORD(2,0), &ws);
#endif /* WIN32 */
if ((filename != NULL) && (strlen(filename) >= MAX_PATH_LEN)) {
fprintf(stderr, "too long filename\n");
exit(1);
}
if ((cd_dirname != NULL) && (strlen(cd_dirname) >= MAX_PATH_LEN)) {
fprintf(stderr, "too long dirname\n");
exit(1);
}
command_socket = open_socket(host, 21);
if (command_socket == SOCKETERROR) {
return 0;
}
read_response(command_socket, buf);
if (!check_response(buf)) {
goto exit;
}
/* send username */
sprintf(send_mesg, "USER %s\r\n", user);
write_to_server(command_socket, send_mesg);
read_response(command_socket, buf);
if (!check_response(buf)) {
goto exit;
}
/* send passwd */
sprintf(send_mesg, "PASS %s\r\n", passwd);
write_to_server(command_socket, send_mesg);
read_response(command_socket, buf);
if (!check_response(buf)) {
goto exit;
}
/* send TYPE I */
sprintf(send_mesg, "TYPE I\r\n");
write_to_server(command_socket, send_mesg);
read_response(command_socket, buf);
if (!check_response(buf)) {
goto exit;
}
/* send PASV */
sprintf(send_mesg, "PASV\r\n");
write_to_server(command_socket, send_mesg);
read_response(command_socket, buf);
if (!check_response(buf)) {
goto exit;
}
if (!get_port(buf, data_host, &data_port)) {
goto exit;
}
data_socket = open_socket(data_host, data_port);
if (data_socket == SOCKETERROR) {
goto exit;
}
if (cd_dirname) {
sprintf(send_mesg, "CWD %s\r\n", cd_dirname);
write_to_server(command_socket, send_mesg);
}
read_response(command_socket, buf);
if (!check_response(buf)) {
socketclose(data_socket);
goto exit;
}
#ifdef WIN32
_splitpath(filename, NULL, NULL, fname, ext);
grn_strcpy(base, BUF_LEN, fname);
strcat(base, ext);
#else
grn_strcpy(buf, BUF_LEN, filename);
base = basename(buf);
#endif /* WIN32 */
switch (mode) {
case MODE_LIST:
if (filename) {
sprintf(send_mesg, "LIST %s\r\n", filename);
} else {
sprintf(send_mesg, "LIST \r\n");
}
write_to_server(command_socket, send_mesg);
break;
case MODE_PUT:
sprintf(send_mesg, "STOR %s\r\n", base);
write_to_server(command_socket, send_mesg);
break;
case MODE_GET:
sprintf(send_mesg, "RETR %s\r\n", base);
write_to_server(command_socket, send_mesg);
break;
case MODE_TIME:
sprintf(send_mesg, "MDTM %s\r\n", base);
write_to_server(command_socket, send_mesg);
break;
default:
fprintf(stderr, "invalid mode\n");
socketclose(data_socket);
goto exit;
}
read_response(command_socket, buf);
if (!check_response(buf)) {
socketclose(data_socket);
goto exit;
}
if (!strncmp(buf, "150", 3)) {
size = get_size(buf);
}
if (!strncmp(buf, "213", 3)) {
retval[BUF_LEN-2] = '\0';
grn_strcpy(retval, BUF_LEN - 2, get_ftp_date(buf));
if (retval[BUF_LEN-2] != '\0' ) {
fprintf(stderr, "buffer over run in ftp\n");
exit(1);
}
}
switch (mode) {
case MODE_LIST:
ftp_list(data_socket);
break;
case MODE_GET:
if (get_file(data_socket, filename, size) == -1) {
socketclose(data_socket);
goto exit;
}
fprintf(stderr, "get:%s\n", filename);
break;
case MODE_PUT:
if (put_file(data_socket, filename) == -1) {
socketclose(data_socket);
goto exit;
}
fprintf(stderr, "put:%s\n", filename);
break;
default:
break;
}
socketclose(data_socket);
if ((mode == MODE_GET) || (mode == MODE_PUT)) {
read_response(command_socket, buf);
}
write_to_server(command_socket, "QUIT\n");
status = 1;
exit:
socketclose(command_socket);
#ifdef WIN32
WSACleanup();
#endif
return status;
}
/*
static int
ftp_main(int argc, char **argv)
{
char val[BUF_LEN];
val[0] = '\0';
ftp_sub(FTPUSER, FTPPASSWD, FTPSERVER, argv[2],
grntest_atoi(argv[3], argv[3] + strlen(argv[3]), NULL), argv[4], val);
if (val[0] != '\0') {
printf("val=%s\n", val);
}
return 0;
}
*/
static int
get_username(char *name, int maxlen)
{
char *env=NULL;
grn_strcpy(name, maxlen, "nobody");
#ifdef WIN32
env = getenv("USERNAME");
#else
env = getenv("USER");
#endif /* WIN32 */
if (strlen(env) > maxlen) {
fprintf(stderr, "too long username:%s\n", env);
exit(1);
}
if (env) {
grn_strcpy(name, maxlen, env);
}
return 0;
}
static int
get_date(char *date, time_t *sec)
{
#if defined(WIN32) && !defined(__GNUC__)
struct tm tmbuf;
struct tm *tm = &tmbuf;
localtime_s(tm, sec);
#else /* defined(WIN32) && !defined(__GNUC__) */
# ifdef HAVE_LOCALTIME_R
struct tm result;
struct tm *tm = &result;
localtime_r(sec, tm);
# else /* HAVE_LOCALTIME_R */
struct tm *tm = localtime(sec);
# endif /* HAVE_LOCALTIME_R */
#endif /* defined(WIN32) && !defined(__GNUC__) */
#ifdef WIN32
strftime(date, 128, "%Y-%m-%d %H:%M:%S", tm);
#else
strftime(date, 128, "%F %T", tm);
#endif /* WIN32 */
return 1;
}
static int
get_scriptname(const char *path, char *name, size_t name_len, const char *suffix)
{
int slen = strlen(suffix);
int len = strlen(path);
if (len >= BUF_LEN) {
fprintf(stderr, "too long script name\n");
exit(1);
}
if (slen > len) {
fprintf(stderr, "too long suffux\n");
exit(1);
}
grn_strcpy(name, name_len, path);
if (strncmp(&name[len-slen], suffix, slen)) {
name[0] = '\0';
return 0;
}
name[len-slen] = '\0';
return 1;
}
#ifdef WIN32
static int
get_tm_from_serverdate(char *serverdate, struct tm *tm)
{
int res;
int year, month, day, hour, minute, second;
res = sscanf(serverdate, "%4d%2d%2d%2d%2d%2d",
&year, &month, &day, &hour, &minute, &second);
/*
printf("%d %d %d %d %d %d\n", year, month, day, hour, minute, second);
*/
tm->tm_sec = second;
tm->tm_min = minute;
tm->tm_hour = hour;
tm->tm_mday = day;
tm->tm_mon = month - 1;
tm->tm_year = year - 1900;
tm->tm_isdst = -1;
return 0;
}
#endif /* WIN32 */
static int
sync_sub(grn_ctx *ctx, const char *filename)
{
int ret;
char serverdate[BUF_LEN];
#ifdef WIN32
struct _stat statbuf;
#else
struct stat statbuf;
#endif /* WIN32 */
time_t st, lt;
struct tm stm;
ret = ftp_sub(FTPUSER, FTPPASSWD, FTPSERVER, filename, MODE_TIME, "data",
serverdate);
if (ret == 0) {
fprintf(stderr, "[%s] does not exist in server\n", filename);
return 0;
}
#ifdef WIN32
get_tm_from_serverdate(serverdate, &stm);
#else
strptime(serverdate, "%Y%m%d %H%M%S", &stm);
#endif /* WIN32 */
/* fixme! needs timezone info */
st = mktime(&stm) + 3600 * 9;
lt = st;
#ifdef WIN32
ret = _stat(filename, &statbuf);
#else
ret = stat(filename, &statbuf);
#endif /* WIN32 */
if (!ret) {
lt = statbuf.st_mtime;
if (lt < st) {
fprintf(stderr, "newer [%s] exists in server\n", filename);
fflush(stderr);
ret = ftp_sub(FTPUSER, FTPPASSWD, FTPSERVER, filename, MODE_GET, "data",
NULL);
return ret;
}
} else {
fprintf(stderr, "[%s] does not exist in local\n", filename);
fflush(stderr);
ret = ftp_sub(FTPUSER, FTPPASSWD, FTPSERVER, filename, MODE_GET, "data",
NULL);
return ret;
}
return 0;
}
static int
cache_file(grn_ctx *ctx, char **flist, char *file, int fnum)
{
int i;
for (i = 0; i < fnum; i++) {
if (!strcmp(flist[i], file) ) {
return fnum;
}
}
flist[fnum] = GRN_STRDUP(file);
fnum++;
if (fnum >= BUF_LEN) {
fprintf(stderr, "too many uniq commands file!\n");
exit(1);
}
return fnum;
}
static int
sync_datafile(grn_ctx *ctx, const char *script_file_path)
{
int line = 0;
int fnum = 0;
int i, job_num;
FILE *fp;
char buf[BUF_LEN];
char *filelist[BUF_LEN];
fp = fopen(script_file_path, "r");
if (fp == NULL) {
fprintf(stderr, "Cannot open script file: <%s>\n", script_file_path);
error_exit(ctx, 1);
}
buf[BUF_LEN-2] = '\0';
while (fgets(buf, BUF_LEN, fp) != NULL) {
line++;
if (buf[BUF_LEN-2] != '\0') {
fprintf(stderr, "Too long line in script file:%d\n", line);
error_exit(ctx, 1);
}
job_num = get_jobs(ctx, buf, line);
if (job_num > 0) {
for (i = 0; i < job_num; i++) {
/*
printf("commandfile=[%s]:buf=%s\n", grntest_job[i].commandfile, buf);
*/
fnum = cache_file(ctx, filelist, grntest_job[i].commandfile, fnum);
}
}
}
for (i = 0; i < fnum; i++) {
if (sync_sub(ctx, filelist[i])) {
fprintf(stderr, "updated!:%s\n", filelist[i]);
fflush(stderr);
}
GRN_FREE(filelist[i]);
}
fclose(fp);
return fnum;
}
static int
sync_script(grn_ctx *ctx, const char *filename)
{
int ret, filenum;
ret = sync_sub(ctx, filename);
if (!ret) {
return 0;
}
fprintf(stderr, "updated!:%s\n", filename);
fflush(stderr);
filenum = sync_datafile(ctx, filename);
return 1;
}
static void
usage(void)
{
fprintf(stderr,
"Usage: grntest [options...] [script] [db]\n"
"options:\n"
" --dir: show script files on ftp server\n"
" -i, --host <ip/hostname>: server address to listen (default: %s)\n"
" --localonly: omit server connection\n"
" --log-output-dir: specify output dir (default: current)\n"
" --ftp: connect to ftp server\n"
" --onmemory: load all commands into memory\n"
" --output-type <tsv/json>: specify output-type (default: json)\n"
" --owndb: open dbs for each ctx\n"
" -p, --port <port number>: server port number (default: %d)\n"
" --groonga <groonga_path>: groonga command path (default: %s)\n"
" --protocol <gqtp|http>: groonga server protocol (default: %s)\n"
" --log-path <path>: specify log file path\n"
" --pid-path <path>: specify file path to store PID file\n",
DEFAULT_DEST, DEFAULT_PORT,
groonga_path, groonga_protocol);
exit(1);
}
enum {
mode_default = 0,
mode_list,
mode_usage,
};
#define MODE_MASK 0x007f
#define MODE_FTP 0x0080
#define MODE_LOCALONLY 0x0100
#define MODE_OWNDB 0x0800
#define MODE_ONMEMORY 0x1000
static int
get_token(char *line, char *token, int maxlen, char **next)
{
int i = 0;
*next = NULL;
token[i] = '\0';
while (*line) {
if (grn_isspace(line, GRN_ENC_UTF8) == 1) {
line++;
continue;
}
if (*line == ';') {
token[0] = ';';
token[1] = '\0';
*next = line + 1;
return 1;
}
if (*line == '#') {
token[0] = ';';
token[1] = '\0';
*next = line + 1;
return 1;
}
break;
}
while (*line) {
token[i] = *line;
i++;
if (grn_isspace(line + 1, GRN_ENC_UTF8) == 1) {
token[i] = '\0';
*next = line + 1;
return 1;
}
if (*(line + 1) == ';') {
token[i] = '\0';
*next = line + 1;
return 1;
}
if (*(line + 1) == '#') {
token[i] = '\0';
*next = line + 1;
return 1;
}
if (*(line + 1) == '\0') {
token[i] = '\0';
return 1;
}
line++;
}
return 0;
}
/* SET_PORT and SET_HOST */
static grn_bool
check_script(grn_ctx *ctx, const char *script_file_path)
{
grn_file_reader *script_file;
grn_obj line;
char token[BUF_LEN];
char prev[BUF_LEN];
char *next = NULL;
script_file = grn_file_reader_open(ctx, script_file_path);
if (!script_file) {
fprintf(stderr, "Cannot open script file: <%s>\n", script_file_path);
return GRN_FALSE;
}
GRN_TEXT_INIT(&line, 0);
while (grn_file_reader_read_line(ctx, script_file, &line) == GRN_SUCCESS) {
GRN_TEXT_VALUE(&line)[GRN_TEXT_LEN(&line) - 1] = '\0';
get_token(GRN_TEXT_VALUE(&line), token, BUF_LEN, &next);
grn_strcpy(prev, BUF_LEN, token);
while (next) {
get_token(next, token, BUF_LEN, &next);
if (!strncmp(prev, "SET_PORT", 8)) {
grntest_serverport = grn_atoi(token, token + strlen(token), NULL);
}
if (!strncmp(prev, "SET_HOST", 8)) {
grn_strcpy(grntest_serverhost, BUF_LEN, token);
grntest_remote_mode = 1;
}
grn_strcpy(prev, BUF_LEN, token);
}
}
grn_obj_unlink(ctx, &line);
grn_file_reader_close(ctx, script_file);
return GRN_TRUE;
}
#ifndef WIN32
static void
timeout(int sig)
{
fprintf(stderr, "timeout:groonga server cannot shutdown!!\n");
fprintf(stderr, "Use \"kill -9 %d\"\n", grntest_server_id);
alarm(0);
}
static void
setexit(int sig)
{
grntest_sigint = 1;
}
static int
setsigalarm(int sec)
{
int ret;
struct sigaction sig;
alarm(sec);
sig.sa_handler = timeout;
sig.sa_flags = 0;
sigemptyset(&sig.sa_mask);
ret = sigaction(SIGALRM, &sig, NULL);
if (ret == -1) {
fprintf(stderr, "setsigalarm:errno= %d\n", errno);
}
return ret;
}
static int
setsigint(void)
{
int ret;
struct sigaction sig;
sig.sa_handler = setexit;
sig.sa_flags = 0;
sigemptyset(&sig.sa_mask);
ret = sigaction(SIGINT, &sig, NULL);
if (ret == -1) {
fprintf(stderr, "setsigint:errno= %d\n", errno);
}
return ret;
}
#endif /* WIN32 */
int
main(int argc, char **argv)
{
int qnum, i, mode = 0;
int exit_code = EXIT_SUCCESS;
grn_ctx context;
char sysinfo[BUF_LEN];
char log_path_buffer[BUF_LEN];
const char *log_path = NULL;
const char *pid_path = NULL;
const char *portstr = NULL, *hoststr = NULL, *dbname = NULL, *scrname = NULL, *outdir = NULL, *outtype = NULL;
time_t sec;
static grn_str_getopt_opt opts[] = {
{'i', "host", NULL, 0, GETOPT_OP_NONE},
{'p', "port", NULL, 0, GETOPT_OP_NONE},
{'\0', "log-output-dir", NULL, 0, GETOPT_OP_NONE},
{'\0', "output-type", NULL, 0, GETOPT_OP_NONE},
{'\0', "dir", NULL, mode_list, GETOPT_OP_UPDATE},
{'\0', "ftp", NULL, MODE_FTP, GETOPT_OP_ON},
{'h', "help", NULL, mode_usage, GETOPT_OP_UPDATE},
{'\0', "localonly", NULL, MODE_LOCALONLY, GETOPT_OP_ON},
{'\0', "onmemory", NULL, MODE_ONMEMORY, GETOPT_OP_ON},
{'\0', "owndb", NULL, MODE_OWNDB, GETOPT_OP_ON},
{'\0', "groonga", NULL, 0, GETOPT_OP_NONE},
{'\0', "protocol", NULL, 0, GETOPT_OP_NONE},
{'\0', "log-path", NULL, 0, GETOPT_OP_NONE},
{'\0', "pid-path", NULL, 0, GETOPT_OP_NONE},
{'\0', NULL, NULL, 0, 0}
};
opts[0].arg = &hoststr;
opts[1].arg = &portstr;
opts[2].arg = &outdir;
opts[3].arg = &outtype;
opts[10].arg = &groonga_path;
opts[11].arg = &groonga_protocol;
opts[12].arg = &log_path;
opts[13].arg = &pid_path;
i = grn_str_getopt(argc, argv, opts, &mode);
if (i < 0) {
usage();
}
switch (mode & MODE_MASK) {
case mode_list :
ftp_sub(FTPUSER, FTPPASSWD, FTPSERVER, "*.scr", 1, "data",
NULL);
return 0;
break;
case mode_usage :
usage();
break;
default :
break;
}
if (pid_path) {
FILE *pid_file;
pid_file = fopen(pid_path, "w");
if (pid_file) {
fprintf(pid_file, "%d", getpid());
fclose(pid_file);
} else {
fprintf(stderr,
"failed to open PID file: <%s>: %s\n",
pid_path, strerror(errno));
}
}
if (i < argc) {
scrname = argv[i];
}
if (i < argc - 1) {
dbname = argv[i+1];
}
grntest_dbpath = dbname;
if (mode & MODE_LOCALONLY) {
grntest_localonly_mode = 1;
grntest_remote_mode = 1;
}
if (mode & MODE_OWNDB) {
grntest_localonly_mode = 1;
grntest_remote_mode = 1;
grntest_owndb_mode = 1;
}
if (mode & MODE_ONMEMORY) {
grntest_onmemory_mode= 1;
}
if (mode & MODE_FTP) {
grntest_ftp_mode = GRN_TRUE;
}
if ((scrname == NULL) || (dbname == NULL)) {
usage();
}
grn_strcpy(grntest_serverhost, BUF_LEN, DEFAULT_DEST);
if (hoststr) {
grntest_remote_mode = 1;
grn_strcpy(grntest_serverhost, BUF_LEN, hoststr);
}
grntest_serverport = DEFAULT_PORT;
if (portstr) {
grntest_serverport = grn_atoi(portstr, portstr + strlen(portstr), NULL);
}
if (outtype && !strcmp(outtype, "tsv")) {
grntest_outtype = OUT_TSV;
}
grn_default_logger_set_path(GRN_LOG_PATH);
grn_init();
CRITICAL_SECTION_INIT(grntest_cs);
grn_ctx_init(&context, 0);
grn_ctx_init(&grntest_server_context, 0);
grn_db_create(&grntest_server_context, NULL, NULL);
grn_set_default_encoding(GRN_ENC_UTF8);
if (grntest_ftp_mode) {
sync_script(&context, scrname);
}
if (!check_script(&context, scrname)) {
exit_code = EXIT_FAILURE;
goto exit;
}
start_local(&context, dbname);
if (!grntest_remote_mode) {
start_server(dbname, 0);
}
if (!grntest_localonly_mode) {
if (check_server(&grntest_server_context)) {
goto exit;
}
}
get_scriptname(scrname, grntest_scriptname, BUF_LEN, ".scr");
get_username(grntest_username, 256);
GRN_TIME_INIT(&grntest_starttime, 0);
GRN_TIME_NOW(&context, &grntest_starttime);
sec = (time_t)(GRN_TIME_VALUE(&grntest_starttime)/1000000);
get_date(grntest_date, &sec);
if (!log_path) {
if (outdir) {
sprintf(log_path_buffer,
"%s/%s-%s-%" GRN_FMT_LLD "-%s.log", outdir, grntest_scriptname,
grntest_username,
GRN_TIME_VALUE(&grntest_starttime), grn_get_version());
} else {
sprintf(log_path_buffer,
"%s-%s-%" GRN_FMT_LLD "-%s.log", grntest_scriptname,
grntest_username,
GRN_TIME_VALUE(&grntest_starttime), grn_get_version());
}
log_path = log_path_buffer;
}
grntest_log_file = fopen(log_path, "w+b");
if (!grntest_log_file) {
fprintf(stderr, "Cannot open log file: <%s>\n", log_path);
goto exit;
}
get_sysinfo(dbname, sysinfo, BUF_LEN);
output_sysinfo(sysinfo);
#ifndef WIN32
setsigint();
#endif /* WIN32 */
qnum = do_script(&context, scrname);
output_result_final(&context, qnum);
fclose(grntest_log_file);
if (grntest_ftp_mode) {
ftp_sub(FTPUSER, FTPPASSWD, FTPSERVER, log_path, 3,
"report", NULL);
}
fprintf(stderr, "grntest done. logfile=%s\n", log_path);
exit:
if (pid_path) {
remove(pid_path);
}
shutdown_server();
#ifdef WIN32
if (!grntest_remote_mode) {
int ret;
ret = WaitForSingleObject(grntest_pi.hProcess, 20000);
if (ret == WAIT_TIMEOUT) {
fprintf(stderr, "timeout:groonga server cannot shutdown!!\n");
fprintf(stderr, "Cannot wait\n");
exit(1);
}
}
#else
if (grntest_server_id) {
int ret, pstatus;
setsigalarm(20);
ret = waitpid(grntest_server_id, &pstatus, 0);
if (ret < 0) {
fprintf(stderr, "Cannot wait\n");
exit(1);
}
/*
else {
fprintf(stderr, "pstatus = %d\n", pstatus);
}
*/
alarm(0);
}
#endif /* WIN32 */
CRITICAL_SECTION_FIN(grntest_cs);
grn_obj_close(&context, &grntest_starttime);
grn_obj_close(&context, grntest_db);
grn_ctx_fin(&context);
grn_obj_close(&grntest_server_context, grn_ctx_db(&grntest_server_context));
grn_ctx_fin(&grntest_server_context);
grn_fin();
return exit_code;
}