Codebase list gman / upstream/0.0.8 task.c
upstream/0.0.8

Tree @upstream/0.0.8 (Download .tar.gz)

task.c @upstream/0.0.8raw · history · blame

/************************ task.c ***************************/
/*********************** 1999.6.21 *************************/

#include "task.h"
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

struct _task_group {
	pthread_t thread;
	pthread_attr_t attr;
	pthread_mutex_t lock;
	int state;
	List * tasks;
};

struct _task {
	TaskGroup * task_group;
	pthread_mutex_t lock;
	int state;
	float priority;
	TaskRunFunc task_run_func;
	gpointer user_data;
	List * signals[2]; 
	//signal quenes, 1 for synchronous signals, 0 for unsynchronous signals
};

static void * task_group_running(TaskGroup *);
static void task_group_set_active(TaskGroup *);

TaskGroup * task_group_new()
{
	int retcode;
	TaskGroup * task_group = (TaskGroup*)g_malloc(sizeof(TaskGroup));
	task_group->tasks = new List;
	task_group->state = 0;
	pthread_attr_init(&(task_group->attr));
	pthread_attr_setdetachstate(&(task_group->attr), 1);
	pthread_mutex_init(&task_group->lock,NULL);
	//retcode = pthread_create(&task_group->thread, NULL, (void*(*)(void*))my_thread_running, (void *) task_group);
	//if (retcode != 0) g_error("my_thread_new:thread creation failed %d\n", retcode);
	return task_group;
}

Task * task_new(TaskGroup * task_group, float priority, TaskRunFunc task_func, gpointer data)
{
	int i,j,k;
	Task * task;
	g_return_val_if_fail((priority>0.0 && priority <1.0)&&(task_group != NULL)&&(task_func != NULL),NULL);

	task = (Task *)g_malloc(sizeof(Task));
	pthread_mutex_init(&task->lock,NULL);
	task->priority = priority;
	task->state = 0;
	task->task_group = task_group;
	task->task_run_func = task_func;
	task->user_data = data;
	task->signals[0] = new List;
	task->signals[1] = new List;

	pthread_mutex_lock(&task_group->lock);
	j = task_group->tasks->get_size();
	for(i = 0;i<j && priority>((Task*)task_group->tasks->get_item(i))->priority;i++);
	task_group->tasks->insert_item(i,(void*)task);
	pthread_mutex_unlock(&task_group->lock);
	
	return task;
}

void task_signal_send(Task * task, int signal)
{
	g_return_if_fail(task != NULL);
	pthread_mutex_lock(&task->lock);
	task->signals[signal&1]->add_item((void*)signal);
	pthread_mutex_unlock(&task->lock);
	task_group_set_active(task->task_group);
}

void task_set_active(Task * task)
{
	task_signal_send(task,TASK_START);
}

void task_set_stop(Task * task)
{
	task_signal_send(task,TASK_STOP);
}

//let the task itself to decide wether to stop or not
//of the task don't want to be wake up any more, it will return 0,
//otherwise 1.

static void task_group_set_active(TaskGroup * task_group)
{
	int retcode;
	pthread_mutex_lock(&task_group->lock);
	if(!task_group->state) {
		task_group->state  |= 1;
		retcode = pthread_create(&task_group->thread, &task_group->attr, (void*(*)(void*))task_group_running, (void *) task_group);
		if (retcode != 0) g_error("thread creation failed %d\n", retcode);
	}
	pthread_mutex_unlock(&task_group->lock);
}

static void * task_group_running(TaskGroup * task_group)
{
	int i,j;
	int have_task;
	int state,k;
	Task * task;
	
	do {
		have_task = 0;
		task = NULL;
		int flag;
		pthread_mutex_lock(&task_group->lock);
		j = task_group->tasks->get_size();
		for(i = 0; i<j && !have_task ;i++) {
			if((task = (Task*)task_group->tasks->get_item(i))->signals[0]->get_size()) {
				flag = ((int)task->signals[0]->get_item(0) & ~1) | (task->state & 1);
				task->signals[0]->delete_item(0);
				have_task++;
			}
			else if (task->state & TASK_RUNNING) {
				flag = task->state & TASK_RUNNING;
				have_task++;
			}
			else if (task->signals[1]->get_size()) {
				flag = ((int)task->signals[1]->get_item(1) & ~1);
				task->signals[1]->delete_item(0);
				have_task++;
			}
		}
		pthread_mutex_unlock(&task_group->lock);
		if(have_task) {
			//			printf("init state = %x\n",task->state);
			state = (*task->task_run_func)(flag,task->user_data);
			if(state) 
				task->state |= TASK_RUNNING;
			else
			task->state &= ~TASK_RUNNING;
			//			printf("return state = %x\n",task->state);
		}
	} while (have_task);
	pthread_mutex_lock(&task_group->lock);
	task_group->state &= ~TASK_RUNNING;
	pthread_mutex_unlock(&task_group->lock);
	return 0;
}