summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--c/src/exec/itron/src/cre_mbf.c10
-rw-r--r--c/src/exec/itron/src/snd_mbx.c12
-rw-r--r--c/src/exec/itron/src/trcv_mbf.c2
-rw-r--r--c/src/exec/itron/src/tsnd_mbf.c14
-rw-r--r--c/src/exec/posix/include/rtems/posix/mqueue.h2
-rw-r--r--c/src/exec/rtems/src/msgqcreate.c14
-rw-r--r--c/src/exec/rtems/src/msgqreceive.c6
-rw-r--r--c/src/exec/rtems/src/msgqsubmit.c41
-rw-r--r--c/src/exec/score/include/rtems/score/coremsg.h44
-rw-r--r--c/src/exec/score/inline/rtems/score/coremsg.inl24
-rw-r--r--c/src/exec/score/macros/rtems/score/coremsg.inl10
-rw-r--r--c/src/exec/score/src/coremsgbroadcast.c19
-rw-r--r--c/src/exec/score/src/coremsgsubmit.c146
-rw-r--r--c/src/tests/psxtests/psxmsgq01/init.c1321
-rw-r--r--cpukit/itron/src/cre_mbf.c10
-rw-r--r--cpukit/itron/src/snd_mbx.c12
-rw-r--r--cpukit/itron/src/trcv_mbf.c2
-rw-r--r--cpukit/itron/src/tsnd_mbf.c14
-rw-r--r--cpukit/posix/include/rtems/posix/mqueue.h2
-rw-r--r--cpukit/rtems/src/msgqcreate.c14
-rw-r--r--cpukit/rtems/src/msgqreceive.c6
-rw-r--r--cpukit/rtems/src/msgqsubmit.c41
-rw-r--r--cpukit/score/include/rtems/score/coremsg.h44
-rw-r--r--cpukit/score/inline/rtems/score/coremsg.inl24
-rw-r--r--cpukit/score/macros/rtems/score/coremsg.inl10
-rw-r--r--cpukit/score/src/coremsgbroadcast.c19
-rw-r--r--cpukit/score/src/coremsgsubmit.c146
-rw-r--r--testsuites/psxtests/psxmsgq01/init.c1321
28 files changed, 2556 insertions, 774 deletions
diff --git a/c/src/exec/itron/src/cre_mbf.c b/c/src/exec/itron/src/cre_mbf.c
index ca139d53ae..0db738435e 100644
--- a/c/src/exec/itron/src/cre_mbf.c
+++ b/c/src/exec/itron/src/cre_mbf.c
@@ -25,7 +25,7 @@ ER cre_mbf(
T_CMBF *pk_cmbf
)
{
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
ITRON_Message_buffer_Control *the_message_buffer;
/*
@@ -57,16 +57,14 @@ ER cre_mbf(
}
if ( pk_cmbf->mbfatr & TA_TPRI )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
_CORE_message_queue_Initialize(
&the_message_buffer->message_queue,
OBJECTS_ITRON_MESSAGE_BUFFERS,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
pk_cmbf->bufsz / pk_cmbf->maxmsz,
pk_cmbf->maxmsz,
NULL /* Multiprocessing not supported */
diff --git a/c/src/exec/itron/src/snd_mbx.c b/c/src/exec/itron/src/snd_mbx.c
index f674583e7b..e02714465e 100644
--- a/c/src/exec/itron/src/snd_mbx.c
+++ b/c/src/exec/itron/src/snd_mbx.c
@@ -27,7 +27,6 @@ ER snd_msg(
{
register ITRON_Mailbox_Control *the_mailbox;
Objects_Locations location;
- CORE_message_queue_Status status = E_OK;
unsigned32 message_priority;
void *message_contents;
@@ -47,17 +46,22 @@ ER snd_msg(
message_priority = CORE_MESSAGE_QUEUE_SEND_REQUEST;
message_contents = pk_msg;
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_mailbox->message_queue,
&message_contents,
sizeof(T_MSG *),
the_mailbox->Object.id,
NULL, /* multiprocessing not supported */
- message_priority
+ message_priority,
+ FALSE, /* do not allow sender to block */
+ 0 /* no timeout */
);
break;
}
_ITRON_return_errorno(
- _ITRON_Mailbox_Translate_core_message_queue_return_code(status) );
+ _ITRON_Mailbox_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ )
+ );
}
diff --git a/c/src/exec/itron/src/trcv_mbf.c b/c/src/exec/itron/src/trcv_mbf.c
index a63f2a6748..0b2b89e612 100644
--- a/c/src/exec/itron/src/trcv_mbf.c
+++ b/c/src/exec/itron/src/trcv_mbf.c
@@ -32,7 +32,6 @@ ER trcv_mbf(
CORE_message_queue_Status status;
boolean wait;
Watchdog_Interval interval;
- CORE_message_queue_Submit_types core_priority;
interval = 0;
if (tmout == TMO_POL) {
@@ -62,7 +61,6 @@ ER trcv_mbf(
msg,
p_msgsz,
wait,
- &core_priority,
interval
);
_Thread_Enable_dispatch();
diff --git a/c/src/exec/itron/src/tsnd_mbf.c b/c/src/exec/itron/src/tsnd_mbf.c
index bc609dd298..0ed3b4f90e 100644
--- a/c/src/exec/itron/src/tsnd_mbf.c
+++ b/c/src/exec/itron/src/tsnd_mbf.c
@@ -33,7 +33,6 @@ ER tsnd_mbf(
Objects_Locations location;
Watchdog_Interval interval;
boolean wait;
- CORE_message_queue_Status status;
if (msgsz <= 0 || !msg)
return E_PAR;
@@ -50,8 +49,6 @@ ER tsnd_mbf(
if ( wait && _ITRON_Is_in_non_task_state() )
return E_CTX;
- assert( wait == FALSE );
-
the_message_buffer = _ITRON_Message_buffer_Get(mbfid, &location);
switch (location) {
case OBJECTS_REMOTE:
@@ -60,17 +57,20 @@ ER tsnd_mbf(
case OBJECTS_LOCAL:
/* XXX Submit needs to take into account blocking */
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_message_buffer->message_queue,
msg,
msgsz,
the_message_buffer->Object.id,
NULL,
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ interval /* timeout interval */
);
_Thread_Enable_dispatch();
- return
- _ITRON_Message_buffer_Translate_core_message_buffer_return_code(status);
+ return _ITRON_Message_buffer_Translate_core_message_buffer_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
/*
diff --git a/c/src/exec/posix/include/rtems/posix/mqueue.h b/c/src/exec/posix/include/rtems/posix/mqueue.h
index 1c1201fef1..e3f7a2f073 100644
--- a/c/src/exec/posix/include/rtems/posix/mqueue.h
+++ b/c/src/exec/posix/include/rtems/posix/mqueue.h
@@ -32,10 +32,8 @@ extern "C" {
typedef struct {
Objects_Control Object;
int process_shared;
- int flags;
boolean named;
boolean linked;
- boolean blocking;
int oflag;
unsigned32 open_count;
CORE_message_queue_Control Message_queue;
diff --git a/c/src/exec/rtems/src/msgqcreate.c b/c/src/exec/rtems/src/msgqcreate.c
index 395cbf3a02..24bc35993d 100644
--- a/c/src/exec/rtems/src/msgqcreate.c
+++ b/c/src/exec/rtems/src/msgqcreate.c
@@ -59,7 +59,7 @@ rtems_status_code rtems_message_queue_create(
)
{
register Message_queue_Control *the_message_queue;
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
void *handler;
#if defined(RTEMS_MULTIPROCESSING)
boolean is_global;
@@ -74,10 +74,10 @@ rtems_status_code rtems_message_queue_create(
return RTEMS_MP_NOT_CONFIGURED;
#endif
- if (count == 0)
+ if ( count == 0 )
return RTEMS_INVALID_NUMBER;
- if (max_message_size == 0)
+ if ( max_message_size == 0 )
return RTEMS_INVALID_SIZE;
#if defined(RTEMS_MULTIPROCESSING)
@@ -115,11 +115,9 @@ rtems_status_code rtems_message_queue_create(
the_message_queue->attribute_set = attribute_set;
if (_Attributes_Is_priority( attribute_set ) )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
handler = NULL;
#if defined(RTEMS_MULTIPROCESSING)
@@ -129,7 +127,7 @@ rtems_status_code rtems_message_queue_create(
if ( ! _CORE_message_queue_Initialize(
&the_message_queue->message_queue,
OBJECTS_RTEMS_MESSAGE_QUEUES,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
count,
max_message_size,
handler ) ) {
diff --git a/c/src/exec/rtems/src/msgqreceive.c b/c/src/exec/rtems/src/msgqreceive.c
index 1338216c6b..77fcadc313 100644
--- a/c/src/exec/rtems/src/msgqreceive.c
+++ b/c/src/exec/rtems/src/msgqreceive.c
@@ -92,12 +92,12 @@ rtems_status_code rtems_message_queue_receive(
buffer,
size,
wait,
- &core_priority,
timeout
);
_Thread_Enable_dispatch();
- return( _Message_queue_Translate_core_message_queue_return_code(
- _Thread_Executing->Wait.return_code ) );
+ return _Message_queue_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
diff --git a/c/src/exec/rtems/src/msgqsubmit.c b/c/src/exec/rtems/src/msgqsubmit.c
index 5a03f6409a..16f1c50266 100644
--- a/c/src/exec/rtems/src/msgqsubmit.c
+++ b/c/src/exec/rtems/src/msgqsubmit.c
@@ -61,7 +61,6 @@ rtems_status_code _Message_queue_Submit(
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
- CORE_message_queue_Status core_status;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location )
@@ -98,39 +97,43 @@ rtems_status_code _Message_queue_Submit(
case OBJECTS_LOCAL:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
- core_status = _CORE_message_queue_Send(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Send(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
case MESSAGE_QUEUE_URGENT_REQUEST:
- core_status = _CORE_message_queue_Urgent(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Urgent(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
default:
- core_status = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
return RTEMS_INTERNAL_ERROR; /* should never get here */
}
_Thread_Enable_dispatch();
return _Message_queue_Translate_core_message_queue_return_code(
- core_status );
+ _Thread_Executing->Wait.return_code
+ );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
diff --git a/c/src/exec/score/include/rtems/score/coremsg.h b/c/src/exec/score/include/rtems/score/coremsg.h
index 6ffedee0af..818b1e8c18 100644
--- a/c/src/exec/score/include/rtems/score/coremsg.h
+++ b/c/src/exec/score/include/rtems/score/coremsg.h
@@ -166,13 +166,12 @@ void _CORE_message_queue_Close(
);
/*
- *
* _CORE_message_queue_Flush
*
* DESCRIPTION:
*
- * This function flushes the message_queue's task wait queue. The number
- * messages flushed from the queue is returned.
+ * This function flushes the message_queue's pending message queue. The
+ * number of messages flushed from the queue is returned.
*
*/
@@ -194,7 +193,20 @@ unsigned32 _CORE_message_queue_Flush_support(
);
/*
+ * _CORE_message_queue_Flush_waiting_threads
+ *
+ * DESCRIPTION:
*
+ * This function flushes the threads which are blocked on this
+ * message_queue's pending message queue. They are unblocked whether
+ * blocked sending or receiving.
+ */
+
+void _CORE_message_queue_Flush_waiting_threads(
+ CORE_message_queue_Control *the_message_queue
+);
+
+/*
* _CORE_message_queue_Broadcast
*
* DESCRIPTION:
@@ -214,7 +226,6 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
);
/*
- *
* _CORE_message_queue_Submit
*
* DESCRIPTION:
@@ -228,17 +239,18 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
*
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
);
/*
- *
* _CORE_message_queue_Seize
*
* DESCRIPTION:
@@ -248,6 +260,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* inactive message pool. The thread will be blocked if wait is TRUE,
* otherwise an error will be given to the thread if no messages are available.
*
+ * NOTE: Returns message priority via return are in TCB.
*/
void _CORE_message_queue_Seize(
@@ -256,10 +269,25 @@ void _CORE_message_queue_Seize(
void *buffer,
unsigned32 *size,
boolean wait,
- CORE_message_queue_Submit_types *priority,
Watchdog_Interval timeout
);
+/*
+ * _CORE_message_queue_Insert_message
+ *
+ * DESCRIPTION:
+ *
+ * This kernel routine inserts the specified message into the
+ * message queue. It is assumed that the message has been filled
+ * in before this routine is called.
+ */
+
+void _CORE_message_queue_Insert_message(
+ CORE_message_queue_Control *the_message_queue,
+ CORE_message_queue_Buffer_control *the_message,
+ CORE_message_queue_Submit_types submit_type
+);
+
#ifndef __RTEMS_APPLICATION__
#include <rtems/score/coremsg.inl>
#endif
diff --git a/c/src/exec/score/inline/rtems/score/coremsg.inl b/c/src/exec/score/inline/rtems/score/coremsg.inl
index af16fbd4aa..7356ce9537 100644
--- a/c/src/exec/score/inline/rtems/score/coremsg.inl
+++ b/c/src/exec/score/inline/rtems/score/coremsg.inl
@@ -27,15 +27,17 @@
* This routine sends a message to the end of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Send(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -45,7 +47,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
@@ -58,15 +62,17 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
* This routine sends a message to the front of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Urgent(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -76,7 +82,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_URGENT_REQUEST
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
diff --git a/c/src/exec/score/macros/rtems/score/coremsg.inl b/c/src/exec/score/macros/rtems/score/coremsg.inl
index bc45a51ac3..0717b3eea3 100644
--- a/c/src/exec/score/macros/rtems/score/coremsg.inl
+++ b/c/src/exec/score/macros/rtems/score/coremsg.inl
@@ -23,9 +23,10 @@
*/
#define _CORE_message_queue_Send( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_SEND_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_SEND_REQUEST, (_wait), (_timeout)
/*PAGE
*
@@ -34,9 +35,10 @@ _id, _api_message_queue_mp_support ) \
*/
#define _CORE_message_queue_Urgent( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_URGENT_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST, (_wait), (_timeout)
/*PAGE
*
diff --git a/c/src/exec/score/src/coremsgbroadcast.c b/c/src/exec/score/src/coremsgbroadcast.c
index 2e6f649545..18e148ab1c 100644
--- a/c/src/exec/score/src/coremsgbroadcast.c
+++ b/c/src/exec/score/src/coremsgbroadcast.c
@@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
Thread_Wait_information *waitp;
unsigned32 constrained_size;
+ /*
+ * If there are pending messages, then there can't be threads
+ * waiting for us to send them a message.
+ *
+ * NOTE: This check is critical because threads can block on
+ * send and receive and this ensures that we are broadcasting
+ * the message to threads waiting to receive -- not to send.
+ */
+
+ if ( the_message_queue->number_of_pending_messages != 0 ) {
+ *count = 0;
+ return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ }
+
+ /*
+ * There must be no pending messages if there is a thread waiting to
+ * receive a message.
+ */
+
number_broadcasted = 0;
while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
waitp = &the_thread->Wait;
diff --git a/c/src/exec/score/src/coremsgsubmit.c b/c/src/exec/score/src/coremsgsubmit.c
index 4e15ab5bc0..6829351c18 100644
--- a/c/src/exec/score/src/coremsgsubmit.c
+++ b/c/src/exec/score/src/coremsgsubmit.c
@@ -53,102 +53,110 @@
* error code - if unsuccessful
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
+ ISR_Level level;
CORE_message_queue_Buffer_control *the_message;
Thread_Control *the_thread;
+ Thread_Control *executing;
- if ( size > the_message_queue->maximum_message_size )
- return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+
+ if ( size > the_message_queue->maximum_message_size ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ return;
+ }
/*
- * Is there a thread currently waiting on this message queue?
+ * Is there a thread currently waiting on this message queue?
*/
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( the_thread ) {
- _CORE_message_queue_Copy_buffer(
- buffer,
- the_thread->Wait.return_argument,
- size
- );
- *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
- the_thread->Wait.count = submit_type;
+ if ( the_message_queue->number_of_pending_messages == 0 ) {
+ the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
+ if ( the_thread ) {
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_thread->Wait.return_argument,
+ size
+ );
+ *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
+ the_thread->Wait.count = submit_type;
#if defined(RTEMS_MULTIPROCESSING)
- if ( !_Objects_Is_local_id( the_thread->Object.id ) )
- (*api_message_queue_mp_support) ( the_thread, id );
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+ (*api_message_queue_mp_support) ( the_thread, id );
#endif
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ return;
+ }
}
/*
- * No one waiting on this one currently.
- * Allocate a message buffer and store it away
+ * No one waiting on the message queue at this time, so attempt to
+ * queue the message up for a future receive.
*/
- if ( the_message_queue->number_of_pending_messages ==
+ if ( the_message_queue->number_of_pending_messages <
the_message_queue->maximum_pending_messages ) {
- return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
- }
- the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue);
- if ( the_message == 0 )
- return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
-
- _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
- the_message->Contents.size = size;
- the_message->priority = submit_type;
-
- the_message_queue->number_of_pending_messages += 1;
-
- switch ( submit_type ) {
- case CORE_MESSAGE_QUEUE_SEND_REQUEST:
- _CORE_message_queue_Append( the_message_queue, the_message );
- break;
- case CORE_MESSAGE_QUEUE_URGENT_REQUEST:
- _CORE_message_queue_Prepend( the_message_queue, the_message );
- break;
- default:
- /* XXX interrupt critical section needs to be addressed */
- {
- CORE_message_queue_Buffer_control *this_message;
- Chain_Node *the_node;
-
- the_message->priority = submit_type;
- for ( the_node = the_message_queue->Pending_messages.first ;
- !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ;
- the_node = the_node->next ) {
-
- this_message = (CORE_message_queue_Buffer_control *) the_node;
-
- if ( this_message->priority >= the_message->priority )
- continue;
-
- _Chain_Insert( the_node, &the_message->Node );
- break;
- }
- }
- break;
+ the_message =
+ _CORE_message_queue_Allocate_message_buffer( the_message_queue );
+
+ /*
+ * NOTE: If the system is consistent, this error should never occur.
+ */
+ if ( !the_message ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
+ return;
+ }
+
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_message->Contents.buffer,
+ size
+ );
+ the_message->Contents.size = size;
+ the_message->priority = submit_type;
+
+ _CORE_message_queue_Insert_message(
+ the_message_queue,
+ the_message,
+ submit_type
+ );
+ return;
}
/*
- * According to POSIX, does this happen before or after the message
- * is actually enqueued. It is logical to think afterwards, because
- * the message is actually in the queue at this point.
+ * No message buffers were available so we may need to return an
+ * overflow error or block the sender until the message is placed
+ * on the queue.
*/
- if ( the_message_queue->number_of_pending_messages == 1 &&
- the_message_queue->notify_handler )
- (*the_message_queue->notify_handler)( the_message_queue->notify_argument );
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ if ( !wait ) {
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
+ return;
+ }
+
+ executing = _Thread_Executing;
+
+ _ISR_Disable( level );
+ _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue );
+ executing->Wait.queue = &the_message_queue->Wait_queue;
+ executing->Wait.id = id;
+ executing->Wait.return_argument = (void *)buffer;
+ executing->Wait.return_argument_1 = (void *)size;
+ executing->Wait.count = submit_type;
+ _ISR_Enable( level );
+
+ _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
}
diff --git a/c/src/tests/psxtests/psxmsgq01/init.c b/c/src/tests/psxtests/psxmsgq01/init.c
index d40ab18a2f..534f6c6dcb 100644
--- a/c/src/tests/psxtests/psxmsgq01/init.c
+++ b/c/src/tests/psxtests/psxmsgq01/init.c
@@ -15,13 +15,95 @@
#include <fcntl.h>
#include <time.h>
#include <tmacros.h>
+#include <signal.h> /* signal facilities */
+
+typedef struct {
+ char msg[ 50 ];
+ int size;
+ unsigned int priority;
+}Test_Message_t;
+Test_Message_t Predefined_Msgs[MAXMSG+1];
+Test_Message_t Predefined_Msgs[MAXMSG+1] = {
+ { "12345678", 9, MQ_PRIO_MAX-1 }, /* Max Length Message med */
+ { "", 1, 1 }, /* NULL Message low */
+ { "Last", 5, MQ_PRIO_MAX }, /* Queue Full Message hi */
+ { "No Message", 0, MQ_PRIO_MAX-1 }, /* 0 length Message med */
+ { "1", 2, 0 }, /* Cause Overflow Behavior */
+};
+int Priority_Order[MAXMSG+1] = { 2, 0, 3, 1, MAXMSG };
+
+
+typedef struct {
+ mqd_t mq;
+ Test_Queue_Types index;
+ char *name;
+ int oflag;
+ int maxmsg;
+ int msgsize;
+ int count;
+} Test_queue_type;
+
+Test_queue_type Test_q[ NUMBER_OF_TEST_QUEUES ] =
+{
+ { 0, 0, "Qread", ( O_CREAT | O_RDONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 1, "Qwrite", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 2, "Qnoblock", ( O_CREAT | O_RDWR | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 3, "Qblock", ( O_CREAT | O_RDWR ) , MAXMSG, MSGSIZE, 0 },
+ { 0, 4, "Qdefault", ( O_CREAT | O_RDWR ) , 10, 16, 0 },
+ { 0, 5, "mq6", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+};
+
+#define RW_NAME Test_q[ RW_QUEUE ].name
+#define DEFAULT_NAME Test_q[ DEFAULT_RW ].name
+#define RD_NAME Test_q[ RD_QUEUE ].name
+#define WR_NAME Test_q[ WR_QUEUE ].name
+#define BLOCKING_NAME Test_q[ BLOCKING ].name
+#define CLOSED_NAME Test_q[ CLOSED ].name
+
+#define RW_ATTR Test_q[ RW_QUEUE ].oflag
+#define DEFAULT_ATTR Test_q[ DEFAULT_RW ].oflag
+#define RD_ATTR Test_q[ RD_QUEUE ].oflag
+#define WR_ATTR Test_q[ WR_QUEUE ].oflag
+#define BLOCK_ATTR Test_q[ BLOCKING ].oflag
+#define CLOSED_ATTR Test_q[ CLOSED ].oflag
-char Queue_Name[PATH_MAX + 2];
-char *Get_Queue_Name(
- int i
+/*
+ * Outputs a header at each test section.
+ */
+void Start_Test(
+ char *description
)
{
- sprintf(Queue_Name,"mq%d",i+1);
+ printf( "_______________%s\n", description );
+}
+
+
+void Validate_attributes(
+ mqd_t mq,
+ int oflag,
+ int msg_count
+)
+{
+ int status;
+ struct mq_attr attr;
+
+ status = mq_getattr( mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+
+ if ( mq != Test_q[ DEFAULT_RW ].mq ){
+ fatal_int_service_status((int)attr.mq_maxmsg, MAXMSG, "maxmsg attribute" );
+ fatal_int_service_status((int)attr.mq_msgsize,MSGSIZE,"msgsize attribute");
+ }
+
+ fatal_int_service_status((int)attr.mq_curmsgs, msg_count, "count attribute" );
+ fatal_int_service_status((int)attr.mq_flags, oflag, "flag attribute" );
+}
+
+char Queue_Name[PATH_MAX + 2];
+#define Get_Queue_Name( i ) Test_q[i].name
+
+char *Build_Queue_Name( int i ) {
+ sprintf(Queue_Name,"mq%d", i+1 );
return Queue_Name;
}
@@ -35,354 +117,1010 @@ char *Get_Too_Long_Name()
return Queue_Name;
}
-typedef enum {
- DEFAULT_SIZE_TYPE,
- TEST_SIZE_TYPE,
- MAX_SIZE,
- TYPES_OF_TEST_SIZES
-} TEST_MQ_SIZE_TYPES;
+void open_test_queues()
+{
+ struct mq_attr attr;
+ int status;
+ Test_queue_type *tq;
+ int que;
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
+
+ puts( "Init: Open Test Queues" );
+
+ for( que = 0; que < NUMBER_OF_TEST_QUEUES; que++ ) {
+
+ tq = &Test_q[ que ];
+ if ( que == DEFAULT_RW)
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, NULL );
+ else
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, &attr );
+
+ assert( Test_q[que].mq != (-1) );
+ }
+
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+}
/*
* Opens CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES then leaves size queues
* opened but closes the rest.
*/
-void validate_mq_open_error_codes(
- mqd_t *mqs, /* Must be large enough for Maximum to be opened. */
- int size
-)
+void validate_mq_open_error_codes()
{
int i;
mqd_t n_mq2;
struct mq_attr attr;
int status;
+ mqd_t open_mq[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
- assert( size < (CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES-1) );
+ Start_Test( "mq_open errors" );
/*
- * Validate mq_open errors that can occur when no queues are open.
- * EINVAL
- * ENOENT
- * EINTR
+ * XXX EINVAL - inappropriate name was given for the message queue
*/
/*
- * XXX EINVAL - inappropriate name was given for the message queue
+ * EINVAL - Create with negative maxmsg.
*/
attr.mq_maxmsg = -1;
- puts( "mq_open - Create with maxmsg (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with maxmsg (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_maxmsg = MAXMSG;
+
+ /*
+ * EINVAL - Create withnegative msgsize.
+ */
attr.mq_msgsize = -1;
- puts( "mq_open - Create with msgsize (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with msgsize (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_msgsize = MSGSIZE;
+
+ /*
+ * ENOENT - Open a non-created file.
+ */
- puts( "mq_open - Open new mq without create flag (ENOENT)" );
- n_mq2 = mq_open("mq3", O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Open new mq without create flag (ENOENT)" );
+ n_mq2 = mq_open( "mq3", O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENOENT, "mq_open errno ENOENT");
+ fatal_posix_service_status( errno, ENOENT, "mq_open errno ENOENT");
+
/*
* XXX EINTR - call was interrupted by a signal
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_open or mq_open is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_open - Open with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_open - Open with too long of a name (ENAMETOOLONG)" );
n_mq2 = mq_open( Get_Too_Long_Name(), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
/*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
+
+ /*
* Open maximum number of message queues
*/
- puts( "mq_open - SUCCESSFUL" );
+ puts( "Init: mq_open - SUCCESSFUL" );
for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- mqs[i] = mq_open( Get_Queue_Name(i), O_CREAT | O_RDWR, 0x777, NULL );
- assert( mqs[i] != (-1) );
+ open_mq[i] = mq_open(
+ Build_Queue_Name(i), O_CREAT | O_RDWR | O_NONBLOCK, 0x777, NULL );
+ assert( open_mq[i] != (-1) );
/*XXX - Isn't there a more general check */
}
/*
- * Validate open errors that must occur after message queues are open.
- * EACCES
- * EEXIST
- * EMFILE
- * ENFILE
+ * XXX EACCES - permission to create is denied.
*/
/*
- * XXX EACCES - permission to create is denied.
+ * XXX EACCES - queue exists permissions specified by o_flag are denied.
*/
/*
- * XXX EACCES - queue exists permissions specified by o_flag are denied.
- puts( "mq_open - open mq as write (EACCES)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_WRONLY, 0x777, NULL);
- fatal_directive_status(
- (int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EACCES, "mq_open errno EACCES");
+ * EEXIST - Create an existing queue.
*/
- puts( "mq_open - Create an Existing mq (EEXIST)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Create an Existing mq (EEXIST)" );
+ n_mq2 = mq_open(
+ Build_Queue_Name(0), O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EEXIST, "mq_open errno EEXIST");
+ fatal_posix_service_status( errno, EEXIST, "mq_open errno EEXIST");
+ /*
+ * XXX EMFILE - Too many message queues in use by the process
+ */
/*
- * XXX EMFILE - Too many message queues open
+ * ENFILE - Too many message queues open in the system
*/
- puts( "mq_open - system is out of resources (ENFILE)" );
- n_mq2 = mq_open( Get_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ puts( "Init: mq_open - system is out of resources (ENFILE)" );
+ n_mq2 = mq_open( Build_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENFILE, "mq_open errno ENFILE");
+ fatal_posix_service_status( errno, ENFILE, "mq_open errno ENFILE");
/*
- * Unlink and Close .
+ * Unlink and Close all queues.
*/
- puts( "mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
- for (i = size; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
+ puts( "Init: mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
+ for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- status = mq_close( mqs[i] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ status = mq_close( open_mq[i]);
+ fatal_posix_service_status( status, 0, "mq_close message queue");
- status = mq_unlink( Get_Queue_Name(i) );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ status = mq_unlink( Build_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
}
}
-void validate_mq_unlink_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_unlink_error_codes()
{
int status;
+ Start_Test( "mq_unlink errors" );
+
/*
* XXX - EACCES Permission Denied
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_unlink or mq_unlink is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
status = mq_unlink( Get_Too_Long_Name() );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink(Get_Queue_Name(size));
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
+ /*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
/*
- * XXX - These errors are not in the POSIX manual but may occur.
+ * ENOENT - Unlink an unopened queue
*/
- puts( "mq_unlink (NULL) - EINVAL" );
+ puts( "Init: mq_unlink - A Queue not opened (ENOENT)" );
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENOENT, "mq_unlink errno ENOENT");
+
+ /*
+ * XXX - The following were not listed in the POSIX document as
+ * possible errors. Under other commands the EINVAL is
+ * given for these conditions.
+ */
+
+ /*
+ * EINVAL - Unlink a queue with no name
+ */
+
+ puts( "Init: mq_unlink (NULL) - EINVAL" );
status = mq_unlink( NULL );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
- puts( "mq_unlink (\"\") - EINVAL" );
+ /*
+ * EINVAL - Unlink a queue with a null name
+ */
+
+ puts( "Init: mq_unlink (\"\") - EINVAL" );
status = mq_unlink( "" );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
}
-void validate_mq_close_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_close_error_codes()
{
int status;
- puts( "mq_close - UNSUCCESSFUL (EBADF)" );
- status = mq_close(mqs[size]);
- fatal_directive_status( status, -1, "mq_close error return status");
- fatal_directive_status( errno, EBADF, "mq_close errno EBADF");
+ Start_Test( "mq_close errors" );
+
+ /*
+ * EBADF - Close a queue that is not open.
+ */
+
+ puts( "Init: mq_close - unopened queue (EBADF)" );
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
}
+
+void validate_mq_getattr_error_codes()
+{
+ struct mq_attr attr;
+ int status;
+
+ Start_Test( "mq_getattr errors" );
+
+ /*
+ * EBADF - Get the attributes from a closed queue.
+ */
+
+ puts( "Init: mq_getattr - unopened queue (EBADF)" );
+ status = mq_getattr( Test_q[CLOSED].mq, &attr );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Init: mq_getattr - NULL attributes (EINVAL)" );
+ status = mq_getattr( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_close errno EINVAL");
+
+}
+
+
+void Send_msg_to_que(
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ int status;
+
+ status = mq_send( Test_q[que].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, 0, "mq_send valid return status");
+ Test_q[que].count++;
+}
+
+void Show_send_msg_to_que(
+ char *task_name,
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ printf( "%s mq_send - to %s msg: %s priority %d\n",
+ task_name, Test_q[que].name, ptr->msg, ptr->priority);
+ Send_msg_to_que( que, msg );
+}
+
+void verify_queues_full(
+ char *task_name
+)
+{
+ int que;
+
+ /*
+ * Validate that the queues are full.
+ */
+
+ printf( "%s Verify Queues are full\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, Test_q[que].count );
+
+}
+void verify_queues_empty(
+ char *task_name
+)
+{
+ int que;
+
+ printf( "%s Verify Queues are empty\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, 0 );
+}
+
+int fill_message_queues(
+ char *task_name
+)
+{
+ int msg;
+ int status;
+ int que;
+
+
+ verify_queues_empty( task_name );
+
+ /*
+ * Fill Queue with predefined messages.
+ */
+
+ printf( "%s Fill Queues with messages\n", task_name );
+ for(msg=0; msg<MAXMSG; msg++){
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ Send_msg_to_que( que, msg );
+ }
+ }
+
+ verify_queues_full( "Init:" );
+ return msg;
+}
+
+
+void Read_msg_from_que(
+ int que,
+ int msg
+)
+{
+ unsigned int priority;
+ Test_Message_t *ptr;
+ int status;
+ char message[100];
+ char err_msg[100];
+
+ ptr = &Predefined_Msgs[msg];
+ status = mq_receive(Test_q[ que ].mq, message, 100, &priority );
+ Test_q[que].count--;
+
+ sprintf( err_msg, "%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status( status, ptr->size, err_msg );
+
+ assert( !strcmp( message, ptr->msg ) );
+ strcpy( message, "No Message" );
+
+ sprintf( err_msg,"%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status(priority, ptr->priority, err_msg );
+}
+
+int empty_message_queues(
+ char *task_name
+)
+{
+ int que;
+ int i;
+
+ printf( "%s Empty all Queues\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ for(i=0; Test_q[que].count != 0; i++ )
+ Read_msg_from_que( que, Priority_Order[i] );
+
+ Validate_attributes( Test_q[ que].mq, Test_q[ que ].oflag, 0 );
+ }
+ return 0;
+}
+
/*
* Returns the number of messages queued after the test on the
* first queue.
*/
-int validate_mq_send_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+int validate_mq_send_error_codes( )
{
int status;
int i;
- mqd_t n_mq1;
- struct mq_attr attr;
+ char *str;
- attr.mq_maxmsg = 3;
- attr.mq_msgsize = 8;
+ Start_Test( "mq_send errors" );
/*
- * XXX - EBADF Not a valid message descriptor.
- * Write to a invalid message descriptor
- * XXX - Write to a read only queue
+ * EBADF - Write to a closed queue.
*/
- puts( "mq_send - Closed message queue (EBADF)" );
- status = mq_send( mqs[size], "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ puts( "Init: mq_send - Closed message queue (EBADF)" );
+ status = mq_send( Test_q[CLOSED].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_open - Open a read only queue" );
- n_mq1 = mq_open("read_only", O_CREAT | O_RDONLY, 0x777, &attr);
- assert( n_mq1 != (-1) );
- /*XXX - Isn't there a more general check */
+ /*
+ * EBADF - Write to a read only queue.
+ */
+
+ puts( "Init: mq_send - Read only message queue (EBADF)" );
+ status = mq_send( Test_q[ RD_QUEUE ].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_send - Read only message queue (EBADF)" );
- status = mq_send( n_mq1, "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ /*
+ * XXX - EINTR Signal interrupted the call.
+ *
+ puts( "Init: mq_send - UNSUCCESSFUL (EINTR)" );
+ status = mq_send( Test_q, "", 0xffff, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, E, "mq_send errno E");
+ */
+
+ /*
+ * EINVAL priority is out of range.
+ */
+
+ puts( "Init: mq_send - Priority out of range (EINVAL)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", 1, MQ_PRIO_MAX + 1 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_send errno EINVAL");
+
+ /*
+ * EMSGSIZE - Message size larger than msg_len
+ * Validates that msgsize is stored correctly.
+ */
+
+ puts( "Init: mq_send - Message longer than msg_len (EMSGSIZE)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", MSGSIZE+1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+
+ i = fill_message_queues( "Init:" );
+
+ /*
+ * ENOSYS - send not supported
+ puts( "Init: mq_send - Blocking Queue overflow (ENOSYS)" );
+ status = mq_send( n_mq1, Predefined_Msgs[i], 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
+ fatal_posix_service_status( status, 0, "mq_close message queue");
status = mq_unlink( "read_only" );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+ */
/*
- * XXX - EINTR
- * Signal interrupted the call.
+ * EAGAIN - O_NONBLOCK and message queue is full.
+ */
+
+ puts( "Init: mq_send - on a FULL non-blocking queue with (EAGAIN)" );
+ str = Predefined_Msgs[i].msg;
+ status = mq_send(Test_q[RW_QUEUE].mq, str, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
- puts( "mq_send - UNSUCCESSFUL (EINTR)" );
- status = mq_send( mqs, "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ return i-1;
+}
+
+void validate_mq_receive_error_codes( )
+{
+ int status;
+ char message[100];
+ unsigned int priority;
+ int i;
+
+ Start_Test( "mq_receive errors" );
+
+ /*
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_receive - Unopened message queue (EBADF)" );
+ status = mq_receive( Test_q[CLOSED].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * XXX - EINVAL priority is out of range.
+ * EBADF - Queue not opened to read
*/
- puts( "mq_send - Priority out of range (EINVAL)" );
- status = mq_send( mqs[0], "", 1, MQ_PRIO_MAX + 1 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EINVAL, "mq_send errno EINVAL");
+ puts( "Init: mq_receive - Write only queue (EBADF)" );
+ status = mq_receive( Test_q[WR_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
/*
- * XXX - EMSGSIZE - Message size larger than msg_len
+ * EMSGSIZE - Size is less than the message size attribute
*/
- puts( "mq_send - Message longer than msg_len (EMSGSIZE)" );
- status = mq_send( mqs[0], "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+ puts( "Init: mq_receive - Size is less than the message (EMSGSIZE)" );
+ status = mq_receive(
+ Test_q[RW_QUEUE].mq, message, Predefined_Msgs[0].size-1, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_receive errno EMSGSIZE");
+
/*
- * ENOSYS - send is supported should never happen.
+ * EAGAIN - O_NONBLOCK and Queue is empty
*/
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+ puts( "Init: mq_receive - Queue is empty (EAGAIN)" );
+ status = mq_receive( Test_q[RW_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
/*
- * XXX - EAGAIN
- * O_NONBLOCK and message queue is full.
- * This is validated in the read/write test.
+ * XXX - EINTR - Interrupted by a signal
*/
- i=0;
- do {
- status = mq_send( mqs[0], "", 1, 0 );
- i++;
- } while (status == 0);
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ /*
+ * XXX - EBADMSG - a data corruption problem.
+ */
- return i-1;
+ /*
+ * XXX - ENOSYS - mq_receive not supported
+ */
+}
+
+void verify_open_functionality()
+{
+ mqd_t n_mq;
+
+ Start_Test( "mq_open functionality" );
+
+ /*
+ * Validate a second open returns the same message queue.
+ */
+
+ puts( "Init: mq_open - Open an existing mq ( same id )" );
+ n_mq = mq_open( RD_NAME, 0 );
+ fatal_posix_service_status(
+ (int) n_mq, (int ) Test_q[RD_QUEUE].mq, "mq_open error return status" );
+}
+
+void verify_unlink_functionality()
+{
+ mqd_t n_mq;
+ int status;
+
+ Start_Test( "mq_unlink functionality" );
+
+ /*
+ * Unlink the message queue, then verify an open of the same name produces a
+ * different message queue.
+ */
+
+ puts( "Init: Unlink and Open without closing SUCCESSFUL" );
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+
+ n_mq = mq_open( DEFAULT_NAME, DEFAULT_ATTR, 0x777, NULL );
+ assert( n_mq != (-1) );
+ assert( n_mq != Test_q[ DEFAULT_RW ].mq );
+
+
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+ status = mq_close( Test_q[ DEFAULT_RW ].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+
+ Test_q[ DEFAULT_RW ].mq = n_mq;
}
-void validate_mq_receive_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
+void verify_close_functionality()
+{
+ int i;
+ int status;
+ Start_Test( "Unlink and Close All Files" );
+ for (i=0; i<DEFAULT_RW; i++) {
+
+ status = mq_unlink( Get_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+
+ status = mq_close( Test_q[i].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ }
+}
+
+
+void verify_timed_send_queue(
+ int que,
+ int is_blocking
+)
+{
+ int i;
+ struct timespec timeout;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int len;
+ int status;
+ char *msg;
+
+ timeout.tv_sec = 1;
+ timeout.tv_nsec = 0;
+
+ printf( "Init: mq_timedsend - on queue %s ", Test_q[que].name);
+ len = Predefined_Msgs[MAXMSG].size;
+ msg = Predefined_Msgs[MAXMSG].msg;
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedsend( Test_q[que].mq, msg, len , 0, &timeout );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ if ( is_blocking ) { /* Don't verify the non-blocking queue */
+ fatal_int_service_status( status, -1, "mq_timedsend status");
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ }
+
+ printf("Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking ) /* non-blocking queue */
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+
+ if ( que == DEFAULT_RW )
+ Test_q[que].count++;
+}
+
+void verify_timed_send()
+{
+ int que;
+
+ Start_Test( "mq_timedsend" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if ( que == BLOCKING )
+ verify_timed_send_queue( que, 1 );
+ else
+ verify_timed_send_queue( que, 0 );
+ }
+}
+
+void verify_timed_receive_queue(
+ char *task_name,
+ int que,
+ int is_blocking
)
{
+ char message[ 100 ];
+ unsigned int priority;
+ struct timespec tm;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int status;
+
+ tm.tv_sec = 1;
+ tm.tv_nsec = 0;
+
+ printf( "Init: %s mq_timedreceive - on queue %s ", task_name, Test_q[que].name);
+
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedreceive( Test_q[ que ].mq, message, 100, &priority, &tm );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ fatal_int_service_status( status, -1, "mq_timedreceive status");
+ if ( is_blocking )
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ printf( "Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking )
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+}
+
+
+
+void verify_timed_receive()
+{
+ int que;
+
+ Start_Test( "mq_timedreceive" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if (( que == BLOCKING ) || ( que == DEFAULT_RW ))
+ verify_timed_receive_queue( "Init:", que, 1 );
+ else
+ verify_timed_receive_queue( "Init:", que, 0 );
+ }
+}
+
+#if (0)
+void verify_set_attr()
+{
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ struct mq_attr attr;
+ int i;
+ int status;
+
+ attr.mq_maxmsg = 0;
+ attr.mq_msgsize = 0;
+
+ Start_Test( "mq_setattr" );
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
+ }
+}
+#endif
+
+void wait_for_signal(
+ sigset_t *waitset,
+ int sec,
+ int expect_signal
+)
+{
+ siginfo_t siginfo;
+ int status;
+ struct timespec timeout;
+ int signo;
+
+ siginfo.si_code = -1;
+ siginfo.si_signo = -1;
+ siginfo.si_value.sival_int = -1;
+
+ timeout.tv_sec = sec;
+ timeout.tv_nsec = 0;
+
+ status = sigemptyset( waitset );
+ assert( !status );
+
+ status = sigaddset( waitset, SIGUSR1 );
+ assert( !status );
+
+ printf( "waiting on any signal for %d seconds.\n", sec );
+ signo = sigtimedwait( waitset, &siginfo, &timeout );
+ if (expect_signal) {
+ fatal_int_service_status( signo, SIGUSR1, "got SISUSR1" );
+ } else {
+ fatal_int_service_status( signo, -1, "error return status");
+ fatal_posix_service_status( errno, EAGAIN, "errno EAGAIN");
+ }
+}
+
+void verify_notify()
+{
+ struct sigevent event;
int status;
+ timer_t timer_id;
+ sigset_t set;
+ Test_Message_t *ptr;
+
+ Start_Test( "mq_notify" );
+
+ /* timer create */
+ event.sigev_notify = SIGEV_SIGNAL;
+ event.sigev_signo = SIGUSR1;
+ if (timer_create (CLOCK_REALTIME, &event, &timer_id) == -1)
+ fatal_posix_service_status( errno, 0, "errno ETIMEDOUT");
+
+ /* block the timer signal */
+ sigemptyset( &set );
+ sigaddset( &set, SIGUSR1 );
+ pthread_sigmask( SIG_BLOCK, &set, NULL );
/*
- * EAGAIN -
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_notify - Unopened message queue (EBADF)" );
+ status = mq_notify( Test_q[CLOSED].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * EBADF -
+ * Create ...
*/
/*
- * EMSGSIZE -
+ * XXX setup notification
+ */
+
+ printf( "_____mq_notify - notify when %s gets a message\n",RW_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ /*
+ * Send and verify signal occurs and registration is removed.
+ */
+
+ puts( "Init: Verify Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 1 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+
+ /*
+ * EBUSY - Already Registered
+ */
+
+ printf( "____mq_notify - notify when %s gets a message\n",RD_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ puts( "Init: mq_notify - (EBUSY)" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, -1, "mq_notify error return status");
+ fatal_posix_service_status( errno, EBUSY, "mq_notify errno EBUSY");
+
+ /*
+ * Verify NULL removes registration.
+ */
+
+ puts( "Init: mq_notify - Remove notification with null" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+}
+
+void verify_with_threads()
+{
+ int status;
+ pthread_t id;
+ Test_Message_t *ptr;
+ unsigned int priority;
+ char message[100];
+
+
+ /*
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 4 Receive Test" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_receive - Empty queue changes to non-blocking (EAGAIN)" );
+ status = mq_receive( Test_q[BLOCKING].mq, message, 100, &priority );
+ fatal_int_service_status( status, -1, "mq_receive error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
+ print_current_time( "Init: ", "" );
+
/*
- * EINTR -
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 1 Test" );
+ status = pthread_create( &id, NULL, Task_1, NULL );
+ assert( !status );
+ Read_msg_from_que( BLOCKING, 0 ); /* Block until init writes */
+ print_current_time( "Init: ", "" );
+
/*
- * EBADMSG - a data corruption problem.
- * XXX - Can not cause.
+ * Create a task then block until the task reads a message.
*/
+ Start_Test( "multi-thread Task 4 Send Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Full queue changes to non-blocking (EAGAIN)" );
+ status = mq_send(Test_q[BLOCKING].mq, message, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+
/*
- puts( "mq_ - UNSUCCESSFUL ()" );
- status = mq_( );
- fatal_directive_status( status, -1, "mq_ error return status");
- fatal_directive_status( errno, E, "mq_c errno E");
+ * Create a task then block until the task reads a message.
+ */
+
+ Start_Test( "multi-thread Task 2 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_2, NULL );
+ assert( !status );
+ Show_send_msg_to_que( "Init:", BLOCKING, Priority_Order[0] );
+ print_current_time( "Init: ", "" );
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
- */
/*
- * ENOSYS -
+ * Create a task then block until it deletes and closes all queues.
+ * EBADF - Queue unlinked and closed while blocked
*/
+ Start_Test( "multi-thread Task 3 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_3, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Block while thread deletes queue (EBADF)" );
+ ptr = &Predefined_Msgs[0];
+ status = mq_send( Test_q[BLOCKING].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
+
}
-void non_blocking_mq_read_write(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_setattr()
{
+ struct mq_attr attr;
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ int status;
+ int i;
+
/*
- int status;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
+ * EBADF - Get the attributes from a closed queue.
+ */
- status = mq_send( mqs[0], messages[0], strlen( messages[0] ), 0 );
- fatal_directive_status( status, 0, "mq_send error return status" );
-
- puts( "mq_send - UNSUCCESSFUL ()" );
- do {
- status = mq_send( );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ puts( "Task1:mq_setattr - unopened queue (EBADF)" );
+ status = mq_setattr( Test_q[CLOSED].mq, &attr, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_setattr errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Task1:mq_setattr - NULL attributes (EINVAL)" );
+ status = mq_setattr( Test_q[RW_QUEUE].mq, NULL, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_setattr errno EINVAL");
+
+ /*
+ * Verify change queues to blocking, by verifying all queues block
+ * for a timed receive.
+ */
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ /*
+ * Restore restore all queues to their old attribute.
+ */
+
+ for(i=0; i<CLOSED; i++) {
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
}
- */
}
void *POSIX_Init(
@@ -390,95 +1128,184 @@ void *POSIX_Init(
)
{
int status;
- mqd_t mqs[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
- mqd_t n_mq1;
mqd_t n_mq2;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
puts( "\n\n*** POSIX MESSAGE QUEUE TEST ***" );
- validate_mq_open_error_codes( mqs, 2 );
- validate_mq_unlink_error_codes( mqs, 2 );
- validate_mq_close_error_codes( mqs, 2 );
+ validate_mq_open_error_codes( );
+ open_test_queues();
+ validate_mq_unlink_error_codes();
+ validate_mq_close_error_codes();
+ verify_unlink_functionality();
+ validate_mq_setattr( );
+ validate_mq_send_error_codes();
+ validate_mq_getattr_error_codes();
+ verify_timed_send();
+ validate_mq_receive_error_codes();
+ verify_timed_receive();
+ verify_open_functionality();
+ verify_notify();
+ verify_with_threads();
+
+ puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
+ exit( 0 );
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
- validate_mq_send_error_codes( mqs, 2 );
- validate_mq_receive_error_codes( mqs, 2 );
+void *Task_1 (
+ void *argument
+)
+{
+ int status;
+ int count = 0;
+ sigset_t set;
- /*
- * Validate a second open returns the same message queue.
- */
+ /* Block Waiting for a message */
- puts( "mq_open - Open an existing mq ( same id )" );
- n_mq1 = mq_open("mq1", 0 );
- fatal_directive_status(
- (int) n_mq1, (int ) mqs[0], "mq_open error return status" );
-
- /*
- * Unlink the message queue, then verify an open of the same name produces a
- * different message queue.
- */
+ print_current_time( "Task_1: ", "" );
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ Show_send_msg_to_que( "Task_1:", BLOCKING, 0 );
- puts( "mq_open - Reopen mq1 SUCCESSFUL with a different id" );
- n_mq2 = mq_open( "mq1", O_CREAT | O_EXCL, 00777, NULL);
- assert( n_mq2 != (-1) );
- assert( n_mq2 != n_mq1 );
+ puts( "Task_1: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ assert( 0 );
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_2(
+ void *argument
+)
+{
+ int status;
+
+
+ print_current_time( "Task_2: ", "" );
+
+
+ /* Block waiting to send a message */
+
+ verify_queues_full( "Task_2:" );
+ Read_msg_from_que( BLOCKING, Priority_Order[0] ); /* Cause context switch */
+
+ puts( "Task_2: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_3 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_3: ", "" );
/*
- * Validate it "mq1" can be closed and unlinked.
+ * close and unlink all queues.
*/
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ verify_close_functionality( "Task_3: " );
+ puts( "Task_3: pthread_exit" );
+ pthread_exit( NULL );
- puts( "mq_close mq1 - SUCCESSFUL" );
- status = mq_close( n_mq2 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( mqs[0] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ /* switch to Init */
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq1");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_close errno EINVAL");
+ return NULL; /* just so the compiler thinks we returned something */
- /*
- * XXX - Cant' create location OBJECTS_ERROR or OBJECTS_REMOTE.
- * mq_close and mq_unlink.
- * XXX - Don't think we need this save until yellow line tested.
- puts( "Init: mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq3");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
- assert( (status == -1) && (errno == ENOENT) );
- */
+}
+void *Task_4 (
+ void *argument
+)
+{
+ struct mq_attr attr;
+ int status;
+ int count;
+
+ print_current_time( "Task_4: ", "" );
/*
- * Validate we can wait on a message queue opened with mq_open.
+ * Set the count to the number of messages in the queue.
*/
-#if (0) /* XXX FIX ME */
- puts( "Init: mq_wait on mq1" );
- status = mq_receive(n_mq1);
- fatal_directive_status( status, 0, "mq_wait opened message queue");
-#endif
+ status = mq_getattr( Test_q[BLOCKING].mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+ count = attr.mq_curmsgs;
- puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
- exit( 0 );
+ puts("Task_4: Set queue to non-blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag | O_NONBLOCK;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts("Task_4: Return queue to blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts( "Task_4: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_5 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_5: ", "" );
+
+ puts( "Task_5: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_ (
+ void *argument
+)
+{
+
+ print_current_time( "Task_: ", "" );
+
+ puts( "Task_: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
return NULL; /* just so the compiler thinks we returned something */
+
}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/cpukit/itron/src/cre_mbf.c b/cpukit/itron/src/cre_mbf.c
index ca139d53ae..0db738435e 100644
--- a/cpukit/itron/src/cre_mbf.c
+++ b/cpukit/itron/src/cre_mbf.c
@@ -25,7 +25,7 @@ ER cre_mbf(
T_CMBF *pk_cmbf
)
{
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
ITRON_Message_buffer_Control *the_message_buffer;
/*
@@ -57,16 +57,14 @@ ER cre_mbf(
}
if ( pk_cmbf->mbfatr & TA_TPRI )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
_CORE_message_queue_Initialize(
&the_message_buffer->message_queue,
OBJECTS_ITRON_MESSAGE_BUFFERS,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
pk_cmbf->bufsz / pk_cmbf->maxmsz,
pk_cmbf->maxmsz,
NULL /* Multiprocessing not supported */
diff --git a/cpukit/itron/src/snd_mbx.c b/cpukit/itron/src/snd_mbx.c
index f674583e7b..e02714465e 100644
--- a/cpukit/itron/src/snd_mbx.c
+++ b/cpukit/itron/src/snd_mbx.c
@@ -27,7 +27,6 @@ ER snd_msg(
{
register ITRON_Mailbox_Control *the_mailbox;
Objects_Locations location;
- CORE_message_queue_Status status = E_OK;
unsigned32 message_priority;
void *message_contents;
@@ -47,17 +46,22 @@ ER snd_msg(
message_priority = CORE_MESSAGE_QUEUE_SEND_REQUEST;
message_contents = pk_msg;
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_mailbox->message_queue,
&message_contents,
sizeof(T_MSG *),
the_mailbox->Object.id,
NULL, /* multiprocessing not supported */
- message_priority
+ message_priority,
+ FALSE, /* do not allow sender to block */
+ 0 /* no timeout */
);
break;
}
_ITRON_return_errorno(
- _ITRON_Mailbox_Translate_core_message_queue_return_code(status) );
+ _ITRON_Mailbox_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ )
+ );
}
diff --git a/cpukit/itron/src/trcv_mbf.c b/cpukit/itron/src/trcv_mbf.c
index a63f2a6748..0b2b89e612 100644
--- a/cpukit/itron/src/trcv_mbf.c
+++ b/cpukit/itron/src/trcv_mbf.c
@@ -32,7 +32,6 @@ ER trcv_mbf(
CORE_message_queue_Status status;
boolean wait;
Watchdog_Interval interval;
- CORE_message_queue_Submit_types core_priority;
interval = 0;
if (tmout == TMO_POL) {
@@ -62,7 +61,6 @@ ER trcv_mbf(
msg,
p_msgsz,
wait,
- &core_priority,
interval
);
_Thread_Enable_dispatch();
diff --git a/cpukit/itron/src/tsnd_mbf.c b/cpukit/itron/src/tsnd_mbf.c
index bc609dd298..0ed3b4f90e 100644
--- a/cpukit/itron/src/tsnd_mbf.c
+++ b/cpukit/itron/src/tsnd_mbf.c
@@ -33,7 +33,6 @@ ER tsnd_mbf(
Objects_Locations location;
Watchdog_Interval interval;
boolean wait;
- CORE_message_queue_Status status;
if (msgsz <= 0 || !msg)
return E_PAR;
@@ -50,8 +49,6 @@ ER tsnd_mbf(
if ( wait && _ITRON_Is_in_non_task_state() )
return E_CTX;
- assert( wait == FALSE );
-
the_message_buffer = _ITRON_Message_buffer_Get(mbfid, &location);
switch (location) {
case OBJECTS_REMOTE:
@@ -60,17 +57,20 @@ ER tsnd_mbf(
case OBJECTS_LOCAL:
/* XXX Submit needs to take into account blocking */
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_message_buffer->message_queue,
msg,
msgsz,
the_message_buffer->Object.id,
NULL,
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ interval /* timeout interval */
);
_Thread_Enable_dispatch();
- return
- _ITRON_Message_buffer_Translate_core_message_buffer_return_code(status);
+ return _ITRON_Message_buffer_Translate_core_message_buffer_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
/*
diff --git a/cpukit/posix/include/rtems/posix/mqueue.h b/cpukit/posix/include/rtems/posix/mqueue.h
index 1c1201fef1..e3f7a2f073 100644
--- a/cpukit/posix/include/rtems/posix/mqueue.h
+++ b/cpukit/posix/include/rtems/posix/mqueue.h
@@ -32,10 +32,8 @@ extern "C" {
typedef struct {
Objects_Control Object;
int process_shared;
- int flags;
boolean named;
boolean linked;
- boolean blocking;
int oflag;
unsigned32 open_count;
CORE_message_queue_Control Message_queue;
diff --git a/cpukit/rtems/src/msgqcreate.c b/cpukit/rtems/src/msgqcreate.c
index 395cbf3a02..24bc35993d 100644
--- a/cpukit/rtems/src/msgqcreate.c
+++ b/cpukit/rtems/src/msgqcreate.c
@@ -59,7 +59,7 @@ rtems_status_code rtems_message_queue_create(
)
{
register Message_queue_Control *the_message_queue;
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
void *handler;
#if defined(RTEMS_MULTIPROCESSING)
boolean is_global;
@@ -74,10 +74,10 @@ rtems_status_code rtems_message_queue_create(
return RTEMS_MP_NOT_CONFIGURED;
#endif
- if (count == 0)
+ if ( count == 0 )
return RTEMS_INVALID_NUMBER;
- if (max_message_size == 0)
+ if ( max_message_size == 0 )
return RTEMS_INVALID_SIZE;
#if defined(RTEMS_MULTIPROCESSING)
@@ -115,11 +115,9 @@ rtems_status_code rtems_message_queue_create(
the_message_queue->attribute_set = attribute_set;
if (_Attributes_Is_priority( attribute_set ) )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
handler = NULL;
#if defined(RTEMS_MULTIPROCESSING)
@@ -129,7 +127,7 @@ rtems_status_code rtems_message_queue_create(
if ( ! _CORE_message_queue_Initialize(
&the_message_queue->message_queue,
OBJECTS_RTEMS_MESSAGE_QUEUES,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
count,
max_message_size,
handler ) ) {
diff --git a/cpukit/rtems/src/msgqreceive.c b/cpukit/rtems/src/msgqreceive.c
index 1338216c6b..77fcadc313 100644
--- a/cpukit/rtems/src/msgqreceive.c
+++ b/cpukit/rtems/src/msgqreceive.c
@@ -92,12 +92,12 @@ rtems_status_code rtems_message_queue_receive(
buffer,
size,
wait,
- &core_priority,
timeout
);
_Thread_Enable_dispatch();
- return( _Message_queue_Translate_core_message_queue_return_code(
- _Thread_Executing->Wait.return_code ) );
+ return _Message_queue_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
diff --git a/cpukit/rtems/src/msgqsubmit.c b/cpukit/rtems/src/msgqsubmit.c
index 5a03f6409a..16f1c50266 100644
--- a/cpukit/rtems/src/msgqsubmit.c
+++ b/cpukit/rtems/src/msgqsubmit.c
@@ -61,7 +61,6 @@ rtems_status_code _Message_queue_Submit(
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
- CORE_message_queue_Status core_status;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location )
@@ -98,39 +97,43 @@ rtems_status_code _Message_queue_Submit(
case OBJECTS_LOCAL:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
- core_status = _CORE_message_queue_Send(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Send(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
case MESSAGE_QUEUE_URGENT_REQUEST:
- core_status = _CORE_message_queue_Urgent(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Urgent(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
default:
- core_status = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
return RTEMS_INTERNAL_ERROR; /* should never get here */
}
_Thread_Enable_dispatch();
return _Message_queue_Translate_core_message_queue_return_code(
- core_status );
+ _Thread_Executing->Wait.return_code
+ );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
diff --git a/cpukit/score/include/rtems/score/coremsg.h b/cpukit/score/include/rtems/score/coremsg.h
index 6ffedee0af..818b1e8c18 100644
--- a/cpukit/score/include/rtems/score/coremsg.h
+++ b/cpukit/score/include/rtems/score/coremsg.h
@@ -166,13 +166,12 @@ void _CORE_message_queue_Close(
);
/*
- *
* _CORE_message_queue_Flush
*
* DESCRIPTION:
*
- * This function flushes the message_queue's task wait queue. The number
- * messages flushed from the queue is returned.
+ * This function flushes the message_queue's pending message queue. The
+ * number of messages flushed from the queue is returned.
*
*/
@@ -194,7 +193,20 @@ unsigned32 _CORE_message_queue_Flush_support(
);
/*
+ * _CORE_message_queue_Flush_waiting_threads
+ *
+ * DESCRIPTION:
*
+ * This function flushes the threads which are blocked on this
+ * message_queue's pending message queue. They are unblocked whether
+ * blocked sending or receiving.
+ */
+
+void _CORE_message_queue_Flush_waiting_threads(
+ CORE_message_queue_Control *the_message_queue
+);
+
+/*
* _CORE_message_queue_Broadcast
*
* DESCRIPTION:
@@ -214,7 +226,6 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
);
/*
- *
* _CORE_message_queue_Submit
*
* DESCRIPTION:
@@ -228,17 +239,18 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
*
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
);
/*
- *
* _CORE_message_queue_Seize
*
* DESCRIPTION:
@@ -248,6 +260,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* inactive message pool. The thread will be blocked if wait is TRUE,
* otherwise an error will be given to the thread if no messages are available.
*
+ * NOTE: Returns message priority via return are in TCB.
*/
void _CORE_message_queue_Seize(
@@ -256,10 +269,25 @@ void _CORE_message_queue_Seize(
void *buffer,
unsigned32 *size,
boolean wait,
- CORE_message_queue_Submit_types *priority,
Watchdog_Interval timeout
);
+/*
+ * _CORE_message_queue_Insert_message
+ *
+ * DESCRIPTION:
+ *
+ * This kernel routine inserts the specified message into the
+ * message queue. It is assumed that the message has been filled
+ * in before this routine is called.
+ */
+
+void _CORE_message_queue_Insert_message(
+ CORE_message_queue_Control *the_message_queue,
+ CORE_message_queue_Buffer_control *the_message,
+ CORE_message_queue_Submit_types submit_type
+);
+
#ifndef __RTEMS_APPLICATION__
#include <rtems/score/coremsg.inl>
#endif
diff --git a/cpukit/score/inline/rtems/score/coremsg.inl b/cpukit/score/inline/rtems/score/coremsg.inl
index af16fbd4aa..7356ce9537 100644
--- a/cpukit/score/inline/rtems/score/coremsg.inl
+++ b/cpukit/score/inline/rtems/score/coremsg.inl
@@ -27,15 +27,17 @@
* This routine sends a message to the end of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Send(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -45,7 +47,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
@@ -58,15 +62,17 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
* This routine sends a message to the front of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Urgent(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -76,7 +82,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_URGENT_REQUEST
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
diff --git a/cpukit/score/macros/rtems/score/coremsg.inl b/cpukit/score/macros/rtems/score/coremsg.inl
index bc45a51ac3..0717b3eea3 100644
--- a/cpukit/score/macros/rtems/score/coremsg.inl
+++ b/cpukit/score/macros/rtems/score/coremsg.inl
@@ -23,9 +23,10 @@
*/
#define _CORE_message_queue_Send( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_SEND_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_SEND_REQUEST, (_wait), (_timeout)
/*PAGE
*
@@ -34,9 +35,10 @@ _id, _api_message_queue_mp_support ) \
*/
#define _CORE_message_queue_Urgent( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_URGENT_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST, (_wait), (_timeout)
/*PAGE
*
diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c
index 2e6f649545..18e148ab1c 100644
--- a/cpukit/score/src/coremsgbroadcast.c
+++ b/cpukit/score/src/coremsgbroadcast.c
@@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
Thread_Wait_information *waitp;
unsigned32 constrained_size;
+ /*
+ * If there are pending messages, then there can't be threads
+ * waiting for us to send them a message.
+ *
+ * NOTE: This check is critical because threads can block on
+ * send and receive and this ensures that we are broadcasting
+ * the message to threads waiting to receive -- not to send.
+ */
+
+ if ( the_message_queue->number_of_pending_messages != 0 ) {
+ *count = 0;
+ return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ }
+
+ /*
+ * There must be no pending messages if there is a thread waiting to
+ * receive a message.
+ */
+
number_broadcasted = 0;
while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
waitp = &the_thread->Wait;
diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c
index 4e15ab5bc0..6829351c18 100644
--- a/cpukit/score/src/coremsgsubmit.c
+++ b/cpukit/score/src/coremsgsubmit.c
@@ -53,102 +53,110 @@
* error code - if unsuccessful
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
+ ISR_Level level;
CORE_message_queue_Buffer_control *the_message;
Thread_Control *the_thread;
+ Thread_Control *executing;
- if ( size > the_message_queue->maximum_message_size )
- return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+
+ if ( size > the_message_queue->maximum_message_size ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ return;
+ }
/*
- * Is there a thread currently waiting on this message queue?
+ * Is there a thread currently waiting on this message queue?
*/
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( the_thread ) {
- _CORE_message_queue_Copy_buffer(
- buffer,
- the_thread->Wait.return_argument,
- size
- );
- *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
- the_thread->Wait.count = submit_type;
+ if ( the_message_queue->number_of_pending_messages == 0 ) {
+ the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
+ if ( the_thread ) {
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_thread->Wait.return_argument,
+ size
+ );
+ *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
+ the_thread->Wait.count = submit_type;
#if defined(RTEMS_MULTIPROCESSING)
- if ( !_Objects_Is_local_id( the_thread->Object.id ) )
- (*api_message_queue_mp_support) ( the_thread, id );
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+ (*api_message_queue_mp_support) ( the_thread, id );
#endif
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ return;
+ }
}
/*
- * No one waiting on this one currently.
- * Allocate a message buffer and store it away
+ * No one waiting on the message queue at this time, so attempt to
+ * queue the message up for a future receive.
*/
- if ( the_message_queue->number_of_pending_messages ==
+ if ( the_message_queue->number_of_pending_messages <
the_message_queue->maximum_pending_messages ) {
- return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
- }
- the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue);
- if ( the_message == 0 )
- return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
-
- _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
- the_message->Contents.size = size;
- the_message->priority = submit_type;
-
- the_message_queue->number_of_pending_messages += 1;
-
- switch ( submit_type ) {
- case CORE_MESSAGE_QUEUE_SEND_REQUEST:
- _CORE_message_queue_Append( the_message_queue, the_message );
- break;
- case CORE_MESSAGE_QUEUE_URGENT_REQUEST:
- _CORE_message_queue_Prepend( the_message_queue, the_message );
- break;
- default:
- /* XXX interrupt critical section needs to be addressed */
- {
- CORE_message_queue_Buffer_control *this_message;
- Chain_Node *the_node;
-
- the_message->priority = submit_type;
- for ( the_node = the_message_queue->Pending_messages.first ;
- !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ;
- the_node = the_node->next ) {
-
- this_message = (CORE_message_queue_Buffer_control *) the_node;
-
- if ( this_message->priority >= the_message->priority )
- continue;
-
- _Chain_Insert( the_node, &the_message->Node );
- break;
- }
- }
- break;
+ the_message =
+ _CORE_message_queue_Allocate_message_buffer( the_message_queue );
+
+ /*
+ * NOTE: If the system is consistent, this error should never occur.
+ */
+ if ( !the_message ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
+ return;
+ }
+
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_message->Contents.buffer,
+ size
+ );
+ the_message->Contents.size = size;
+ the_message->priority = submit_type;
+
+ _CORE_message_queue_Insert_message(
+ the_message_queue,
+ the_message,
+ submit_type
+ );
+ return;
}
/*
- * According to POSIX, does this happen before or after the message
- * is actually enqueued. It is logical to think afterwards, because
- * the message is actually in the queue at this point.
+ * No message buffers were available so we may need to return an
+ * overflow error or block the sender until the message is placed
+ * on the queue.
*/
- if ( the_message_queue->number_of_pending_messages == 1 &&
- the_message_queue->notify_handler )
- (*the_message_queue->notify_handler)( the_message_queue->notify_argument );
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ if ( !wait ) {
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
+ return;
+ }
+
+ executing = _Thread_Executing;
+
+ _ISR_Disable( level );
+ _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue );
+ executing->Wait.queue = &the_message_queue->Wait_queue;
+ executing->Wait.id = id;
+ executing->Wait.return_argument = (void *)buffer;
+ executing->Wait.return_argument_1 = (void *)size;
+ executing->Wait.count = submit_type;
+ _ISR_Enable( level );
+
+ _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
}
diff --git a/testsuites/psxtests/psxmsgq01/init.c b/testsuites/psxtests/psxmsgq01/init.c
index d40ab18a2f..534f6c6dcb 100644
--- a/testsuites/psxtests/psxmsgq01/init.c
+++ b/testsuites/psxtests/psxmsgq01/init.c
@@ -15,13 +15,95 @@
#include <fcntl.h>
#include <time.h>
#include <tmacros.h>
+#include <signal.h> /* signal facilities */
+
+typedef struct {
+ char msg[ 50 ];
+ int size;
+ unsigned int priority;
+}Test_Message_t;
+Test_Message_t Predefined_Msgs[MAXMSG+1];
+Test_Message_t Predefined_Msgs[MAXMSG+1] = {
+ { "12345678", 9, MQ_PRIO_MAX-1 }, /* Max Length Message med */
+ { "", 1, 1 }, /* NULL Message low */
+ { "Last", 5, MQ_PRIO_MAX }, /* Queue Full Message hi */
+ { "No Message", 0, MQ_PRIO_MAX-1 }, /* 0 length Message med */
+ { "1", 2, 0 }, /* Cause Overflow Behavior */
+};
+int Priority_Order[MAXMSG+1] = { 2, 0, 3, 1, MAXMSG };
+
+
+typedef struct {
+ mqd_t mq;
+ Test_Queue_Types index;
+ char *name;
+ int oflag;
+ int maxmsg;
+ int msgsize;
+ int count;
+} Test_queue_type;
+
+Test_queue_type Test_q[ NUMBER_OF_TEST_QUEUES ] =
+{
+ { 0, 0, "Qread", ( O_CREAT | O_RDONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 1, "Qwrite", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 2, "Qnoblock", ( O_CREAT | O_RDWR | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+ { 0, 3, "Qblock", ( O_CREAT | O_RDWR ) , MAXMSG, MSGSIZE, 0 },
+ { 0, 4, "Qdefault", ( O_CREAT | O_RDWR ) , 10, 16, 0 },
+ { 0, 5, "mq6", ( O_CREAT | O_WRONLY | O_NONBLOCK ), MAXMSG, MSGSIZE, 0 },
+};
+
+#define RW_NAME Test_q[ RW_QUEUE ].name
+#define DEFAULT_NAME Test_q[ DEFAULT_RW ].name
+#define RD_NAME Test_q[ RD_QUEUE ].name
+#define WR_NAME Test_q[ WR_QUEUE ].name
+#define BLOCKING_NAME Test_q[ BLOCKING ].name
+#define CLOSED_NAME Test_q[ CLOSED ].name
+
+#define RW_ATTR Test_q[ RW_QUEUE ].oflag
+#define DEFAULT_ATTR Test_q[ DEFAULT_RW ].oflag
+#define RD_ATTR Test_q[ RD_QUEUE ].oflag
+#define WR_ATTR Test_q[ WR_QUEUE ].oflag
+#define BLOCK_ATTR Test_q[ BLOCKING ].oflag
+#define CLOSED_ATTR Test_q[ CLOSED ].oflag
-char Queue_Name[PATH_MAX + 2];
-char *Get_Queue_Name(
- int i
+/*
+ * Outputs a header at each test section.
+ */
+void Start_Test(
+ char *description
)
{
- sprintf(Queue_Name,"mq%d",i+1);
+ printf( "_______________%s\n", description );
+}
+
+
+void Validate_attributes(
+ mqd_t mq,
+ int oflag,
+ int msg_count
+)
+{
+ int status;
+ struct mq_attr attr;
+
+ status = mq_getattr( mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+
+ if ( mq != Test_q[ DEFAULT_RW ].mq ){
+ fatal_int_service_status((int)attr.mq_maxmsg, MAXMSG, "maxmsg attribute" );
+ fatal_int_service_status((int)attr.mq_msgsize,MSGSIZE,"msgsize attribute");
+ }
+
+ fatal_int_service_status((int)attr.mq_curmsgs, msg_count, "count attribute" );
+ fatal_int_service_status((int)attr.mq_flags, oflag, "flag attribute" );
+}
+
+char Queue_Name[PATH_MAX + 2];
+#define Get_Queue_Name( i ) Test_q[i].name
+
+char *Build_Queue_Name( int i ) {
+ sprintf(Queue_Name,"mq%d", i+1 );
return Queue_Name;
}
@@ -35,354 +117,1010 @@ char *Get_Too_Long_Name()
return Queue_Name;
}
-typedef enum {
- DEFAULT_SIZE_TYPE,
- TEST_SIZE_TYPE,
- MAX_SIZE,
- TYPES_OF_TEST_SIZES
-} TEST_MQ_SIZE_TYPES;
+void open_test_queues()
+{
+ struct mq_attr attr;
+ int status;
+ Test_queue_type *tq;
+ int que;
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
+
+ puts( "Init: Open Test Queues" );
+
+ for( que = 0; que < NUMBER_OF_TEST_QUEUES; que++ ) {
+
+ tq = &Test_q[ que ];
+ if ( que == DEFAULT_RW)
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, NULL );
+ else
+ Test_q[que].mq = mq_open( tq->name, tq->oflag, 0x777, &attr );
+
+ assert( Test_q[que].mq != (-1) );
+ }
+
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+}
/*
* Opens CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES then leaves size queues
* opened but closes the rest.
*/
-void validate_mq_open_error_codes(
- mqd_t *mqs, /* Must be large enough for Maximum to be opened. */
- int size
-)
+void validate_mq_open_error_codes()
{
int i;
mqd_t n_mq2;
struct mq_attr attr;
int status;
+ mqd_t open_mq[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
+
+ attr.mq_maxmsg = MAXMSG;
+ attr.mq_msgsize = MSGSIZE;
- assert( size < (CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES-1) );
+ Start_Test( "mq_open errors" );
/*
- * Validate mq_open errors that can occur when no queues are open.
- * EINVAL
- * ENOENT
- * EINTR
+ * XXX EINVAL - inappropriate name was given for the message queue
*/
/*
- * XXX EINVAL - inappropriate name was given for the message queue
+ * EINVAL - Create with negative maxmsg.
*/
attr.mq_maxmsg = -1;
- puts( "mq_open - Create with maxmsg (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with maxmsg (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_maxmsg = MAXMSG;
+
+ /*
+ * EINVAL - Create withnegative msgsize.
+ */
attr.mq_msgsize = -1;
- puts( "mq_open - Create with msgsize (-1) (EINVAL)" );
- n_mq2 = mq_open("mq2", O_CREAT | O_RDONLY, 0x777, &attr);
- fatal_directive_status(
+ puts( "Init: mq_open - Create with msgsize (-1) (EINVAL)" );
+ n_mq2 = mq_open( "mq2", O_CREAT | O_RDONLY, 0x777, &attr);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EINVAL, "mq_open errno EINVAL");
+ fatal_posix_service_status( errno, EINVAL, "mq_open errno EINVAL");
+ attr.mq_msgsize = MSGSIZE;
+
+ /*
+ * ENOENT - Open a non-created file.
+ */
- puts( "mq_open - Open new mq without create flag (ENOENT)" );
- n_mq2 = mq_open("mq3", O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Open new mq without create flag (ENOENT)" );
+ n_mq2 = mq_open( "mq3", O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENOENT, "mq_open errno ENOENT");
+ fatal_posix_service_status( errno, ENOENT, "mq_open errno ENOENT");
+
/*
* XXX EINTR - call was interrupted by a signal
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_open or mq_open is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_open - Open with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_open - Open with too long of a name (ENAMETOOLONG)" );
n_mq2 = mq_open( Get_Too_Long_Name(), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_open errno ENAMETOOLONG");
/*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
+
+ /*
* Open maximum number of message queues
*/
- puts( "mq_open - SUCCESSFUL" );
+ puts( "Init: mq_open - SUCCESSFUL" );
for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- mqs[i] = mq_open( Get_Queue_Name(i), O_CREAT | O_RDWR, 0x777, NULL );
- assert( mqs[i] != (-1) );
+ open_mq[i] = mq_open(
+ Build_Queue_Name(i), O_CREAT | O_RDWR | O_NONBLOCK, 0x777, NULL );
+ assert( open_mq[i] != (-1) );
/*XXX - Isn't there a more general check */
}
/*
- * Validate open errors that must occur after message queues are open.
- * EACCES
- * EEXIST
- * EMFILE
- * ENFILE
+ * XXX EACCES - permission to create is denied.
*/
/*
- * XXX EACCES - permission to create is denied.
+ * XXX EACCES - queue exists permissions specified by o_flag are denied.
*/
/*
- * XXX EACCES - queue exists permissions specified by o_flag are denied.
- puts( "mq_open - open mq as write (EACCES)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_WRONLY, 0x777, NULL);
- fatal_directive_status(
- (int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EACCES, "mq_open errno EACCES");
+ * EEXIST - Create an existing queue.
*/
- puts( "mq_open - Create an Existing mq (EEXIST)" );
- n_mq2 = mq_open("mq1", O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
- fatal_directive_status(
+ puts( "Init: mq_open - Create an Existing mq (EEXIST)" );
+ n_mq2 = mq_open(
+ Build_Queue_Name(0), O_CREAT | O_EXCL | O_RDONLY, 0x777, NULL);
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, EEXIST, "mq_open errno EEXIST");
+ fatal_posix_service_status( errno, EEXIST, "mq_open errno EEXIST");
+ /*
+ * XXX EMFILE - Too many message queues in use by the process
+ */
/*
- * XXX EMFILE - Too many message queues open
+ * ENFILE - Too many message queues open in the system
*/
- puts( "mq_open - system is out of resources (ENFILE)" );
- n_mq2 = mq_open( Get_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
- fatal_directive_status(
+ puts( "Init: mq_open - system is out of resources (ENFILE)" );
+ n_mq2 = mq_open( Build_Queue_Name(i), O_CREAT | O_RDONLY, 0x777, NULL );
+ fatal_posix_service_status(
(int) n_mq2, (int ) (-1), "mq_open error return status" );
- fatal_directive_status( errno, ENFILE, "mq_open errno ENFILE");
+ fatal_posix_service_status( errno, ENFILE, "mq_open errno ENFILE");
/*
- * Unlink and Close .
+ * Unlink and Close all queues.
*/
- puts( "mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
- for (i = size; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
+ puts( "Init: mq_close and mq_unlink (mq3...mqn) - SUCCESSFUL" );
+ for (i = 0; i < CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES; i++) {
- status = mq_close( mqs[i] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ status = mq_close( open_mq[i]);
+ fatal_posix_service_status( status, 0, "mq_close message queue");
- status = mq_unlink( Get_Queue_Name(i) );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ status = mq_unlink( Build_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
}
}
-void validate_mq_unlink_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_unlink_error_codes()
{
int status;
+ Start_Test( "mq_unlink errors" );
+
/*
* XXX - EACCES Permission Denied
*/
/*
- * XXX ENAMETOOLONG - Not checked in either sem_unlink or mq_unlink is
- * this an error?
+ * ENAMETOOLONG - Give a name greater than PATH_MAX.
*/
- puts( "mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
+ puts( "Init: mq_unlink - mq_unlink with too long of a name (ENAMETOOLONG)" );
status = mq_unlink( Get_Too_Long_Name() );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENAMETOOLONG, "mq_unlink errno ENAMETOOLONG");
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink(Get_Queue_Name(size));
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
+ /*
+ * XXX - ENAMETOOLONG - Give a name greater than NAME_MAX
+ * Per implementation not possible.
+ */
/*
- * XXX - These errors are not in the POSIX manual but may occur.
+ * ENOENT - Unlink an unopened queue
*/
- puts( "mq_unlink (NULL) - EINVAL" );
+ puts( "Init: mq_unlink - A Queue not opened (ENOENT)" );
+ status = mq_unlink( CLOSED_NAME );
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, ENOENT, "mq_unlink errno ENOENT");
+
+ /*
+ * XXX - The following were not listed in the POSIX document as
+ * possible errors. Under other commands the EINVAL is
+ * given for these conditions.
+ */
+
+ /*
+ * EINVAL - Unlink a queue with no name
+ */
+
+ puts( "Init: mq_unlink (NULL) - EINVAL" );
status = mq_unlink( NULL );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
- puts( "mq_unlink (\"\") - EINVAL" );
+ /*
+ * EINVAL - Unlink a queue with a null name
+ */
+
+ puts( "Init: mq_unlink (\"\") - EINVAL" );
status = mq_unlink( "" );
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, EINVAL, "mq_unlink errno value");
+ fatal_posix_service_status( status, -1, "mq_unlink error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_unlink errno value");
}
-void validate_mq_close_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_close_error_codes()
{
int status;
- puts( "mq_close - UNSUCCESSFUL (EBADF)" );
- status = mq_close(mqs[size]);
- fatal_directive_status( status, -1, "mq_close error return status");
- fatal_directive_status( errno, EBADF, "mq_close errno EBADF");
+ Start_Test( "mq_close errors" );
+
+ /*
+ * EBADF - Close a queue that is not open.
+ */
+
+ puts( "Init: mq_close - unopened queue (EBADF)" );
+ status = mq_close( Test_q[CLOSED].mq );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
}
+
+void validate_mq_getattr_error_codes()
+{
+ struct mq_attr attr;
+ int status;
+
+ Start_Test( "mq_getattr errors" );
+
+ /*
+ * EBADF - Get the attributes from a closed queue.
+ */
+
+ puts( "Init: mq_getattr - unopened queue (EBADF)" );
+ status = mq_getattr( Test_q[CLOSED].mq, &attr );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_close errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Init: mq_getattr - NULL attributes (EINVAL)" );
+ status = mq_getattr( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_close error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_close errno EINVAL");
+
+}
+
+
+void Send_msg_to_que(
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ int status;
+
+ status = mq_send( Test_q[que].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, 0, "mq_send valid return status");
+ Test_q[que].count++;
+}
+
+void Show_send_msg_to_que(
+ char *task_name,
+ int que,
+ int msg
+)
+{
+ Test_Message_t *ptr = &Predefined_Msgs[msg];
+ printf( "%s mq_send - to %s msg: %s priority %d\n",
+ task_name, Test_q[que].name, ptr->msg, ptr->priority);
+ Send_msg_to_que( que, msg );
+}
+
+void verify_queues_full(
+ char *task_name
+)
+{
+ int que;
+
+ /*
+ * Validate that the queues are full.
+ */
+
+ printf( "%s Verify Queues are full\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, Test_q[que].count );
+
+}
+void verify_queues_empty(
+ char *task_name
+)
+{
+ int que;
+
+ printf( "%s Verify Queues are empty\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ )
+ Validate_attributes( Test_q[que].mq, Test_q[que].oflag, 0 );
+}
+
+int fill_message_queues(
+ char *task_name
+)
+{
+ int msg;
+ int status;
+ int que;
+
+
+ verify_queues_empty( task_name );
+
+ /*
+ * Fill Queue with predefined messages.
+ */
+
+ printf( "%s Fill Queues with messages\n", task_name );
+ for(msg=0; msg<MAXMSG; msg++){
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ Send_msg_to_que( que, msg );
+ }
+ }
+
+ verify_queues_full( "Init:" );
+ return msg;
+}
+
+
+void Read_msg_from_que(
+ int que,
+ int msg
+)
+{
+ unsigned int priority;
+ Test_Message_t *ptr;
+ int status;
+ char message[100];
+ char err_msg[100];
+
+ ptr = &Predefined_Msgs[msg];
+ status = mq_receive(Test_q[ que ].mq, message, 100, &priority );
+ Test_q[que].count--;
+
+ sprintf( err_msg, "%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status( status, ptr->size, err_msg );
+
+ assert( !strcmp( message, ptr->msg ) );
+ strcpy( message, "No Message" );
+
+ sprintf( err_msg,"%s msg %s size failure", Test_q[ que ].name, ptr->msg );
+ fatal_int_service_status(priority, ptr->priority, err_msg );
+}
+
+int empty_message_queues(
+ char *task_name
+)
+{
+ int que;
+ int i;
+
+ printf( "%s Empty all Queues\n", task_name );
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ for(i=0; Test_q[que].count != 0; i++ )
+ Read_msg_from_que( que, Priority_Order[i] );
+
+ Validate_attributes( Test_q[ que].mq, Test_q[ que ].oflag, 0 );
+ }
+ return 0;
+}
+
/*
* Returns the number of messages queued after the test on the
* first queue.
*/
-int validate_mq_send_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+int validate_mq_send_error_codes( )
{
int status;
int i;
- mqd_t n_mq1;
- struct mq_attr attr;
+ char *str;
- attr.mq_maxmsg = 3;
- attr.mq_msgsize = 8;
+ Start_Test( "mq_send errors" );
/*
- * XXX - EBADF Not a valid message descriptor.
- * Write to a invalid message descriptor
- * XXX - Write to a read only queue
+ * EBADF - Write to a closed queue.
*/
- puts( "mq_send - Closed message queue (EBADF)" );
- status = mq_send( mqs[size], "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ puts( "Init: mq_send - Closed message queue (EBADF)" );
+ status = mq_send( Test_q[CLOSED].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_open - Open a read only queue" );
- n_mq1 = mq_open("read_only", O_CREAT | O_RDONLY, 0x777, &attr);
- assert( n_mq1 != (-1) );
- /*XXX - Isn't there a more general check */
+ /*
+ * EBADF - Write to a read only queue.
+ */
+
+ puts( "Init: mq_send - Read only message queue (EBADF)" );
+ status = mq_send( Test_q[ RD_QUEUE ].mq, "", 1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
- puts( "mq_send - Read only message queue (EBADF)" );
- status = mq_send( n_mq1, "", 1, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EBADF, "mq_send errno EBADF");
+ /*
+ * XXX - EINTR Signal interrupted the call.
+ *
+ puts( "Init: mq_send - UNSUCCESSFUL (EINTR)" );
+ status = mq_send( Test_q, "", 0xffff, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, E, "mq_send errno E");
+ */
+
+ /*
+ * EINVAL priority is out of range.
+ */
+
+ puts( "Init: mq_send - Priority out of range (EINVAL)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", 1, MQ_PRIO_MAX + 1 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_send errno EINVAL");
+
+ /*
+ * EMSGSIZE - Message size larger than msg_len
+ * Validates that msgsize is stored correctly.
+ */
+
+ puts( "Init: mq_send - Message longer than msg_len (EMSGSIZE)" );
+ status = mq_send( Test_q[ RW_QUEUE ].mq, "", MSGSIZE+1, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+
+ i = fill_message_queues( "Init:" );
+
+ /*
+ * ENOSYS - send not supported
+ puts( "Init: mq_send - Blocking Queue overflow (ENOSYS)" );
+ status = mq_send( n_mq1, Predefined_Msgs[i], 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
+ fatal_posix_service_status( status, 0, "mq_close message queue");
status = mq_unlink( "read_only" );
- fatal_directive_status( status, 0, "mq_unlink message queue");
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+ */
/*
- * XXX - EINTR
- * Signal interrupted the call.
+ * EAGAIN - O_NONBLOCK and message queue is full.
+ */
+
+ puts( "Init: mq_send - on a FULL non-blocking queue with (EAGAIN)" );
+ str = Predefined_Msgs[i].msg;
+ status = mq_send(Test_q[RW_QUEUE].mq, str, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
- puts( "mq_send - UNSUCCESSFUL (EINTR)" );
- status = mq_send( mqs, "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ return i-1;
+}
+
+void validate_mq_receive_error_codes( )
+{
+ int status;
+ char message[100];
+ unsigned int priority;
+ int i;
+
+ Start_Test( "mq_receive errors" );
+
+ /*
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_receive - Unopened message queue (EBADF)" );
+ status = mq_receive( Test_q[CLOSED].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * XXX - EINVAL priority is out of range.
+ * EBADF - Queue not opened to read
*/
- puts( "mq_send - Priority out of range (EINVAL)" );
- status = mq_send( mqs[0], "", 1, MQ_PRIO_MAX + 1 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EINVAL, "mq_send errno EINVAL");
+ puts( "Init: mq_receive - Write only queue (EBADF)" );
+ status = mq_receive( Test_q[WR_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
/*
- * XXX - EMSGSIZE - Message size larger than msg_len
+ * EMSGSIZE - Size is less than the message size attribute
*/
- puts( "mq_send - Message longer than msg_len (EMSGSIZE)" );
- status = mq_send( mqs[0], "", 0xffff, 0 );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EMSGSIZE, "mq_send errno EMSGSIZE");
+ puts( "Init: mq_receive - Size is less than the message (EMSGSIZE)" );
+ status = mq_receive(
+ Test_q[RW_QUEUE].mq, message, Predefined_Msgs[0].size-1, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EMSGSIZE, "mq_receive errno EMSGSIZE");
+
/*
- * ENOSYS - send is supported should never happen.
+ * EAGAIN - O_NONBLOCK and Queue is empty
*/
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+ puts( "Init: mq_receive - Queue is empty (EAGAIN)" );
+ status = mq_receive( Test_q[RW_QUEUE].mq, message, 100, &priority );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
/*
- * XXX - EAGAIN
- * O_NONBLOCK and message queue is full.
- * This is validated in the read/write test.
+ * XXX - EINTR - Interrupted by a signal
*/
- i=0;
- do {
- status = mq_send( mqs[0], "", 1, 0 );
- i++;
- } while (status == 0);
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ /*
+ * XXX - EBADMSG - a data corruption problem.
+ */
- return i-1;
+ /*
+ * XXX - ENOSYS - mq_receive not supported
+ */
+}
+
+void verify_open_functionality()
+{
+ mqd_t n_mq;
+
+ Start_Test( "mq_open functionality" );
+
+ /*
+ * Validate a second open returns the same message queue.
+ */
+
+ puts( "Init: mq_open - Open an existing mq ( same id )" );
+ n_mq = mq_open( RD_NAME, 0 );
+ fatal_posix_service_status(
+ (int) n_mq, (int ) Test_q[RD_QUEUE].mq, "mq_open error return status" );
+}
+
+void verify_unlink_functionality()
+{
+ mqd_t n_mq;
+ int status;
+
+ Start_Test( "mq_unlink functionality" );
+
+ /*
+ * Unlink the message queue, then verify an open of the same name produces a
+ * different message queue.
+ */
+
+ puts( "Init: Unlink and Open without closing SUCCESSFUL" );
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+
+ n_mq = mq_open( DEFAULT_NAME, DEFAULT_ATTR, 0x777, NULL );
+ assert( n_mq != (-1) );
+ assert( n_mq != Test_q[ DEFAULT_RW ].mq );
+
+
+ status = mq_unlink( DEFAULT_NAME );
+ fatal_posix_service_status( status, 0, "mq_unlink locked message queue");
+ status = mq_close( Test_q[ DEFAULT_RW ].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+
+ Test_q[ DEFAULT_RW ].mq = n_mq;
}
-void validate_mq_receive_error_codes(
- mqd_t *mqs,
- int size /* Number still open in mqs */
+void verify_close_functionality()
+{
+ int i;
+ int status;
+ Start_Test( "Unlink and Close All Files" );
+ for (i=0; i<DEFAULT_RW; i++) {
+
+ status = mq_unlink( Get_Queue_Name(i) );
+ fatal_posix_service_status( status, 0, "mq_unlink message queue");
+
+ status = mq_close( Test_q[i].mq );
+ fatal_posix_service_status( status, 0, "mq_close message queue");
+ }
+}
+
+
+void verify_timed_send_queue(
+ int que,
+ int is_blocking
+)
+{
+ int i;
+ struct timespec timeout;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int len;
+ int status;
+ char *msg;
+
+ timeout.tv_sec = 1;
+ timeout.tv_nsec = 0;
+
+ printf( "Init: mq_timedsend - on queue %s ", Test_q[que].name);
+ len = Predefined_Msgs[MAXMSG].size;
+ msg = Predefined_Msgs[MAXMSG].msg;
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedsend( Test_q[que].mq, msg, len , 0, &timeout );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ if ( is_blocking ) { /* Don't verify the non-blocking queue */
+ fatal_int_service_status( status, -1, "mq_timedsend status");
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ }
+
+ printf("Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking ) /* non-blocking queue */
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+
+ if ( que == DEFAULT_RW )
+ Test_q[que].count++;
+}
+
+void verify_timed_send()
+{
+ int que;
+
+ Start_Test( "mq_timedsend" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if ( que == BLOCKING )
+ verify_timed_send_queue( que, 1 );
+ else
+ verify_timed_send_queue( que, 0 );
+ }
+}
+
+void verify_timed_receive_queue(
+ char *task_name,
+ int que,
+ int is_blocking
)
{
+ char message[ 100 ];
+ unsigned int priority;
+ struct timespec tm;
+ struct timeval tv1, tv2, tv3;
+ struct timezone tz1, tz2;
+ int status;
+
+ tm.tv_sec = 1;
+ tm.tv_nsec = 0;
+
+ printf( "Init: %s mq_timedreceive - on queue %s ", task_name, Test_q[que].name);
+
+ gettimeofday( &tv1, &tz1 );
+ status = mq_timedreceive( Test_q[ que ].mq, message, 100, &priority, &tm );
+ gettimeofday( &tv2, &tz2 );
+ tv3.tv_sec = tv2.tv_sec - tv1.tv_sec;
+ tv3.tv_usec = tv2.tv_usec - tv1.tv_usec;
+
+ fatal_int_service_status( status, -1, "mq_timedreceive status");
+ if ( is_blocking )
+ fatal_posix_service_status( errno, ETIMEDOUT, "errno ETIMEDOUT");
+ printf( "Init: %d sec %d us\n", tv3.tv_sec, tv3.tv_usec );
+
+ if ( is_blocking )
+ assert( tv3.tv_sec == 1 );
+ else
+ assert( tv3.tv_sec == 0 );
+}
+
+
+
+void verify_timed_receive()
+{
+ int que;
+
+ Start_Test( "mq_timedreceive" );
+
+ for( que = RW_QUEUE; que < CLOSED; que++ ) {
+ if (( que == BLOCKING ) || ( que == DEFAULT_RW ))
+ verify_timed_receive_queue( "Init:", que, 1 );
+ else
+ verify_timed_receive_queue( "Init:", que, 0 );
+ }
+}
+
+#if (0)
+void verify_set_attr()
+{
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ struct mq_attr attr;
+ int i;
+ int status;
+
+ attr.mq_maxmsg = 0;
+ attr.mq_msgsize = 0;
+
+ Start_Test( "mq_setattr" );
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
+ }
+}
+#endif
+
+void wait_for_signal(
+ sigset_t *waitset,
+ int sec,
+ int expect_signal
+)
+{
+ siginfo_t siginfo;
+ int status;
+ struct timespec timeout;
+ int signo;
+
+ siginfo.si_code = -1;
+ siginfo.si_signo = -1;
+ siginfo.si_value.sival_int = -1;
+
+ timeout.tv_sec = sec;
+ timeout.tv_nsec = 0;
+
+ status = sigemptyset( waitset );
+ assert( !status );
+
+ status = sigaddset( waitset, SIGUSR1 );
+ assert( !status );
+
+ printf( "waiting on any signal for %d seconds.\n", sec );
+ signo = sigtimedwait( waitset, &siginfo, &timeout );
+ if (expect_signal) {
+ fatal_int_service_status( signo, SIGUSR1, "got SISUSR1" );
+ } else {
+ fatal_int_service_status( signo, -1, "error return status");
+ fatal_posix_service_status( errno, EAGAIN, "errno EAGAIN");
+ }
+}
+
+void verify_notify()
+{
+ struct sigevent event;
int status;
+ timer_t timer_id;
+ sigset_t set;
+ Test_Message_t *ptr;
+
+ Start_Test( "mq_notify" );
+
+ /* timer create */
+ event.sigev_notify = SIGEV_SIGNAL;
+ event.sigev_signo = SIGUSR1;
+ if (timer_create (CLOCK_REALTIME, &event, &timer_id) == -1)
+ fatal_posix_service_status( errno, 0, "errno ETIMEDOUT");
+
+ /* block the timer signal */
+ sigemptyset( &set );
+ sigaddset( &set, SIGUSR1 );
+ pthread_sigmask( SIG_BLOCK, &set, NULL );
/*
- * EAGAIN -
+ * EBADF - Not A Valid Message Queue
*/
+ puts( "Init: mq_notify - Unopened message queue (EBADF)" );
+ status = mq_notify( Test_q[CLOSED].mq, NULL );
+ fatal_posix_service_status( status, -1, "mq_ error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_receive errno EBADF");
+
/*
- * EBADF -
+ * Create ...
*/
/*
- * EMSGSIZE -
+ * XXX setup notification
+ */
+
+ printf( "_____mq_notify - notify when %s gets a message\n",RW_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ /*
+ * Send and verify signal occurs and registration is removed.
+ */
+
+ puts( "Init: Verify Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 1 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+
+ /*
+ * EBUSY - Already Registered
+ */
+
+ printf( "____mq_notify - notify when %s gets a message\n",RD_NAME);
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+ wait_for_signal( &set, 3, 0 );
+
+ puts( "Init: mq_notify - (EBUSY)" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, &event );
+ fatal_posix_service_status( status, -1, "mq_notify error return status");
+ fatal_posix_service_status( errno, EBUSY, "mq_notify errno EBUSY");
+
+ /*
+ * Verify NULL removes registration.
+ */
+
+ puts( "Init: mq_notify - Remove notification with null" );
+ status = mq_notify( Test_q[RW_QUEUE].mq, NULL );
+ fatal_posix_service_status( status, 0, "mq_notify valid status");
+
+ puts( "Init: Verify No Signal when send" );
+ Show_send_msg_to_que( "Init:", RW_QUEUE, 0 );
+ wait_for_signal( &set, 3, 0 );
+ Read_msg_from_que( RW_QUEUE, 0 );
+
+}
+
+void verify_with_threads()
+{
+ int status;
+ pthread_t id;
+ Test_Message_t *ptr;
+ unsigned int priority;
+ char message[100];
+
+
+ /*
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 4 Receive Test" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_receive - Empty queue changes to non-blocking (EAGAIN)" );
+ status = mq_receive( Test_q[BLOCKING].mq, message, 100, &priority );
+ fatal_int_service_status( status, -1, "mq_receive error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_receive errno EAGAIN");
+ print_current_time( "Init: ", "" );
+
/*
- * EINTR -
+ * Create a task then block until the task sends the message.
+ * Task tests set attributes so one queue will have a thread
+ * blocked while attributes are changed.
*/
+ Start_Test( "multi-thread Task 1 Test" );
+ status = pthread_create( &id, NULL, Task_1, NULL );
+ assert( !status );
+ Read_msg_from_que( BLOCKING, 0 ); /* Block until init writes */
+ print_current_time( "Init: ", "" );
+
/*
- * EBADMSG - a data corruption problem.
- * XXX - Can not cause.
+ * Create a task then block until the task reads a message.
*/
+ Start_Test( "multi-thread Task 4 Send Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_4, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Full queue changes to non-blocking (EAGAIN)" );
+ status = mq_send(Test_q[BLOCKING].mq, message, 0, 0 );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EAGAIN, "mq_send errno EAGAIN");
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
+
/*
- puts( "mq_ - UNSUCCESSFUL ()" );
- status = mq_( );
- fatal_directive_status( status, -1, "mq_ error return status");
- fatal_directive_status( errno, E, "mq_c errno E");
+ * Create a task then block until the task reads a message.
+ */
+
+ Start_Test( "multi-thread Task 2 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_2, NULL );
+ assert( !status );
+ Show_send_msg_to_que( "Init:", BLOCKING, Priority_Order[0] );
+ print_current_time( "Init: ", "" );
+ verify_queues_full( "Init:" );
+ empty_message_queues( "Init:" );
- */
/*
- * ENOSYS -
+ * Create a task then block until it deletes and closes all queues.
+ * EBADF - Queue unlinked and closed while blocked
*/
+ Start_Test( "multi-thread Task 3 Test" );
+ fill_message_queues( "Init:" );
+ status = pthread_create( &id, NULL, Task_3, NULL );
+ assert( !status );
+ puts( "Init: mq_send - Block while thread deletes queue (EBADF)" );
+ ptr = &Predefined_Msgs[0];
+ status = mq_send( Test_q[BLOCKING].mq, ptr->msg, ptr->size , ptr->priority );
+ fatal_posix_service_status( status, -1, "mq_send error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_send errno EBADF");
+
}
-void non_blocking_mq_read_write(
- mqd_t *mqs,
- int size /* Number still open in mqs */
-)
+void validate_mq_setattr()
{
+ struct mq_attr attr;
+ struct mq_attr save_attr[ NUMBER_OF_TEST_QUEUES ];
+ int status;
+ int i;
+
/*
- int status;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
+ * EBADF - Get the attributes from a closed queue.
+ */
- status = mq_send( mqs[0], messages[0], strlen( messages[0] ), 0 );
- fatal_directive_status( status, 0, "mq_send error return status" );
-
- puts( "mq_send - UNSUCCESSFUL ()" );
- do {
- status = mq_send( );
- fatal_directive_status( status, -1, "mq_send error return status");
- fatal_directive_status( errno, E, "mq_send errno E");
+ puts( "Task1:mq_setattr - unopened queue (EBADF)" );
+ status = mq_setattr( Test_q[CLOSED].mq, &attr, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EBADF, "mq_setattr errno EBADF");
+
+ /*
+ * XXX - The following are not listed in the POSIX manual but
+ * may occur.
+ */
+
+ /*
+ * EINVAL - NULL attributes
+ */
+
+ puts( "Task1:mq_setattr - NULL attributes (EINVAL)" );
+ status = mq_setattr( Test_q[RW_QUEUE].mq, NULL, NULL );
+ fatal_posix_service_status( status, -1, "mq_setattr error return status");
+ fatal_posix_service_status( errno, EINVAL, "mq_setattr errno EINVAL");
+
+ /*
+ * Verify change queues to blocking, by verifying all queues block
+ * for a timed receive.
+ */
+
+ puts( "Init: set_attr all queues to blocking" );
+ for(i=0; i<CLOSED; i++) {
+ attr.mq_flags = Test_q[i].oflag & (~O_NONBLOCK );
+ status = mq_setattr( Test_q[i].mq, &attr, &save_attr[i] );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, attr.mq_flags, 0 );
+ }
+ for( i = RW_QUEUE; i < CLOSED; i++ ) {
+ verify_timed_receive_queue( "Init:", i, 1 );
+ }
+
+ /*
+ * Restore restore all queues to their old attribute.
+ */
+
+ for(i=0; i<CLOSED; i++) {
+ status = mq_setattr( Test_q[i].mq, &save_attr[i], NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[i].mq, Test_q[i].oflag, 0 );
}
- */
}
void *POSIX_Init(
@@ -390,95 +1128,184 @@ void *POSIX_Init(
)
{
int status;
- mqd_t mqs[CONFIGURE_MAXIMUM_POSIX_MESSAGE_QUEUES];
- mqd_t n_mq1;
mqd_t n_mq2;
- char *messages[] = {
- "Msg 1",
- "Test 2",
- "12345678901234567890"
- };
puts( "\n\n*** POSIX MESSAGE QUEUE TEST ***" );
- validate_mq_open_error_codes( mqs, 2 );
- validate_mq_unlink_error_codes( mqs, 2 );
- validate_mq_close_error_codes( mqs, 2 );
+ validate_mq_open_error_codes( );
+ open_test_queues();
+ validate_mq_unlink_error_codes();
+ validate_mq_close_error_codes();
+ verify_unlink_functionality();
+ validate_mq_setattr( );
+ validate_mq_send_error_codes();
+ validate_mq_getattr_error_codes();
+ verify_timed_send();
+ validate_mq_receive_error_codes();
+ verify_timed_receive();
+ verify_open_functionality();
+ verify_notify();
+ verify_with_threads();
+
+ puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
+ exit( 0 );
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
- validate_mq_send_error_codes( mqs, 2 );
- validate_mq_receive_error_codes( mqs, 2 );
+void *Task_1 (
+ void *argument
+)
+{
+ int status;
+ int count = 0;
+ sigset_t set;
- /*
- * Validate a second open returns the same message queue.
- */
+ /* Block Waiting for a message */
- puts( "mq_open - Open an existing mq ( same id )" );
- n_mq1 = mq_open("mq1", 0 );
- fatal_directive_status(
- (int) n_mq1, (int ) mqs[0], "mq_open error return status" );
-
- /*
- * Unlink the message queue, then verify an open of the same name produces a
- * different message queue.
- */
+ print_current_time( "Task_1: ", "" );
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ Show_send_msg_to_que( "Task_1:", BLOCKING, 0 );
- puts( "mq_open - Reopen mq1 SUCCESSFUL with a different id" );
- n_mq2 = mq_open( "mq1", O_CREAT | O_EXCL, 00777, NULL);
- assert( n_mq2 != (-1) );
- assert( n_mq2 != n_mq1 );
+ puts( "Task_1: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ assert( 0 );
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_2(
+ void *argument
+)
+{
+ int status;
+
+
+ print_current_time( "Task_2: ", "" );
+
+
+ /* Block waiting to send a message */
+
+ verify_queues_full( "Task_2:" );
+ Read_msg_from_que( BLOCKING, Priority_Order[0] ); /* Cause context switch */
+
+ puts( "Task_2: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+}
+
+void *Task_3 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_3: ", "" );
/*
- * Validate it "mq1" can be closed and unlinked.
+ * close and unlink all queues.
*/
- puts( "mq_unlink - mq1 SUCCESSFUL" );
- status = mq_unlink( "mq1" );
- fatal_directive_status( status, 0, "mq_unlink locked message queue");
+ verify_close_functionality( "Task_3: " );
+ puts( "Task_3: pthread_exit" );
+ pthread_exit( NULL );
- puts( "mq_close mq1 - SUCCESSFUL" );
- status = mq_close( n_mq2 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( n_mq1 );
- fatal_directive_status( status, 0, "mq_close message queue");
- status = mq_close( mqs[0] );
- fatal_directive_status( status, 0, "mq_close message queue");
+ /* switch to Init */
- puts( "mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq1");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_close errno EINVAL");
+ return NULL; /* just so the compiler thinks we returned something */
- /*
- * XXX - Cant' create location OBJECTS_ERROR or OBJECTS_REMOTE.
- * mq_close and mq_unlink.
- * XXX - Don't think we need this save until yellow line tested.
- puts( "Init: mq_unlink - UNSUCCESSFUL (ENOENT)" );
- status = mq_unlink("mq3");
- fatal_directive_status( status, -1, "mq_unlink error return status");
- fatal_directive_status( errno, ENOENT, "mq_unlink errno ENOENT");
- assert( (status == -1) && (errno == ENOENT) );
- */
+}
+void *Task_4 (
+ void *argument
+)
+{
+ struct mq_attr attr;
+ int status;
+ int count;
+
+ print_current_time( "Task_4: ", "" );
/*
- * Validate we can wait on a message queue opened with mq_open.
+ * Set the count to the number of messages in the queue.
*/
-#if (0) /* XXX FIX ME */
- puts( "Init: mq_wait on mq1" );
- status = mq_receive(n_mq1);
- fatal_directive_status( status, 0, "mq_wait opened message queue");
-#endif
+ status = mq_getattr( Test_q[BLOCKING].mq, &attr );
+ fatal_posix_service_status( status, 0, "mq_getattr valid return status");
+ count = attr.mq_curmsgs;
- puts( "*** END OF POSIX MESSAGE QUEUE TEST ***" );
- exit( 0 );
+ puts("Task_4: Set queue to non-blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag | O_NONBLOCK;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts("Task_4: Return queue to blocking");
+ attr.mq_flags = Test_q[BLOCKING].oflag;
+ status = mq_setattr( Test_q[BLOCKING].mq, &attr, NULL );
+ fatal_int_service_status( status, 0, "mq_setattr valid return status");
+ Validate_attributes( Test_q[BLOCKING].mq, attr.mq_flags, count );
+
+ puts( "Task_4: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_5 (
+ void *argument
+)
+{
+
+ print_current_time( "Task_5: ", "" );
+
+ puts( "Task_5: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
+
+ return NULL; /* just so the compiler thinks we returned something */
+
+}
+
+void *Task_ (
+ void *argument
+)
+{
+
+ print_current_time( "Task_: ", "" );
+
+ puts( "Task_: pthread_exit" );
+ pthread_exit( NULL );
+
+ /* switch to Init */
return NULL; /* just so the compiler thinks we returned something */
+
}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+