From d8a7e180245bc00dbf55fa3e40135a4a94117ab9 Mon Sep 17 00:00:00 2001 From: Ralf Corsepius Date: Mon, 16 Aug 2010 05:46:09 +0000 Subject: 2010-08-16 Alin Rus * posix/src/aio_misc.c: New. * posix/src/aio_error.c, posix/src/aio_read.c, * posix/src/aio_return.c, posix/src/aio_write.c: New implementation. --- cpukit/posix/src/aio_misc.c | 521 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 521 insertions(+) create mode 100644 cpukit/posix/src/aio_misc.c (limited to 'cpukit/posix/src/aio_misc.c') diff --git a/cpukit/posix/src/aio_misc.c b/cpukit/posix/src/aio_misc.c new file mode 100644 index 0000000000..6158d83acd --- /dev/null +++ b/cpukit/posix/src/aio_misc.c @@ -0,0 +1,521 @@ +/* + * Copyright 2010, Alin Rus + * + * The license and distribution terms for this file may be + * found in the file LICENSE in this distribution or at + * http://www.rtems.com/license/LICENSE. + * + * $Id$ + */ + +#include +#include +#include +#include + + + +rtems_aio_queue aio_request_queue; + +static void *rtems_aio_handle (void *arg); + +/* + * rtems_aio_init + * + * Initialize the request queue for aio + * + * Input parameters: + * NONE + * + * Output parameters: + * 0 - if initialization succeded + */ + +int +rtems_aio_init (void) +{ + int result = 0; + + result = pthread_attr_init (&aio_request_queue.attr); + if (result != 0) + return result; + + result = + pthread_attr_setdetachstate (&aio_request_queue.attr, + PTHREAD_CREATE_DETACHED); + if (result != 0) + pthread_attr_destroy (&aio_request_queue.attr); + + + result = pthread_mutex_init (&aio_request_queue.mutex, NULL); + if (result != 0) + pthread_attr_destroy (&aio_request_queue.attr); + + + result = pthread_cond_init (&aio_request_queue.new_req, NULL); + if (result != 0) { + pthread_mutex_destroy (&aio_request_queue.mutex); + pthread_attr_destroy (&aio_request_queue.attr); + } + + rtems_chain_initialize_empty (&aio_request_queue.work_req); + rtems_chain_initialize_empty (&aio_request_queue.idle_req); + + aio_request_queue.active_threads = 0; + aio_request_queue.idle_threads = 0; + aio_request_queue.size = 0; + aio_request_queue.initialized = AIO_QUEUE_INITIALIZED; + + return result; + +} + +/* + * rtems_aio_search_fd + * + * Search and create chain of requests for given FD + * + * Input parameters: + * chain - chain of FD chains + * fildes - file descriptor to search + * create - if 1 search and create + * - if 0 just search + * + * Output parameters: + * r_chain - NULL if create == 0 and there is + * no chain for given fildes + * - pointer to chain is there exists + * a chain for given fildes + * - pointer to newly create chain if + * create == 1 + * + */ + +rtems_aio_request_chain * +rtems_aio_search_fd (rtems_chain_control * chain, int fildes, int create) +{ + rtems_aio_request_chain *r_chain; + rtems_chain_node *node; + + node = chain->first; + r_chain = (rtems_aio_request_chain *) node; + + while (r_chain->fildes < fildes && !rtems_chain_is_tail (chain, node)) { + node = node->next; + r_chain = (rtems_aio_request_chain *) node; + } + + if (r_chain->fildes == fildes) + r_chain->new_fd = 0; + else { + if (create == 0) + r_chain = NULL; + else { + r_chain = malloc (sizeof (rtems_aio_request_chain)); + rtems_chain_initialize_empty (&r_chain->perfd); + + if (rtems_chain_is_empty (chain)) + rtems_chain_prepend (chain, &r_chain->next_fd); + else + rtems_chain_insert (node->previous, &r_chain->next_fd); + + r_chain->new_fd = 1; + } + } + return r_chain; +} + +/* + * rtems_aio_insert_prio + * + * Add request to given FD chain. The chain is ordered + * by priority + * + * Input parameters: + * chain - chain of requests for a given FD + * req - request (see aio_misc.h) + * + * Output parameters: + * NONE + */ + +void +rtems_aio_insert_prio (rtems_chain_control * chain, rtems_aio_request * req) +{ + rtems_chain_node *node; + + AIO_printf ("FD exists \n"); + node = chain->first; + + if (rtems_chain_is_empty (chain)) { + AIO_printf ("First in chain \n"); + rtems_chain_prepend (chain, &req->next_prio); + } else { + AIO_printf ("Add by priority \n"); + int prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio; + + while (req->aiocbp->aio_reqprio > prio && + !rtems_chain_is_tail (chain, node)) { + node = node->next; + prio = ((rtems_aio_request *) node)->aiocbp->aio_reqprio; + } + + rtems_chain_insert (node->previous, &req->next_prio); + + } +} + +/* + * rtems_aio_remove_fd + * + * Removes all the requests in a fd chain + * + * Input parameters: + * r_chain - pointer to the fd chain request + * + * Output parameters: + * NONE + */ + +void rtems_aio_remove_fd (rtems_aio_request_chain *r_chain) +{ + rtems_chain_control *chain; + rtems_chain_node *node; + + chain = &r_chain->perfd; + node = chain->first; + + while (!rtems_chain_is_tail (chain, node)) + { + rtems_chain_extract (node); + rtems_aio_request *req = (rtems_aio_request *) node; + req->aiocbp->error_code = ECANCELED; + req->aiocbp->return_value = -1; + free (req); + } +} + +/* + * rtems_aio_remove_req + * + * Removes request from given chain + * + * Input parameters: + * chain - pointer to fd chain which may contain + * the request + * aiocbp - pointer to request that needs to be + * canceled + * + * Output parameters: + * AIO_NOTCANCELED - if request was not canceled + * AIO_CANCELED - if request was canceled + */ + +int rtems_aio_remove_req (rtems_chain_control *chain, struct aiocb *aiocbp) +{ + rtems_chain_node *node = chain->first; + rtems_aio_request *current; + + current = (rtems_aio_request *) node; + + while (!rtems_chain_is_tail (chain, node) && current->aiocbp != aiocbp) { + node = node->next; + current = (rtems_aio_request *) node; + } + + if (rtems_chain_is_tail (chain, node)) + return AIO_NOTCANCELED; + else + { + rtems_chain_extract (node); + current->aiocbp->error_code = ECANCELED; + current->aiocbp->return_value = -1; + free (current); + } + + return AIO_CANCELED; +} + +/* + * rtems_aio_enqueue + * + * Enqueue requests, and creates threads to process them + * + * Input parameters: + * req - see aio_misc.h + * + * Output parameters: + * 0 - if request was added to queue + * errno - otherwise + */ + +int +rtems_aio_enqueue (rtems_aio_request * req) +{ + + rtems_aio_request_chain *r_chain; + rtems_chain_control *chain; + pthread_t thid; + int result, policy; + struct sched_param param; + + /* The queue should be initialized */ + AIO_assert (aio_request_queue.initialized != AIO_QUEUE_INITIALIZED); + + result = pthread_mutex_lock (&aio_request_queue.mutex); + if (result != 0) { + free (req); + return result; + } + + /* used to check if we can create more threads */ + int chain_type; + + /* _POSIX_PRIORITIZED_IO and _POSIX_PRIORITY_SCHEDULING are defined, + we can use aio_reqprio to lower the priority of the request */ + pthread_getschedparam (pthread_self(), &policy, ¶m); + + req->caller_thread = pthread_self (); + req->priority = param.sched_priority - req->aiocbp->aio_reqprio; + req->policy = policy; + req->aiocbp->error_code = EINPROGRESS; + req->aiocbp->return_value = 0; + + if (AIO_MAX_THREADS > + (aio_request_queue.active_threads + aio_request_queue.idle_threads)) { + chain_type = 0; + chain = &aio_request_queue.work_req; + } else { + chain = &aio_request_queue.idle_req; + chain_type = 1; + } + + /* we still have empty places on the active_threads chain */ + if (chain_type == 0) { + r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1); + + if (r_chain->new_fd == 1) { + rtems_chain_prepend (&r_chain->perfd, &req->next_prio); + r_chain->new_fd = 0; + pthread_mutex_init (&r_chain->mutex, NULL); + pthread_cond_init (&r_chain->cond, NULL); + + /* if there are idle threads and this is a new fd chain + there's no need to create another thread */ + if (aio_request_queue.idle_threads > 0) { + result = pthread_cond_signal (&aio_request_queue.new_req); + if (result != 0) { + pthread_mutex_unlock (&aio_request_queue.mutex); + return result; + } + } else { + + /* if this is a new fd chain and no threads are idle create + new thread */ + AIO_printf ("New thread"); + result = pthread_create (&thid, &aio_request_queue.attr, + rtems_aio_handle, (void *) r_chain); + if (result != 0) { + pthread_mutex_unlock (&aio_request_queue.mutex); + return result; + } + ++aio_request_queue.active_threads; + } + } else { + /* put request in the fd chain it belongs to */ + pthread_mutex_lock (&r_chain->mutex); + rtems_aio_insert_prio (&r_chain->perfd, req); + pthread_cond_signal (&r_chain->cond); + pthread_mutex_unlock (&r_chain->mutex); + } + } else { + /* the maximum number of threads has been already created + even though some of them might be idle. + The request belongs to one of the active fd chain */ + r_chain = rtems_aio_search_fd (&aio_request_queue.work_req, + req->aiocbp->aio_fildes, 0); + if (r_chain != NULL) + { + pthread_mutex_lock (&r_chain->mutex); + rtems_aio_insert_prio (&r_chain->perfd, req); + pthread_cond_signal (&r_chain->cond); + pthread_mutex_unlock (&r_chain->mutex); + + } else { + + /* or to the idle chain */ + r_chain = rtems_aio_search_fd (chain, req->aiocbp->aio_fildes, 1); + + if (r_chain->new_fd == 1) { + /* If this is a new fd chain we signal the idle threads that + might be waiting for requests */ + rtems_chain_prepend (&r_chain->perfd, &req->next_prio); + pthread_cond_signal (&aio_request_queue.new_req); + } else + /* just insert the request in the existing fd chain */ + rtems_aio_insert_prio (&r_chain->perfd, req); + } + } + + pthread_mutex_unlock (&aio_request_queue.mutex); + return 0; +} + +/* + * rtems_aio_handle + * + * Thread processing requests + * + * Input parameters: + * arg - the chain for the fd to be worked on + * + * Output parameters: + * NULL - if error + */ + +static void * +rtems_aio_handle (void *arg) +{ + + rtems_aio_request_chain *r_chain = arg; + rtems_aio_request *req; + rtems_chain_control *chain; + rtems_chain_node *node; + int result, policy; + struct sched_param param; + + AIO_printf ("Thread started\n"); + + while (1) { + + /* acquire the mutex of the current fd chain. + we don't need to lock the queue mutex since we can + add requests to idle fd chains or even active ones + if the working request has been extracted from the + chain */ + result = pthread_mutex_lock (&r_chain->mutex); + if (result != 0) + return NULL; + + chain = &r_chain->perfd; + + /* If the locked chain is not empty, take the first + request extract it, unlock the chain and process + the request, in this way the user can supply more + requests to this fd chain */ + if (!rtems_chain_is_empty (chain)) { + + node = chain->first; + req = (rtems_aio_request *) node; + + /* See _POSIX_PRIORITIZE_IO and _POSIX_PRIORITY_SCHEDULING + discussion in rtems_aio_enqueue () */ + pthread_getschedparam (pthread_self(), &policy, ¶m); + param.sched_priority = req->priority; + pthread_setschedparam (pthread_self(), req->policy, ¶m); + + rtems_chain_extract (node); + + pthread_mutex_unlock (&r_chain->mutex); + + switch (req->aiocbp->aio_lio_opcode) { + case LIO_READ: + result = pread (req->aiocbp->aio_fildes, + (void *) req->aiocbp->aio_buf, + req->aiocbp->aio_nbytes, req->aiocbp->aio_offset); + break; + + case LIO_WRITE: + result = pwrite (req->aiocbp->aio_fildes, + (void *) req->aiocbp->aio_buf, + req->aiocbp->aio_nbytes, req->aiocbp->aio_offset); + break; + + default: + result = -1; + } + if (result == -1) { + req->aiocbp->return_value = -1; + req->aiocbp->error_code = errno; + } else { + req->aiocbp->return_value = result; + req->aiocbp->error_code = 0; + } + + // notification needed for lio + + } else { + /* If the fd chain is empty we unlock the fd chain + and we lock the queue chain, this will ensure that + we have at most one request comming to our fd chain + when we check. + + If there was no request added sleep for 3 seconds and + wait for a signal on chain, this will unlock the queue. + The fd chain is already unlocked */ + + struct timespec timeout; + + pthread_mutex_unlock (&r_chain->mutex); + pthread_mutex_lock (&aio_request_queue.mutex); + if (rtems_chain_is_empty (chain)) + { + clock_gettime (CLOCK_REALTIME, &timeout); + timeout.tv_sec += 3; + timeout.tv_nsec = 0; + result = pthread_cond_timedwait (&r_chain->cond, + &aio_request_queue.mutex, &timeout); + + /* If no requests were added to the chain we delete the fd chain from + the queue and start working with idle fd chains */ + if (result == ETIMEDOUT) { + rtems_chain_extract (&r_chain->next_fd); + pthread_mutex_destroy (&r_chain->mutex); + pthread_cond_destroy (&r_chain->cond); + free (r_chain); + + /* If the idle chain is empty sleep for 3 seconds and wait for a + signal. The thread now becomes idle. */ + if (rtems_chain_is_empty (&aio_request_queue.idle_req)) { + ++aio_request_queue.idle_threads; + clock_gettime (CLOCK_REALTIME, &timeout); + timeout.tv_sec += 3; + timeout.tv_nsec = 0; + result = pthread_cond_timedwait (&aio_request_queue.new_req, + &aio_request_queue.mutex, + &timeout); + + /* If no new fd chain was added in the idle requests + then this thread is finished */ + if (result == ETIMEDOUT) { + pthread_mutex_unlock (&aio_request_queue.mutex); + return NULL; + } + + /* Otherwise move this chain to the working chain and + start the loop all over again */ + --aio_request_queue.idle_threads; + node = aio_request_queue.idle_req.first; + rtems_chain_extract (node); + r_chain = rtems_aio_search_fd (&aio_request_queue.work_req, + ((rtems_aio_request_chain *)node)->fildes, + 1); + r_chain->new_fd = 0; + pthread_mutex_init (&r_chain->mutex, NULL); + pthread_cond_init (&r_chain->cond, NULL); + + r_chain->perfd = ((rtems_aio_request_chain *)node)->perfd; + } + else + /* If there was a request added in the initial fd chain then release + the mutex and process it */ + pthread_mutex_unlock (&aio_request_queue.mutex); + } + } + } + } + + + AIO_printf ("Thread finished\n"); + return NULL; +} -- cgit v1.2.3