Codebase list fish / HEAD src / iothread.cpp
HEAD

Tree @HEAD (Download .tar.gz)

iothread.cpp @HEADraw · history · blame

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
#include "config.h"  // IWYU pragma: keep

#include "iothread.h"

#include <limits.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <queue>
#include <thread>

#include "common.h"
#include "fds.h"
#include "flog.h"
#include "global_safety.h"
#include "wutil.h"

// We just define a thread limit of 1024.
// On all systems I've seen the limit is higher,
// but on some (like linux with glibc) the setting for _POSIX_THREAD_THREADS_MAX is 64,
// which is too low, even tho the system can handle more than 64 threads.
#define IO_MAX_THREADS 1024

// iothread has a thread pool. Sometimes there's no work to do, but extant threads wait around for a
// while (on a condition variable) in case new work comes soon. However condition variables are not
// properly instrumented with Thread Sanitizer, so it fails to recognize when our mutex is locked.
// See https://github.com/google/sanitizers/issues/1259
// When using TSan, disable the wait-around feature.
#ifdef FISH_TSAN_WORKAROUNDS
#define IO_WAIT_FOR_WORK_DURATION_MS 0
#else
#define IO_WAIT_FOR_WORK_DURATION_MS 500
#endif

using void_function_t = std::function<void()>;

struct work_request_t {
    void_function_t handler;
    void_function_t completion;

    work_request_t(void_function_t &&f, void_function_t &&comp)
        : handler(std::move(f)), completion(std::move(comp)) {}

    // Move-only
    work_request_t &operator=(const work_request_t &) = delete;
    work_request_t &operator=(work_request_t &&) = default;
    work_request_t(const work_request_t &) = delete;
    work_request_t(work_request_t &&) = default;
};

struct main_thread_request_t {
    // The function to execute.
    void_function_t func;

    // Set by the main thread when the work is done.
    std::promise<void> done{};

    explicit main_thread_request_t(void_function_t &&f) : func(f) {}

    // No moving OR copying
    // main_thread_requests are always stack allocated, and we deal in pointers to them
    void operator=(const main_thread_request_t &) = delete;
    void operator=(main_thread_request_t &&) = delete;
    main_thread_request_t(const main_thread_request_t &) = delete;
    main_thread_request_t(main_thread_request_t &&) = delete;
};

struct thread_pool_t {
    struct data_t {
        /// The queue of outstanding, unclaimed requests.
        std::queue<work_request_t> request_queue{};

        /// The number of threads that exist in the pool.
        size_t total_threads{0};

        /// The number of threads which are waiting for more work.
        size_t waiting_threads{0};

        /// A flag indicating we should not process new requests.
        bool drain{false};
    };

    /// Data which needs to be atomically accessed.
    owning_lock<data_t> req_data{};

    /// The condition variable used to wake up waiting threads.
    /// Note this is tied to data's lock.
    std::condition_variable queue_cond{};

    /// The minimum and maximum number of threads.
    /// Here "minimum" means threads that are kept waiting in the pool.
    /// Note that the pool is initially empty and threads may decide to exit based on a time wait.
    const size_t soft_min_threads;
    const size_t max_threads;

    /// Construct with a soft minimum and maximum thread count.
    thread_pool_t(size_t soft_min_threads, size_t max_threads)
        : soft_min_threads(soft_min_threads), max_threads(max_threads) {}

    /// Enqueue a new work item onto the thread pool.
    /// The function \p func will execute in one of the pool's threads.
    /// \p completion will run on the main thread, if it is not missing.
    /// If \p cant_wait is set, disrespect the thread limit, because extant threads may
    /// want to wait for new threads.
    int perform(void_function_t &&func, void_function_t &&completion, bool cant_wait);

   private:
    /// The worker loop for this thread.
    void *run();

    /// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
    /// reducing the active thread count.
    /// This runs in the background thread.
    maybe_t<work_request_t> dequeue_work_or_commit_to_exit();

