Codebase list tcmu / lintian-fixes/main tcmur_aio.c
lintian-fixes/main

Tree @lintian-fixes/main (Download .tar.gz)

tcmur_aio.c @lintian-fixes/mainraw · history · blame

/*
 * Copyright (c) 2017 Red Hat, Inc.
 *
 * This file is licensed to you under your choice of the GNU Lesser
 * General Public License, version 2.1 or any later version (LGPLv2.1 or
 * later), or the Apache License 2.0.
 */

#define _GNU_SOURCE
#include <errno.h>
#include <assert.h>
#include <stdint.h>
#include <pthread.h>

#include "ccan/list/list.h"

#include "libtcmu.h"
#include "libtcmu_priv.h"
#include "tcmur_device.h"
#include "tcmur_aio.h"
#include "tcmu_runner_priv.h"
#include "tcmu-runner.h"

struct tcmu_work {
	struct tcmu_device *dev;
	void *data;
	tcmu_work_fn_t work_fn;
	tcmu_done_fn_t done_fn;
	struct list_node entry;
};

static void _cleanup_mutex_lock(void *arg)
{
	pthread_mutex_unlock(arg);
}

void track_aio_request_start(struct tcmur_device *rdev)
{
	struct tcmu_track_aio *aio_track = &rdev->track_queue;

	pthread_cleanup_push(_cleanup_mutex_lock, (void *)&aio_track->track_lock);
	pthread_mutex_lock(&aio_track->track_lock);

	++aio_track->tracked_aio_ops;

	pthread_mutex_unlock(&aio_track->track_lock);
	pthread_cleanup_pop(0);
}

void track_aio_request_finish(struct tcmur_device *rdev, int *wake_up)
{
	struct tcmu_track_aio *aio_track = &rdev->track_queue;
	pthread_cond_t *cond;

	pthread_cleanup_push(_cleanup_mutex_lock, (void *)&aio_track->track_lock);
	pthread_mutex_lock(&aio_track->track_lock);

	assert(aio_track->tracked_aio_ops > 0);
	--aio_track->tracked_aio_ops;

	if (wake_up) {
		++aio_track->pending_wakeups;
		*wake_up = (aio_track->pending_wakeups == 1) ? 1 : 0;
	}

	if (!aio_track->tracked_aio_ops && aio_track->is_empty_cond) {
		cond = aio_track->is_empty_cond;
		aio_track->is_empty_cond = NULL;
		pthread_cond_signal(cond);
	}

	pthread_mutex_unlock(&aio_track->track_lock);
	pthread_cleanup_pop(0);
}

void track_aio_wakeup_finish(struct tcmur_device *rdev, int *wake_up)
{
	struct tcmu_track_aio *aio_track = &rdev->track_queue;

	pthread_cleanup_push(_cleanup_mutex_lock, (void *)&aio_track->track_lock);
	pthread_mutex_lock(&aio_track->track_lock);

	if (aio_track->pending_wakeups > 1) {
		aio_track->pending_wakeups = 1;
		*wake_up = 1;
	} else {
		assert(aio_track->pending_wakeups > 0);
		aio_track->pending_wakeups = 0;
		*wake_up = 0;
	}

	pthread_mutex_unlock(&aio_track->track_lock);
	pthread_cleanup_pop(0);
}

static void cleanup_empty_queue_wait(void *arg)
{
	struct tcmu_track_aio *aio_track = arg;

	pthread_cond_destroy(aio_track->is_empty_cond);
	aio_track->is_empty_cond = NULL;
	pthread_mutex_unlock(&aio_track->track_lock);
}

int aio_wait_for_empty_queue(struct tcmur_device *rdev)
{
	struct tcmu_track_aio *aio_track = &rdev->track_queue;
	pthread_cond_t cond;
	int ret;

	ret = pthread_cond_init(&cond, NULL);
	if (ret)
		return ret;

	pthread_cleanup_push(cleanup_empty_queue_wait, aio_track);
	pthread_mutex_lock(&aio_track->track_lock);

	if (!aio_track->tracked_aio_ops)
		goto unlock;

	tcmu_dev_dbg(rdev->dev, "waiting for %d commands\n",
		     rdev->track_queue.tracked_aio_ops);

	aio_track->is_empty_cond = &cond;
	ret = pthread_cond_wait(&cond, &aio_track->track_lock);
	aio_track->is_empty_cond = NULL;

unlock:
	pthread_mutex_unlock(&aio_track->track_lock);
	pthread_cleanup_pop(0);

	pthread_cond_destroy(&cond);
	return ret;
}

static void _cleanup_io_work(void *arg)
{
	free(arg);
}

static void *io_work_queue(void *arg)
{
	struct tcmu_device *dev = arg;
	struct tcmur_device *rdev = tcmu_dev_get_private(dev);
	struct tcmu_io_queue *io_wq = &rdev->work_queue;
	int ret;

	tcmu_set_thread_name("aio", dev);

	while (1) {
		struct tcmu_work *work;
		void *data;

		pthread_cleanup_push(_cleanup_mutex_lock, &io_wq->io_lock);
		pthread_mutex_lock(&io_wq->io_lock);

		while (list_empty(&io_wq->io_queue)) {
			pthread_cond_wait(&io_wq->io_cond,
					  &io_wq->io_lock);
		}

		work = list_first_entry(&io_wq->io_queue, struct tcmu_work,
					entry);
		list_del(&work->entry);

		pthread_mutex_unlock(&io_wq->io_lock);
		pthread_cleanup_pop(0);

		/* kick start I/O request */
		data = work->data;
		pthread_cleanup_push(_cleanup_io_work, work);

		ret = work->work_fn(work->dev, data);
		work->done_fn(dev, data, ret);

		pthread_cleanup_pop(1); /* cleanup work */
	}

	return NULL;
}

