summaryrefslogtreecommitdiffstats
path: root/cpukit
diff options
context:
space:
mode:
authorJoel Sherrill <joel.sherrill@OARcorp.com>2000-01-13 19:25:15 +0000
committerJoel Sherrill <joel.sherrill@OARcorp.com>2000-01-13 19:25:15 +0000
commit53fb837afc4285486e318bcb614c911bbe9b1348 (patch)
treecdd9b6ff2a66e8f5c746a06d1e639be4c01c6941 /cpukit
parentee4ddd83616637c4fbbafe7cbb6148c08832309b (diff)
downloadrtems-53fb837afc4285486e318bcb614c911bbe9b1348.tar.bz2
POSIX message queues now include complete functionality including
blocking sends when the queue is full. The SuperCore was enhanced to support blocking on send. The existing POSIX API was debugged and numerous test cases were added to psxmsgq01 by Jennifer Averett. SuperCore enhancements and resulting modifications to other APIs were done by Joel. There is one significant point of interpretation for the POSIX API. What happens to threads already blocked on a message queue when the mode of that same message queue is changed from blocking to non-blocking? We decided to unblock all waiting tasks with an EAGAIN error just as if a non-blocking version of the same operation had returned unsatisfied. This case is not discussed in the POSIX standard and other implementations may have chosen differently.
Diffstat (limited to '')
-rw-r--r--cpukit/itron/src/cre_mbf.c10
-rw-r--r--cpukit/itron/src/snd_mbx.c12
-rw-r--r--cpukit/itron/src/trcv_mbf.c2
-rw-r--r--cpukit/itron/src/tsnd_mbf.c14
-rw-r--r--cpukit/posix/include/rtems/posix/mqueue.h2
-rw-r--r--cpukit/rtems/src/msgqcreate.c14
-rw-r--r--cpukit/rtems/src/msgqreceive.c6
-rw-r--r--cpukit/rtems/src/msgqsubmit.c41
-rw-r--r--cpukit/score/include/rtems/score/coremsg.h44
-rw-r--r--cpukit/score/inline/rtems/score/coremsg.inl24
-rw-r--r--cpukit/score/macros/rtems/score/coremsg.inl10
-rw-r--r--cpukit/score/src/coremsgbroadcast.c19
-rw-r--r--cpukit/score/src/coremsgsubmit.c146
13 files changed, 204 insertions, 140 deletions
diff --git a/cpukit/itron/src/cre_mbf.c b/cpukit/itron/src/cre_mbf.c
index ca139d53ae..0db738435e 100644
--- a/cpukit/itron/src/cre_mbf.c
+++ b/cpukit/itron/src/cre_mbf.c
@@ -25,7 +25,7 @@ ER cre_mbf(
T_CMBF *pk_cmbf
)
{
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
ITRON_Message_buffer_Control *the_message_buffer;
/*
@@ -57,16 +57,14 @@ ER cre_mbf(
}
if ( pk_cmbf->mbfatr & TA_TPRI )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
_CORE_message_queue_Initialize(
&the_message_buffer->message_queue,
OBJECTS_ITRON_MESSAGE_BUFFERS,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
pk_cmbf->bufsz / pk_cmbf->maxmsz,
pk_cmbf->maxmsz,
NULL /* Multiprocessing not supported */
diff --git a/cpukit/itron/src/snd_mbx.c b/cpukit/itron/src/snd_mbx.c
index f674583e7b..e02714465e 100644
--- a/cpukit/itron/src/snd_mbx.c
+++ b/cpukit/itron/src/snd_mbx.c
@@ -27,7 +27,6 @@ ER snd_msg(
{
register ITRON_Mailbox_Control *the_mailbox;
Objects_Locations location;
- CORE_message_queue_Status status = E_OK;
unsigned32 message_priority;
void *message_contents;
@@ -47,17 +46,22 @@ ER snd_msg(
message_priority = CORE_MESSAGE_QUEUE_SEND_REQUEST;
message_contents = pk_msg;
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_mailbox->message_queue,
&message_contents,
sizeof(T_MSG *),
the_mailbox->Object.id,
NULL, /* multiprocessing not supported */
- message_priority
+ message_priority,
+ FALSE, /* do not allow sender to block */
+ 0 /* no timeout */
);
break;
}
_ITRON_return_errorno(
- _ITRON_Mailbox_Translate_core_message_queue_return_code(status) );
+ _ITRON_Mailbox_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ )
+ );
}
diff --git a/cpukit/itron/src/trcv_mbf.c b/cpukit/itron/src/trcv_mbf.c
index a63f2a6748..0b2b89e612 100644
--- a/cpukit/itron/src/trcv_mbf.c
+++ b/cpukit/itron/src/trcv_mbf.c
@@ -32,7 +32,6 @@ ER trcv_mbf(
CORE_message_queue_Status status;
boolean wait;
Watchdog_Interval interval;
- CORE_message_queue_Submit_types core_priority;
interval = 0;
if (tmout == TMO_POL) {
@@ -62,7 +61,6 @@ ER trcv_mbf(
msg,
p_msgsz,
wait,
- &core_priority,
interval
);
_Thread_Enable_dispatch();
diff --git a/cpukit/itron/src/tsnd_mbf.c b/cpukit/itron/src/tsnd_mbf.c
index bc609dd298..0ed3b4f90e 100644
--- a/cpukit/itron/src/tsnd_mbf.c
+++ b/cpukit/itron/src/tsnd_mbf.c
@@ -33,7 +33,6 @@ ER tsnd_mbf(
Objects_Locations location;
Watchdog_Interval interval;
boolean wait;
- CORE_message_queue_Status status;
if (msgsz <= 0 || !msg)
return E_PAR;
@@ -50,8 +49,6 @@ ER tsnd_mbf(
if ( wait && _ITRON_Is_in_non_task_state() )
return E_CTX;
- assert( wait == FALSE );
-
the_message_buffer = _ITRON_Message_buffer_Get(mbfid, &location);
switch (location) {
case OBJECTS_REMOTE:
@@ -60,17 +57,20 @@ ER tsnd_mbf(
case OBJECTS_LOCAL:
/* XXX Submit needs to take into account blocking */
- status = _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
&the_message_buffer->message_queue,
msg,
msgsz,
the_message_buffer->Object.id,
NULL,
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ interval /* timeout interval */
);
_Thread_Enable_dispatch();
- return
- _ITRON_Message_buffer_Translate_core_message_buffer_return_code(status);
+ return _ITRON_Message_buffer_Translate_core_message_buffer_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
/*
diff --git a/cpukit/posix/include/rtems/posix/mqueue.h b/cpukit/posix/include/rtems/posix/mqueue.h
index 1c1201fef1..e3f7a2f073 100644
--- a/cpukit/posix/include/rtems/posix/mqueue.h
+++ b/cpukit/posix/include/rtems/posix/mqueue.h
@@ -32,10 +32,8 @@ extern "C" {
typedef struct {
Objects_Control Object;
int process_shared;
- int flags;
boolean named;
boolean linked;
- boolean blocking;
int oflag;
unsigned32 open_count;
CORE_message_queue_Control Message_queue;
diff --git a/cpukit/rtems/src/msgqcreate.c b/cpukit/rtems/src/msgqcreate.c
index 395cbf3a02..24bc35993d 100644
--- a/cpukit/rtems/src/msgqcreate.c
+++ b/cpukit/rtems/src/msgqcreate.c
@@ -59,7 +59,7 @@ rtems_status_code rtems_message_queue_create(
)
{
register Message_queue_Control *the_message_queue;
- CORE_message_queue_Attributes the_message_queue_attributes;
+ CORE_message_queue_Attributes the_msgq_attributes;
void *handler;
#if defined(RTEMS_MULTIPROCESSING)
boolean is_global;
@@ -74,10 +74,10 @@ rtems_status_code rtems_message_queue_create(
return RTEMS_MP_NOT_CONFIGURED;
#endif
- if (count == 0)
+ if ( count == 0 )
return RTEMS_INVALID_NUMBER;
- if (max_message_size == 0)
+ if ( max_message_size == 0 )
return RTEMS_INVALID_SIZE;
#if defined(RTEMS_MULTIPROCESSING)
@@ -115,11 +115,9 @@ rtems_status_code rtems_message_queue_create(
the_message_queue->attribute_set = attribute_set;
if (_Attributes_Is_priority( attribute_set ) )
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
- the_message_queue_attributes.discipline =
- CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
+ the_msgq_attributes.discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
handler = NULL;
#if defined(RTEMS_MULTIPROCESSING)
@@ -129,7 +127,7 @@ rtems_status_code rtems_message_queue_create(
if ( ! _CORE_message_queue_Initialize(
&the_message_queue->message_queue,
OBJECTS_RTEMS_MESSAGE_QUEUES,
- &the_message_queue_attributes,
+ &the_msgq_attributes,
count,
max_message_size,
handler ) ) {
diff --git a/cpukit/rtems/src/msgqreceive.c b/cpukit/rtems/src/msgqreceive.c
index 1338216c6b..77fcadc313 100644
--- a/cpukit/rtems/src/msgqreceive.c
+++ b/cpukit/rtems/src/msgqreceive.c
@@ -92,12 +92,12 @@ rtems_status_code rtems_message_queue_receive(
buffer,
size,
wait,
- &core_priority,
timeout
);
_Thread_Enable_dispatch();
- return( _Message_queue_Translate_core_message_queue_return_code(
- _Thread_Executing->Wait.return_code ) );
+ return _Message_queue_Translate_core_message_queue_return_code(
+ _Thread_Executing->Wait.return_code
+ );
}
diff --git a/cpukit/rtems/src/msgqsubmit.c b/cpukit/rtems/src/msgqsubmit.c
index 5a03f6409a..16f1c50266 100644
--- a/cpukit/rtems/src/msgqsubmit.c
+++ b/cpukit/rtems/src/msgqsubmit.c
@@ -61,7 +61,6 @@ rtems_status_code _Message_queue_Submit(
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
- CORE_message_queue_Status core_status;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location )
@@ -98,39 +97,43 @@ rtems_status_code _Message_queue_Submit(
case OBJECTS_LOCAL:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
- core_status = _CORE_message_queue_Send(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Send(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
case MESSAGE_QUEUE_URGENT_REQUEST:
- core_status = _CORE_message_queue_Urgent(
- &the_message_queue->message_queue,
- buffer,
- size,
- id,
+ _CORE_message_queue_Urgent(
+ &the_message_queue->message_queue,
+ buffer,
+ size,
+ id,
#if defined(RTEMS_MULTIPROCESSING)
- _Message_queue_Core_message_queue_mp_support
+ _Message_queue_Core_message_queue_mp_support,
#else
- NULL
+ NULL,
#endif
- );
+ FALSE, /* sender does not block */
+ 0 /* no timeout */
+ );
break;
default:
- core_status = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
return RTEMS_INTERNAL_ERROR; /* should never get here */
}
_Thread_Enable_dispatch();
return _Message_queue_Translate_core_message_queue_return_code(
- core_status );
+ _Thread_Executing->Wait.return_code
+ );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
diff --git a/cpukit/score/include/rtems/score/coremsg.h b/cpukit/score/include/rtems/score/coremsg.h
index 6ffedee0af..818b1e8c18 100644
--- a/cpukit/score/include/rtems/score/coremsg.h
+++ b/cpukit/score/include/rtems/score/coremsg.h
@@ -166,13 +166,12 @@ void _CORE_message_queue_Close(
);
/*
- *
* _CORE_message_queue_Flush
*
* DESCRIPTION:
*
- * This function flushes the message_queue's task wait queue. The number
- * messages flushed from the queue is returned.
+ * This function flushes the message_queue's pending message queue. The
+ * number of messages flushed from the queue is returned.
*
*/
@@ -194,7 +193,20 @@ unsigned32 _CORE_message_queue_Flush_support(
);
/*
+ * _CORE_message_queue_Flush_waiting_threads
+ *
+ * DESCRIPTION:
*
+ * This function flushes the threads which are blocked on this
+ * message_queue's pending message queue. They are unblocked whether
+ * blocked sending or receiving.
+ */
+
+void _CORE_message_queue_Flush_waiting_threads(
+ CORE_message_queue_Control *the_message_queue
+);
+
+/*
* _CORE_message_queue_Broadcast
*
* DESCRIPTION:
@@ -214,7 +226,6 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
);
/*
- *
* _CORE_message_queue_Submit
*
* DESCRIPTION:
@@ -228,17 +239,18 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
*
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
);
/*
- *
* _CORE_message_queue_Seize
*
* DESCRIPTION:
@@ -248,6 +260,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* inactive message pool. The thread will be blocked if wait is TRUE,
* otherwise an error will be given to the thread if no messages are available.
*
+ * NOTE: Returns message priority via return are in TCB.
*/
void _CORE_message_queue_Seize(
@@ -256,10 +269,25 @@ void _CORE_message_queue_Seize(
void *buffer,
unsigned32 *size,
boolean wait,
- CORE_message_queue_Submit_types *priority,
Watchdog_Interval timeout
);
+/*
+ * _CORE_message_queue_Insert_message
+ *
+ * DESCRIPTION:
+ *
+ * This kernel routine inserts the specified message into the
+ * message queue. It is assumed that the message has been filled
+ * in before this routine is called.
+ */
+
+void _CORE_message_queue_Insert_message(
+ CORE_message_queue_Control *the_message_queue,
+ CORE_message_queue_Buffer_control *the_message,
+ CORE_message_queue_Submit_types submit_type
+);
+
#ifndef __RTEMS_APPLICATION__
#include <rtems/score/coremsg.inl>
#endif
diff --git a/cpukit/score/inline/rtems/score/coremsg.inl b/cpukit/score/inline/rtems/score/coremsg.inl
index af16fbd4aa..7356ce9537 100644
--- a/cpukit/score/inline/rtems/score/coremsg.inl
+++ b/cpukit/score/inline/rtems/score/coremsg.inl
@@ -27,15 +27,17 @@
* This routine sends a message to the end of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Send(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -45,7 +47,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_SEND_REQUEST
+ CORE_MESSAGE_QUEUE_SEND_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
@@ -58,15 +62,17 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
* This routine sends a message to the front of the specified message queue.
*/
-RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Urgent(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
- CORE_message_queue_API_mp_support_callout api_message_queue_mp_support
+ CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
- return _CORE_message_queue_Submit(
+ _CORE_message_queue_Submit(
the_message_queue,
buffer,
size,
@@ -76,7 +82,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
#else
NULL,
#endif
- CORE_MESSAGE_QUEUE_URGENT_REQUEST
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST,
+ wait, /* sender may block */
+ timeout /* timeout interval */
);
}
diff --git a/cpukit/score/macros/rtems/score/coremsg.inl b/cpukit/score/macros/rtems/score/coremsg.inl
index bc45a51ac3..0717b3eea3 100644
--- a/cpukit/score/macros/rtems/score/coremsg.inl
+++ b/cpukit/score/macros/rtems/score/coremsg.inl
@@ -23,9 +23,10 @@
*/
#define _CORE_message_queue_Send( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_SEND_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_SEND_REQUEST, (_wait), (_timeout)
/*PAGE
*
@@ -34,9 +35,10 @@ _id, _api_message_queue_mp_support ) \
*/
#define _CORE_message_queue_Urgent( _the_message_queue, _buffer, _size, \
-_id, _api_message_queue_mp_support ) \
+ _id, _api_message_queue_mp_support, _wait, _timeout ) \
_CORE_message_queue_Submit( (_the_message_queue), (_buffer), (_size), \
- (_id), (_api_message_queue_mp_support), CORE_MESSAGE_QUEUE_URGENT_REQUEST )
+ (_id), (_api_message_queue_mp_support), \
+ CORE_MESSAGE_QUEUE_URGENT_REQUEST, (_wait), (_timeout)
/*PAGE
*
diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c
index 2e6f649545..18e148ab1c 100644
--- a/cpukit/score/src/coremsgbroadcast.c
+++ b/cpukit/score/src/coremsgbroadcast.c
@@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
Thread_Wait_information *waitp;
unsigned32 constrained_size;
+ /*
+ * If there are pending messages, then there can't be threads
+ * waiting for us to send them a message.
+ *
+ * NOTE: This check is critical because threads can block on
+ * send and receive and this ensures that we are broadcasting
+ * the message to threads waiting to receive -- not to send.
+ */
+
+ if ( the_message_queue->number_of_pending_messages != 0 ) {
+ *count = 0;
+ return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ }
+
+ /*
+ * There must be no pending messages if there is a thread waiting to
+ * receive a message.
+ */
+
number_broadcasted = 0;
while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
waitp = &the_thread->Wait;
diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c
index 4e15ab5bc0..6829351c18 100644
--- a/cpukit/score/src/coremsgsubmit.c
+++ b/cpukit/score/src/coremsgsubmit.c
@@ -53,102 +53,110 @@
* error code - if unsuccessful
*/
-CORE_message_queue_Status _CORE_message_queue_Submit(
+void _CORE_message_queue_Submit(
CORE_message_queue_Control *the_message_queue,
void *buffer,
unsigned32 size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- CORE_message_queue_Submit_types submit_type
+ CORE_message_queue_Submit_types submit_type,
+ boolean wait,
+ Watchdog_Interval timeout
)
{
+ ISR_Level level;
CORE_message_queue_Buffer_control *the_message;
Thread_Control *the_thread;
+ Thread_Control *executing;
- if ( size > the_message_queue->maximum_message_size )
- return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+
+ if ( size > the_message_queue->maximum_message_size ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
+ return;
+ }
/*
- * Is there a thread currently waiting on this message queue?
+ * Is there a thread currently waiting on this message queue?
*/
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( the_thread ) {
- _CORE_message_queue_Copy_buffer(
- buffer,
- the_thread->Wait.return_argument,
- size
- );
- *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
- the_thread->Wait.count = submit_type;
+ if ( the_message_queue->number_of_pending_messages == 0 ) {
+ the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
+ if ( the_thread ) {
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_thread->Wait.return_argument,
+ size
+ );
+ *(unsigned32 *)the_thread->Wait.return_argument_1 = size;
+ the_thread->Wait.count = submit_type;
#if defined(RTEMS_MULTIPROCESSING)
- if ( !_Objects_Is_local_id( the_thread->Object.id ) )
- (*api_message_queue_mp_support) ( the_thread, id );
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+ (*api_message_queue_mp_support) ( the_thread, id );
#endif
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ return;
+ }
}
/*
- * No one waiting on this one currently.
- * Allocate a message buffer and store it away
+ * No one waiting on the message queue at this time, so attempt to
+ * queue the message up for a future receive.
*/
- if ( the_message_queue->number_of_pending_messages ==
+ if ( the_message_queue->number_of_pending_messages <
the_message_queue->maximum_pending_messages ) {
- return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
- }
- the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue);
- if ( the_message == 0 )
- return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
-
- _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
- the_message->Contents.size = size;
- the_message->priority = submit_type;
-
- the_message_queue->number_of_pending_messages += 1;
-
- switch ( submit_type ) {
- case CORE_MESSAGE_QUEUE_SEND_REQUEST:
- _CORE_message_queue_Append( the_message_queue, the_message );
- break;
- case CORE_MESSAGE_QUEUE_URGENT_REQUEST:
- _CORE_message_queue_Prepend( the_message_queue, the_message );
- break;
- default:
- /* XXX interrupt critical section needs to be addressed */
- {
- CORE_message_queue_Buffer_control *this_message;
- Chain_Node *the_node;
-
- the_message->priority = submit_type;
- for ( the_node = the_message_queue->Pending_messages.first ;
- !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ;
- the_node = the_node->next ) {
-
- this_message = (CORE_message_queue_Buffer_control *) the_node;
-
- if ( this_message->priority >= the_message->priority )
- continue;
-
- _Chain_Insert( the_node, &the_message->Node );
- break;
- }
- }
- break;
+ the_message =
+ _CORE_message_queue_Allocate_message_buffer( the_message_queue );
+
+ /*
+ * NOTE: If the system is consistent, this error should never occur.
+ */
+ if ( !the_message ) {
+ _Thread_Executing->Wait.return_code =
+ CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
+ return;
+ }
+
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_message->Contents.buffer,
+ size
+ );
+ the_message->Contents.size = size;
+ the_message->priority = submit_type;
+
+ _CORE_message_queue_Insert_message(
+ the_message_queue,
+ the_message,
+ submit_type
+ );
+ return;
}
/*
- * According to POSIX, does this happen before or after the message
- * is actually enqueued. It is logical to think afterwards, because
- * the message is actually in the queue at this point.
+ * No message buffers were available so we may need to return an
+ * overflow error or block the sender until the message is placed
+ * on the queue.
*/
- if ( the_message_queue->number_of_pending_messages == 1 &&
- the_message_queue->notify_handler )
- (*the_message_queue->notify_handler)( the_message_queue->notify_argument );
-
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
+ if ( !wait ) {
+ _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
+ return;
+ }
+
+ executing = _Thread_Executing;
+
+ _ISR_Disable( level );
+ _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue );
+ executing->Wait.queue = &the_message_queue->Wait_queue;
+ executing->Wait.id = id;
+ executing->Wait.return_argument = (void *)buffer;
+ executing->Wait.return_argument_1 = (void *)size;
+ executing->Wait.count = submit_type;
+ _ISR_Enable( level );
+
+ _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
}