    /// Trampoline function for pthread_spawn compatibility.
    static void *run_trampoline(void *vpool);

    /// Attempt to spawn a new pthread.
    bool spawn() const;

    /// No copying or moving.
    thread_pool_t(const thread_pool_t &) = delete;
    thread_pool_t(thread_pool_t &&) = delete;
    void operator=(const thread_pool_t &) = delete;
    void operator=(thread_pool_t &&) = delete;
};

/// The thread pool for "iothreads" which are used to lift I/O off of the main thread.
/// These are used for completions, etc.
/// Leaked to avoid shutdown dtor registration (including tsan).
static thread_pool_t &s_io_thread_pool = *(new thread_pool_t(1, IO_MAX_THREADS));

/// A queue of "things to do on the main thread."
struct main_thread_queue_t {
    // Functions to invoke as the completion callback from iothread_perform.
    std::vector<void_function_t> completions;

    // iothread_perform_on_main requests.
    // Note this contains pointers to structs that are stack-allocated on the requesting thread.
    std::vector<main_thread_request_t *> requests;

    /// Transfer ownership of ourselves to a new queue and return it.
    /// 'this' is left empty.
    main_thread_queue_t take() {
        main_thread_queue_t result;
        std::swap(result.completions, this->completions);
        std::swap(result.requests, this->requests);
        return result;
    }

    // Moving is allowed, but not copying.
    main_thread_queue_t() = default;
    main_thread_queue_t(main_thread_queue_t &&) = default;
    main_thread_queue_t &operator=(main_thread_queue_t &&) = default;
    main_thread_queue_t(const main_thread_queue_t &) = delete;
    void operator=(const main_thread_queue_t &) = delete;
};
static owning_lock<main_thread_queue_t> s_main_thread_queue;

/// \return the signaller for completions and main thread requests.
static fd_event_signaller_t &get_notify_signaller() {
    // Leaked to avoid shutdown dtors.
    static fd_event_signaller_t *s_signaller = new fd_event_signaller_t();
    return *s_signaller;
}

/// Dequeue a work item (perhaps waiting on the condition variable), or commit to exiting by
/// reducing the active thread count.
maybe_t<work_request_t> thread_pool_t::dequeue_work_or_commit_to_exit() {
    auto data = this->req_data.acquire();
    // If the queue is empty, check to see if we should wait.
    // We should wait if our exiting would drop us below the soft min.
    if (data->request_queue.empty() && data->total_threads == this->soft_min_threads &&
        IO_WAIT_FOR_WORK_DURATION_MS > 0) {
        data->waiting_threads += 1;
        this->queue_cond.wait_for(data.get_lock(),
                                  std::chrono::milliseconds(IO_WAIT_FOR_WORK_DURATION_MS));
        data->waiting_threads -= 1;
    }

    // Now that we've perhaps waited, see if there's something on the queue.
    maybe_t<work_request_t> result{};
    if (!data->request_queue.empty()) {
        result = std::move(data->request_queue.front());
        data->request_queue.pop();
    }
    // If we are returning none, then ensure we balance the thread count increment from when we were
    // created. This has to be done here in this awkward place because we've already committed to
    // exiting - we will never pick up more work. So we need to ensure we decrement the thread count
    // while holding the lock as we are effectively exited.
    if (!result) {
        data->total_threads -= 1;
    }
    return result;
}

static void enqueue_thread_result(void_function_t req) {
    s_main_thread_queue.acquire()->completions.push_back(std::move(req));
    get_notify_signaller().post();
}

static void *this_thread() { return (void *)(intptr_t)pthread_self(); }

void *thread_pool_t::run() {
    while (auto req = dequeue_work_or_commit_to_exit()) {
        FLOGF(iothread, L"pthread %p got work", this_thread());

        // Perform the work
        req->handler();

        // If there's a completion handler, we have to enqueue it on the result queue.
        // Note we're using std::function's weirdo operator== here
        if (req->completion != nullptr) {
            // Enqueue the result, and tell the main thread about it.
            enqueue_thread_result(std::move(req->completion));
        }
    }
    FLOGF(iothread, L"pthread %p exiting", this_thread());
    return nullptr;
}

