diff options
Diffstat (limited to 'cpukit/libfs/src/pipe/fifo.c')
-rw-r--r-- | cpukit/libfs/src/pipe/fifo.c | 585 |
1 files changed, 585 insertions, 0 deletions
diff --git a/cpukit/libfs/src/pipe/fifo.c b/cpukit/libfs/src/pipe/fifo.c new file mode 100644 index 0000000000..9579954611 --- /dev/null +++ b/cpukit/libfs/src/pipe/fifo.c @@ -0,0 +1,585 @@ +/* + * fifo.c: POSIX FIFO/pipe for RTEMS + * + * Author: Wei Shen <cquark@gmail.com> + * + * The license and distribution terms for this file may be + * found in the file LICENSE in this distribution or at + * http://www.rtems.com/license/LICENSE. + * + * $Id$ + */ + + +#if HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef RTEMS_POSIX_API +#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__ +#endif + +#include <errno.h> +#include <stdlib.h> + +#include <rtems.h> +#include <rtems/libio_.h> + +#include "pipe.h" + + +#define MIN(a, b) ((a) < (b)? (a): (b)) + +#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE) +#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY) + +static rtems_id pipe_semaphore = RTEMS_ID_NONE; + + +#define PIPE_EMPTY(_pipe) (_pipe->Length == 0) +#define PIPE_FULL(_pipe) (_pipe->Length == _pipe->Size) +#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length) +#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size) + +#define PIPE_LOCK(_pipe) \ + ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT) \ + == RTEMS_SUCCESSFUL ) + +#define PIPE_UNLOCK(_pipe) rtems_semaphore_release(_pipe->Semaphore) + +#define PIPE_READWAIT(_pipe) \ + ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT) \ + == RTEMS_SUCCESSFUL) + +#define PIPE_WRITEWAIT(_pipe) \ + ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT) \ + == RTEMS_SUCCESSFUL) + +#define PIPE_WAKEUPREADERS(_pipe) \ + do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0) + +#define PIPE_WAKEUPWRITERS(_pipe) \ + do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0) + + +#ifdef RTEMS_POSIX_API +#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__ + +#include <rtems/rtems/barrier.h> +#include <rtems/score/thread.h> + +/* Set barriers to be interruptible by signals. */ +static void pipe_interruptible(pipe_control_t *pipe) +{ + Objects_Locations location; + + _Barrier_Get(pipe->readBarrier, &location)->Barrier.Wait_queue.state + |= STATES_INTERRUPTIBLE_BY_SIGNAL; + _Thread_Enable_dispatch(); + _Barrier_Get(pipe->writeBarrier, &location)->Barrier.Wait_queue.state + |= STATES_INTERRUPTIBLE_BY_SIGNAL; + _Thread_Enable_dispatch(); +} +#endif + +/* + * Alloc pipe control structure, buffer, and resources. + * Called with pipe_semaphore held. + */ +static int pipe_alloc( + pipe_control_t **pipep +) +{ + static char c = 'a'; + pipe_control_t *pipe; + int err = -ENOMEM; + + pipe = malloc(sizeof(pipe_control_t)); + if (pipe == NULL) + return err; + memset(pipe, 0, sizeof(pipe_control_t)); + + pipe->Size = PIPE_BUF; + pipe->Buffer = malloc(pipe->Size); + if (! pipe->Buffer) + goto err_buf; + + err = -ENOMEM; + + if (rtems_barrier_create( + rtems_build_name ('P', 'I', 'r', c), + RTEMS_BARRIER_MANUAL_RELEASE, 0, + &pipe->readBarrier) != RTEMS_SUCCESSFUL) + goto err_rbar; + if (rtems_barrier_create( + rtems_build_name ('P', 'I', 'w', c), + RTEMS_BARRIER_MANUAL_RELEASE, 0, + &pipe->writeBarrier) != RTEMS_SUCCESSFUL) + goto err_wbar; + if (rtems_semaphore_create( + rtems_build_name ('P', 'I', 's', c), 1, + RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO, + RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL) + goto err_sem; + +#ifdef RTEMS_POSIX_API + pipe_interruptible(pipe); +#endif + + *pipep = pipe; + if (c ++ == 'z') + c = 'a'; + return 0; + +err_sem: + rtems_barrier_delete(pipe->writeBarrier); +err_wbar: + rtems_barrier_delete(pipe->readBarrier); +err_rbar: + free(pipe->Buffer); +err_buf: + free(pipe); + return err; +} + +/* Called with pipe_semaphore held. */ +static inline void pipe_free( + pipe_control_t *pipe +) +{ + rtems_barrier_delete(pipe->readBarrier); + rtems_barrier_delete(pipe->writeBarrier); + rtems_semaphore_delete(pipe->Semaphore); + free(pipe->Buffer); + free(pipe); +} + +static rtems_status_code pipe_lock(void) +{ + rtems_status_code sc = RTEMS_SUCCESSFUL; + + if (pipe_semaphore == RTEMS_ID_NONE) { + rtems_libio_lock(); + + if (pipe_semaphore == RTEMS_ID_NONE) { + sc = rtems_semaphore_create( + rtems_build_name('P', 'I', 'P', 'E'), + 1, + RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY, + RTEMS_NO_PRIORITY, + &pipe_semaphore + ); + } + + rtems_libio_unlock(); + } + + if (sc == RTEMS_SUCCESSFUL) { + sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT); + } + + if (sc == RTEMS_SUCCESSFUL) { + return 0; + } else { + return -ENOMEM; + } +} + +static void pipe_unlock(void) +{ + rtems_status_code sc = RTEMS_SUCCESSFUL; + + sc = rtems_semaphore_release(pipe_semaphore); + #ifdef RTEMS_DEBUG + if (sc != RTEMS_SUCCESSFUL) { + rtems_fatal_error_occurred(0xdeadbeef); + } + #endif +} + +/* + * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a + * pipe control structure and set *pipep to its address. + * pipe is locked, when pipe_new returns with no error. + */ +static int pipe_new( + pipe_control_t **pipep +) +{ + pipe_control_t *pipe; + int err = 0; + + err = pipe_lock(); + if (err) + return err; + + pipe = *pipep; + if (pipe == NULL) { + err = pipe_alloc(&pipe); + if (err) + goto out; + } + + if (! PIPE_LOCK(pipe)) + err = -EINTR; + + if (*pipep == NULL) { + if (err) + pipe_free(pipe); + else + *pipep = pipe; + } + +out: + pipe_unlock(); + return err; +} + +/* + * Interface to file system close. + * + * *pipep points to pipe control structure. When the last user releases pipe, + * it will be set to NULL. + */ +void pipe_release( + pipe_control_t **pipep, + rtems_libio_t *iop +) +{ + pipe_control_t *pipe = *pipep; + uint32_t mode; + + #if defined(RTEMS_DEBUG) + /* WARN pipe not freed and pipep not set to NULL! */ + if (pipe_lock()) + rtems_fatal_error_occurred(0xdeadbeef); + + /* WARN pipe not released! */ + if (!PIPE_LOCK(pipe)) + rtems_fatal_error_occurred(0xdeadbeef); + #endif + + mode = LIBIO_ACCMODE(iop); + if (mode & LIBIO_FLAGS_READ) + pipe->Readers --; + if (mode & LIBIO_FLAGS_WRITE) + pipe->Writers --; + + PIPE_UNLOCK(pipe); + + if (pipe->Readers == 0 && pipe->Writers == 0) { +#if 0 + /* To delete an anonymous pipe file when all users closed it */ + if (pipe->Anonymous) + delfile = TRUE; +#endif + pipe_free(pipe); + *pipep = NULL; + } + else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE) + /* Notify waiting Writers that all their partners left */ + PIPE_WAKEUPWRITERS(pipe); + else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ) + PIPE_WAKEUPREADERS(pipe); + + pipe_unlock(); + +#if 0 + if (! delfile) + return; + if (iop->pathinfo.ops->unlink_h == NULL) + return; + + /* This is safe for IMFS, but how about other FSes? */ + iop->flags &= ~LIBIO_FLAGS_OPEN; + if(iop->pathinfo.ops->unlink_h(&iop->pathinfo)) + return; +#endif + +} + +/* + * Interface to file system open. + * + * *pipep points to pipe control structure. If called with *pipep = NULL, + * fifo_open will try allocating and initializing a control structure. If the + * call succeeds, *pipep will be set to address of new control structure. + */ +int fifo_open( + pipe_control_t **pipep, + rtems_libio_t *iop +) +{ + pipe_control_t *pipe; + unsigned int prevCounter; + int err; + + err = pipe_new(pipep); + if (err) + return err; + pipe = *pipep; + + switch (LIBIO_ACCMODE(iop)) { + case LIBIO_FLAGS_READ: + pipe->readerCounter ++; + if (pipe->Readers ++ == 0) + PIPE_WAKEUPWRITERS(pipe); + + if (pipe->Writers == 0) { + /* Not an error */ + if (LIBIO_NODELAY(iop)) + break; + + prevCounter = pipe->writerCounter; + err = -EINTR; + /* Wait until a writer opens the pipe */ + do { + PIPE_UNLOCK(pipe); + if (! PIPE_READWAIT(pipe)) + goto out_error; + if (! PIPE_LOCK(pipe)) + goto out_error; + } while (prevCounter == pipe->writerCounter); + } + break; + + case LIBIO_FLAGS_WRITE: + pipe->writerCounter ++; + + if (pipe->Writers ++ == 0) + PIPE_WAKEUPREADERS(pipe); + + if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) { + PIPE_UNLOCK(pipe); + err = -ENXIO; + goto out_error; + } + + if (pipe->Readers == 0) { + prevCounter = pipe->readerCounter; + err = -EINTR; + do { + PIPE_UNLOCK(pipe); + if (! PIPE_WRITEWAIT(pipe)) + goto out_error; + if (! PIPE_LOCK(pipe)) + goto out_error; + } while (prevCounter == pipe->readerCounter); + } + break; + + case LIBIO_FLAGS_READ_WRITE: + pipe->readerCounter ++; + if (pipe->Readers ++ == 0) + PIPE_WAKEUPWRITERS(pipe); + pipe->writerCounter ++; + if (pipe->Writers ++ == 0) + PIPE_WAKEUPREADERS(pipe); + break; + } + + PIPE_UNLOCK(pipe); + return 0; + +out_error: + pipe_release(pipep, iop); + return err; +} + +/* + * Interface to file system read. + */ +ssize_t pipe_read( + pipe_control_t *pipe, + void *buffer, + size_t count, + rtems_libio_t *iop +) +{ + int chunk, chunk1, read = 0, ret = 0; + + if (! PIPE_LOCK(pipe)) + return -EINTR; + + while (read < count) { + while (PIPE_EMPTY(pipe)) { + /* Not an error */ + if (pipe->Writers == 0) + goto out_locked; + + if (LIBIO_NODELAY(iop)) { + ret = -EAGAIN; + goto out_locked; + } + + /* Wait until pipe is no more empty or no writer exists */ + pipe->waitingReaders ++; + PIPE_UNLOCK(pipe); + if (! PIPE_READWAIT(pipe)) + ret = -EINTR; + if (! PIPE_LOCK(pipe)) { + /* WARN waitingReaders not restored! */ + ret = -EINTR; + goto out_nolock; + } + pipe->waitingReaders --; + if (ret != 0) + goto out_locked; + } + + /* Read chunk bytes */ + chunk = MIN(count - read, pipe->Length); + chunk1 = pipe->Size - pipe->Start; + if (chunk > chunk1) { + memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1); + memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1); + } + else + memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk); + + pipe->Start += chunk; + pipe->Start %= pipe->Size; + pipe->Length -= chunk; + /* For buffering optimization */ + if (PIPE_EMPTY(pipe)) + pipe->Start = 0; + + if (pipe->waitingWriters > 0) + PIPE_WAKEUPWRITERS(pipe); + read += chunk; + } + +out_locked: + PIPE_UNLOCK(pipe); + +out_nolock: + if (read > 0) + return read; + return ret; +} + +/* + * Interface to file system write. + */ +ssize_t pipe_write( + pipe_control_t *pipe, + const void *buffer, + size_t count, + rtems_libio_t *iop +) +{ + int chunk, chunk1, written = 0, ret = 0; + + /* Write nothing */ + if (count == 0) + return 0; + + if (! PIPE_LOCK(pipe)) + return -EINTR; + + if (pipe->Readers == 0) { + ret = -EPIPE; + goto out_locked; + } + + /* Write of PIPE_BUF bytes or less shall not be interleaved */ + chunk = count <= pipe->Size ? count : 1; + + while (written < count) { + while (PIPE_SPACE(pipe) < chunk) { + if (LIBIO_NODELAY(iop)) { + ret = -EAGAIN; + goto out_locked; + } + + /* Wait until there is chunk bytes space or no reader exists */ + pipe->waitingWriters ++; + PIPE_UNLOCK(pipe); + if (! PIPE_WRITEWAIT(pipe)) + ret = -EINTR; + if (! PIPE_LOCK(pipe)) { + /* WARN waitingWriters not restored! */ + ret = -EINTR; + goto out_nolock; + } + pipe->waitingWriters --; + if (ret != 0) + goto out_locked; + + if (pipe->Readers == 0) { + ret = -EPIPE; + goto out_locked; + } + } + + chunk = MIN(count - written, PIPE_SPACE(pipe)); + chunk1 = pipe->Size - PIPE_WSTART(pipe); + if (chunk > chunk1) { + memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1); + memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1); + } + else + memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk); + + pipe->Length += chunk; + if (pipe->waitingReaders > 0) + PIPE_WAKEUPREADERS(pipe); + written += chunk; + /* Write of more than PIPE_BUF bytes can be interleaved */ + chunk = 1; + } + +out_locked: + PIPE_UNLOCK(pipe); + +out_nolock: +#ifdef RTEMS_POSIX_API + /* Signal SIGPIPE */ + if (ret == -EPIPE) + kill(getpid(), SIGPIPE); +#endif + + if (written > 0) + return written; + return ret; +} + +/* + * Interface to file system ioctl. + */ +int pipe_ioctl( + pipe_control_t *pipe, + uint32_t cmd, + void *buffer, + rtems_libio_t *iop +) +{ + if (cmd == FIONREAD) { + if (buffer == NULL) + return -EFAULT; + + if (! PIPE_LOCK(pipe)) + return -EINTR; + + /* Return length of pipe */ + *(unsigned int *)buffer = pipe->Length; + PIPE_UNLOCK(pipe); + return 0; + } + + return -EINVAL; +} + +/* + * Interface to file system lseek. + */ +int pipe_lseek( + pipe_control_t *pipe, + off_t offset, + int whence, + rtems_libio_t *iop +) +{ + /* Seek on pipe is not supported */ + return -ESPIPE; +} |