static int aio_queue(struct tcmu_device *dev, void *data, tcmu_work_fn_t work_fn,
		     tcmu_done_fn_t done_fn)
{
	struct tcmu_work *work;
	struct tcmur_device *rdev = tcmu_dev_get_private(dev);
	struct tcmu_io_queue *io_wq = &rdev->work_queue;

	work = malloc(sizeof(*work));
	if (!work)
		return TCMU_STS_NO_RESOURCE;

	work->work_fn = work_fn;
	work->done_fn = done_fn;
	work->dev = dev;
	work->data = data;
	list_node_init(&work->entry);

	/* cleanup push/pop not _really_ required here atm */
	pthread_cleanup_push(_cleanup_mutex_lock, &io_wq->io_lock);
	pthread_mutex_lock(&io_wq->io_lock);

	list_add_tail(&io_wq->io_queue, &work->entry);
	pthread_cond_signal(&io_wq->io_cond); // TODO: conditional

	pthread_mutex_unlock(&io_wq->io_lock);
	pthread_cleanup_pop(0);

	return TCMU_STS_ASYNC_HANDLED;
}

int aio_request_schedule(struct tcmu_device *dev, void *data,
			 tcmu_work_fn_t work_fn, tcmu_done_fn_t done_fn)
{
	struct tcmur_handler *rhandler = tcmu_get_runner_handler(dev);
	int ret;

	if (!rhandler->nr_threads) {
		ret = work_fn(dev, data);
		if (!ret)
			ret = TCMU_STS_ASYNC_HANDLED;
	} else {
		ret = aio_queue(dev, data, work_fn, done_fn);
	}

	return ret;
}

int setup_aio_tracking(struct tcmur_device *rdev)
{
	int ret;
	struct tcmu_track_aio *aio_track = &rdev->track_queue;

	aio_track->pending_wakeups = 0;
	aio_track->tracked_aio_ops = 0;
	ret = pthread_mutex_init(&aio_track->track_lock, NULL);
	if (ret != 0) {
		return -ret;
	}

	return 0;
}

void cleanup_aio_tracking(struct tcmur_device *rdev)
{
	int ret;
	struct tcmu_track_aio *aio_track = &rdev->track_queue;

	assert(aio_track->tracked_aio_ops == 0);

	ret = pthread_mutex_destroy(&aio_track->track_lock);
	if (ret != 0) {
		tcmu_err("failed to destroy track lock\n");
	}
}

void cleanup_io_work_queue_threads(struct tcmu_device *dev)
{
	struct tcmur_handler *r_handler = tcmu_get_runner_handler(dev);
	struct tcmur_device *rdev = tcmu_dev_get_private(dev);
	struct tcmu_io_queue *io_wq = &rdev->work_queue;
	int i, nr_threads = r_handler->nr_threads;

	if (!io_wq->io_wq_threads) {
		return;
	}

	for (i = 0; i < nr_threads; i++) {
		if (io_wq->io_wq_threads[i]) {
			tcmu_thread_cancel(io_wq->io_wq_threads[i]);
		}
	}
}

int setup_io_work_queue(struct tcmu_device *dev)
{
	struct tcmur_handler *r_handler = tcmu_get_runner_handler(dev);
	struct tcmur_device *rdev = tcmu_dev_get_private(dev);
	struct tcmu_io_queue *io_wq = &rdev->work_queue;
	int ret, i, nr_threads = r_handler->nr_threads;

	if (!nr_threads)
		return 0;

	list_head_init(&io_wq->io_queue);

	ret = pthread_mutex_init(&io_wq->io_lock, NULL);
	if (ret != 0) {
		goto out;
	}
	ret = pthread_cond_init(&io_wq->io_cond, NULL);
	if (ret != 0) {
		goto cleanup_lock;
	}

	/* TODO: Allow user to override device defaults */
	io_wq->io_wq_threads = calloc(nr_threads, sizeof(pthread_t));
	if (!io_wq->io_wq_threads) {
		ret = ENOMEM;
		goto cleanup_cond;
	}

	for (i = 0; i < nr_threads; i++) {
		ret = pthread_create(&io_wq->io_wq_threads[i], NULL,
				      io_work_queue, dev);
		if (ret != 0) {
			goto cleanup_threads;
		}
	}

	return 0;

cleanup_threads:
	cleanup_io_work_queue_threads(dev);
	free(io_wq->io_wq_threads);
cleanup_cond:
	pthread_cond_destroy(&io_wq->io_cond);
cleanup_lock:
	pthread_mutex_destroy(&io_wq->io_lock);
out:
	return -ret;
}

void cleanup_io_work_queue(struct tcmu_device *dev, bool cancel)
{
	struct tcmur_device *rdev = tcmu_dev_get_private(dev);
	struct tcmu_io_queue *io_wq = &rdev->work_queue;
	int ret;

	if (!io_wq->io_wq_threads) {
		return;
	}

	if (cancel) {
		cleanup_io_work_queue_threads(dev);
	}

	/*
	 * Note that there's no need to drain ->io_queue at this point
	 * as it _should_ be empty (target layer would call this path
	 * when no commands are running - thanks Mike).
	 *
	 * Out of tree handlers which do not use the aio code are not
	 * supported in this path.
	 */

	ret = pthread_mutex_destroy(&io_wq->io_lock);
	if (ret != 0) {
		tcmu_err("failed to destroy io workqueue lock\n");
	}

	ret = pthread_cond_destroy(&io_wq->io_cond);
	if (ret != 0) {
		tcmu_err("failed to destroy io workqueue cond\n");
	}

	free(io_wq->io_wq_threads);
}