void *thread_pool_t::run_trampoline(void *pool) {
    assert(pool && "No thread pool given");
    return static_cast<thread_pool_t *>(pool)->run();
}

/// Spawn another thread. No lock is held when this is called.
bool thread_pool_t::spawn() const {
    return make_detached_pthread(&run_trampoline, const_cast<thread_pool_t *>(this));
}

int thread_pool_t::perform(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
    assert(func && "Missing function");
    // Note we permit an empty completion.
    struct work_request_t req(std::move(func), std::move(completion));
    int local_thread_count = -1;
    auto &pool = s_io_thread_pool;
    bool spawn_new_thread = false;
    bool wakeup_thread = false;
    {
        // Lock around a local region.
        auto data = pool.req_data.acquire();
        data->request_queue.push(std::move(req));
        FLOGF(iothread, L"enqueuing work item (count is %lu)", data->request_queue.size());
        if (data->drain) {
            // Do nothing here.
        } else if (data->waiting_threads >= data->request_queue.size()) {
            // There's enough waiting threads, wake one up.
            wakeup_thread = true;
        } else if (cant_wait || data->total_threads < pool.max_threads) {
            // No threads are waiting but we can or must spawn a new thread.
            data->total_threads += 1;
            spawn_new_thread = true;
        }
        local_thread_count = data->total_threads;
    }

    // Kick off the thread if we decided to do so.
    if (wakeup_thread) {
        FLOGF(iothread, L"notifying a thread", this_thread());
        pool.queue_cond.notify_one();
    }
    if (spawn_new_thread) {
        // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
        // unlikely that they are all on the verge of exiting, so one is likely to be ready to
        // handle extant requests. So we can ignore failure with some confidence.
        if (this->spawn()) {
            FLOGF(iothread, L"pthread spawned");
        } else {
            // We failed to spawn a thread; decrement the thread count.
            pool.req_data.acquire()->total_threads -= 1;
        }
    }
    return local_thread_count;
}

void iothread_perform_impl(void_function_t &&func, void_function_t &&completion, bool cant_wait) {
    ASSERT_IS_MAIN_THREAD();
    ASSERT_IS_NOT_FORKED_CHILD();
    s_io_thread_pool.perform(std::move(func), std::move(completion), cant_wait);
}

int iothread_port() { return get_notify_signaller().read_fd(); }

static bool iothread_wait_for_main_requests(long timeout_usec) {
    const long usec_per_sec = 1000000;
    struct timeval tv;
    tv.tv_sec = timeout_usec / usec_per_sec;
    tv.tv_usec = timeout_usec % usec_per_sec;
    const int fd = iothread_port();
    fd_set fds;
    FD_ZERO(&fds);
    FD_SET(fd, &fds);
    int ret = select(fd + 1, &fds, nullptr, nullptr, &tv);
    return ret > 0;
}

void iothread_service_main_with_timeout(long timeout_usec) {
    if (iothread_wait_for_main_requests(timeout_usec)) {
        iothread_service_main();
    }
}

/// At the moment, this function is only used in the test suite and in a
/// drain-all-threads-before-fork compatibility mode that no architecture requires, so it's OK that
/// it's terrible.
int iothread_drain_all() {
    ASSERT_IS_MAIN_THREAD();
    ASSERT_IS_NOT_FORKED_CHILD();

    int thread_count;
    auto &pool = s_io_thread_pool;
    // Set the drain flag.
    {
        auto data = pool.req_data.acquire();
        assert(!data->drain && "Should not be draining already");
        data->drain = true;
        thread_count = data->total_threads;
    }

    // Wake everyone up.
    pool.queue_cond.notify_all();

    double now = timef();

    // Nasty polling via select().
    while (pool.req_data.acquire()->total_threads > 0) {
        iothread_service_main_with_timeout(1000);
    }

    // Clear the drain flag.
    // Even though we released the lock, nobody should have added a new thread while the drain flag
    // is set.
    {
        auto data = pool.req_data.acquire();
        assert(data->total_threads == 0 && "Should be no threads");
        assert(data->drain && "Should be draining");
        data->drain = false;
    }

    double after = timef();
    FLOGF(iothread, "Drained %d thread(s) in %.02f msec", thread_count, 1000 * (after - now));
    return thread_count;
}

