summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoel Sherrill <joel.sherrill@OARcorp.com>2000-01-13 19:25:15 +0000
committerJoel Sherrill <joel.sherrill@OARcorp.com>2000-01-13 19:25:15 +0000
commit53fb837afc4285486e318bcb614c911bbe9b1348 (patch)
treecdd9b6ff2a66e8f5c746a06d1e639be4c01c6941
parentee4ddd83616637c4fbbafe7cbb6148c08832309b (diff)
downloadrtems-53fb837afc4285486e318bcb614c911bbe9b1348.tar.bz2
POSIX message queues now include complete functionality including
blocking sends when the queue is full. The SuperCore was enhanced to support blocking on send. The existing POSIX API was debugged and numerous test cases were added to psxmsgq01 by Jennifer Averett. SuperCore enhancements and resulting modifications to other APIs were done by Joel. There is one significant point of interpretation for the POSIX API. What happens to threads already blocked on a message queue when the mode of that same message queue is changed from blocking to non-blocking? We decided to unblock all waiting tasks with an EAGAIN error just as if a non-blocking version of the same operation had returned unsatisfied. This case is not discussed in the POSIX standard and other implementations may have chosen differently.
Diffstat (limited to '')
-rw-r--r--c/src/exec/itron/src/cre_mbf.c10
-rw-r--r--c/src/exec/itron/src/snd_mbx.c12
-rw-r--r--c/src/exec/itron/src/trcv_mbf.c2
-rw-r--r--c/src/exec/itron/src/tsnd_mbf.c14
-rw-r--r--c/src/exec/posix/include/rtems/posix/mqueue.h2
-rw-r--r--c/src/exec/rtems/src/msgqcreate.c14
-rw-r--r--c/src/exec/rtems/src/msgqreceive.c6
-rw-r--r--c/src/exec/rtems/src/msgqsubmit.c41
-rw-r--r--c/src/exec/score/include/rtems/score/coremsg.h44
-rw-r--r--c/src/exec/score/inline/rtems/score/coremsg.inl24
-rw-r--r--c/src/exec/score/macros/rtems/score/coremsg.inl10
-rw-r--r--c/src/exec/score/src/coremsgbroadcast.c19
-rw-r--r--c/src/exec/score/src/coremsgsubmit.c146
-rw-r--r--c/src/tests/psxtests/psxmsgq01/init.c1321
-rw-r--r--cpukit/itron/src/cre_mbf.c10
-rw-r--r--cpukit/itron/src/snd_mbx.c12
-rw-r--r--cpukit/itron/src/trcv_mbf.c2
-rw-r--r--cpukit/itron/src/tsnd_mbf.c14
-rw-r--r--cpukit/posix/include/rtems/posix/mqueue.h2
-rw-r--r--cpukit/rtems/src/msgqcreate.c14
-rw-r--r--cpukit/rtems/src/msgqreceive.c6
-rw-r--r--cpukit/rtems/src/msgqsubmit.c41
-rw-r--r--cpukit/score/include/rtems/score/coremsg.h44
-rw-r--r--cpukit/score/inline/rtems/score/coremsg.inl24
-rw-r--r--cpukit/score/macros/rtems/score/coremsg.inl10
-rw-r--r--cpukit/score/src/coremsgbroadcast.c19
-rw-r--r--cpukit/score/src/coremsgsubmit.c146
-rw-r--r--testsuites/psxtests/psxmsgq01/init.c1321
28 files changed, 2556 insertions, 774 deletions
diff --git a/c/src/exec/itron/src/cre_mbf.c b/c/src/exec/itron/src/cre_mbf.c
index ca139d53ae..0db738435e 100644
--- a/c/src/exec/itron/src/cre_mbf.c
+++ b/c/src/exec/itron/src/cre_mbf.c
@@ -25,7 +25,7 @@ ER cre_mbf(
T_CMBF *pk_cmbf
)
{
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
ITRON_Message_buffer_Control *the_message_buffer;
/*
@@ -57,16 +57,14 @@ ER cre_mbf(
}
if ( pk_cmbf->mbfatr & TA_TPRI )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
_CORE_message_queue_Initialize(
&the_message_buffer->message_queue,
OBJECTS_ITRON_MESSAGE_BUFFERS,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
pk_cmbf->bufsz / pk_cmbf->maxmsz,
pk_cmbf->maxmsz,
NULL /* Multiprocessing not supported */
diff --git a/c/src/exec/itron/src/snd_mbx.c b/c/src/exec/itron/src/snd_mbx.c
index f674583e7b..e02714465e 100644
--- a/c/src/exec/itron/src/snd_mbx.c
+++ b/c/src/exec/itron/src/snd_mbx.c
@@ -27,7 +27,6 @@ ER snd_msg(
{
register ITRON_Mailbox_Control *the_mailbox;
Objects_Locations location;
- CORE_message_queue_Status status = E_OK;
unsigned32 message_priority;
void *message_contents;
@@ -47,17 +46,22 @@ ER snd_msg(
message_priority = CORE_MESSAGE_QUEUE_SEND_REQUEST;
message_contents = pk_msg;
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_mailbox->message_queue,
&message_contents,
sizeof(T_MSG *),
the_mailbox->Object.id,
NULL, /* multiprocessing not supported */
- message_priority
+ message_priority,
+ FALSE, /* do not allow sender to block */
+ 0 /* no timeout */
);
break;
}
_ITRON_return_errorno(
- _ITRON_Mailbox_Translate_core_message_queue_return_code(status) );
+ _ITRON_Mailbox_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ )
+ );
}
diff --git a/c/src/exec/itron/src/trcv_mbf.c b/c/src/exec/itron/src/trcv_mbf.c
index a63f2a6748..0b2b89e612 100644
--- a/c/src/exec/itron/src/trcv_mbf.c
+++ b/c/src/exec/itron/src/trcv_mbf.c
@@ -32,7 +32,6 @@ ER trcv_mbf(
CORE_message_queue_Status status;
boolean wait;
Watchdog_Interval interval;
- CORE_message_queue_Submit_types core_priority;
interval = 0;
if (tmout == TMO_POL) {
@@ -62,7 +61,6 @@ ER trcv_mbf(
msg,
p_msgsz,
wait,
- &core_priority,
interval
);
_Thread_Enable_dispatch();
diff --git a/c/src/exec/itron/src/tsnd_mbf.c b/c/src/exec/itron/src/tsnd_mbf.c
index bc609dd298..0ed3b4f90e 100644
--- a/c/src/exec/itron/src/tsnd_mbf.c
+++ b/c/src/exec/itron/src/tsnd_mbf.c
@@ -33,7 +33,6 @@ ER tsnd_mbf(
Objects_Locations location;
Watchdog_Interval interval;
boolean wait;
- CORE_message_queue_Status status;
if (msgsz <= 0 || !msg)
return E_PAR;
@@ -50,8 +49,6 @@ ER tsnd_mbf(
if ( wait && _ITRON_Is_in_non_task_state() )
return E_CTX;
- assert( wait == FALSE );
-
the_message_buffer = _ITRON_Message_buffer_Get(mbfid, &location);
switch (location) {
case OBJECTS_REMOTE:
@@ -60,17 +57,20 @@ ER tsnd_mbf(
case OBJECTS_LOCAL:
/* XXX Submit needs to take into account blocking */
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_message_buffer->message_queue,
msg,
msgsz,
the_message_buffer->Object.id,
NULL,
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ interval /* timeout interval */
);
_Thread_Enable_dispatch();
- return
- _ITRON_Message_buffer_Translate_core_message_buffer_return_code(status);
+ return _ITRON_Message_buffer_Translate_core_message_buffer_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
/*
diff --git a/c/src/exec/posix/include/rtems/posix/mqueue.h b/c/src/exec/posix/include/rtems/posix/mqueue.h
index 1c1201fef1..e3f7a2f073 100644
--- a/c/src/exec/posix/include/rtems/posix/mqueue.h
+++ b/c/src/exec/posix/include/rtems/posix/mqueue.h
@@ -32,10 +32,8 @@ extern "C" {
typedef struct {
Objects_Control Object;
int process_shared;
- int flags;
boolean named;
boolean linked;
- boolean blocking;
int oflag;
unsigned32 open_count;
CORE_message_queue_Control Message_queue;
diff --git a/c/src/exec/rtems/src/msgqcreate.c b/c/src/exec/rtems/src/msgqcreate.c
index 395cbf3a02..24bc35993d 100644
--- a/c/src/exec/rtems/src/msgqcreate.c
+++ b/c/src/exec/rtems/src/msgqcreate.c
@@ -59,7 +59,7 @@ rtems_status_code rtems_message_queue_create(
)
{
register Message_queue_Control *the_message_queue;
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
void *handler;
#if defined(RTEMS_MULTIPROCESSING)
boolean is_global;
@@ -74,10 +74,10 @@ rtems_status_code rtems_message_queue_create(
return RTEMS_MP_NOT_CONFIGURED;
#endif
- if (count == 0)
+ if ( count == 0 )
return RTEMS_INVALID_NUMBER;
- if (max_message_size == 0)
+ if ( max_message_size == 0 )
return RTEMS_INVALID_SIZE;
#if defined(RTEMS_MULTIPROCESSING)
@@ -115,11 +115,9 @@ rtems_status_code rtems_message_queue_create(
the_message_queue->attribute_set = attribute_set;
if (_Attributes_Is_priority( attribute_set ) )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
handler = NULL;
#if defined(RTEMS_MULTIPROCESSING)
@@ -129,7 +127,7 @@ rtems_status_code rtems_message_queue_create(
if ( ! _CORE_message_queue_Initialize(
&the_message_queue->message_queue,
OBJECTS_RTEMS_MESSAGE_QUEUES,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
count,
max_message_size,
handler ) ) {
diff --git a/c/src/exec/rtems/src/msgqreceive.c b/c/src/exec/rtems/src/msgqreceive.c
index 1338216c6b..77fcadc313 100644
--- a/c/src/exec/rtems/src/msgqreceive.c
+++ b/c/src/exec/rtems/src/msgqreceive.c
@@ -92,12 +92,12 @@ rtems_status_code rtems_message_queue_receive(
buffer,
size,
wait,
- &core_priority,
timeout
);
_Thread_Enable_dispatch();
- return( _Message_queue_Translate_core_message_queue_return_code(
- _Thread_Executing->Wait.return_code ) );
+ return _Message_queue_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
diff --git a/c/src/exec/rtems/src/msgqsubmit.c b/c/src/exec/rtems/src/msgqsubmit.c
index 5a03f6409a..16f1c50266 100644
--- a/c/src/exec/rtems/src/msgqsubmit.c
+++ b/c/src/exec/rtems/src/msgqsubmit.c
@@ -61,7 +61,6 @@ rtems_status_code _Message_queue_Submit(
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
- CORE_message_queue_Status core_status;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location )
@@ -98,39 +97,43 @@ rtems_status_code _Message_queue_Submit(
case OBJECTS_LOCAL:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
- core_status = _CORE_message_queue_Send(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Send(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
case MESSAGE_QUEUE_URGENT_REQUEST:
- core_status = _CORE_message_queue_Urgent(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Urgent(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
default:
- core_status = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
return RTEMS_INTERNAL_ERROR; /* should never get here */
}
_Thread_Enable_dispatch();
return _Message_queue_Translate_core_message_queue_return_code(
- core_status );
+ _Thread_Executing->Wait.return_code
+ );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
diff --git a/c/src/exec/score/include/rtems/score/coremsg.h b/c/src/exec/score/include/rtems/score/coremsg.h
index 6ffedee0af..818b1e8c18 100644
--- a/c/src/exec/score/include/rtems/score/coremsg.h
+++ b/c/src/exec/score/include/rtems/score/coremsg.h
@@ -166,13 +166,12 @@ void _CORE_message_queue_Close(
);
/*
- *
* _CORE_message_queue_Flush
*
* DESCRIPTION:
*
- * This function flushes the message_queue's task wait queue. The number
- * messages flushed from the queue is returned.
+ * This function flushes the message_queue's pending message queue. The
+ * number of messages flushed from the queue is returned.
*
*/
@@ -194,7 +193,20 @@ unsigned32 _CORE_message_queue_Flush_support(
);
/*
+ * _CORE_message_queue_Flush_waiting_threads
+ *
+ * DESCRIPTION:
*
+ * This function flushes the threads which are blocked on this
+ * message_queue's pending message queue. They are unblocked whether
+ * blocked sending or receiving.
+ */
+
+void _CORE_message_queue_Flush_waiting_threads(
+ CORE_message_queue_Control *the_message_queue
+);
+
+/*
* _CORE_message_queue_Broadcast
*
* DESCRIPTION:
@@ -214,7 +226,6 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
);
/*
- *
* _CORE_message_queue_Submit
*
* DESCRIPTION:
@@ -228,17 +239,18 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
*
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
);
/*
- *
* _CORE_message_queue_Seize
*
* DESCRIPTION:
@@ -248,6 +260,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* inactive message pool. The thread will be blocked if wait is TRUE,
* otherwise an error will be given to the thread if no messages are available.
*
+ * NOTE: Returns message priority via return are in TCB.
*/
void _CORE_message_queue_Seize(
@@ -256,10 +269,25 @@ void _CORE_message_queue_Seize(
void *buffer,
unsigned32 *size,
boolean wait,
- CORE_message_queue_Submit_types *priority,
Watchdog_Interval timeout
);
+/*
+ * _CORE_message_queue_Insert_message
+ *
+ * DESCRIPTION:
+ *
+ * This kernel routine inserts the specified message into the
+ * message queue. It is assumed that the message has been filled
+ * in before this routine is called.
+ */
+
+void _CORE_message_queue_Insert_message(
+ CORE_message_queue_Control *the_message_queue,
+ CORE_message_queue_Buffer_control *the_message,
+ CORE_message_queue_Submit_types submit_type
+);
+
#ifndef __RTEMS_APPLICATION__
#include <rtems/score/coremsg.inl>
#endif
diff --git a/c/src/exec/score/inline/rtems/score/coremsg.inl b/c/src/exec/score/inline/rtems/score/coremsg.inl
index af16fbd4aa..7356ce9537 100644
--- a/c/src/exec/score/inline/rtems/score/coremsg.inl
+++ b/c/src/exec/score/inline/rtems/score/coremsg.inl
@@ -27,15 +27,17 @@
* This routine sends a message to the end of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Send(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -45,7 +47,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
@@ -58,15 +62,17 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
* This routine sends a message to the front of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Urgent(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -76,7 +82,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_URGENT_REQUEST
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
diff --git a/c/src/exec/score/macros/rtems/score/coremsg.inl b/c/src/exec/score/macros/rtems/score/coremsg.inl
index bc45a51ac3..0717b3eea3 100644
--- a/c/src/exec/score/macros/rtems/score/coremsg.inl
+++ b/c/src/exec/score/macros/rtems/score/coremsg.inl
@@ -23,9 +23,10 @@
*/
#define _CORE_message_queue_Send( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_SEND_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_SEND_REQUEST, (_wait), (_timeout)
/*PAGE
*
@@ -34,9 +35,10 @@ _id, _api_message_queue_mp_support ) \
*/
#define _CORE_message_queue_Urgent( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_URGENT_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST, (_wait), (_timeout)
/*PAGE
*
diff --git a/c/src/exec/score/src/coremsgbroadcast.c b/c/src/exec/score/src/coremsgbroadcast.c
index 2e6f649545..18e148ab1c 100644
--- a/c/src/exec/score/src/coremsgbroadcast.c
+++ b/c/src/exec/score/src/coremsgbroadcast.c
@@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
Thread_Wait_information *waitp;
unsigned32 constrained_size;
+ /*
+ * If there are pending messages, then there can't be threads
+ * waiting for us to send them a message.
+ *
+ * NOTE: This check is critical because threads can block on
+ * send and receive and this ensures that we are broadcasting
+ * the message to threads waiting to receive -- not to send.
+ */
+
+ if ( the_message_queue->number_of_pending_messages != 0 ) {
+ *count = 0;
+ return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ }
+
+ /*
+ * There must be no pending messages if there is a thread waiting to
+ * receive a message.
+ */
+
number_broadcasted = 0;
while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
waitp = &the_thread->Wait;
diff --git a/c/src/exec/score/src/coremsgsubmit.c b/c/src/exec/score/src/coremsgsubmit.c
index 4e15ab5bc0..6829351c18 100644
--- a/c/src/exec/score/src/coremsgsubmit.c
+++ b/c/src/exec/score/src/coremsgsubmit.c
@@ -53,102 +53,110 @@
* error code - if unsuccessful
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
+ ISR_Level level;
CORE_message_queue_Buffer_control *the_message;
Thread_Control *the_thread;
+ Thread_Control *executing;
- if ( size > the_message_queue->maximum_message_size )
- return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+
+ if ( size > the_message_queue->maximum_message_size ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ return;
+ }
/*
- * Is there a thread currently waiting on this message queue?
+ * Is there a thread currently waiting on this message queue?
*/
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( the_thread ) {
- _CORE_message_queue_Copy_buffer(
- buffer,
- the_thread->Wait.return_argument,
- size
- );
- *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
- the_thread->Wait.count = submit_type;
+ if ( the_message_queue->number_of_pending_messages == 0 ) {
+ the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
+ if ( the_thread ) {
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_thread->Wait.return_argument,
+ size
+ );
+ *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
+ the_thread->Wait.count = submit_type;
#if defined(RTEMS_MULTIPROCESSING)
- if ( !_Objects_Is_local_id( the_thread->Object.id ) )
- (*api_message_queue_mp_support) ( the_thread, id );
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+ (*api_message_queue_mp_support) ( the_thread, id );
#endif
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ return;
+ }
}
/*
- * No one waiting on this one currently.
- * Allocate a message buffer and store it away
+ * No one waiting on the message queue at this time, so attempt to
+ * queue the message up for a future receive.
*/
- if ( the_message_queue->number_of_pending_messages ==
+ if ( the_message_queue->number_of_pending_messages <
the_message_queue->maximum_pending_messages ) {
- return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
- }
- the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue);
- if ( the_message == 0 )
- return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
-
- _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
- the_message->Contents.size = size;
- the_message->priority = submit_type;
-
- the_message_queue->number_of_pending_messages += 1;
-
- switch ( submit_type ) {
- case CORE_MESSAGE_QUEUE_SEND_REQUEST:
- _CORE_message_queue_Append( the_message_queue, the_message );
- break;
- case CORE_MESSAGE_QUEUE_URGENT_REQUEST:
- _CORE_message_queue_Prepend( the_message_queue, the_message );
- break;
- default:
- /* XXX interrupt critical section needs to be addressed */
- {
- CORE_message_queue_Buffer_control *this_message;
- Chain_Node *the_node;
-
- the_message->priority = submit_type;
- for ( the_node = the_message_queue->Pending_messages.first ;
- !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ;
- the_node = the_node->next ) {
-
- this_message = (CORE_message_queue_Buffer_control *) the_node;
-
- if ( this_message->priority >= the_message->priority )
- continue;
-
- _Chain_Insert( the_node, &the_message->Node );
- break;
- }
- }
- break;
+ the_message =
+ _CORE_message_queue_Allocate_message_buffer( the_message_queue );
+
+ /*
+ * NOTE: If the system is consistent, this error should never occur.
+ */
+ if ( !the_message ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
+ return;
+ }
+
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_message->Contents.buffer,
+ size
+ );
+ the_message->Contents.size = size;
+ the_message->priority = submit_type;
+
+ _CORE_message_queue_Insert_message(
+ the_message_queue,
+ the_message,
+ submit_type
+ );
+ return;
}
/*
- * According to POSIX, does this happen before or after the message
- * is actually enqueued. It is logical to think afterwards, because
- * the message is actually in the queue at this point.
+ * No message buffers were available so we may need to return an
+ * overflow error or block the sender until the message is placed
+ * on the queue.
*/
- if ( the_message_queue->number_of_pending_messages == 1 &&
- the_message_queue->notify_handler )
- (*the_message_queue->notify_handler)( the_message_queue->notify_argument );
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ if ( !wait ) {
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
+ return;
+ }
+
+ executing = _Thread_Executing;
+
+ _ISR_Disable( level );
+ _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue );
+ executing->Wait.queue = &the_message_queue->Wait_queue;
+ executing->Wait.id = id;
+ executing->Wait.return_argument = (void *)buffer;
+ executing->Wait.return_argument_1 = (void *)size;
+ executing->Wait.count = submit_type;
+ _ISR_Enable( level );
+
+ _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
}
diff --git a/c/src/tests/psxtests/psxmsgq01/init.c b/c/src/tests/psxtests/psxmsgq01/init.c
index d40ab18a2f..534f6c6dcb 100644
--- a/c/src/tests/psxtests/psxmsgq01/init.c
+++ b/c/src/tests/psxtests/psxmsgq01/init.c
@@ -15,13 +15,95 @@
#include <fcntl.h>
#include <time.h>
#include <tmacros.h>
+#include <signal.h> /* signal facilities */
+
+typedef struct {
+ char msg[ 50 ];
+ int size;
+ unsigned int priority;
+}Test_Message_t;
+Test_Message_t Predefined_Msgs[MAXMSG+1];
+Test_Message_t Predefined_Msgs[MAXMSG+1] = {
+ { "12345678", 9, MQ_PRIO_MAX-1 }, /* Max Length Message med */
+ { "", 1, 1 }, /* NULL Message low */
+ { "Last", 5, MQ_PRIO_MAX }, /* Queue Full Message hi */
+ { "No Message", 0, MQ_PRIO_MAX-1 }, /* 0 length Message med */
+ { "1", 2, 0 }, /* Cause Overflow Behavior */
+};
+int Priority_Order[MAXMSG+1] = { 2, 0, 3, 1, MAXMSG };
+
+
+typedef struct {
+ mqd_t mq;
+ Test_Queue_Types index;
+ char *name;
+ int oflag;
+ int maxmsg;
+ int msgsize;
+ int count;
+} Test_queue_type;
+
+Test_queue_type Test_q[ NUMBER_OF_TEST_QUEUES ] =
+{
+ { 0, 0, "Qread", ( O_CREAT | O_RDONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 1, "Qwrite", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 2, "Qnoblock", ( O_CREAT | O_RDWR | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 3, "Qblock", ( O_CREAT | O_RDWR ) , MAXMSG, MSGSIZE, 0 },
+ { 0, 4, "Qdefault", ( O_CREAT | O_RDWR ) , 10, 16, 0 },
+ { 0, 5, "mq6", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+};
+
+#define RW_NAME Test_q[ RW_QUEUE ].name
+#define DEFAULT_NAME Test_q[ DEFAULT_RW ].name
+#define RD_NAME Test_q[ RD_QUEUE ].name
+#define WR_NAME Test_q[ WR_QUEUE ].name
+#define BLOCKING_NAME Test_q[ BLOCKING ].name
+#define CLOSED_NAME Test_q[ CLOSED ].name
+
+#define RW_ATTR Test_q[ RW_QUEUE ].oflag
+#define DEFAULT_ATTR Test_q[ DEFAULT_RW ].oflag
+#define RD_ATTR Test_q[ RD_QUEUE ].oflag
+#define WR_ATTR Test_q[ WR_QUEUE ].oflag
+#define BLOCK_ATTR Test_q[ BLOCKING ].oflag
+#define CLOSED_ATTR Test_q[ CLOSED ].oflag
-char Queue_Name[PATH_MAX + 2];
-char *Get_Queue_Name(
- int i
+/*
+ * Outputs a header at each test section.
+ */
+void Start_Test(
+ char *description
)
{
- sprintf(Queue_Name,"mq%d",i+1);
+ printf( "_______________%s\n", description );
+}
+
+
+void Validate_attributes(
+ mqd_t mq,
+ int oflag,
+ int msg_count
+)
+{
+ int status;
+ struct mq_attr attr;
+
+ status = mq_getattr( mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+
+ if ( mq != Test_q[ DEFAULT_RW ].mq ){
+ fatal_int_service_status((int)attr.mq_maxmsg, MAXMSG, "maxmsg attribute" );
+ fatal_int_service_status((int)attr.mq_msgsize,MSGSIZE,"msgsize attribute");
+ }
+
+ fatal_int_service_status((int)attr.mq_curmsgs, msg_count, "count attribute" );
+ fatal_int_service_status((int)attr.mq_flags, oflag, "flag attribute" );
+}
+
+char Queue_Name[PATH_MAX + 2];
+#define Get_Queue_Name( i ) Test_q[i].name
+
+char *Build_Queue_Name( int i ) {
+ sprintf(Queue_Name,"mq%d", i+1 );
return Queue_Name;
}
@@ -35,354 +117,1010 @@ char *Get_Too_Long_Name()
return Queue_Name;
}
-typedef enum {
- DEFAULT_SIZE_TYPE,
- TEST_SIZE_TYPE,
- MAX_SIZE,
- TYPES_OF_TEST_SIZES
-} TEST_MQ_SIZE_TYPES;
+void open_test_queues()
+{
+ struct mq_attr attr;
+ int status;
+ Test_queue_type *tq;
+ int que;
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
+
+ puts( "Init: Open Test Queues" );
+
+ for( que = 0; que < NUMBER_OF_TEST_QUEUES; que++ ) {
+
+ tq = &Test_q[ que ];
+ if ( que == DEFAULT_RW)
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, NULL );
+ else
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, &attr );
+
+ assert( Test_q[que].mq != (-1) );
+ }
+
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+}
/*
* Opens CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES then leaves size queues
* opened but closes the rest.
*/
-void validate_mq_open_error_codes(
- mqd_t *mqs, /* Must be large enough for Maximum to be opened. */
- int size
-)
+void validate_mq_open_error_codes()
{
int i;
mqd_t n_mq2;
struct mq_attr attr;
int status;
+ mqd_t open_mq[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
- assert( size < (CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES-1) );
+ Start_Test( "mq_open errors" );
/*
- * Validate mq_open errors that can occur when no queues are open.
- * EINVAL
- * ENOENT
- * EINTR
+ * XXX EINVAL - inappropriate name was given for the message queue
*/
/*
- * XXX EINVAL - inappropriate name was given for the message queue
+ * EINVAL - Create with negative maxmsg.
*/
attr.mq_maxmsg = -1;
- puts( "mq_open - Create with maxmsg (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with maxmsg (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_maxmsg = MAXMSG;
+
+ /*
+ * EINVAL - Create withnegative msgsize.
+ */
attr.mq_msgsize = -1;
- puts( "mq_open - Create with msgsize (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with msgsize (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_msgsize = MSGSIZE;
+
+ /*
+ * ENOENT - Open a non-created file.
+ */
- puts( "mq_open - Open new mq without create flag (ENOENT)" );
- n_mq2 = mq_open("mq3", O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Open new mq without create flag (ENOENT)" );
+ n_mq2 = mq_open( "mq3", O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENOENT, "mq_open errno ENOENT");
+ fatal_posix_service_status( errno, ENOENT, "mq_open errno ENOENT");
+
/*
* XXX EINTR - call was interrupted by a signal
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_open or mq_open is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_open - Open with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_open - Open with too long of a name (ENAMETOOLONG)" );
n_mq2 = mq_open( Get_Too_Long_Name(), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
/*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
+
+ /*
* Open maximum number of message queues
*/
- puts( "mq_open - SUCCESSFUL" );
+ puts( "Init: mq_open - SUCCESSFUL" );
for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- mqs[i] = mq_open( Get_Queue_Name(i), O_CREAT | O_RDWR, 0x777, NULL );
- assert( mqs[i] != (-1) );
+ open_mq[i] = mq_open(
+ Build_Queue_Name(i), O_CREAT | O_RDWR | O_NONBLOCK, 0x777, NULL );
+ assert( open_mq[i] != (-1) );
/*XXX - Isn't there a more general check */
}
/*
- * Validate open errors that must occur after message queues are open.
- * EACCES
- * EEXIST
- * EMFILE
- * ENFILE
+ * XXX EACCES - permission to create is denied.
*/
/*
- * XXX EACCES - permission to create is denied.
+ * XXX EACCES - queue exists permissions specified by o_flag are denied.
*/
/*
- * XXX EACCES - queue exists permissions specified by o_flag are denied.
- puts( "mq_open - open mq as write (EACCES)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_WRONLY, 0x777, NULL);
- fatal_directive_status(
- (int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EACCES, "mq_open errno EACCES");
+ * EEXIST - Create an existing queue.
*/
- puts( "mq_open - Create an Existing mq (EEXIST)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Create an Existing mq (EEXIST)" );
+ n_mq2 = mq_open(
+ Build_Queue_Name(0), O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EEXIST, "mq_open errno EEXIST");
+ fatal_posix_service_status( errno, EEXIST, "mq_open errno EEXIST");
+ /*
+ * XXX EMFILE - Too many message queues in use by the process
+ */
/*
- * XXX EMFILE - Too many message queues open
+ * ENFILE - Too many message queues open in the system
*/
- puts( "mq_open - system is out of resources (ENFILE)" );
- n_mq2 = mq_open( Get_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ puts( "Init: mq_open - system is out of resources (ENFILE)" );
+ n_mq2 = mq_open( Build_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENFILE, "mq_open errno ENFILE");
+ fatal_posix_service_status( errno, ENFILE, "mq_open errno ENFILE");
/*
- * Unlink and Close .
+ * Unlink and Close all queues.
*/
- puts( "mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
- for (i = size; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
+ puts( "Init: mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
+ for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- status = mq_close( mqs[i] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ status = mq_close( open_mq[i]);
+ fatal_posix_service_status( status, 0, "mq_close message queue");
- status = mq_unlink( Get_Queue_Name(i) );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ status = mq_unlink( Build_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
}
}
-void validate_mq_unlink_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_unlink_error_codes()
{
int status;
+ Start_Test( "mq_unlink errors" );
+
/*
* XXX - EACCES Permission Denied
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_unlink or mq_unlink is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
status = mq_unlink( Get_Too_Long_Name() );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink(Get_Queue_Name(size));
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
+ /*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
/*
- * XXX - These errors are not in the POSIX manual but may occur.
+ * ENOENT - Unlink an unopened queue
*/
- puts( "mq_unlink (NULL) - EINVAL" );
+ puts( "Init: mq_unlink - A Queue not opened (ENOENT)" );
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENOENT, "mq_unlink errno ENOENT");
+
+ /*
+ * XXX - The following were not listed in the POSIX document as
+ * possible errors. Under other commands the EINVAL is
+ * given for these conditions.
+ */
+
+ /*
+ * EINVAL - Unlink a queue with no name
+ */
+
+ puts( "Init: mq_unlink (NULL) - EINVAL" );
status = mq_unlink( NULL );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
- puts( "mq_unlink (\"\") - EINVAL" );
+ /*
+ * EINVAL - Unlink a queue with a null name
+ */
+
+ puts( "Init: mq_unlink (\"\") - EINVAL" );
status = mq_unlink( "" );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
}
-void validate_mq_close_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_close_error_codes()
{
int status;
- puts( "mq_close - UNSUCCESSFUL (EBADF)" );
- status = mq_close(mqs[size]);
- fatal_directive_status( status, -1, "mq_close error return status");
- fatal_directive_status( errno, EBADF, "mq_close errno EBADF");
+ Start_Test( "mq_close errors" );
+
+ /*
+ * EBADF - Close a queue that is not open.
+ */
+
+ puts( "Init: mq_close - unopened queue (EBADF)" );
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
}
+
+void validate_mq_getattr_error_codes()
+{
+ struct mq_attr attr;
+ int status;
+
+ Start_Test( "mq_getattr errors" );
+
+ /*
+ * EBADF - Get the attributes from a closed queue.
+ */
+
+ puts( "Init: mq_getattr - unopened queue (EBADF)" );
+ status = mq_getattr( Test_q[CLOSED].mq, &attr );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Init: mq_getattr - NULL attributes (EINVAL)" );
+ status = mq_getattr( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_close errno EINVAL");
+
+}
+
+
+void Send_msg_to_que(
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ int status;
+
+ status = mq_send( Test_q[que].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, 0, "mq_send valid return status");
+ Test_q[que].count++;
+}
+
+void Show_send_msg_to_que(
+ char *task_name,
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ printf( "%s mq_send - to %s msg: %s priority %d\n",
+ task_name, Test_q[que].name, ptr->msg, ptr->priority);
+ Send_msg_to_que( que, msg );
+}
+
+void verify_queues_full(
+ char *task_name
+)
+{
+ int que;
+
+ /*
+ * Validate that the queues are full.
+ */
+
+ printf( "%s Verify Queues are full\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, Test_q[que].count );
+
+}
+void verify_queues_empty(
+ char *task_name
+)
+{
+ int que;
+
+ printf( "%s Verify Queues are empty\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, 0 );
+}
+
+int fill_message_queues(
+ char *task_name
+)
+{
+ int msg;
+ int status;
+ int que;
+
+
+ verify_queues_empty( task_name );
+
+ /*
+ * Fill Queue with predefined messages.
+ */
+
+ printf( "%s Fill Queues with messages\n", task_name );
+ for(msg=0; msg<MAXMSG; msg++){
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ Send_msg_to_que( que, msg );
+ }
+ }
+
+ verify_queues_full( "Init:" );
+ return msg;
+}
+
+
+void Read_msg_from_que(
+ int que,
+ int msg
+)
+{
+ unsigned int priority;
+ Test_Message_t *ptr;
+ int status;
+ char message[100];
+ char err_msg[100];
+
+ ptr = &Predefined_Msgs[msg];
+ status = mq_receive(Test_q[ que ].mq, message, 100, &priority );
+ Test_q[que].count--;
+
+ sprintf( err_msg, "%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status( status, ptr->size, err_msg );
+
+ assert( !strcmp( message, ptr->msg ) );
+ strcpy( message, "No Message" );
+
+ sprintf( err_msg,"%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status(priority, ptr->priority, err_msg );
+}
+
+int empty_message_queues(
+ char *task_name
+)
+{
+ int que;
+ int i;
+
+ printf( "%s Empty all Queues\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ for(i=0; Test_q[que].count != 0; i++ )
+ Read_msg_from_que( que, Priority_Order[i] );
+
+ Validate_attributes( Test_q[ que].mq, Test_q[ que ].oflag, 0 );
+ }
+ return 0;
+}
+
/*
* Returns the number of messages queued after the test on the
* first queue.
*/
-int validate_mq_send_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+int validate_mq_send_error_codes( )
{
int status;
int i;
- mqd_t n_mq1;
- struct mq_attr attr;
+ char *str;
- attr.mq_maxmsg = 3;
- attr.mq_msgsize = 8;
+ Start_Test( "mq_send errors" );
/*
- * XXX - EBADF Not a valid message descriptor.
- * Write to a invalid message descriptor
- * XXX - Write to a read only queue
+ * EBADF - Write to a closed queue.
*/
- puts( "mq_send - Closed message queue (EBADF)" );
- status = mq_send( mqs[size], "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ puts( "Init: mq_send - Closed message queue (EBADF)" );
+ status = mq_send( Test_q[CLOSED].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_open - Open a read only queue" );
- n_mq1 = mq_open("read_only", O_CREAT | O_RDONLY, 0x777, &attr);
- assert( n_mq1 != (-1) );
- /*XXX - Isn't there a more general check */
+ /*
+ * EBADF - Write to a read only queue.
+ */
+
+ puts( "Init: mq_send - Read only message queue (EBADF)" );
+ status = mq_send( Test_q[ RD_QUEUE ].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_send - Read only message queue (EBADF)" );
- status = mq_send( n_mq1, "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ /*
+ * XXX - EINTR Signal interrupted the call.
+ *
+ puts( "Init: mq_send - UNSUCCESSFUL (EINTR)" );
+ status = mq_send( Test_q, "", 0xffff, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, E, "mq_send errno E");
+ */
+
+ /*
+ * EINVAL priority is out of range.
+ */
+
+ puts( "Init: mq_send - Priority out of range (EINVAL)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", 1, MQ_PRIO_MAX + 1 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_send errno EINVAL");
+
+ /*
+ * EMSGSIZE - Message size larger than msg_len
+ * Validates that msgsize is stored correctly.
+ */
+
+ puts( "Init: mq_send - Message longer than msg_len (EMSGSIZE)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", MSGSIZE+1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+
+ i = fill_message_queues( "Init:" );
+
+ /*
+ * ENOSYS - send not supported
+ puts( "Init: mq_send - Blocking Queue overflow (ENOSYS)" );
+ status = mq_send( n_mq1, Predefined_Msgs[i], 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
+ fatal_posix_service_status( status, 0, "mq_close message queue");
status = mq_unlink( "read_only" );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+ */
/*
- * XXX - EINTR
- * Signal interrupted the call.
+ * EAGAIN - O_NONBLOCK and message queue is full.
+ */
+
+ puts( "Init: mq_send - on a FULL non-blocking queue with (EAGAIN)" );
+ str = Predefined_Msgs[i].msg;
+ status = mq_send(Test_q[RW_QUEUE].mq, str, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
- puts( "mq_send - UNSUCCESSFUL (EINTR)" );
- status = mq_send( mqs, "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ return i-1;
+}
+
+void validate_mq_receive_error_codes( )
+{
+ int status;
+ char message[100];
+ unsigned int priority;
+ int i;
+
+ Start_Test( "mq_receive errors" );
+
+ /*
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_receive - Unopened message queue (EBADF)" );
+ status = mq_receive( Test_q[CLOSED].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * XXX - EINVAL priority is out of range.
+ * EBADF - Queue not opened to read
*/
- puts( "mq_send - Priority out of range (EINVAL)" );
- status = mq_send( mqs[0], "", 1, MQ_PRIO_MAX + 1 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EINVAL, "mq_send errno EINVAL");
+ puts( "Init: mq_receive - Write only queue (EBADF)" );
+ status = mq_receive( Test_q[WR_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
/*
- * XXX - EMSGSIZE - Message size larger than msg_len
+ * EMSGSIZE - Size is less than the message size attribute
*/
- puts( "mq_send - Message longer than msg_len (EMSGSIZE)" );
- status = mq_send( mqs[0], "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+ puts( "Init: mq_receive - Size is less than the message (EMSGSIZE)" );
+ status = mq_receive(
+ Test_q[RW_QUEUE].mq, message, Predefined_Msgs[0].size-1, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_receive errno EMSGSIZE");
+
/*
- * ENOSYS - send is supported should never happen.
+ * EAGAIN - O_NONBLOCK and Queue is empty
*/
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+ puts( "Init: mq_receive - Queue is empty (EAGAIN)" );
+ status = mq_receive( Test_q[RW_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
/*
- * XXX - EAGAIN
- * O_NONBLOCK and message queue is full.
- * This is validated in the read/write test.
+ * XXX - EINTR - Interrupted by a signal
*/
- i=0;
- do {
- status = mq_send( mqs[0], "", 1, 0 );
- i++;
- } while (status == 0);
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ /*
+ * XXX - EBADMSG - a data corruption problem.
+ */
- return i-1;
+ /*
+ * XXX - ENOSYS - mq_receive not supported
+ */
+}
+
+void verify_open_functionality()
+{
+ mqd_t n_mq;
+
+ Start_Test( "mq_open functionality" );
+
+ /*
+ * Validate a second open returns the same message queue.
+ */
+
+ puts( "Init: mq_open - Open an existing mq ( same id )" );
+ n_mq = mq_open( RD_NAME, 0 );
+ fatal_posix_service_status(
+ (int) n_mq, (int ) Test_q[RD_QUEUE].mq, "mq_open error return status" );
+}
+
+void verify_unlink_functionality()
+{
+ mqd_t n_mq;
+ int status;
+
+ Start_Test( "mq_unlink functionality" );
+
+ /*
+ * Unlink the message queue, then verify an open of the same name produces a
+ * different message queue.
+ */
+
+ puts( "Init: Unlink and Open without closing SUCCESSFUL" );
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+
+ n_mq = mq_open( DEFAULT_NAME, DEFAULT_ATTR, 0x777, NULL );
+ assert( n_mq != (-1) );
+ assert( n_mq != Test_q[ DEFAULT_RW ].mq );
+
+
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+ status = mq_close( Test_q[ DEFAULT_RW ].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+
+ Test_q[ DEFAULT_RW ].mq = n_mq;
}
-void validate_mq_receive_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
+void verify_close_functionality()
+{
+ int i;
+ int status;
+ Start_Test( "Unlink and Close All Files" );
+ for (i=0; i<DEFAULT_RW; i++) {
+
+ status = mq_unlink( Get_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+
+ status = mq_close( Test_q[i].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ }
+}
+
+
+void verify_timed_send_queue(
+ int que,
+ int is_blocking
+)
+{
+ int i;
+ struct timespec timeout;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int len;
+ int status;
+ char *msg;
+
+ timeout.tv_sec = 1;
+ timeout.tv_nsec = 0;
+
+ printf( "Init: mq_timedsend - on queue %s ", Test_q[que].name);
+ len = Predefined_Msgs[MAXMSG].size;
+ msg = Predefined_Msgs[MAXMSG].msg;
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedsend( Test_q[que].mq, msg, len , 0, &timeout );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ if ( is_blocking ) { /* Don't verify the non-blocking queue */
+ fatal_int_service_status( status, -1, "mq_timedsend status");
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ }
+
+ printf("Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking ) /* non-blocking queue */
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+
+ if ( que == DEFAULT_RW )
+ Test_q[que].count++;
+}
+
+void verify_timed_send()
+{
+ int que;
+
+ Start_Test( "mq_timedsend" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if ( que == BLOCKING )
+ verify_timed_send_queue( que, 1 );
+ else
+ verify_timed_send_queue( que, 0 );
+ }
+}
+
+void verify_timed_receive_queue(
+ char *task_name,
+ int que,
+ int is_blocking
)
{
+ char message[ 100 ];
+ unsigned int priority;
+ struct timespec tm;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int status;
+
+ tm.tv_sec = 1;
+ tm.tv_nsec = 0;
+
+ printf( "Init: %s mq_timedreceive - on queue %s ", task_name, Test_q[que].name);
+
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedreceive( Test_q[ que ].mq, message, 100, &priority, &tm );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ fatal_int_service_status( status, -1, "mq_timedreceive status");
+ if ( is_blocking )
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ printf( "Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking )
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+}
+
+
+
+void verify_timed_receive()
+{
+ int que;
+
+ Start_Test( "mq_timedreceive" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if (( que == BLOCKING ) || ( que == DEFAULT_RW ))
+ verify_timed_receive_queue( "Init:", que, 1 );
+ else
+ verify_timed_receive_queue( "Init:", que, 0 );
+ }
+}
+
+#if (0)
+void verify_set_attr()
+{
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ struct mq_attr attr;
+ int i;
+ int status;
+
+ attr.mq_maxmsg = 0;
+ attr.mq_msgsize = 0;
+
+ Start_Test( "mq_setattr" );
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
+ }
+}
+#endif
+
+void wait_for_signal(
+ sigset_t *waitset,
+ int sec,
+ int expect_signal
+)
+{
+ siginfo_t siginfo;
+ int status;
+ struct timespec timeout;
+ int signo;
+
+ siginfo.si_code = -1;
+ siginfo.si_signo = -1;
+ siginfo.si_value.sival_int = -1;
+
+ timeout.tv_sec = sec;
+ timeout.tv_nsec = 0;
+
+ status = sigemptyset( waitset );
+ assert( !status );
+
+ status = sigaddset( waitset, SIGUSR1 );
+ assert( !status );
+
+ printf( "waiting on any signal for %d seconds.\n", sec );
+ signo = sigtimedwait( waitset, &siginfo, &timeout );
+ if (expect_signal) {
+ fatal_int_service_status( signo, SIGUSR1, "got SISUSR1" );
+ } else {
+ fatal_int_service_status( signo, -1, "error return status");
+ fatal_posix_service_status( errno, EAGAIN, "errno EAGAIN");
+ }
+}
+
+void verify_notify()
+{
+ struct sigevent event;
int status;
+ timer_t timer_id;
+ sigset_t set;
+ Test_Message_t *ptr;
+
+ Start_Test( "mq_notify" );
+
+ /* timer create */
+ event.sigev_notify = SIGEV_SIGNAL;
+ event.sigev_signo = SIGUSR1;
+ if (timer_create (CLOCK_REALTIME, &event, &timer_id) == -1)
+ fatal_posix_service_status( errno, 0, "errno ETIMEDOUT");
+
+ /* block the timer signal */
+ sigemptyset( &set );
+ sigaddset( &set, SIGUSR1 );
+ pthread_sigmask( SIG_BLOCK, &set, NULL );
/*
- * EAGAIN -
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_notify - Unopened message queue (EBADF)" );
+ status = mq_notify( Test_q[CLOSED].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * EBADF -
+ * Create ...
*/
/*
- * EMSGSIZE -
+ * XXX setup notification
+ */
+
+ printf( "_____mq_notify - notify when %s gets a message\n",RW_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ /*
+ * Send and verify signal occurs and registration is removed.
+ */
+
+ puts( "Init: Verify Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 1 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+
+ /*
+ * EBUSY - Already Registered
+ */
+
+ printf( "____mq_notify - notify when %s gets a message\n",RD_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ puts( "Init: mq_notify - (EBUSY)" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, -1, "mq_notify error return status");
+ fatal_posix_service_status( errno, EBUSY, "mq_notify errno EBUSY");
+
+ /*
+ * Verify NULL removes registration.
+ */
+
+ puts( "Init: mq_notify - Remove notification with null" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+}
+
+void verify_with_threads()
+{
+ int status;
+ pthread_t id;
+ Test_Message_t *ptr;
+ unsigned int priority;
+ char message[100];
+
+
+ /*
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 4 Receive Test" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_receive - Empty queue changes to non-blocking (EAGAIN)" );
+ status = mq_receive( Test_q[BLOCKING].mq, message, 100, &priority );
+ fatal_int_service_status( status, -1, "mq_receive error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
+ print_current_time( "Init: ", "" );
+
/*
- * EINTR -
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 1 Test" );
+ status = pthread_create( &id, NULL, Task_1, NULL );
+ assert( !status );
+ Read_msg_from_que( BLOCKING, 0 ); /* Block until init writes */
+ print_current_time( "Init: ", "" );
+
/*
- * EBADMSG - a data corruption problem.
- * XXX - Can not cause.
+ * Create a task then block until the task reads a message.
*/
+ Start_Test( "multi-thread Task 4 Send Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Full queue changes to non-blocking (EAGAIN)" );
+ status = mq_send(Test_q[BLOCKING].mq, message, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+
/*
- puts( "mq_ - UNSUCCESSFUL ()" );
- status = mq_( );
- fatal_directive_status( status, -1, "mq_ error return status");
- fatal_directive_status( errno, E, "mq_c errno E");
+ * Create a task then block until the task reads a message.
+ */
+
+ Start_Test( "multi-thread Task 2 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_2, NULL );
+ assert( !status );
+ Show_send_msg_to_que( "Init:", BLOCKING, Priority_Order[0] );
+ print_current_time( "Init: ", "" );
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
- */
/*
- * ENOSYS -
+ * Create a task then block until it deletes and closes all queues.
+ * EBADF - Queue unlinked and closed while blocked
*/
+ Start_Test( "multi-thread Task 3 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_3, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Block while thread deletes queue (EBADF)" );
+ ptr = &Predefined_Msgs[0];
+ status = mq_send( Test_q[BLOCKING].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
+
}
-void non_blocking_mq_read_write(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_setattr()
{
+ struct mq_attr attr;
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ int status;
+ int i;
+
/*
- int status;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
+ * EBADF - Get the attributes from a closed queue.
+ */
- status = mq_send( mqs[0], messages[0], strlen( messages[0] ), 0 );
- fatal_directive_status( status, 0, "mq_send error return status" );
-
- puts( "mq_send - UNSUCCESSFUL ()" );
- do {
- status = mq_send( );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ puts( "Task1:mq_setattr - unopened queue (EBADF)" );
+ status = mq_setattr( Test_q[CLOSED].mq, &attr, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_setattr errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Task1:mq_setattr - NULL attributes (EINVAL)" );
+ status = mq_setattr( Test_q[RW_QUEUE].mq, NULL, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_setattr errno EINVAL");
+
+ /*
+ * Verify change queues to blocking, by verifying all queues block
+ * for a timed receive.
+ */
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ /*
+ * Restore restore all queues to their old attribute.
+ */
+
+ for(i=0; i<CLOSED; i++) {
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
}
- */
}
void *POSIX_Init(
@@ -390,95 +1128,184 @@ void *POSIX_Init(
)
{
int status;
- mqd_t mqs[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
- mqd_t n_mq1;
mqd_t n_mq2;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
puts( "\n\n*** POSIX MESSAGE QUEUE TEST ***" );
- validate_mq_open_error_codes( mqs, 2 );
- validate_mq_unlink_error_codes( mqs, 2 );
- validate_mq_close_error_codes( mqs, 2 );
+ validate_mq_open_error_codes( );
+ open_test_queues();
+ validate_mq_unlink_error_codes();
+ validate_mq_close_error_codes();
+ verify_unlink_functionality();
+ validate_mq_setattr( );
+ validate_mq_send_error_codes();
+ validate_mq_getattr_error_codes();
+ verify_timed_send();
+ validate_mq_receive_error_codes();
+ verify_timed_receive();
+ verify_open_functionality();
+ verify_notify();
+ verify_with_threads();
+
+ puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
+ exit( 0 );
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
- validate_mq_send_error_codes( mqs, 2 );
- validate_mq_receive_error_codes( mqs, 2 );
+void *Task_1 (
+ void *argument
+)
+{
+ int status;
+ int count = 0;
+ sigset_t set;
- /*
- * Validate a second open returns the same message queue.
- */
+ /* Block Waiting for a message */
- puts( "mq_open - Open an existing mq ( same id )" );
- n_mq1 = mq_open("mq1", 0 );
- fatal_directive_status(
- (int) n_mq1, (int ) mqs[0], "mq_open error return status" );
-
- /*
- * Unlink the message queue, then verify an open of the same name produces a
- * different message queue.
- */
+ print_current_time( "Task_1: ", "" );
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ Show_send_msg_to_que( "Task_1:", BLOCKING, 0 );
- puts( "mq_open - Reopen mq1 SUCCESSFUL with a different id" );
- n_mq2 = mq_open( "mq1", O_CREAT | O_EXCL, 00777, NULL);
- assert( n_mq2 != (-1) );
- assert( n_mq2 != n_mq1 );
+ puts( "Task_1: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ assert( 0 );
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_2(
+ void *argument
+)
+{
+ int status;
+
+
+ print_current_time( "Task_2: ", "" );
+
+
+ /* Block waiting to send a message */
+
+ verify_queues_full( "Task_2:" );
+ Read_msg_from_que( BLOCKING, Priority_Order[0] ); /* Cause context switch */
+
+ puts( "Task_2: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_3 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_3: ", "" );
/*
- * Validate it "mq1" can be closed and unlinked.
+ * close and unlink all queues.
*/
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ verify_close_functionality( "Task_3: " );
+ puts( "Task_3: pthread_exit" );
+ pthread_exit( NULL );
- puts( "mq_close mq1 - SUCCESSFUL" );
- status = mq_close( n_mq2 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( mqs[0] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ /* switch to Init */
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq1");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_close errno EINVAL");
+ return NULL; /* just so the compiler thinks we returned something */
- /*
- * XXX - Cant' create location OBJECTS_ERROR or OBJECTS_REMOTE.
- * mq_close and mq_unlink.
- * XXX - Don't think we need this save until yellow line tested.
- puts( "Init: mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq3");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
- assert( (status == -1) && (errno == ENOENT) );
- */
+}
+void *Task_4 (
+ void *argument
+)
+{
+ struct mq_attr attr;
+ int status;
+ int count;
+
+ print_current_time( "Task_4: ", "" );
/*
- * Validate we can wait on a message queue opened with mq_open.
+ * Set the count to the number of messages in the queue.
*/
-#if (0) /* XXX FIX ME */
- puts( "Init: mq_wait on mq1" );
- status = mq_receive(n_mq1);
- fatal_directive_status( status, 0, "mq_wait opened message queue");
-#endif
+ status = mq_getattr( Test_q[BLOCKING].mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+ count = attr.mq_curmsgs;
- puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
- exit( 0 );
+ puts("Task_4: Set queue to non-blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag | O_NONBLOCK;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts("Task_4: Return queue to blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts( "Task_4: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_5 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_5: ", "" );
+
+ puts( "Task_5: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_ (
+ void *argument
+)
+{
+
+ print_current_time( "Task_: ", "" );
+
+ puts( "Task_: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
return NULL; /* just so the compiler thinks we returned something */
+
}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/cpukit/itron/src/cre_mbf.c b/cpukit/itron/src/cre_mbf.c
index ca139d53ae..0db738435e 100644
--- a/cpukit/itron/src/cre_mbf.c
+++ b/cpukit/itron/src/cre_mbf.c
@@ -25,7 +25,7 @@ ER cre_mbf(
T_CMBF *pk_cmbf
)
{
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
ITRON_Message_buffer_Control *the_message_buffer;
/*
@@ -57,16 +57,14 @@ ER cre_mbf(
}
if ( pk_cmbf->mbfatr & TA_TPRI )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
_CORE_message_queue_Initialize(
&the_message_buffer->message_queue,
OBJECTS_ITRON_MESSAGE_BUFFERS,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
pk_cmbf->bufsz / pk_cmbf->maxmsz,
pk_cmbf->maxmsz,
NULL /* Multiprocessing not supported */
diff --git a/cpukit/itron/src/snd_mbx.c b/cpukit/itron/src/snd_mbx.c
index f674583e7b..e02714465e 100644
--- a/cpukit/itron/src/snd_mbx.c
+++ b/cpukit/itron/src/snd_mbx.c
@@ -27,7 +27,6 @@ ER snd_msg(
{
register ITRON_Mailbox_Control *the_mailbox;
Objects_Locations location;
- CORE_message_queue_Status status = E_OK;
unsigned32 message_priority;
void *message_contents;
@@ -47,17 +46,22 @@ ER snd_msg(
message_priority = CORE_MESSAGE_QUEUE_SEND_REQUEST;
message_contents = pk_msg;
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_mailbox->message_queue,
&message_contents,
sizeof(T_MSG *),
the_mailbox->Object.id,
NULL, /* multiprocessing not supported */
- message_priority
+ message_priority,
+ FALSE, /* do not allow sender to block */
+ 0 /* no timeout */
);
break;
}
_ITRON_return_errorno(
- _ITRON_Mailbox_Translate_core_message_queue_return_code(status) );
+ _ITRON_Mailbox_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ )
+ );
}
diff --git a/cpukit/itron/src/trcv_mbf.c b/cpukit/itron/src/trcv_mbf.c
index a63f2a6748..0b2b89e612 100644
--- a/cpukit/itron/src/trcv_mbf.c
+++ b/cpukit/itron/src/trcv_mbf.c
@@ -32,7 +32,6 @@ ER trcv_mbf(
CORE_message_queue_Status status;
boolean wait;
Watchdog_Interval interval;
- CORE_message_queue_Submit_types core_priority;
interval = 0;
if (tmout == TMO_POL) {
@@ -62,7 +61,6 @@ ER trcv_mbf(
msg,
p_msgsz,
wait,
- &core_priority,
interval
);
_Thread_Enable_dispatch();
diff --git a/cpukit/itron/src/tsnd_mbf.c b/cpukit/itron/src/tsnd_mbf.c
index bc609dd298..0ed3b4f90e 100644
--- a/cpukit/itron/src/tsnd_mbf.c
+++ b/cpukit/itron/src/tsnd_mbf.c
@@ -33,7 +33,6 @@ ER tsnd_mbf(
Objects_Locations location;
Watchdog_Interval interval;
boolean wait;
- CORE_message_queue_Status status;
if (msgsz <= 0 || !msg)
return E_PAR;
@@ -50,8 +49,6 @@ ER tsnd_mbf(
if ( wait && _ITRON_Is_in_non_task_state() )
return E_CTX;
- assert( wait == FALSE );
-
the_message_buffer = _ITRON_Message_buffer_Get(mbfid, &location);
switch (location) {
case OBJECTS_REMOTE:
@@ -60,17 +57,20 @@ ER tsnd_mbf(
case OBJECTS_LOCAL:
/* XXX Submit needs to take into account blocking */
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_message_buffer->message_queue,
msg,
msgsz,
the_message_buffer->Object.id,
NULL,
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ interval /* timeout interval */
);
_Thread_Enable_dispatch();
- return
- _ITRON_Message_buffer_Translate_core_message_buffer_return_code(status);
+ return _ITRON_Message_buffer_Translate_core_message_buffer_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
/*
diff --git a/cpukit/posix/include/rtems/posix/mqueue.h b/cpukit/posix/include/rtems/posix/mqueue.h
index 1c1201fef1..e3f7a2f073 100644
--- a/cpukit/posix/include/rtems/posix/mqueue.h
+++ b/cpukit/posix/include/rtems/posix/mqueue.h
@@ -32,10 +32,8 @@ extern "C" {
typedef struct {
Objects_Control Object;
int process_shared;
- int flags;
boolean named;
boolean linked;
- boolean blocking;
int oflag;
unsigned32 open_count;
CORE_message_queue_Control Message_queue;
diff --git a/cpukit/rtems/src/msgqcreate.c b/cpukit/rtems/src/msgqcreate.c
index 395cbf3a02..24bc35993d 100644
--- a/cpukit/rtems/src/msgqcreate.c
+++ b/cpukit/rtems/src/msgqcreate.c
@@ -59,7 +59,7 @@ rtems_status_code rtems_message_queue_create(
)
{
register Message_queue_Control *the_message_queue;
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
void *handler;
#if defined(RTEMS_MULTIPROCESSING)
boolean is_global;
@@ -74,10 +74,10 @@ rtems_status_code rtems_message_queue_create(
return RTEMS_MP_NOT_CONFIGURED;
#endif
- if (count == 0)
+ if ( count == 0 )
return RTEMS_INVALID_NUMBER;
- if (max_message_size == 0)
+ if ( max_message_size == 0 )
return RTEMS_INVALID_SIZE;
#if defined(RTEMS_MULTIPROCESSING)
@@ -115,11 +115,9 @@ rtems_status_code rtems_message_queue_create(
the_message_queue->attribute_set = attribute_set;
if (_Attributes_Is_priority( attribute_set ) )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
handler = NULL;
#if defined(RTEMS_MULTIPROCESSING)
@@ -129,7 +127,7 @@ rtems_status_code rtems_message_queue_create(
if ( ! _CORE_message_queue_Initialize(
&the_message_queue->message_queue,
OBJECTS_RTEMS_MESSAGE_QUEUES,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
count,
max_message_size,
handler ) ) {
diff --git a/cpukit/rtems/src/msgqreceive.c b/cpukit/rtems/src/msgqreceive.c
index 1338216c6b..77fcadc313 100644
--- a/cpukit/rtems/src/msgqreceive.c
+++ b/cpukit/rtems/src/msgqreceive.c
@@ -92,12 +92,12 @@ rtems_status_code rtems_message_queue_receive(
buffer,
size,
wait,
- &core_priority,
timeout
);
_Thread_Enable_dispatch();
- return( _Message_queue_Translate_core_message_queue_return_code(
- _Thread_Executing->Wait.return_code ) );
+ return _Message_queue_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
diff --git a/cpukit/rtems/src/msgqsubmit.c b/cpukit/rtems/src/msgqsubmit.c
index 5a03f6409a..16f1c50266 100644
--- a/cpukit/rtems/src/msgqsubmit.c
+++ b/cpukit/rtems/src/msgqsubmit.c
@@ -61,7 +61,6 @@ rtems_status_code _Message_queue_Submit(
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
- CORE_message_queue_Status core_status;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location )
@@ -98,39 +97,43 @@ rtems_status_code _Message_queue_Submit(
case OBJECTS_LOCAL:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
- core_status = _CORE_message_queue_Send(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Send(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
case MESSAGE_QUEUE_URGENT_REQUEST:
- core_status = _CORE_message_queue_Urgent(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Urgent(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
default:
- core_status = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
return RTEMS_INTERNAL_ERROR; /* should never get here */
}
_Thread_Enable_dispatch();
return _Message_queue_Translate_core_message_queue_return_code(
- core_status );
+ _Thread_Executing->Wait.return_code
+ );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
diff --git a/cpukit/score/include/rtems/score/coremsg.h b/cpukit/score/include/rtems/score/coremsg.h
index 6ffedee0af..818b1e8c18 100644
--- a/cpukit/score/include/rtems/score/coremsg.h
+++ b/cpukit/score/include/rtems/score/coremsg.h
@@ -166,13 +166,12 @@ void _CORE_message_queue_Close(
);
/*
- *
* _CORE_message_queue_Flush
*
* DESCRIPTION:
*
- * This function flushes the message_queue's task wait queue. The number
- * messages flushed from the queue is returned.
+ * This function flushes the message_queue's pending message queue. The
+ * number of messages flushed from the queue is returned.
*
*/
@@ -194,7 +193,20 @@ unsigned32 _CORE_message_queue_Flush_support(
);
/*
+ * _CORE_message_queue_Flush_waiting_threads
+ *
+ * DESCRIPTION:
*
+ * This function flushes the threads which are blocked on this
+ * message_queue's pending message queue. They are unblocked whether
+ * blocked sending or receiving.
+ */
+
+void _CORE_message_queue_Flush_waiting_threads(
+ CORE_message_queue_Control *the_message_queue
+);
+
+/*
* _CORE_message_queue_Broadcast
*
* DESCRIPTION:
@@ -214,7 +226,6 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
);
/*
- *
* _CORE_message_queue_Submit
*
* DESCRIPTION:
@@ -228,17 +239,18 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
*
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
);
/*
- *
* _CORE_message_queue_Seize
*
* DESCRIPTION:
@@ -248,6 +260,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* inactive message pool. The thread will be blocked if wait is TRUE,
* otherwise an error will be given to the thread if no messages are available.
*
+ * NOTE: Returns message priority via return are in TCB.
*/
void _CORE_message_queue_Seize(
@@ -256,10 +269,25 @@ void _CORE_message_queue_Seize(
void *buffer,
unsigned32 *size,
boolean wait,
- CORE_message_queue_Submit_types *priority,
Watchdog_Interval timeout
);
+/*
+ * _CORE_message_queue_Insert_message
+ *
+ * DESCRIPTION:
+ *
+ * This kernel routine inserts the specified message into the
+ * message queue. It is assumed that the message has been filled
+ * in before this routine is called.
+ */
+
+void _CORE_message_queue_Insert_message(
+ CORE_message_queue_Control *the_message_queue,
+ CORE_message_queue_Buffer_control *the_message,
+ CORE_message_queue_Submit_types submit_type
+);
+
#ifndef __RTEMS_APPLICATION__
#include <rtems/score/coremsg.inl>
#endif
diff --git a/cpukit/score/inline/rtems/score/coremsg.inl b/cpukit/score/inline/rtems/score/coremsg.inl
index af16fbd4aa..7356ce9537 100644
--- a/cpukit/score/inline/rtems/score/coremsg.inl
+++ b/cpukit/score/inline/rtems/score/coremsg.inl
@@ -27,15 +27,17 @@
* This routine sends a message to the end of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Send(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -45,7 +47,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
@@ -58,15 +62,17 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
* This routine sends a message to the front of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Urgent(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -76,7 +82,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_URGENT_REQUEST
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
diff --git a/cpukit/score/macros/rtems/score/coremsg.inl b/cpukit/score/macros/rtems/score/coremsg.inl
index bc45a51ac3..0717b3eea3 100644
--- a/cpukit/score/macros/rtems/score/coremsg.inl
+++ b/cpukit/score/macros/rtems/score/coremsg.inl
@@ -23,9 +23,10 @@
*/
#define _CORE_message_queue_Send( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_SEND_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_SEND_REQUEST, (_wait), (_timeout)
/*PAGE
*
@@ -34,9 +35,10 @@ _id, _api_message_queue_mp_support ) \
*/
#define _CORE_message_queue_Urgent( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_URGENT_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST, (_wait), (_timeout)
/*PAGE
*
diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c
index 2e6f649545..18e148ab1c 100644
--- a/cpukit/score/src/coremsgbroadcast.c
+++ b/cpukit/score/src/coremsgbroadcast.c
@@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
Thread_Wait_information *waitp;
unsigned32 constrained_size;
+ /*
+ * If there are pending messages, then there can't be threads
+ * waiting for us to send them a message.
+ *
+ * NOTE: This check is critical because threads can block on
+ * send and receive and this ensures that we are broadcasting
+ * the message to threads waiting to receive -- not to send.
+ */
+
+ if ( the_message_queue->number_of_pending_messages != 0 ) {
+ *count = 0;
+ return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ }
+
+ /*
+ * There must be no pending messages if there is a thread waiting to
+ * receive a message.
+ */
+
number_broadcasted = 0;
while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
waitp = &the_thread->Wait;
diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c
index 4e15ab5bc0..6829351c18 100644
--- a/cpukit/score/src/coremsgsubmit.c
+++ b/cpukit/score/src/coremsgsubmit.c
@@ -53,102 +53,110 @@
* error code - if unsuccessful
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
+ ISR_Level level;
CORE_message_queue_Buffer_control *the_message;
Thread_Control *the_thread;
+ Thread_Control *executing;
- if ( size > the_message_queue->maximum_message_size )
- return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+
+ if ( size > the_message_queue->maximum_message_size ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ return;
+ }
/*
- * Is there a thread currently waiting on this message queue?
+ * Is there a thread currently waiting on this message queue?
*/
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( the_thread ) {
- _CORE_message_queue_Copy_buffer(
- buffer,
- the_thread->Wait.return_argument,
- size
- );
- *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
- the_thread->Wait.count = submit_type;
+ if ( the_message_queue->number_of_pending_messages == 0 ) {
+ the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
+ if ( the_thread ) {
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_thread->Wait.return_argument,
+ size
+ );
+ *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
+ the_thread->Wait.count = submit_type;
#if defined(RTEMS_MULTIPROCESSING)
- if ( !_Objects_Is_local_id( the_thread->Object.id ) )
- (*api_message_queue_mp_support) ( the_thread, id );
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+ (*api_message_queue_mp_support) ( the_thread, id );
#endif
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ return;
+ }
}
/*
- * No one waiting on this one currently.
- * Allocate a message buffer and store it away
+ * No one waiting on the message queue at this time, so attempt to
+ * queue the message up for a future receive.
*/
- if ( the_message_queue->number_of_pending_messages ==
+ if ( the_message_queue->number_of_pending_messages <
the_message_queue->maximum_pending_messages ) {
- return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
- }
- the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue);
- if ( the_message == 0 )
- return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
-
- _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
- the_message->Contents.size = size;
- the_message->priority = submit_type;
-
- the_message_queue->number_of_pending_messages += 1;
-
- switch ( submit_type ) {
- case CORE_MESSAGE_QUEUE_SEND_REQUEST:
- _CORE_message_queue_Append( the_message_queue, the_message );
- break;
- case CORE_MESSAGE_QUEUE_URGENT_REQUEST:
- _CORE_message_queue_Prepend( the_message_queue, the_message );
- break;
- default:
- /* XXX interrupt critical section needs to be addressed */
- {
- CORE_message_queue_Buffer_control *this_message;
- Chain_Node *the_node;
-
- the_message->priority = submit_type;
- for ( the_node = the_message_queue->Pending_messages.first ;
- !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ;
- the_node = the_node->next ) {
-
- this_message = (CORE_message_queue_Buffer_control *) the_node;
-
- if ( this_message->priority >= the_message->priority )
- continue;
-
- _Chain_Insert( the_node, &the_message->Node );
- break;
- }
- }
- break;
+ the_message =
+ _CORE_message_queue_Allocate_message_buffer( the_message_queue );
+
+ /*
+ * NOTE: If the system is consistent, this error should never occur.
+ */
+ if ( !the_message ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
+ return;
+ }
+
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_message->Contents.buffer,
+ size
+ );
+ the_message->Contents.size = size;
+ the_message->priority = submit_type;
+
+ _CORE_message_queue_Insert_message(
+ the_message_queue,
+ the_message,
+ submit_type
+ );
+ return;
}
/*
- * According to POSIX, does this happen before or after the message
- * is actually enqueued. It is logical to think afterwards, because
- * the message is actually in the queue at this point.
+ * No message buffers were available so we may need to return an
+ * overflow error or block the sender until the message is placed
+ * on the queue.
*/
- if ( the_message_queue->number_of_pending_messages == 1 &&
- the_message_queue->notify_handler )
- (*the_message_queue->notify_handler)( the_message_queue->notify_argument );
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ if ( !wait ) {
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
+ return;
+ }
+
+ executing = _Thread_Executing;
+
+ _ISR_Disable( level );
+ _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue );
+ executing->Wait.queue = &the_message_queue->Wait_queue;
+ executing->Wait.id = id;
+ executing->Wait.return_argument = (void *)buffer;
+ executing->Wait.return_argument_1 = (void *)size;
+ executing->Wait.count = submit_type;
+ _ISR_Enable( level );
+
+ _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
}
diff --git a/testsuites/psxtests/psxmsgq01/init.c b/testsuites/psxtests/psxmsgq01/init.c
index d40ab18a2f..534f6c6dcb 100644
--- a/testsuites/psxtests/psxmsgq01/init.c
+++ b/testsuites/psxtests/psxmsgq01/init.c
@@ -15,13 +15,95 @@
#include <fcntl.h>
#include <time.h>
#include <tmacros.h>
+#include <signal.h> /* signal facilities */
+
+typedef struct {
+ char msg[ 50 ];
+ int size;
+ unsigned int priority;
+}Test_Message_t;
+Test_Message_t Predefined_Msgs[MAXMSG+1];
+Test_Message_t Predefined_Msgs[MAXMSG+1] = {
+ { "12345678", 9, MQ_PRIO_MAX-1 }, /* Max Length Message med */
+ { "", 1, 1 }, /* NULL Message low */
+ { "Last", 5, MQ_PRIO_MAX }, /* Queue Full Message hi */
+ { "No Message", 0, MQ_PRIO_MAX-1 }, /* 0 length Message med */
+ { "1", 2, 0 }, /* Cause Overflow Behavior */
+};
+int Priority_Order[MAXMSG+1] = { 2, 0, 3, 1, MAXMSG };
+
+
+typedef struct {
+ mqd_t mq;
+ Test_Queue_Types index;
+ char *name;
+ int oflag;
+ int maxmsg;
+ int msgsize;
+ int count;
+} Test_queue_type;
+
+Test_queue_type Test_q[ NUMBER_OF_TEST_QUEUES ] =
+{
+ { 0, 0, "Qread", ( O_CREAT | O_RDONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 1, "Qwrite", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 2, "Qnoblock", ( O_CREAT | O_RDWR | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 3, "Qblock", ( O_CREAT | O_RDWR ) , MAXMSG, MSGSIZE, 0 },
+ { 0, 4, "Qdefault", ( O_CREAT | O_RDWR ) , 10, 16, 0 },
+ { 0, 5, "mq6", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+};
+
+#define RW_NAME Test_q[ RW_QUEUE ].name
+#define DEFAULT_NAME Test_q[ DEFAULT_RW ].name
+#define RD_NAME Test_q[ RD_QUEUE ].name
+#define WR_NAME Test_q[ WR_QUEUE ].name
+#define BLOCKING_NAME Test_q[ BLOCKING ].name
+#define CLOSED_NAME Test_q[ CLOSED ].name
+
+#define RW_ATTR Test_q[ RW_QUEUE ].oflag
+#define DEFAULT_ATTR Test_q[ DEFAULT_RW ].oflag
+#define RD_ATTR Test_q[ RD_QUEUE ].oflag
+#define WR_ATTR Test_q[ WR_QUEUE ].oflag
+#define BLOCK_ATTR Test_q[ BLOCKING ].oflag
+#define CLOSED_ATTR Test_q[ CLOSED ].oflag
-char Queue_Name[PATH_MAX + 2];
-char *Get_Queue_Name(
- int i
+/*
+ * Outputs a header at each test section.
+ */
+void Start_Test(
+ char *description
)
{
- sprintf(Queue_Name,"mq%d",i+1);
+ printf( "_______________%s\n", description );
+}
+
+
+void Validate_attributes(
+ mqd_t mq,
+ int oflag,
+ int msg_count
+)
+{
+ int status;
+ struct mq_attr attr;
+
+ status = mq_getattr( mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+
+ if ( mq != Test_q[ DEFAULT_RW ].mq ){
+ fatal_int_service_status((int)attr.mq_maxmsg, MAXMSG, "maxmsg attribute" );
+ fatal_int_service_status((int)attr.mq_msgsize,MSGSIZE,"msgsize attribute");
+ }
+
+ fatal_int_service_status((int)attr.mq_curmsgs, msg_count, "count attribute" );
+ fatal_int_service_status((int)attr.mq_flags, oflag, "flag attribute" );
+}
+
+char Queue_Name[PATH_MAX + 2];
+#define Get_Queue_Name( i ) Test_q[i].name
+
+char *Build_Queue_Name( int i ) {
+ sprintf(Queue_Name,"mq%d", i+1 );
return Queue_Name;
}
@@ -35,354 +117,1010 @@ char *Get_Too_Long_Name()
return Queue_Name;
}
-typedef enum {
- DEFAULT_SIZE_TYPE,
- TEST_SIZE_TYPE,
- MAX_SIZE,
- TYPES_OF_TEST_SIZES
-} TEST_MQ_SIZE_TYPES;
+void open_test_queues()
+{
+ struct mq_attr attr;
+ int status;
+ Test_queue_type *tq;
+ int que;
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
+
+ puts( "Init: Open Test Queues" );
+
+ for( que = 0; que < NUMBER_OF_TEST_QUEUES; que++ ) {
+
+ tq = &Test_q[ que ];
+ if ( que == DEFAULT_RW)
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, NULL );
+ else
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, &attr );
+
+ assert( Test_q[que].mq != (-1) );
+ }
+
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+}
/*
* Opens CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES then leaves size queues
* opened but closes the rest.
*/
-void validate_mq_open_error_codes(
- mqd_t *mqs, /* Must be large enough for Maximum to be opened. */
- int size
-)
+void validate_mq_open_error_codes()
{
int i;
mqd_t n_mq2;
struct mq_attr attr;
int status;
+ mqd_t open_mq[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
- assert( size < (CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES-1) );
+ Start_Test( "mq_open errors" );
/*
- * Validate mq_open errors that can occur when no queues are open.
- * EINVAL
- * ENOENT
- * EINTR
+ * XXX EINVAL - inappropriate name was given for the message queue
*/
/*
- * XXX EINVAL - inappropriate name was given for the message queue
+ * EINVAL - Create with negative maxmsg.
*/
attr.mq_maxmsg = -1;
- puts( "mq_open - Create with maxmsg (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with maxmsg (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_maxmsg = MAXMSG;
+
+ /*
+ * EINVAL - Create withnegative msgsize.
+ */
attr.mq_msgsize = -1;
- puts( "mq_open - Create with msgsize (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with msgsize (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_msgsize = MSGSIZE;
+
+ /*
+ * ENOENT - Open a non-created file.
+ */
- puts( "mq_open - Open new mq without create flag (ENOENT)" );
- n_mq2 = mq_open("mq3", O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Open new mq without create flag (ENOENT)" );
+ n_mq2 = mq_open( "mq3", O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENOENT, "mq_open errno ENOENT");
+ fatal_posix_service_status( errno, ENOENT, "mq_open errno ENOENT");
+
/*
* XXX EINTR - call was interrupted by a signal
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_open or mq_open is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_open - Open with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_open - Open with too long of a name (ENAMETOOLONG)" );
n_mq2 = mq_open( Get_Too_Long_Name(), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
/*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
+
+ /*
* Open maximum number of message queues
*/
- puts( "mq_open - SUCCESSFUL" );
+ puts( "Init: mq_open - SUCCESSFUL" );
for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- mqs[i] = mq_open( Get_Queue_Name(i), O_CREAT | O_RDWR, 0x777, NULL );
- assert( mqs[i] != (-1) );
+ open_mq[i] = mq_open(
+ Build_Queue_Name(i), O_CREAT | O_RDWR | O_NONBLOCK, 0x777, NULL );
+ assert( open_mq[i] != (-1) );
/*XXX - Isn't there a more general check */
}
/*
- * Validate open errors that must occur after message queues are open.
- * EACCES
- * EEXIST
- * EMFILE
- * ENFILE
+ * XXX EACCES - permission to create is denied.
*/
/*
- * XXX EACCES - permission to create is denied.
+ * XXX EACCES - queue exists permissions specified by o_flag are denied.
*/
/*
- * XXX EACCES - queue exists permissions specified by o_flag are denied.
- puts( "mq_open - open mq as write (EACCES)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_WRONLY, 0x777, NULL);
- fatal_directive_status(
- (int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EACCES, "mq_open errno EACCES");
+ * EEXIST - Create an existing queue.
*/
- puts( "mq_open - Create an Existing mq (EEXIST)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Create an Existing mq (EEXIST)" );
+ n_mq2 = mq_open(
+ Build_Queue_Name(0), O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EEXIST, "mq_open errno EEXIST");
+ fatal_posix_service_status( errno, EEXIST, "mq_open errno EEXIST");
+ /*
+ * XXX EMFILE - Too many message queues in use by the process
+ */
/*
- * XXX EMFILE - Too many message queues open
+ * ENFILE - Too many message queues open in the system
*/
- puts( "mq_open - system is out of resources (ENFILE)" );
- n_mq2 = mq_open( Get_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ puts( "Init: mq_open - system is out of resources (ENFILE)" );
+ n_mq2 = mq_open( Build_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENFILE, "mq_open errno ENFILE");
+ fatal_posix_service_status( errno, ENFILE, "mq_open errno ENFILE");
/*
- * Unlink and Close .
+ * Unlink and Close all queues.
*/
- puts( "mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
- for (i = size; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
+ puts( "Init: mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
+ for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- status = mq_close( mqs[i] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ status = mq_close( open_mq[i]);
+ fatal_posix_service_status( status, 0, "mq_close message queue");
- status = mq_unlink( Get_Queue_Name(i) );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ status = mq_unlink( Build_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
}
}
-void validate_mq_unlink_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_unlink_error_codes()
{
int status;
+ Start_Test( "mq_unlink errors" );
+
/*
* XXX - EACCES Permission Denied
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_unlink or mq_unlink is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
status = mq_unlink( Get_Too_Long_Name() );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink(Get_Queue_Name(size));
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
+ /*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
/*
- * XXX - These errors are not in the POSIX manual but may occur.
+ * ENOENT - Unlink an unopened queue
*/
- puts( "mq_unlink (NULL) - EINVAL" );
+ puts( "Init: mq_unlink - A Queue not opened (ENOENT)" );
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENOENT, "mq_unlink errno ENOENT");
+
+ /*
+ * XXX - The following were not listed in the POSIX document as
+ * possible errors. Under other commands the EINVAL is
+ * given for these conditions.
+ */
+
+ /*
+ * EINVAL - Unlink a queue with no name
+ */
+
+ puts( "Init: mq_unlink (NULL) - EINVAL" );
status = mq_unlink( NULL );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
- puts( "mq_unlink (\"\") - EINVAL" );
+ /*
+ * EINVAL - Unlink a queue with a null name
+ */
+
+ puts( "Init: mq_unlink (\"\") - EINVAL" );
status = mq_unlink( "" );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
}
-void validate_mq_close_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_close_error_codes()
{
int status;
- puts( "mq_close - UNSUCCESSFUL (EBADF)" );
- status = mq_close(mqs[size]);
- fatal_directive_status( status, -1, "mq_close error return status");
- fatal_directive_status( errno, EBADF, "mq_close errno EBADF");
+ Start_Test( "mq_close errors" );
+
+ /*
+ * EBADF - Close a queue that is not open.
+ */
+
+ puts( "Init: mq_close - unopened queue (EBADF)" );
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
}
+
+void validate_mq_getattr_error_codes()
+{
+ struct mq_attr attr;
+ int status;
+
+ Start_Test( "mq_getattr errors" );
+
+ /*
+ * EBADF - Get the attributes from a closed queue.
+ */
+
+ puts( "Init: mq_getattr - unopened queue (EBADF)" );
+ status = mq_getattr( Test_q[CLOSED].mq, &attr );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Init: mq_getattr - NULL attributes (EINVAL)" );
+ status = mq_getattr( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_close errno EINVAL");
+
+}
+
+
+void Send_msg_to_que(
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ int status;
+
+ status = mq_send( Test_q[que].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, 0, "mq_send valid return status");
+ Test_q[que].count++;
+}
+
+void Show_send_msg_to_que(
+ char *task_name,
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ printf( "%s mq_send - to %s msg: %s priority %d\n",
+ task_name, Test_q[que].name, ptr->msg, ptr->priority);
+ Send_msg_to_que( que, msg );
+}
+
+void verify_queues_full(
+ char *task_name
+)
+{
+ int que;
+
+ /*
+ * Validate that the queues are full.
+ */
+
+ printf( "%s Verify Queues are full\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, Test_q[que].count );
+
+}
+void verify_queues_empty(
+ char *task_name
+)
+{
+ int que;
+
+ printf( "%s Verify Queues are empty\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, 0 );
+}
+
+int fill_message_queues(
+ char *task_name
+)
+{
+ int msg;
+ int status;
+ int que;
+
+
+ verify_queues_empty( task_name );
+
+ /*
+ * Fill Queue with predefined messages.
+ */
+
+ printf( "%s Fill Queues with messages\n", task_name );
+ for(msg=0; msg<MAXMSG; msg++){
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ Send_msg_to_que( que, msg );
+ }
+ }
+
+ verify_queues_full( "Init:" );
+ return msg;
+}
+
+
+void Read_msg_from_que(
+ int que,
+ int msg
+)
+{
+ unsigned int priority;
+ Test_Message_t *ptr;
+ int status;
+ char message[100];
+ char err_msg[100];
+
+ ptr = &Predefined_Msgs[msg];
+ status = mq_receive(Test_q[ que ].mq, message, 100, &priority );
+ Test_q[que].count--;
+
+ sprintf( err_msg, "%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status( status, ptr->size, err_msg );
+
+ assert( !strcmp( message, ptr->msg ) );
+ strcpy( message, "No Message" );
+
+ sprintf( err_msg,"%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status(priority, ptr->priority, err_msg );
+}
+
+int empty_message_queues(
+ char *task_name
+)
+{
+ int que;
+ int i;
+
+ printf( "%s Empty all Queues\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ for(i=0; Test_q[que].count != 0; i++ )
+ Read_msg_from_que( que, Priority_Order[i] );
+
+ Validate_attributes( Test_q[ que].mq, Test_q[ que ].oflag, 0 );
+ }
+ return 0;
+}
+
/*
* Returns the number of messages queued after the test on the
* first queue.
*/
-int validate_mq_send_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+int validate_mq_send_error_codes( )
{
int status;
int i;
- mqd_t n_mq1;
- struct mq_attr attr;
+ char *str;
- attr.mq_maxmsg = 3;
- attr.mq_msgsize = 8;
+ Start_Test( "mq_send errors" );
/*
- * XXX - EBADF Not a valid message descriptor.
- * Write to a invalid message descriptor
- * XXX - Write to a read only queue
+ * EBADF - Write to a closed queue.
*/
- puts( "mq_send - Closed message queue (EBADF)" );
- status = mq_send( mqs[size], "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ puts( "Init: mq_send - Closed message queue (EBADF)" );
+ status = mq_send( Test_q[CLOSED].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_open - Open a read only queue" );
- n_mq1 = mq_open("read_only", O_CREAT | O_RDONLY, 0x777, &attr);
- assert( n_mq1 != (-1) );
- /*XXX - Isn't there a more general check */
+ /*
+ * EBADF - Write to a read only queue.
+ */
+
+ puts( "Init: mq_send - Read only message queue (EBADF)" );
+ status = mq_send( Test_q[ RD_QUEUE ].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_send - Read only message queue (EBADF)" );
- status = mq_send( n_mq1, "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ /*
+ * XXX - EINTR Signal interrupted the call.
+ *
+ puts( "Init: mq_send - UNSUCCESSFUL (EINTR)" );
+ status = mq_send( Test_q, "", 0xffff, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, E, "mq_send errno E");
+ */
+
+ /*
+ * EINVAL priority is out of range.
+ */
+
+ puts( "Init: mq_send - Priority out of range (EINVAL)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", 1, MQ_PRIO_MAX + 1 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_send errno EINVAL");
+
+ /*
+ * EMSGSIZE - Message size larger than msg_len
+ * Validates that msgsize is stored correctly.
+ */
+
+ puts( "Init: mq_send - Message longer than msg_len (EMSGSIZE)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", MSGSIZE+1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+
+ i = fill_message_queues( "Init:" );
+
+ /*
+ * ENOSYS - send not supported
+ puts( "Init: mq_send - Blocking Queue overflow (ENOSYS)" );
+ status = mq_send( n_mq1, Predefined_Msgs[i], 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
+ fatal_posix_service_status( status, 0, "mq_close message queue");
status = mq_unlink( "read_only" );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+ */
/*
- * XXX - EINTR
- * Signal interrupted the call.
+ * EAGAIN - O_NONBLOCK and message queue is full.
+ */
+
+ puts( "Init: mq_send - on a FULL non-blocking queue with (EAGAIN)" );
+ str = Predefined_Msgs[i].msg;
+ status = mq_send(Test_q[RW_QUEUE].mq, str, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
- puts( "mq_send - UNSUCCESSFUL (EINTR)" );
- status = mq_send( mqs, "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ return i-1;
+}
+
+void validate_mq_receive_error_codes( )
+{
+ int status;
+ char message[100];
+ unsigned int priority;
+ int i;
+
+ Start_Test( "mq_receive errors" );
+
+ /*
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_receive - Unopened message queue (EBADF)" );
+ status = mq_receive( Test_q[CLOSED].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * XXX - EINVAL priority is out of range.
+ * EBADF - Queue not opened to read
*/
- puts( "mq_send - Priority out of range (EINVAL)" );
- status = mq_send( mqs[0], "", 1, MQ_PRIO_MAX + 1 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EINVAL, "mq_send errno EINVAL");
+ puts( "Init: mq_receive - Write only queue (EBADF)" );
+ status = mq_receive( Test_q[WR_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
/*
- * XXX - EMSGSIZE - Message size larger than msg_len
+ * EMSGSIZE - Size is less than the message size attribute
*/
- puts( "mq_send - Message longer than msg_len (EMSGSIZE)" );
- status = mq_send( mqs[0], "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+ puts( "Init: mq_receive - Size is less than the message (EMSGSIZE)" );
+ status = mq_receive(
+ Test_q[RW_QUEUE].mq, message, Predefined_Msgs[0].size-1, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_receive errno EMSGSIZE");
+
/*
- * ENOSYS - send is supported should never happen.
+ * EAGAIN - O_NONBLOCK and Queue is empty
*/
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+ puts( "Init: mq_receive - Queue is empty (EAGAIN)" );
+ status = mq_receive( Test_q[RW_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
/*
- * XXX - EAGAIN
- * O_NONBLOCK and message queue is full.
- * This is validated in the read/write test.
+ * XXX - EINTR - Interrupted by a signal
*/
- i=0;
- do {
- status = mq_send( mqs[0], "", 1, 0 );
- i++;
- } while (status == 0);
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ /*
+ * XXX - EBADMSG - a data corruption problem.
+ */
- return i-1;
+ /*
+ * XXX - ENOSYS - mq_receive not supported
+ */
+}
+
+void verify_open_functionality()
+{
+ mqd_t n_mq;
+
+ Start_Test( "mq_open functionality" );
+
+ /*
+ * Validate a second open returns the same message queue.
+ */
+
+ puts( "Init: mq_open - Open an existing mq ( same id )" );
+ n_mq = mq_open( RD_NAME, 0 );
+ fatal_posix_service_status(
+ (int) n_mq, (int ) Test_q[RD_QUEUE].mq, "mq_open error return status" );
+}
+
+void verify_unlink_functionality()
+{
+ mqd_t n_mq;
+ int status;
+
+ Start_Test( "mq_unlink functionality" );
+
+ /*
+ * Unlink the message queue, then verify an open of the same name produces a
+ * different message queue.
+ */
+
+ puts( "Init: Unlink and Open without closing SUCCESSFUL" );
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+
+ n_mq = mq_open( DEFAULT_NAME, DEFAULT_ATTR, 0x777, NULL );
+ assert( n_mq != (-1) );
+ assert( n_mq != Test_q[ DEFAULT_RW ].mq );
+
+
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+ status = mq_close( Test_q[ DEFAULT_RW ].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+
+ Test_q[ DEFAULT_RW ].mq = n_mq;
}
-void validate_mq_receive_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
+void verify_close_functionality()
+{
+ int i;
+ int status;
+ Start_Test( "Unlink and Close All Files" );
+ for (i=0; i<DEFAULT_RW; i++) {
+
+ status = mq_unlink( Get_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+
+ status = mq_close( Test_q[i].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ }
+}
+
+
+void verify_timed_send_queue(
+ int que,
+ int is_blocking
+)
+{
+ int i;
+ struct timespec timeout;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int len;
+ int status;
+ char *msg;
+
+ timeout.tv_sec = 1;
+ timeout.tv_nsec = 0;
+
+ printf( "Init: mq_timedsend - on queue %s ", Test_q[que].name);
+ len = Predefined_Msgs[MAXMSG].size;
+ msg = Predefined_Msgs[MAXMSG].msg;
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedsend( Test_q[que].mq, msg, len , 0, &timeout );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ if ( is_blocking ) { /* Don't verify the non-blocking queue */
+ fatal_int_service_status( status, -1, "mq_timedsend status");
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ }
+
+ printf("Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking ) /* non-blocking queue */
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+
+ if ( que == DEFAULT_RW )
+ Test_q[que].count++;
+}
+
+void verify_timed_send()
+{
+ int que;
+
+ Start_Test( "mq_timedsend" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if ( que == BLOCKING )
+ verify_timed_send_queue( que, 1 );
+ else
+ verify_timed_send_queue( que, 0 );
+ }
+}
+
+void verify_timed_receive_queue(
+ char *task_name,
+ int que,
+ int is_blocking
)
{
+ char message[ 100 ];
+ unsigned int priority;
+ struct timespec tm;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int status;
+
+ tm.tv_sec = 1;
+ tm.tv_nsec = 0;
+
+ printf( "Init: %s mq_timedreceive - on queue %s ", task_name, Test_q[que].name);
+
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedreceive( Test_q[ que ].mq, message, 100, &priority, &tm );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ fatal_int_service_status( status, -1, "mq_timedreceive status");
+ if ( is_blocking )
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ printf( "Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking )
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+}
+
+
+
+void verify_timed_receive()
+{
+ int que;
+
+ Start_Test( "mq_timedreceive" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if (( que == BLOCKING ) || ( que == DEFAULT_RW ))
+ verify_timed_receive_queue( "Init:", que, 1 );
+ else
+ verify_timed_receive_queue( "Init:", que, 0 );
+ }
+}
+
+#if (0)
+void verify_set_attr()
+{
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ struct mq_attr attr;
+ int i;
+ int status;
+
+ attr.mq_maxmsg = 0;
+ attr.mq_msgsize = 0;
+
+ Start_Test( "mq_setattr" );
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
+ }
+}
+#endif
+
+void wait_for_signal(
+ sigset_t *waitset,
+ int sec,
+ int expect_signal
+)
+{
+ siginfo_t siginfo;
+ int status;
+ struct timespec timeout;
+ int signo;
+
+ siginfo.si_code = -1;
+ siginfo.si_signo = -1;
+ siginfo.si_value.sival_int = -1;
+
+ timeout.tv_sec = sec;
+ timeout.tv_nsec = 0;
+
+ status = sigemptyset( waitset );
+ assert( !status );
+
+ status = sigaddset( waitset, SIGUSR1 );
+ assert( !status );
+
+ printf( "waiting on any signal for %d seconds.\n", sec );
+ signo = sigtimedwait( waitset, &siginfo, &timeout );
+ if (expect_signal) {
+ fatal_int_service_status( signo, SIGUSR1, "got SISUSR1" );
+ } else {
+ fatal_int_service_status( signo, -1, "error return status");
+ fatal_posix_service_status( errno, EAGAIN, "errno EAGAIN");
+ }
+}
+
+void verify_notify()
+{
+ struct sigevent event;
int status;
+ timer_t timer_id;
+ sigset_t set;
+ Test_Message_t *ptr;
+
+ Start_Test( "mq_notify" );
+
+ /* timer create */
+ event.sigev_notify = SIGEV_SIGNAL;
+ event.sigev_signo = SIGUSR1;
+ if (timer_create (CLOCK_REALTIME, &event, &timer_id) == -1)
+ fatal_posix_service_status( errno, 0, "errno ETIMEDOUT");
+
+ /* block the timer signal */
+ sigemptyset( &set );
+ sigaddset( &set, SIGUSR1 );
+ pthread_sigmask( SIG_BLOCK, &set, NULL );
/*
- * EAGAIN -
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_notify - Unopened message queue (EBADF)" );
+ status = mq_notify( Test_q[CLOSED].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * EBADF -
+ * Create ...
*/
/*
- * EMSGSIZE -
+ * XXX setup notification
+ */
+
+ printf( "_____mq_notify - notify when %s gets a message\n",RW_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ /*
+ * Send and verify signal occurs and registration is removed.
+ */
+
+ puts( "Init: Verify Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 1 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+
+ /*
+ * EBUSY - Already Registered
+ */
+
+ printf( "____mq_notify - notify when %s gets a message\n",RD_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ puts( "Init: mq_notify - (EBUSY)" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, -1, "mq_notify error return status");
+ fatal_posix_service_status( errno, EBUSY, "mq_notify errno EBUSY");
+
+ /*
+ * Verify NULL removes registration.
+ */
+
+ puts( "Init: mq_notify - Remove notification with null" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+}
+
+void verify_with_threads()
+{
+ int status;
+ pthread_t id;
+ Test_Message_t *ptr;
+ unsigned int priority;
+ char message[100];
+
+
+ /*
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 4 Receive Test" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_receive - Empty queue changes to non-blocking (EAGAIN)" );
+ status = mq_receive( Test_q[BLOCKING].mq, message, 100, &priority );
+ fatal_int_service_status( status, -1, "mq_receive error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
+ print_current_time( "Init: ", "" );
+
/*
- * EINTR -
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 1 Test" );
+ status = pthread_create( &id, NULL, Task_1, NULL );
+ assert( !status );
+ Read_msg_from_que( BLOCKING, 0 ); /* Block until init writes */
+ print_current_time( "Init: ", "" );
+
/*
- * EBADMSG - a data corruption problem.
- * XXX - Can not cause.
+ * Create a task then block until the task reads a message.
*/
+ Start_Test( "multi-thread Task 4 Send Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Full queue changes to non-blocking (EAGAIN)" );
+ status = mq_send(Test_q[BLOCKING].mq, message, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+
/*
- puts( "mq_ - UNSUCCESSFUL ()" );
- status = mq_( );
- fatal_directive_status( status, -1, "mq_ error return status");
- fatal_directive_status( errno, E, "mq_c errno E");
+ * Create a task then block until the task reads a message.
+ */
+
+ Start_Test( "multi-thread Task 2 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_2, NULL );
+ assert( !status );
+ Show_send_msg_to_que( "Init:", BLOCKING, Priority_Order[0] );
+ print_current_time( "Init: ", "" );
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
- */
/*
- * ENOSYS -
+ * Create a task then block until it deletes and closes all queues.
+ * EBADF - Queue unlinked and closed while blocked
*/
+ Start_Test( "multi-thread Task 3 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_3, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Block while thread deletes queue (EBADF)" );
+ ptr = &Predefined_Msgs[0];
+ status = mq_send( Test_q[BLOCKING].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
+
}
-void non_blocking_mq_read_write(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_setattr()
{
+ struct mq_attr attr;
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ int status;
+ int i;
+
/*
- int status;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
+ * EBADF - Get the attributes from a closed queue.
+ */
- status = mq_send( mqs[0], messages[0], strlen( messages[0] ), 0 );
- fatal_directive_status( status, 0, "mq_send error return status" );
-
- puts( "mq_send - UNSUCCESSFUL ()" );
- do {
- status = mq_send( );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ puts( "Task1:mq_setattr - unopened queue (EBADF)" );
+ status = mq_setattr( Test_q[CLOSED].mq, &attr, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_setattr errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Task1:mq_setattr - NULL attributes (EINVAL)" );
+ status = mq_setattr( Test_q[RW_QUEUE].mq, NULL, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_setattr errno EINVAL");
+
+ /*
+ * Verify change queues to blocking, by verifying all queues block
+ * for a timed receive.
+ */
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ /*
+ * Restore restore all queues to their old attribute.
+ */
+
+ for(i=0; i<CLOSED; i++) {
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
}
- */
}
void *POSIX_Init(
@@ -390,95 +1128,184 @@ void *POSIX_Init(
)
{
int status;
- mqd_t mqs[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
- mqd_t n_mq1;
mqd_t n_mq2;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
puts( "\n\n*** POSIX MESSAGE QUEUE TEST ***" );
- validate_mq_open_error_codes( mqs, 2 );
- validate_mq_unlink_error_codes( mqs, 2 );
- validate_mq_close_error_codes( mqs, 2 );
+ validate_mq_open_error_codes( );
+ open_test_queues();
+ validate_mq_unlink_error_codes();
+ validate_mq_close_error_codes();
+ verify_unlink_functionality();
+ validate_mq_setattr( );
+ validate_mq_send_error_codes();
+ validate_mq_getattr_error_codes();
+ verify_timed_send();
+ validate_mq_receive_error_codes();
+ verify_timed_receive();
+ verify_open_functionality();
+ verify_notify();
+ verify_with_threads();
+
+ puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
+ exit( 0 );
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
- validate_mq_send_error_codes( mqs, 2 );
- validate_mq_receive_error_codes( mqs, 2 );
+void *Task_1 (
+ void *argument
+)
+{
+ int status;
+ int count = 0;
+ sigset_t set;
- /*
- * Validate a second open returns the same message queue.
- */
+ /* Block Waiting for a message */
- puts( "mq_open - Open an existing mq ( same id )" );
- n_mq1 = mq_open("mq1", 0 );
- fatal_directive_status(
- (int) n_mq1, (int ) mqs[0], "mq_open error return status" );
-
- /*
- * Unlink the message queue, then verify an open of the same name produces a
- * different message queue.
- */
+ print_current_time( "Task_1: ", "" );
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ Show_send_msg_to_que( "Task_1:", BLOCKING, 0 );
- puts( "mq_open - Reopen mq1 SUCCESSFUL with a different id" );
- n_mq2 = mq_open( "mq1", O_CREAT | O_EXCL, 00777, NULL);
- assert( n_mq2 != (-1) );
- assert( n_mq2 != n_mq1 );
+ puts( "Task_1: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ assert( 0 );
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_2(
+ void *argument
+)
+{
+ int status;
+
+
+ print_current_time( "Task_2: ", "" );
+
+
+ /* Block waiting to send a message */
+
+ verify_queues_full( "Task_2:" );
+ Read_msg_from_que( BLOCKING, Priority_Order[0] ); /* Cause context switch */
+
+ puts( "Task_2: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_3 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_3: ", "" );
/*
- * Validate it "mq1" can be closed and unlinked.
+ * close and unlink all queues.
*/
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ verify_close_functionality( "Task_3: " );
+ puts( "Task_3: pthread_exit" );
+ pthread_exit( NULL );
- puts( "mq_close mq1 - SUCCESSFUL" );
- status = mq_close( n_mq2 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( mqs[0] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ /* switch to Init */
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq1");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_close errno EINVAL");
+ return NULL; /* just so the compiler thinks we returned something */
- /*
- * XXX - Cant' create location OBJECTS_ERROR or OBJECTS_REMOTE.
- * mq_close and mq_unlink.
- * XXX - Don't think we need this save until yellow line tested.
- puts( "Init: mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq3");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
- assert( (status == -1) && (errno == ENOENT) );
- */
+}
+void *Task_4 (
+ void *argument
+)
+{
+ struct mq_attr attr;
+ int status;
+ int count;
+
+ print_current_time( "Task_4: ", "" );
/*
- * Validate we can wait on a message queue opened with mq_open.
+ * Set the count to the number of messages in the queue.
*/
-#if (0) /* XXX FIX ME */
- puts( "Init: mq_wait on mq1" );
- status = mq_receive(n_mq1);
- fatal_directive_status( status, 0, "mq_wait opened message queue");
-#endif
+ status = mq_getattr( Test_q[BLOCKING].mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+ count = attr.mq_curmsgs;
- puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
- exit( 0 );
+ puts("Task_4: Set queue to non-blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag | O_NONBLOCK;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts("Task_4: Return queue to blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts( "Task_4: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_5 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_5: ", "" );
+
+ puts( "Task_5: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_ (
+ void *argument
+)
+{
+
+ print_current_time( "Task_: ", "" );
+
+ puts( "Task_: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
return NULL; /* just so the compiler thinks we returned something */
+
}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+