#include "thread.h"
#if INTERFACE
#include <stdint.h>
#include <stddef.h>
#include <stdnoreturn.h>
typedef int tls_key;
#define TLS_MAX 32
/**
* \brief Representation of a kernel thread.
*/
struct thread_t {
/* Runtime data */
/** Thread local storage pointers */
void * tls[TLS_MAX];
/** Architecture dependent context */
arch_context_t context;
/** Process associated with this thread */
process_t * process;
/* Thread CPU usage */
int acct;
struct {
timerspec_t tstart;
timerspec_t tlen;
} accts[64];
timerspec_t period;
timerspec_t usage;
timerspec_t preempt;
/** Current thread state */
tstate state;
/** Thread priority */
tpriority priority;
/** Set if the thread has been interrupted */
int interrupted;
/** If set, indicates which interrupt_monitor_t we're waiting for */
interrupt_monitor_t * waitingfor;
/** If set, indicates which interrupt_monitor_t we're sleeping on */
interrupt_monitor_t * sleepingon;
/** Thread name */
char * name;
/** Thread lock */
monitor_t lock[1];
/** Return value for thread_join */
void * retval;
/** Thread queue next thread */
thread_t *next;
/** Thread queue prev thread */
thread_t *prev;
};
/** Thread state */
enum tstate {
/** Thread is new */
THREAD_NEW,
/** Thread is runnable */
THREAD_RUNNABLE,
/** Thread is running */
THREAD_RUNNING,
/** Thread is sleeping */
THREAD_SLEEPING,
/** Thread is terminated */
THREAD_TERMINATED
};
/** Thread priority */
enum tpriority {
/** Thread interrupt priority */
THREAD_INTERRUPT = 0,
/** Thread normal priority */
THREAD_NORMAL,
/** Thread idle priority */
THREAD_IDLE,
/** Thread interrupt priority count */
THREAD_PRIORITIES
};
#ifndef barrier
#define barrier() asm volatile("": : :"memory")
#endif
typedef void * (*thread_func_t)(void * arg);
struct thread_pool_t {
queue_t * queue;
thread_t * thread;
};
#endif
int preempt;
static tls_key tls_next = 1;
/**
* Get the next TLS key
* \return Next key
*/
int tls_get_key()
{
return arch_atomic_postinc(&tls_next);
}
/**
* Set the thread local data given by the key.
*
* \arg key TLS key retrieved using \ref tls_get_key
* \arg p Pointer value
*/
void tls_set(int key, void * p)
{
thread_t * thread = arch_get_thread();
check_not_null(thread, "Unable to get thread");
check_int_bounds(key, 1, TLS_MAX-1, "TLS key out of bounds");
thread->tls[key] = p;
}
/**
* Get the thread local data given by the key.
*
* \arg key TLS key retrieved using \ref tls_get_key
* \return Pointer value
*/
void * tls_get(int key)
{
thread_t * thread = arch_get_thread();
check_not_null(thread, "Unable to get thread");
check_int_bounds(key, 1, TLS_MAX-1, "TLS key out of bounds");
return thread->tls[key];
}
static void thread_mark(void * p);
static void thread_finalize(void * p);
static slab_type_t threads[1] = {SLAB_TYPE(sizeof(thread_t), thread_mark, thread_finalize)};
static spin_t queuelock;
/**
* Lock scheduler
*/
void scheduler_lock()
{
spin_lock(&queuelock);
}
static void scheduler_unlock()
{
spin_unlock(&queuelock);
}
/**
* Put the given thread on the head of the given queue, in the given state.
* \arg queue Current queue
* \arg thread Thread to queue
* \arg state Thread state
* \return New queue head
*/
thread_t * thread_prequeue(thread_t * queue, thread_t * thread, tstate state)
{
if (0 == thread) {
thread = arch_get_thread();
}
thread->state = state;
LIST_PREPEND(queue, thread);
return queue;
}
/**
* Put the given thread on tail of the given queue, in the given state.
* \arg queue Current queue
* \arg thread Thread to queue
* \arg state Thread state
* \return New queue head
*/
thread_t * thread_queue(thread_t * queue, thread_t * thread, tstate state)
{
if (0 == thread) {
thread = arch_get_thread();
}
thread->state = state;
LIST_APPEND(queue, thread);
return queue;
}
/* Simple RR scheduler */
static GCROOT thread_t * queue[THREAD_PRIORITIES];
/**
* Pre-empt the current thread, putting the thread onto the head of the run queue
* \return 1 if the thread was pre-empted
*/
int thread_preempt()
{
thread_t * this = arch_get_thread();
tpriority priority = this->priority;
scheduler_lock();
queue[priority] = thread_prequeue(queue[priority], this, THREAD_RUNNABLE);
return thread_schedule();
}
/**
* Pre-empt the current thread, putting the thread onto the head of the run queue
* \return 1 if the thread was pre-empted
*/
int thread_yield()
{
thread_t * this = arch_get_thread();
tpriority priority = this->priority;
scheduler_lock();
queue[priority] = thread_queue(queue[priority], this, THREAD_RUNNABLE);
return thread_schedule();
}
/**
* Mark given thread as interrupted
*/
void thread_interrupt(thread_t * thread)
{
thread->interrupted = 1;
}
/**
* Check if the given thread has been interrupted
* \arg thread Thread to check for interruption. If 0, check current thread
* \return true (not 0) if interrupted
*/
int thread_isinterrupted(thread_t * thread)
{
if (0 == thread) {
thread = arch_get_thread();
}
return thread->interrupted;
}
/**
* Check if the current thread has been interrupted, and reset the interrupt flag
* \return true (not 0) if interrupted
*/
int thread_interrupted()
{
thread_t * thread = arch_get_thread();
if (thread_isinterrupted(thread)) {
thread->interrupted = 0;
return 1;
}
return 0;
}
/**
* Resume the given thread
* \arg thread Thread to resume
*/
void thread_resume(thread_t * thread)
{
tpriority priority = thread->priority;
scheduler_lock();
queue[priority] = thread_queue(queue[priority], thread, THREAD_RUNNABLE);
scheduler_unlock();
/* Check for pre-emption */
if (arch_get_thread()->priority > priority) {
preempt = 1;
}
}
/**
* Schedule the next thread
* \return 1 if a new thread was scheduled
*/
int thread_schedule()
{
while(1) {
int i;
for(i=0; i<THREAD_PRIORITIES; i++) {
if (queue[i]) {
thread_t * current = arch_get_thread();
thread_t * next = queue[i];
LIST_DELETE(queue[i], next);
scheduler_unlock();
if (arch_get_thread() != next) {
/* Thread is changing, do accounting and switch to next */
current->accts[current->acct].tlen = timer_uptime(1) - current->accts[current->acct].tstart;
current->acct++;
if (sizeof(current->accts)/sizeof(current->accts[0]) == current->acct) {
current->acct = 0;
}
arch_thread_switch(next);
current->accts[current->acct].tstart = timer_uptime(1);
/* By default, preempt after 100ms */
current->preempt = current->accts[current->acct].tstart + 100000;
return 1;
} else {
/* Restore thread state to running */
current->state = THREAD_RUNNING;
return 0;
}
}
}
// kernel_printk("Empty run queue!\n");
scheduler_unlock();
arch_idle();
scheduler_lock();
}
}
/**
* Get the name of the given thread
* \arg thread Thread, or current thread if 0
* \return Thread name, if set
*/
char * thread_get_name(thread_t * thread)
{
if (0 == thread) {
thread = arch_get_thread();
}
if (0 == thread->name) {
thread->name = "Anonymous";
}
return thread->name;
}
/**
* Set the name of the given thread
* \arg thread Thread, or current thread if 0
* \arg name New thread name
*/
void thread_set_name(thread_t * thread, char * name)
{
if (0 == thread) {
thread = arch_get_thread();
}
thread->name = name;
}
static spin_t allthreadslock = ATOMIC_FLAG_INIT;
static GCROOT map_t * allthreads = 0;
static void thread_track(thread_t * thread, int add)
{
SPIN_AUTOLOCK(&allthreadslock) {
if (0 == allthreads) {
/* All threads */
allthreads = tree_new(0, TREE_TREAP);
}
if (add) {
if (map_putpp(allthreads, thread, thread)) {
kernel_panic("Adding existing thread!");
}
} else {
if (0 == map_removepp(allthreads, thread)) {
kernel_panic("Removing non-existent thread!");
}
}
}
}
/**
* Spawn a new thread
* \arg f Function into which the new thread will run
* \arg arg Argument to new thread
* \return thread pointer to the new thread
*/
thread_t * thread_spawn(thread_func_t f, void * arg)
{
thread_t * thread = thread_fork();
if (thread) {
return thread;
}
/* Child thread */
void * retval = f(arg);
thread_exit(retval);
}
/**
* Fork the current thread, into a new thread
*
* Fork the current thread, returning the new thread pointer to the
* creator thread, and 0 to the created thread.
*
* The created thread is a copy of the creator thread, with the same call chain.
*
* Automatic variables that contain pointers to locations in the creator stack are
* transformed to point to the equivalent locations in the created stack.
* \return thread pointer to the new thread in the creator, or 0 in the new thread
*/
thread_t * thread_fork()
{
thread_t * this = arch_get_thread();
thread_t * thread = slab_calloc(threads);
thread->priority = this->priority;
thread->process = this->process;
char buf[32];
snprintf( buf, sizeof(buf), "Child of %p", this);
thread_set_name(thread, strndup(buf, sizeof(buf)));
if (0 == arch_thread_fork(thread)) {
return 0;
}
thread_track(thread, 1);
thread_resume(thread);
return thread;
}
/**
* Exit the current thread.
*
* Exit the current thread. This function never returns.
* \arg retval Return value returned to the caller of \ref thread_join.
*/
noreturn void thread_exit(void * retval)
{
thread_t * this = arch_get_thread();
this->retval = retval;
this->state = THREAD_TERMINATED;
/* Signify to any threads waiting on this thread */
MONITOR_AUTOLOCK(this->lock) {
monitor_broadcast(this->lock);
}
/* Remove this thread from the set of process threads */
if (this->process) {
MONITOR_AUTOLOCK(&this->process->lock) {
map_removepp(this->process->threads, this);
}
}
this->process = 0;
/* Remove this thread from the set of all threads */
thread_track(this, 0);
/* Schedule the next thread */
scheduler_lock();
thread_schedule();
kernel_panic("thread_exit: Should never get here\n");
}
/**
* Get return code from given thread.
*
* Get return code from given thread. If the thread has not yet exited,
* the current thread will sleep waiting for the thread to exit.
*
* \arg thread Thread to get return code from.
* \return Return code as passed to \ref thread_exit
*/
void * thread_join(thread_t * thread)
{
MONITOR_AUTOLOCK(thread->lock) {
while(thread->state != THREAD_TERMINATED) {
monitor_wait(thread->lock);
}
}
return thread->retval;
}
/**
* Set the priority of the given thread.
* \arg thread Thread to set priority of, or current thread if 0.
* \arg priority New thread priority.
*/
void thread_set_priority(thread_t * thread, tpriority priority)
{
check_int_bounds(priority, THREAD_INTERRUPT, THREAD_IDLE, "Thread priority out of bounds");
if (0 == thread) {
thread = arch_get_thread();
}
thread->priority = priority;
}
static GCROOT void ** roots;
#define GCPROFILE 1
#if GCPROFILE
static timerspec_t gctime = 0;
#endif
/**
* Run garbage collection.
*/
void thread_gc()
{
// thread_cleanlocks();
#if GCPROFILE
timerspec_t start = timer_uptime(1);
#endif
slab_gc_begin();
slab_gc();
slab_gc_end();
#if GCPROFILE
static timerspec_t gctime = 0;
gctime += (timer_uptime(1) - start);
kernel_printk("GC time %d (%d%%)\n", (int)gctime, (int)(1000 * gctime / start));
#endif
}
/**
* Add a new garbage collection root.
* \arg p Pointer to the new root.
*/
void thread_gc_root(void * p)
{
static int rootcount = 0;
roots = realloc(roots, sizeof(*roots)*(rootcount+1));
roots[rootcount++] = p;
}
static void thread_mark(void * p)
{
thread_t * thread = (thread_t *)p;
slab_gc_mark(thread->name);
if (thread->state != THREAD_TERMINATED) {
/* Mark live state only */
arch_thread_mark(thread);
slab_gc_mark_block(thread->tls, sizeof(thread->tls));
slab_gc_mark(thread->process);
} else {
/* Mark dead state only */
slab_gc_mark(thread->retval);
}
}
static void thread_finalize(void * p)
{
thread_t * thread = (thread_t *)p;
arena_thread_free();
arch_thread_finalize(thread);
}
/**
* Generate a function call stack backtrace
* \arg buffer Pointer to array of pointers.
* \arg levels Number of pointers in the \ref buffer array.
* \return buffer
*/
void ** thread_backtrace(void ** buffer, int levels)
{
return arch_thread_backtrace(buffer, levels);
}
void thread_init()
{
INIT_ONCE();
/* Craft a new bootstrap thread to replace the static defined thread */
arch_thread_init(slab_calloc(threads));
thread_track(arch_get_thread(), 1);
}
static void thread_test2();
static void thread_test1(rwlock_t * rw)
{
void ** bt = thread_backtrace(NULL, 15);
kernel_printk("thread_test1\n");
while(*bt) {
kernel_printk("\t%p\n", *bt++);
}
rwlock_escalate(rw);
rwlock_read(rw);
rwlock_unlock(rw);
}
static void thread_test2(rwlock_t * rw)
{
void ** bt = thread_backtrace(NULL, 15);
kernel_printk("thread_test2\n");
while(*bt) {
kernel_printk("\t%p\n", *bt++);
}
rwlock_unlock(rw);
rwlock_write(rw);
rwlock_unlock(rw);
}
static void thread_update_acct(const void * const p, void * key, void * data)
{
const timerspec_t *puptime = p;
thread_t * thread = key;
if (thread == arch_get_thread()) {
thread->accts[thread->acct].tlen = (*puptime) - thread->accts[thread->acct].tstart;
thread->acct++;
if (sizeof(thread->accts)/sizeof(thread->accts[0]) == thread->acct) {
thread->acct = 0;
}
}
timerspec_t sum = 0;
timerspec_t from = (*puptime) - 1000000;
for(int i=0; i<countof(thread->accts); i++) {
timerspec_t acctstart = thread->accts[i].tstart;
timerspec_t acctend = acctstart + thread->accts[i].tlen;
if (acctstart > from) {
sum += (acctend - acctstart);
} else if (acctend > from) {
sum += (acctend - from);
}
}
thread->usage = sum;
int percent = 100 * thread->usage / 1000000;
kernel_printk("%s: %d\n", thread->name ? thread->name : "Unknown", percent);
}
typedef struct thread_pool_request_t thread_pool_request_t;
struct thread_pool_request_t {
void (*function)(void * arg);
void * arg;
};
void * thread_pool_processor(void * p)
{
thread_pool_t * pool = p;
while(1) {
thread_pool_request_t * request = queue_getp(pool->queue);
KTRY {
request->function(request->arg);
} KCATCH(Throwable) {
exception_log();
}
}
}
thread_pool_t * thread_pool_create()
{
thread_pool_t * pool = calloc(1, sizeof(*pool));
pool->queue = queue_new(16);
pool->thread = thread_spawn(thread_pool_processor, pool);
return pool;
}
void thread_pool_submit(thread_pool_t * pool, void (*function)(void * arg), void * arg)
{
static GCROOT thread_pool_t * defaultpool;
if (0 == pool) {
if (defaultpool == 0) {
defaultpool = thread_pool_create();
}
pool = defaultpool;
}
thread_pool_request_t request = {.function = function, .arg = arg};
queue_putp(pool->queue, mclone(&request));
}
void thread_update_accts()
{
SPIN_AUTOLOCK(&allthreadslock) {
timerspec_t uptime = timer_uptime(1);
if (allthreads) {
map_walkpp(allthreads, thread_update_acct, &uptime);
}
}
}
void thread_test()
{
thread_t * thread1;
thread1 = thread_fork();
if (thread1) {
thread_join(thread1);
thread1 = 0;
} else {
static rwlock_t rw[1] = {0};
thread_t * thread2 = thread_fork();
rwlock_read(rw);
if (thread2) {
thread_test1(rw);
thread_join(thread2);
thread2 = 0;
thread_exit(0);
} else {
thread_test2(rw);
thread_exit(0);
}
}
}