// Service the main thread queue, by invoking any functions enqueued for the main thread.
void iothread_service_main() {
    ASSERT_IS_MAIN_THREAD();
    // Note the order here is important: we must consume events before handling requests, as posting
    // uses the opposite order.
    (void)get_notify_signaller().try_consume();

    // Move the queue to a local variable.
    // Note the s_main_thread_queue lock is not held after this.
    main_thread_queue_t queue = s_main_thread_queue.acquire()->take();

    // Perform each completion in order.
    for (const void_function_t &func : queue.completions) {
        // ensure we don't invoke empty functions, that raises an exception
        if (func) func();
    }

    // Perform each main thread request. Note we are NOT responsible for deleting these. They are
    // stack allocated in their respective threads!
    for (main_thread_request_t *req : queue.requests) {
        req->func();
        req->done.set_value();
    }
}

void iothread_perform_on_main(void_function_t &&func) {
    if (is_main_thread()) {
        func();
        return;
    }

    // Make a new request. Note we are synchronous, so this can be stack allocated!
    main_thread_request_t req(std::move(func));

    // Append it. Ensure we don't hold the lock after.
    s_main_thread_queue.acquire()->requests.push_back(&req);

    // Tell the signaller and then wait until our future is set.
    get_notify_signaller().post();
    req.done.get_future().wait();
}

bool make_detached_pthread(void *(*func)(void *), void *param) {
    // The spawned thread inherits our signal mask. Temporarily block signals, spawn the thread, and
    // then restore it. But we must not block SIGBUS, SIGFPE, SIGILL, or SIGSEGV; that's undefined
    // (#7837). Conservatively don't try to mask SIGKILL or SIGSTOP either; that's ignored on Linux
    // but maybe has an effect elsewhere.
    sigset_t new_set, saved_set;
    sigfillset(&new_set);
    sigdelset(&new_set, SIGILL);   // bad jump
    sigdelset(&new_set, SIGFPE);   // divide by zero
    sigdelset(&new_set, SIGBUS);   // unaligned memory access
    sigdelset(&new_set, SIGSEGV);  // bad memory access
    sigdelset(&new_set, SIGSTOP);  // unblockable
    sigdelset(&new_set, SIGKILL);  // unblockable
    DIE_ON_FAILURE(pthread_sigmask(SIG_BLOCK, &new_set, &saved_set));

    // Spawn a thread. If this fails, it means there's already a bunch of threads; it is very
    // unlikely that they are all on the verge of exiting, so one is likely to be ready to handle
    // extant requests. So we can ignore failure with some confidence.
    pthread_t thread = 0;
    int err = pthread_create(&thread, nullptr, func, param);
    if (err == 0) {
        // Success, return the thread.
        FLOGF(iothread, "pthread %p spawned", (void *)(intptr_t)thread);
        DIE_ON_FAILURE(pthread_detach(thread));
    } else {
        perror("pthread_create");
    }
    // Restore our sigmask.
    DIE_ON_FAILURE(pthread_sigmask(SIG_SETMASK, &saved_set, nullptr));
    return err == 0;
}

using void_func_t = std::function<void(void)>;

static void *func_invoker(void *param) {
    // Acquire a thread id for this thread.
    (void)thread_id();
    auto vf = static_cast<void_func_t *>(param);
    (*vf)();
    delete vf;
    return nullptr;
}

