summaryrefslogtreecommitdiffstats
path: root/cpukit/libfs/src/pipe/fifo.c
diff options
context:
space:
mode:
Diffstat (limited to 'cpukit/libfs/src/pipe/fifo.c')
-rw-r--r--cpukit/libfs/src/pipe/fifo.c585
1 files changed, 585 insertions, 0 deletions
diff --git a/cpukit/libfs/src/pipe/fifo.c b/cpukit/libfs/src/pipe/fifo.c
new file mode 100644
index 0000000000..9579954611
--- /dev/null
+++ b/cpukit/libfs/src/pipe/fifo.c
@@ -0,0 +1,585 @@
+/*
+ * fifo.c: POSIX FIFO/pipe for RTEMS
+ *
+ * Author: Wei Shen <cquark@gmail.com>
+ *
+ * The license and distribution terms for this file may be
+ * found in the file LICENSE in this distribution or at
+ * http://www.rtems.com/license/LICENSE.
+ *
+ * $Id$
+ */
+
+
+#if HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#ifdef RTEMS_POSIX_API
+#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
+#endif
+
+#include <errno.h>
+#include <stdlib.h>
+
+#include <rtems.h>
+#include <rtems/libio_.h>
+
+#include "pipe.h"
+
+
+#define MIN(a, b) ((a) < (b)? (a): (b))
+
+#define LIBIO_ACCMODE(_iop) ((_iop)->flags & LIBIO_FLAGS_READ_WRITE)
+#define LIBIO_NODELAY(_iop) ((_iop)->flags & LIBIO_FLAGS_NO_DELAY)
+
+static rtems_id pipe_semaphore = RTEMS_ID_NONE;
+
+
+#define PIPE_EMPTY(_pipe) (_pipe->Length == 0)
+#define PIPE_FULL(_pipe) (_pipe->Length == _pipe->Size)
+#define PIPE_SPACE(_pipe) (_pipe->Size - _pipe->Length)
+#define PIPE_WSTART(_pipe) ((_pipe->Start + _pipe->Length) % _pipe->Size)
+
+#define PIPE_LOCK(_pipe) \
+ ( rtems_semaphore_obtain(_pipe->Semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT) \
+ == RTEMS_SUCCESSFUL )
+
+#define PIPE_UNLOCK(_pipe) rtems_semaphore_release(_pipe->Semaphore)
+
+#define PIPE_READWAIT(_pipe) \
+ ( rtems_barrier_wait(_pipe->readBarrier, RTEMS_NO_TIMEOUT) \
+ == RTEMS_SUCCESSFUL)
+
+#define PIPE_WRITEWAIT(_pipe) \
+ ( rtems_barrier_wait(_pipe->writeBarrier, RTEMS_NO_TIMEOUT) \
+ == RTEMS_SUCCESSFUL)
+
+#define PIPE_WAKEUPREADERS(_pipe) \
+ do {uint32_t n; rtems_barrier_release(_pipe->readBarrier, &n); } while(0)
+
+#define PIPE_WAKEUPWRITERS(_pipe) \
+ do {uint32_t n; rtems_barrier_release(_pipe->writeBarrier, &n); } while(0)
+
+
+#ifdef RTEMS_POSIX_API
+#define __RTEMS_VIOLATE_KERNEL_VISIBILITY__
+
+#include <rtems/rtems/barrier.h>
+#include <rtems/score/thread.h>
+
+/* Set barriers to be interruptible by signals. */
+static void pipe_interruptible(pipe_control_t *pipe)
+{
+ Objects_Locations location;
+
+ _Barrier_Get(pipe->readBarrier, &location)->Barrier.Wait_queue.state
+ |= STATES_INTERRUPTIBLE_BY_SIGNAL;
+ _Thread_Enable_dispatch();
+ _Barrier_Get(pipe->writeBarrier, &location)->Barrier.Wait_queue.state
+ |= STATES_INTERRUPTIBLE_BY_SIGNAL;
+ _Thread_Enable_dispatch();
+}
+#endif
+
+/*
+ * Alloc pipe control structure, buffer, and resources.
+ * Called with pipe_semaphore held.
+ */
+static int pipe_alloc(
+ pipe_control_t **pipep
+)
+{
+ static char c = 'a';
+ pipe_control_t *pipe;
+ int err = -ENOMEM;
+
+ pipe = malloc(sizeof(pipe_control_t));
+ if (pipe == NULL)
+ return err;
+ memset(pipe, 0, sizeof(pipe_control_t));
+
+ pipe->Size = PIPE_BUF;
+ pipe->Buffer = malloc(pipe->Size);
+ if (! pipe->Buffer)
+ goto err_buf;
+
+ err = -ENOMEM;
+
+ if (rtems_barrier_create(
+ rtems_build_name ('P', 'I', 'r', c),
+ RTEMS_BARRIER_MANUAL_RELEASE, 0,
+ &pipe->readBarrier) != RTEMS_SUCCESSFUL)
+ goto err_rbar;
+ if (rtems_barrier_create(
+ rtems_build_name ('P', 'I', 'w', c),
+ RTEMS_BARRIER_MANUAL_RELEASE, 0,
+ &pipe->writeBarrier) != RTEMS_SUCCESSFUL)
+ goto err_wbar;
+ if (rtems_semaphore_create(
+ rtems_build_name ('P', 'I', 's', c), 1,
+ RTEMS_BINARY_SEMAPHORE | RTEMS_FIFO,
+ RTEMS_NO_PRIORITY, &pipe->Semaphore) != RTEMS_SUCCESSFUL)
+ goto err_sem;
+
+#ifdef RTEMS_POSIX_API
+ pipe_interruptible(pipe);
+#endif
+
+ *pipep = pipe;
+ if (c ++ == 'z')
+ c = 'a';
+ return 0;
+
+err_sem:
+ rtems_barrier_delete(pipe->writeBarrier);
+err_wbar:
+ rtems_barrier_delete(pipe->readBarrier);
+err_rbar:
+ free(pipe->Buffer);
+err_buf:
+ free(pipe);
+ return err;
+}
+
+/* Called with pipe_semaphore held. */
+static inline void pipe_free(
+ pipe_control_t *pipe
+)
+{
+ rtems_barrier_delete(pipe->readBarrier);
+ rtems_barrier_delete(pipe->writeBarrier);
+ rtems_semaphore_delete(pipe->Semaphore);
+ free(pipe->Buffer);
+ free(pipe);
+}
+
+static rtems_status_code pipe_lock(void)
+{
+ rtems_status_code sc = RTEMS_SUCCESSFUL;
+
+ if (pipe_semaphore == RTEMS_ID_NONE) {
+ rtems_libio_lock();
+
+ if (pipe_semaphore == RTEMS_ID_NONE) {
+ sc = rtems_semaphore_create(
+ rtems_build_name('P', 'I', 'P', 'E'),
+ 1,
+ RTEMS_BINARY_SEMAPHORE | RTEMS_INHERIT_PRIORITY | RTEMS_PRIORITY,
+ RTEMS_NO_PRIORITY,
+ &pipe_semaphore
+ );
+ }
+
+ rtems_libio_unlock();
+ }
+
+ if (sc == RTEMS_SUCCESSFUL) {
+ sc = rtems_semaphore_obtain(pipe_semaphore, RTEMS_WAIT, RTEMS_NO_TIMEOUT);
+ }
+
+ if (sc == RTEMS_SUCCESSFUL) {
+ return 0;
+ } else {
+ return -ENOMEM;
+ }
+}
+
+static void pipe_unlock(void)
+{
+ rtems_status_code sc = RTEMS_SUCCESSFUL;
+
+ sc = rtems_semaphore_release(pipe_semaphore);
+ #ifdef RTEMS_DEBUG
+ if (sc != RTEMS_SUCCESSFUL) {
+ rtems_fatal_error_occurred(0xdeadbeef);
+ }
+ #endif
+}
+
+/*
+ * If called with *pipep = NULL, pipe_new will call pipe_alloc to allocate a
+ * pipe control structure and set *pipep to its address.
+ * pipe is locked, when pipe_new returns with no error.
+ */
+static int pipe_new(
+ pipe_control_t **pipep
+)
+{
+ pipe_control_t *pipe;
+ int err = 0;
+
+ err = pipe_lock();
+ if (err)
+ return err;
+
+ pipe = *pipep;
+ if (pipe == NULL) {
+ err = pipe_alloc(&pipe);
+ if (err)
+ goto out;
+ }
+
+ if (! PIPE_LOCK(pipe))
+ err = -EINTR;
+
+ if (*pipep == NULL) {
+ if (err)
+ pipe_free(pipe);
+ else
+ *pipep = pipe;
+ }
+
+out:
+ pipe_unlock();
+ return err;
+}
+
+/*
+ * Interface to file system close.
+ *
+ * *pipep points to pipe control structure. When the last user releases pipe,
+ * it will be set to NULL.
+ */
+void pipe_release(
+ pipe_control_t **pipep,
+ rtems_libio_t *iop
+)
+{
+ pipe_control_t *pipe = *pipep;
+ uint32_t mode;
+
+ #if defined(RTEMS_DEBUG)
+ /* WARN pipe not freed and pipep not set to NULL! */
+ if (pipe_lock())
+ rtems_fatal_error_occurred(0xdeadbeef);
+
+ /* WARN pipe not released! */
+ if (!PIPE_LOCK(pipe))
+ rtems_fatal_error_occurred(0xdeadbeef);
+ #endif
+
+ mode = LIBIO_ACCMODE(iop);
+ if (mode & LIBIO_FLAGS_READ)
+ pipe->Readers --;
+ if (mode & LIBIO_FLAGS_WRITE)
+ pipe->Writers --;
+
+ PIPE_UNLOCK(pipe);
+
+ if (pipe->Readers == 0 && pipe->Writers == 0) {
+#if 0
+ /* To delete an anonymous pipe file when all users closed it */
+ if (pipe->Anonymous)
+ delfile = TRUE;
+#endif
+ pipe_free(pipe);
+ *pipep = NULL;
+ }
+ else if (pipe->Readers == 0 && mode != LIBIO_FLAGS_WRITE)
+ /* Notify waiting Writers that all their partners left */
+ PIPE_WAKEUPWRITERS(pipe);
+ else if (pipe->Writers == 0 && mode != LIBIO_FLAGS_READ)
+ PIPE_WAKEUPREADERS(pipe);
+
+ pipe_unlock();
+
+#if 0
+ if (! delfile)
+ return;
+ if (iop->pathinfo.ops->unlink_h == NULL)
+ return;
+
+ /* This is safe for IMFS, but how about other FSes? */
+ iop->flags &= ~LIBIO_FLAGS_OPEN;
+ if(iop->pathinfo.ops->unlink_h(&iop->pathinfo))
+ return;
+#endif
+
+}
+
+/*
+ * Interface to file system open.
+ *
+ * *pipep points to pipe control structure. If called with *pipep = NULL,
+ * fifo_open will try allocating and initializing a control structure. If the
+ * call succeeds, *pipep will be set to address of new control structure.
+ */
+int fifo_open(
+ pipe_control_t **pipep,
+ rtems_libio_t *iop
+)
+{
+ pipe_control_t *pipe;
+ unsigned int prevCounter;
+ int err;
+
+ err = pipe_new(pipep);
+ if (err)
+ return err;
+ pipe = *pipep;
+
+ switch (LIBIO_ACCMODE(iop)) {
+ case LIBIO_FLAGS_READ:
+ pipe->readerCounter ++;
+ if (pipe->Readers ++ == 0)
+ PIPE_WAKEUPWRITERS(pipe);
+
+ if (pipe->Writers == 0) {
+ /* Not an error */
+ if (LIBIO_NODELAY(iop))
+ break;
+
+ prevCounter = pipe->writerCounter;
+ err = -EINTR;
+ /* Wait until a writer opens the pipe */
+ do {
+ PIPE_UNLOCK(pipe);
+ if (! PIPE_READWAIT(pipe))
+ goto out_error;
+ if (! PIPE_LOCK(pipe))
+ goto out_error;
+ } while (prevCounter == pipe->writerCounter);
+ }
+ break;
+
+ case LIBIO_FLAGS_WRITE:
+ pipe->writerCounter ++;
+
+ if (pipe->Writers ++ == 0)
+ PIPE_WAKEUPREADERS(pipe);
+
+ if (pipe->Readers == 0 && LIBIO_NODELAY(iop)) {
+ PIPE_UNLOCK(pipe);
+ err = -ENXIO;
+ goto out_error;
+ }
+
+ if (pipe->Readers == 0) {
+ prevCounter = pipe->readerCounter;
+ err = -EINTR;
+ do {
+ PIPE_UNLOCK(pipe);
+ if (! PIPE_WRITEWAIT(pipe))
+ goto out_error;
+ if (! PIPE_LOCK(pipe))
+ goto out_error;
+ } while (prevCounter == pipe->readerCounter);
+ }
+ break;
+
+ case LIBIO_FLAGS_READ_WRITE:
+ pipe->readerCounter ++;
+ if (pipe->Readers ++ == 0)
+ PIPE_WAKEUPWRITERS(pipe);
+ pipe->writerCounter ++;
+ if (pipe->Writers ++ == 0)
+ PIPE_WAKEUPREADERS(pipe);
+ break;
+ }
+
+ PIPE_UNLOCK(pipe);
+ return 0;
+
+out_error:
+ pipe_release(pipep, iop);
+ return err;
+}
+
+/*
+ * Interface to file system read.
+ */
+ssize_t pipe_read(
+ pipe_control_t *pipe,
+ void *buffer,
+ size_t count,
+ rtems_libio_t *iop
+)
+{
+ int chunk, chunk1, read = 0, ret = 0;
+
+ if (! PIPE_LOCK(pipe))
+ return -EINTR;
+
+ while (read < count) {
+ while (PIPE_EMPTY(pipe)) {
+ /* Not an error */
+ if (pipe->Writers == 0)
+ goto out_locked;
+
+ if (LIBIO_NODELAY(iop)) {
+ ret = -EAGAIN;
+ goto out_locked;
+ }
+
+ /* Wait until pipe is no more empty or no writer exists */
+ pipe->waitingReaders ++;
+ PIPE_UNLOCK(pipe);
+ if (! PIPE_READWAIT(pipe))
+ ret = -EINTR;
+ if (! PIPE_LOCK(pipe)) {
+ /* WARN waitingReaders not restored! */
+ ret = -EINTR;
+ goto out_nolock;
+ }
+ pipe->waitingReaders --;
+ if (ret != 0)
+ goto out_locked;
+ }
+
+ /* Read chunk bytes */
+ chunk = MIN(count - read, pipe->Length);
+ chunk1 = pipe->Size - pipe->Start;
+ if (chunk > chunk1) {
+ memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk1);
+ memcpy(buffer + read + chunk1, pipe->Buffer, chunk - chunk1);
+ }
+ else
+ memcpy(buffer + read, pipe->Buffer + pipe->Start, chunk);
+
+ pipe->Start += chunk;
+ pipe->Start %= pipe->Size;
+ pipe->Length -= chunk;
+ /* For buffering optimization */
+ if (PIPE_EMPTY(pipe))
+ pipe->Start = 0;
+
+ if (pipe->waitingWriters > 0)
+ PIPE_WAKEUPWRITERS(pipe);
+ read += chunk;
+ }
+
+out_locked:
+ PIPE_UNLOCK(pipe);
+
+out_nolock:
+ if (read > 0)
+ return read;
+ return ret;
+}
+
+/*
+ * Interface to file system write.
+ */
+ssize_t pipe_write(
+ pipe_control_t *pipe,
+ const void *buffer,
+ size_t count,
+ rtems_libio_t *iop
+)
+{
+ int chunk, chunk1, written = 0, ret = 0;
+
+ /* Write nothing */
+ if (count == 0)
+ return 0;
+
+ if (! PIPE_LOCK(pipe))
+ return -EINTR;
+
+ if (pipe->Readers == 0) {
+ ret = -EPIPE;
+ goto out_locked;
+ }
+
+ /* Write of PIPE_BUF bytes or less shall not be interleaved */
+ chunk = count <= pipe->Size ? count : 1;
+
+ while (written < count) {
+ while (PIPE_SPACE(pipe) < chunk) {
+ if (LIBIO_NODELAY(iop)) {
+ ret = -EAGAIN;
+ goto out_locked;
+ }
+
+ /* Wait until there is chunk bytes space or no reader exists */
+ pipe->waitingWriters ++;
+ PIPE_UNLOCK(pipe);
+ if (! PIPE_WRITEWAIT(pipe))
+ ret = -EINTR;
+ if (! PIPE_LOCK(pipe)) {
+ /* WARN waitingWriters not restored! */
+ ret = -EINTR;
+ goto out_nolock;
+ }
+ pipe->waitingWriters --;
+ if (ret != 0)
+ goto out_locked;
+
+ if (pipe->Readers == 0) {
+ ret = -EPIPE;
+ goto out_locked;
+ }
+ }
+
+ chunk = MIN(count - written, PIPE_SPACE(pipe));
+ chunk1 = pipe->Size - PIPE_WSTART(pipe);
+ if (chunk > chunk1) {
+ memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk1);
+ memcpy(pipe->Buffer, buffer + written + chunk1, chunk - chunk1);
+ }
+ else
+ memcpy(pipe->Buffer + PIPE_WSTART(pipe), buffer + written, chunk);
+
+ pipe->Length += chunk;
+ if (pipe->waitingReaders > 0)
+ PIPE_WAKEUPREADERS(pipe);
+ written += chunk;
+ /* Write of more than PIPE_BUF bytes can be interleaved */
+ chunk = 1;
+ }
+
+out_locked:
+ PIPE_UNLOCK(pipe);
+
+out_nolock:
+#ifdef RTEMS_POSIX_API
+ /* Signal SIGPIPE */
+ if (ret == -EPIPE)
+ kill(getpid(), SIGPIPE);
+#endif
+
+ if (written > 0)
+ return written;
+ return ret;
+}
+
+/*
+ * Interface to file system ioctl.
+ */
+int pipe_ioctl(
+ pipe_control_t *pipe,
+ uint32_t cmd,
+ void *buffer,
+ rtems_libio_t *iop
+)
+{
+ if (cmd == FIONREAD) {
+ if (buffer == NULL)
+ return -EFAULT;
+
+ if (! PIPE_LOCK(pipe))
+ return -EINTR;
+
+ /* Return length of pipe */
+ *(unsigned int *)buffer = pipe->Length;
+ PIPE_UNLOCK(pipe);
+ return 0;
+ }
+
+ return -EINVAL;
+}
+
+/*
+ * Interface to file system lseek.
+ */
+int pipe_lseek(
+ pipe_control_t *pipe,
+ off_t offset,
+ int whence,
+ rtems_libio_t *iop
+)
+{
+ /* Seek on pipe is not supported */
+ return -ESPIPE;
+}