22 | 22 |
#if defined(WIN32) && !defined(UNDER_CE)
|
23 | 23 |
# include <io.h>
|
24 | 24 |
# define write _write
|
25 | |
# define read _read
|
26 | 25 |
#endif
|
27 | 26 |
|
28 | 27 |
// unnamed semaphores are not implemented in this POSIX compatible UNIX system
|
|
71 | 70 |
int active_threads_cnt;
|
72 | 71 |
int pool;
|
73 | 72 |
char extra_thread;
|
|
73 |
int threads_limit;
|
|
74 |
char is_strict_limit;
|
74 | 75 |
char notify_on_begin;
|
75 | 76 |
int extra_threads_cnt;
|
76 | 77 |
int busy_threads;
|
|
84 | 85 |
char *host;
|
85 | 86 |
char *service;
|
86 | 87 |
struct addrinfo *hints;
|
87 | |
char extra;
|
88 | 88 |
char queued;
|
89 | 89 |
DNS_result *res;
|
90 | 90 |
} DNS_thread_arg;
|
|
102 | 102 |
queue *DNS_instances = NULL;
|
103 | 103 |
|
104 | 104 |
void DNS_on_thread_finish(Net_DNS_Native *self) {
|
105 | |
pthread_mutex_lock(&self->mutex);
|
106 | 105 |
if (--self->active_threads_cnt == 0) {
|
107 | 106 |
pthread_cond_signal(&self->cv);
|
108 | 107 |
}
|
109 | |
pthread_mutex_unlock(&self->mutex);
|
110 | 108 |
}
|
111 | 109 |
|
112 | 110 |
void *DNS_getaddrinfo(void *v_arg) {
|
|
121 | 119 |
if (self->notify_on_begin)
|
122 | 120 |
write(arg->res->fd1, "1", 1);
|
123 | 121 |
arg->res->gai_error = getaddrinfo(arg->host, arg->service, arg->hints, &arg->res->hostinfo);
|
|
122 |
#ifdef EAI_SYSTEM
|
124 | 123 |
if (arg->res->gai_error == EAI_SYSTEM)
|
125 | 124 |
arg->res->sys_error = errno;
|
|
125 |
#endif
|
126 | 126 |
|
127 | 127 |
pthread_mutex_lock(&self->mutex);
|
128 | 128 |
arg->res->arg = arg;
|
129 | |
if (arg->extra) self->extra_threads_cnt--;
|
|
129 |
if (!queued) self->extra_threads_cnt--;
|
130 | 130 |
write(arg->res->fd1, "2", 1);
|
|
131 |
if (!queued) DNS_on_thread_finish(self);
|
131 | 132 |
pthread_mutex_unlock(&self->mutex);
|
132 | 133 |
|
133 | |
if (!queued) DNS_on_thread_finish(self);
|
134 | 134 |
return NULL;
|
135 | 135 |
}
|
136 | 136 |
|
|
144 | 144 |
pthread_mutex_lock(&self->mutex);
|
145 | 145 |
void *arg = queue_shift(self->in_queue);
|
146 | 146 |
if (arg != NULL) self->busy_threads++;
|
|
147 |
else DNS_on_thread_finish(self);
|
147 | 148 |
pthread_mutex_unlock(&self->mutex);
|
148 | 149 |
|
149 | 150 |
if (arg == NULL) {
|
150 | 151 |
// this was request to quit thread
|
151 | |
break;
|
|
152 |
return NULL;
|
152 | 153 |
}
|
153 | 154 |
|
154 | 155 |
DNS_getaddrinfo(arg);
|
|
158 | 159 |
pthread_mutex_unlock(&self->mutex);
|
159 | 160 |
}
|
160 | 161 |
|
161 | |
DNS_on_thread_finish(self);
|
162 | 162 |
return NULL;
|
163 | 163 |
}
|
164 | 164 |
|
|
172 | 172 |
while (sem_wait(&self->semaphore) == 0) {
|
173 | 173 |
pthread_mutex_lock(&self->mutex);
|
174 | 174 |
void *arg = queue_shift(self->in_queue);
|
|
175 |
if (arg == NULL) DNS_on_thread_finish(self);
|
175 | 176 |
pthread_mutex_unlock(&self->mutex);
|
176 | 177 |
|
177 | 178 |
if (arg == NULL) {
|
178 | |
break;
|
|
179 |
return NULL;
|
179 | 180 |
}
|
180 | 181 |
|
181 | 182 |
DNS_getaddrinfo(arg);
|
|
184 | 185 |
if (!queue_size(self->in_queue) || (self->pool && self->busy_threads < self->pool)) {
|
185 | 186 |
// extra worker may stop if queue is empty or there is free worker from the pool
|
186 | 187 |
stop = 1;
|
|
188 |
DNS_on_thread_finish(self);
|
187 | 189 |
}
|
188 | 190 |
pthread_mutex_unlock(&self->mutex);
|
189 | 191 |
|
190 | 192 |
if (stop)
|
191 | |
break;
|
192 | |
}
|
193 | |
|
194 | |
DNS_on_thread_finish(self);
|
|
193 |
return NULL;
|
|
194 |
}
|
|
195 |
|
195 | 196 |
return NULL;
|
196 | 197 |
}
|
197 | 198 |
|
|
346 | 347 |
|
347 | 348 |
int i, rc;
|
348 | 349 |
self->pool = 0;
|
|
350 |
self->threads_limit = 0;
|
|
351 |
self->is_strict_limit = 0;
|
349 | 352 |
self->notify_on_begin = 0;
|
350 | 353 |
self->extra_thread = 0;
|
351 | 354 |
self->active_threads_cnt = 0;
|
|
368 | 371 |
else if (strEQ(opt, "extra_thread")) {
|
369 | 372 |
self->extra_thread = SvIV(ST(i+1));
|
370 | 373 |
}
|
|
374 |
else if (strEQ(opt, "threads_limit") || strEQ(opt, "threads_strict_limit")) {
|
|
375 |
self->threads_limit = SvIV(ST(i+1));
|
|
376 |
self->is_strict_limit = strEQ(opt, "threads_strict_limit");
|
|
377 |
}
|
371 | 378 |
else if (strEQ(opt, "notify_on_begin")) {
|
372 | 379 |
self->notify_on_begin = SvIV(ST(i+1));
|
373 | 380 |
}
|
|
416 | 423 |
#endif
|
417 | 424 |
}
|
418 | 425 |
|
419 | |
if (self->pool) {
|
|
426 |
if (self->pool || self->threads_limit) {
|
420 | 427 |
if (sem_init(&self->semaphore, 0, 0) != 0) {
|
421 | 428 |
warn("sem_init(): %s", strerror(errno));
|
422 | 429 |
goto FAIL;
|
423 | 430 |
}
|
424 | 431 |
sem_ok = 1;
|
425 | 432 |
|
426 | |
pthread_t tid;
|
427 | |
int j = 0;
|
428 | |
for (i=0; i<self->pool; i++) {
|
429 | |
rc = pthread_create(&tid, &self->thread_attrs, DNS_pool_worker, (void*)self);
|
430 | |
if (rc == 0) {
|
431 | |
self->active_threads_cnt++;
|
432 | |
j++;
|
|
433 |
if (self->pool) {
|
|
434 |
pthread_t tid;
|
|
435 |
int j = 0;
|
|
436 |
for (i=0; i<self->pool; i++) {
|
|
437 |
rc = pthread_create(&tid, &self->thread_attrs, DNS_pool_worker, (void*)self);
|
|
438 |
if (rc == 0) {
|
|
439 |
self->active_threads_cnt++;
|
|
440 |
j++;
|
|
441 |
}
|
|
442 |
else {
|
|
443 |
warn("Can't create thread #%d: %s", i+1, strerror(rc));
|
|
444 |
}
|
433 | 445 |
}
|
434 | |
else {
|
435 | |
warn("Can't create thread #%d: %s", i+1, strerror(rc));
|
|
446 |
|
|
447 |
if (j == 0) {
|
|
448 |
goto FAIL;
|
436 | 449 |
}
|
437 | |
}
|
438 | |
|
439 | |
if (j == 0) {
|
440 | |
goto FAIL;
|
441 | |
}
|
442 | |
|
443 | |
self->pool = j;
|
|
450 |
|
|
451 |
self->pool = j;
|
|
452 |
}
|
|
453 |
|
444 | 454 |
self->in_queue = queue_new();
|
445 | 455 |
}
|
446 | 456 |
|
|
476 | 486 |
#endif
|
477 | 487 |
if (socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fd) != 0)
|
478 | 488 |
croak("socketpair(): %s", strerror(errno));
|
479 | |
|
|
489 |
#ifdef FD_CLOEXEC
|
480 | 490 |
fcntl(fd[0], F_SETFD, FD_CLOEXEC);
|
481 | 491 |
fcntl(fd[1], F_SETFD, FD_CLOEXEC);
|
482 | |
|
|
492 |
#endif
|
483 | 493 |
char *service = SvOK(sv_service) ? SvPV_nolen(sv_service) : "";
|
484 | 494 |
struct addrinfo *hints = NULL;
|
485 | 495 |
|
|
537 | 547 |
arg->host = strlen(host) ? savepv(host) : NULL;
|
538 | 548 |
arg->service = strlen(service) ? savepv(service) : NULL;
|
539 | 549 |
arg->hints = hints;
|
540 | |
arg->extra = 0;
|
541 | 550 |
arg->queued = 0;
|
542 | 551 |
arg->res = res;
|
543 | 552 |
|
544 | 553 |
pthread_mutex_lock(&self->mutex);
|
545 | 554 |
DNS_free_timedout(self, 0);
|
546 | 555 |
bstree_put(self->fd_map, fd[0], res);
|
|
556 |
char allow_extra_worker = 1;
|
|
557 |
if (self->threads_limit && self->active_threads_cnt == (self->is_strict_limit ? self->threads_limit : self->threads_limit + queue_size(self->tout_queue))) {
|
|
558 |
allow_extra_worker = 0;
|
|
559 |
}
|
|
560 |
|
547 | 561 |
if (self->pool) {
|
548 | |
if (self->busy_threads == self->pool && (self->extra_thread || queue_size(self->tout_queue) > self->extra_threads_cnt)) {
|
549 | |
arg->extra = 1;
|
|
562 |
if (allow_extra_worker && self->busy_threads == self->pool && (self->extra_thread || queue_size(self->tout_queue) > self->extra_threads_cnt)) {
|
550 | 563 |
self->extra_threads_cnt++;
|
551 | 564 |
}
|
552 | 565 |
else {
|
553 | 566 |
arg->queued = 1;
|
554 | |
queue_push(self->in_queue, arg);
|
555 | |
sem_post(&self->semaphore);
|
556 | |
}
|
557 | |
}
|
558 | |
pthread_mutex_unlock(&self->mutex);
|
559 | |
|
560 | |
if (!self->pool || arg->extra) {
|
|
567 |
allow_extra_worker = 0;
|
|
568 |
}
|
|
569 |
}
|
|
570 |
|
|
571 |
if (arg->queued || self->threads_limit) {
|
|
572 |
arg->queued = 1;
|
|
573 |
queue_push(self->in_queue, arg);
|
|
574 |
sem_post(&self->semaphore);
|
|
575 |
}
|
|
576 |
pthread_mutex_unlock(&self->mutex);
|
|
577 |
|
|
578 |
if (allow_extra_worker) {
|
561 | 579 |
pthread_t tid;
|
562 | 580 |
|
563 | 581 |
pthread_mutex_lock(&self->mutex);
|
564 | |
int rc = pthread_create(&tid, &self->thread_attrs, DNS_getaddrinfo, (void *)arg);
|
565 | |
if (rc == 0) {
|
566 | |
++self->active_threads_cnt;
|
567 | |
pthread_mutex_unlock(&self->mutex);
|
568 | |
}
|
569 | |
else {
|
|
582 |
++self->active_threads_cnt;
|
|
583 |
pthread_mutex_unlock(&self->mutex);
|
|
584 |
|
|
585 |
int rc = self->threads_limit ?
|
|
586 |
pthread_create(&tid, &self->thread_attrs, DNS_extra_worker, (void *)self) :
|
|
587 |
pthread_create(&tid, &self->thread_attrs, DNS_getaddrinfo, (void *)arg);
|
|
588 |
|
|
589 |
if (rc != 0) {
|
|
590 |
pthread_mutex_lock(&self->mutex);
|
|
591 |
self->active_threads_cnt--;
|
570 | 592 |
pthread_mutex_unlock(&self->mutex);
|
571 | 593 |
if (arg->host) Safefree(arg->host);
|
572 | 594 |
if (arg->service) Safefree(arg->service);
|
|
606 | 628 |
SV *err = newSV(0);
|
607 | 629 |
sv_setiv(err, (IV)res->gai_error);
|
608 | 630 |
sv_setpv(err, res->gai_error ? gai_strerror(res->gai_error) : "");
|
609 | |
if (res->gai_error == EAI_SYSTEM)
|
|
631 |
if (res->sys_error)
|
610 | 632 |
sv_catpvf(err, " (%s)", strerror(res->sys_error));
|
611 | 633 |
SvIOK_on(err);
|
612 | 634 |
XPUSHs(sv_2mortal(err));
|