bool make_detached_pthread(void_func_t &&func) {
    // Copy the function into a heap allocation.
    auto vf = new void_func_t(std::move(func));
    if (make_detached_pthread(func_invoker, vf)) {
        return true;
    }
    // Thread spawning failed, clean up our heap allocation.
    delete vf;
    return false;
}

static uint64_t next_thread_id() {
    // Note 0 is an invalid thread id.
    // Note fetch_add is a CAS which returns the value *before* the modification.
    static std::atomic<uint64_t> s_last_thread_id{};
    uint64_t res = 1 + s_last_thread_id.fetch_add(1, std::memory_order_relaxed);
    return res;
}

uint64_t thread_id() {
    static FISH_THREAD_LOCAL uint64_t tl_tid = next_thread_id();
    return tl_tid;
}

// Debounce implementation note: we would like to enqueue at most one request, except if a thread
// hangs (e.g. on fs access) then we do not want to block indefinitely; such threads are called
// "abandoned". This is implemented via a monotone uint64 counter, called a token.
// Every time we spawn a thread, increment the token. When the thread is completed, it compares its
// token to the active token; if they differ then this thread was abandoned.
struct debounce_t::impl_t {
    // Synchronized data from debounce_t.
    struct data_t {
        // The (at most 1) next enqueued request, or none if none.
        maybe_t<work_request_t> next_req{};

        // The token of the current non-abandoned thread, or 0 if no thread is running.
        uint64_t active_token{0};

        // The next token to use when spawning a thread.
        uint64_t next_token{1};

        // The start time of the most recently run thread spawn, or request (if any).
        std::chrono::time_point<std::chrono::steady_clock> start_time{};
    };
    owning_lock<data_t> data{};

    /// Run an iteration in the background, with the given thread token.
    /// \return true if we handled a request, false if there were none.
    bool run_next(uint64_t token);
};

bool debounce_t::impl_t::run_next(uint64_t token) {
    assert(token > 0 && "Invalid token");
    // Note we are on a background thread.
    maybe_t<work_request_t> req;
    {
        auto d = data.acquire();
        if (d->next_req) {
            // The value was dequeued, we are going to execute it.
            req = d->next_req.acquire();
            d->start_time = std::chrono::steady_clock::now();
        } else {
            // There is no request. If we are active, mark ourselves as no longer running.
            if (token == d->active_token) {
                d->active_token = 0;
            }
            return false;
        }
    }

    assert(req && req->handler && "Request should have value");
    req->handler();
    if (req->completion) {
        enqueue_thread_result(std::move(req->completion));
    }
    return true;
}

uint64_t debounce_t::perform_impl(std::function<void()> handler, std::function<void()> completion) {
    uint64_t active_token{0};
    bool spawn{false};
    // Local lock.
    {
        auto d = impl_->data.acquire();
        d->next_req = work_request_t{std::move(handler), std::move(completion)};
        // If we have a timeout, and our running thread has exceeded it, abandon that thread.
        if (d->active_token && timeout_msec_ > 0 &&
            std::chrono::steady_clock::now() - d->start_time >
                std::chrono::milliseconds(timeout_msec_)) {
            // Abandon this thread by marking nothing as active.
            d->active_token = 0;
        }
        if (!d->active_token) {
            // We need to spawn a new thread.
            // Mark the current time so that a new request won't immediately abandon us.
            spawn = true;
            d->active_token = d->next_token++;
            d->start_time = std::chrono::steady_clock::now();
        }
        active_token = d->active_token;
        assert(active_token && "Something should be active");
    }
    if (spawn) {
        // Equip our background thread with a reference to impl, to keep it alive.
        auto impl = impl_;
        iothread_perform([=] {
            while (impl->run_next(active_token))
                ;  // pass
        });
    }
    return active_token;
}

debounce_t::debounce_t(long timeout_msec)
    : timeout_msec_(timeout_msec), impl_(std::make_shared<impl_t>()) {}
debounce_t::~debounce_t() = default;