summaryrefslogtreecommitdiffstats
path: root/c/src/exec/rtems/src/msg.c
diff options
context:
space:
mode:
Diffstat (limited to 'c/src/exec/rtems/src/msg.c')
-rw-r--r--c/src/exec/rtems/src/msg.c708
1 files changed, 708 insertions, 0 deletions
diff --git a/c/src/exec/rtems/src/msg.c b/c/src/exec/rtems/src/msg.c
new file mode 100644
index 0000000000..7cfe683ce5
--- /dev/null
+++ b/c/src/exec/rtems/src/msg.c
@@ -0,0 +1,708 @@
+/*
+ * Message Queue Manager
+ *
+ *
+ * COPYRIGHT (c) 1989, 1990, 1991, 1992, 1993, 1994.
+ * On-Line Applications Research Corporation (OAR).
+ * All rights assigned to U.S. Government, 1994.
+ *
+ * This material may be reproduced by or for the U.S. Government pursuant
+ * to the copyright license under the clause at DFARS 252.227-7013. This
+ * notice must appear in all copies of this file and its derivatives.
+ *
+ * $Id$
+ */
+
+#include <rtems/system.h>
+#include <rtems/attr.h>
+#include <rtems/chain.h>
+#include <rtems/config.h>
+#include <rtems/isr.h>
+#include <rtems/message.h>
+#include <rtems/object.h>
+#include <rtems/options.h>
+#include <rtems/states.h>
+#include <rtems/thread.h>
+#include <rtems/wkspace.h>
+#include <rtems/mpci.h>
+
+/*PAGE
+ *
+ * _Message_queue_Manager_initialization
+ *
+ * This routine initializes all message queue manager related
+ * data structures.
+ *
+ * Input parameters:
+ * maximum_message_queues - number of message queues to initialize
+ * maximum_message - number of messages per queue
+ *
+ * Output parameters: NONE
+ */
+
+void _Message_queue_Manager_initialization(
+ unsigned32 maximum_message_queues,
+ unsigned32 maximum_messages
+)
+{
+
+ _Objects_Initialize_information(
+ &_Message_queue_Information,
+ TRUE,
+ maximum_message_queues,
+ sizeof( Message_queue_Control )
+ );
+
+ if ( maximum_messages == 0 ) {
+
+ _Chain_Initialize_empty( &_Message_queue_Inactive_messages );
+
+ } else {
+
+
+ _Chain_Initialize(
+ &_Message_queue_Inactive_messages,
+ _Workspace_Allocate_or_fatal_error(
+ maximum_messages * sizeof( Message_queue_Buffer_control )
+ ),
+ maximum_messages,
+ sizeof( Message_queue_Buffer_control )
+ );
+
+ }
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_create
+ *
+ * This directive creates a message queue by allocating and initializing
+ * a message queue data structure.
+ *
+ * Input parameters:
+ * name - user defined queue name
+ * count - maximum message and reserved buffer count
+ * attribute_set - process method
+ * id - pointer to queue
+ *
+ * Output parameters:
+ * id - queue id
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_create(
+ Objects_Name name,
+ unsigned32 count,
+ rtems_attribute attribute_set,
+ Objects_Id *id
+)
+{
+ register Message_queue_Control *the_message_queue;
+
+ if ( !_Objects_Is_name_valid( name ) )
+ return ( RTEMS_INVALID_NAME );
+
+ if ( _Attributes_Is_global( attribute_set ) &&
+ !_Configuration_Is_multiprocessing() )
+ return( RTEMS_MP_NOT_CONFIGURED );
+
+ _Thread_Disable_dispatch(); /* protects object pointer */
+
+ the_message_queue = _Message_queue_Allocate();
+
+ if ( !the_message_queue ) {
+ _Thread_Enable_dispatch();
+ return( RTEMS_TOO_MANY );
+ }
+
+ if ( _Attributes_Is_global( attribute_set ) &&
+ !( _Objects_MP_Open( &_Message_queue_Information, name,
+ the_message_queue->Object.id, FALSE ) ) ) {
+ _Message_queue_Free( the_message_queue );
+ _Thread_Enable_dispatch();
+ return( RTEMS_TOO_MANY );
+ }
+
+ if ( _Attributes_Is_limit( attribute_set ) )
+ the_message_queue->maximum_pending_messages = count;
+ else
+ the_message_queue->maximum_pending_messages = 0xffffffff;
+
+ the_message_queue->attribute_set = attribute_set;
+ the_message_queue->number_of_pending_messages = 0;
+
+ _Chain_Initialize_empty( &the_message_queue->Pending_messages );
+
+ _Thread_queue_Initialize( &the_message_queue->Wait_queue, attribute_set,
+ STATES_WAITING_FOR_MESSAGE );
+
+ _Objects_Open( &_Message_queue_Information,
+ &the_message_queue->Object, name );
+
+ *id = the_message_queue->Object.id;
+
+ if ( _Attributes_Is_global( attribute_set ) )
+ _Message_queue_MP_Send_process_packet(
+ MESSAGE_QUEUE_MP_ANNOUNCE_CREATE,
+ the_message_queue->Object.id,
+ name,
+ 0
+ );
+
+ _Thread_Enable_dispatch();
+ return( RTEMS_SUCCESSFUL );
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_ident
+ *
+ * This directive returns the system ID associated with
+ * the message queue name.
+ *
+ * Input parameters:
+ * name - user defined message queue name
+ * node - node(s) to be searched
+ * id - pointer to message queue id
+ *
+ * Output parameters:
+ * *id - message queue id
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_ident(
+ Objects_Name name,
+ unsigned32 node,
+ Objects_Id *id
+)
+{
+ return( _Objects_Name_to_id( &_Message_queue_Information, name,
+ node, id ) );
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_delete
+ *
+ * This directive allows a thread to delete the message queue specified
+ * by the given queue identifier.
+ *
+ * Input parameters:
+ * id - queue id
+ *
+ * Output parameters:
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_delete(
+ Objects_Id id
+)
+{
+ register Message_queue_Control *the_message_queue;
+ Objects_Locations location;
+
+ the_message_queue = _Message_queue_Get( id, &location );
+ switch ( location ) {
+ case OBJECTS_ERROR:
+ return( RTEMS_INVALID_ID );
+ case OBJECTS_REMOTE:
+ _Thread_Dispatch();
+ return( RTEMS_ILLEGAL_ON_REMOTE_OBJECT );
+ case OBJECTS_LOCAL:
+ _Objects_Close( &_Message_queue_Information,
+ &the_message_queue->Object );
+
+ if ( the_message_queue->number_of_pending_messages != 0 )
+ (void) _Message_queue_Flush_support( the_message_queue );
+ else
+ _Thread_queue_Flush(
+ &the_message_queue->Wait_queue,
+ _Message_queue_MP_Send_object_was_deleted
+ );
+
+ _Message_queue_Free( the_message_queue );
+
+ if ( _Attributes_Is_global( the_message_queue->attribute_set ) ) {
+ _Objects_MP_Close(
+ &_Message_queue_Information,
+ the_message_queue->Object.id
+ );
+
+ _Message_queue_MP_Send_process_packet(
+ MESSAGE_QUEUE_MP_ANNOUNCE_DELETE,
+ the_message_queue->Object.id,
+ 0, /* Not used */
+ MPCI_DEFAULT_TIMEOUT
+ );
+ }
+
+ _Thread_Enable_dispatch();
+ return( RTEMS_SUCCESSFUL );
+ }
+
+ return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_send
+ *
+ * This routine implements the directives q_send. It sends a
+ * message to the specified message queue.
+ *
+ * Input parameters:
+ * id - pointer to message queue
+ * buffer - pointer to message buffer
+ *
+ * Output parameters:
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_send(
+ Objects_Id id,
+ void *buffer
+)
+{
+ return( _Message_queue_Submit(
+ id,
+ (Message_queue_Buffer *) buffer,
+ MESSAGE_QUEUE_SEND_REQUEST
+ )
+ );
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_urgent
+ *
+ * This routine implements the directives q_urgent. It urgents a
+ * message to the specified message queue.
+ *
+ * Input parameters:
+ * id - pointer to message queue
+ * buffer - pointer to message buffer
+ *
+ * Output parameters:
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_urgent(
+ Objects_Id id,
+ void *buffer
+)
+{
+ return( _Message_queue_Submit(
+ id,
+ (Message_queue_Buffer *) buffer,
+ MESSAGE_QUEUE_URGENT_REQUEST
+ )
+ );
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_broadcast
+ *
+ * This directive sends a message for every thread waiting on the queue
+ * designated by id.
+ *
+ * Input parameters:
+ * id - pointer to message queue
+ * buffer - pointer to message buffer
+ * count - pointer to area to store number of threads made ready
+ *
+ * Output parameters:
+ * count - number of threads made ready
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_broadcast(
+ Objects_Id id,
+ void *buffer,
+ unsigned32 *count
+)
+{
+ register Message_queue_Control *the_message_queue;
+ Objects_Locations location;
+ Thread_Control *the_thread;
+ unsigned32 number_broadcasted;
+
+ the_message_queue = _Message_queue_Get( id, &location );
+ switch ( location ) {
+ case OBJECTS_ERROR:
+ return( RTEMS_INVALID_ID );
+ case OBJECTS_REMOTE:
+ _Thread_Executing->Wait.return_argument = count;
+
+ return
+ _Message_queue_MP_Send_request_packet(
+ MESSAGE_QUEUE_MP_BROADCAST_REQUEST,
+ id,
+ (Message_queue_Buffer *) buffer,
+ 0, /* Not used */
+ MPCI_DEFAULT_TIMEOUT
+ );
+
+ case OBJECTS_LOCAL:
+ number_broadcasted = 0;
+ while ( (the_thread =
+ _Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) {
+ number_broadcasted += 1;
+ _Message_queue_Copy_buffer(
+ (Message_queue_Buffer *) buffer,
+ the_thread->Wait.return_argument
+ );
+
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
+ the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
+
+ _Message_queue_MP_Send_response_packet(
+ MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
+ id,
+ the_thread
+ );
+ }
+ }
+ _Thread_Enable_dispatch();
+ *count = number_broadcasted;
+ return( RTEMS_SUCCESSFUL );
+ }
+
+ return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_receive
+ *
+ * This directive dequeues a message from the designated message queue
+ * and copies it into the requesting thread's buffer.
+ *
+ * Input parameters:
+ * id - queue id
+ * buffer - pointer to message buffer
+ * option_set - options on receive
+ * timeout - number of ticks to wait
+ *
+ * Output parameters:
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_receive(
+ Objects_Id id,
+ void *buffer,
+ unsigned32 option_set,
+ rtems_interval timeout
+)
+{
+ register Message_queue_Control *the_message_queue;
+ Objects_Locations location;
+
+ the_message_queue = _Message_queue_Get( id, &location );
+ switch ( location ) {
+ case OBJECTS_ERROR:
+ return( RTEMS_INVALID_ID );
+ case OBJECTS_REMOTE:
+ _Thread_Executing->Wait.return_argument = buffer;
+ return
+ _Message_queue_MP_Send_request_packet(
+ MESSAGE_QUEUE_MP_RECEIVE_REQUEST,
+ id,
+ buffer,
+ option_set,
+ timeout
+ );
+
+ case OBJECTS_LOCAL:
+ if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) )
+ _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
+ _Thread_Enable_dispatch();
+ return( _Thread_Executing->Wait.return_code );
+ }
+
+ return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
+}
+
+/*PAGE
+ *
+ * rtems_message_queue_flush
+ *
+ * This directive removes all pending messages from a queue and returns
+ * the number of messages removed. If no messages were present then
+ * a count of zero is returned.
+ *
+ * Input parameters:
+ * id - queue id
+ * count - return area for count
+ *
+ * Output parameters:
+ * count - number of messages removed ( 0 = empty queue )
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code rtems_message_queue_flush(
+ Objects_Id id,
+ unsigned32 *count
+)
+{
+ register Message_queue_Control *the_message_queue;
+ Objects_Locations location;
+
+ the_message_queue = _Message_queue_Get( id, &location );
+ switch ( location ) {
+ case OBJECTS_ERROR:
+ return( RTEMS_INVALID_ID );
+ case OBJECTS_REMOTE:
+ _Thread_Executing->Wait.return_argument = count;
+
+ return
+ _Message_queue_MP_Send_request_packet(
+ MESSAGE_QUEUE_MP_FLUSH_REQUEST,
+ id,
+ 0, /* Not used */
+ 0, /* Not used */
+ MPCI_DEFAULT_TIMEOUT
+ );
+
+ case OBJECTS_LOCAL:
+ if ( the_message_queue->number_of_pending_messages != 0 )
+ *count = _Message_queue_Flush_support( the_message_queue );
+ else
+ *count = 0;
+ _Thread_Enable_dispatch();
+ return( RTEMS_SUCCESSFUL );
+ }
+
+ return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
+}
+
+/*PAGE
+ *
+ * _Message_queue_Seize
+ *
+ * This kernel routine dequeues a message, copies the message buffer to
+ * a given destination buffer, and frees the message buffer to the
+ * inactive message pool.
+ *
+ * Input parameters:
+ * the_message_queue - pointer to message queue
+ * option_set - options on receive
+ * the_buffer - pointer to message buffer to be filled
+ *
+ * Output parameters:
+ * TRUE - if message received or RTEMS_NO_WAIT and no message
+ * FALSE - if thread is to block
+ *
+ * NOTE: Dependent on BUFFER_LENGTH
+ *
+ * INTERRUPT LATENCY:
+ * available
+ * wait
+ */
+
+boolean _Message_queue_Seize(
+ Message_queue_Control *the_message_queue,
+ rtems_option option_set,
+ Message_queue_Buffer *buffer
+)
+{
+ ISR_Level level;
+ Message_queue_Buffer_control *the_message;
+ Thread_Control *executing;
+
+ executing = _Thread_Executing;
+ executing->Wait.return_code = RTEMS_SUCCESSFUL;
+ _ISR_Disable( level );
+ if ( the_message_queue->number_of_pending_messages != 0 ) {
+ the_message_queue->number_of_pending_messages -= 1;
+
+ the_message = _Message_queue_Get_pending_message( the_message_queue );
+ _ISR_Enable( level );
+ _Message_queue_Copy_buffer( &the_message->Contents, buffer );
+ _Message_queue_Free_message_buffer( the_message );
+ return( TRUE );
+ }
+
+ if ( _Options_Is_no_wait( option_set ) ) {
+ _ISR_Enable( level );
+ executing->Wait.return_code = RTEMS_UNSATISFIED;
+ return( TRUE );
+ }
+
+ the_message_queue->Wait_queue.sync = TRUE;
+ executing->Wait.queue = &the_message_queue->Wait_queue;
+ executing->Wait.id = the_message_queue->Object.id;
+ executing->Wait.option_set = option_set;
+ executing->Wait.return_argument = (unsigned32 *)buffer;
+ _ISR_Enable( level );
+ return( FALSE );
+}
+
+/*PAGE
+ *
+ * _Message_queue_Flush_support
+ *
+ * This message manager routine removes all messages from a message queue
+ * and returns them to the inactive message pool.
+ *
+ * Input parameters:
+ * the_message_queue - pointer to message queue
+ *
+ * Output parameters:
+ * returns - number of messages placed on inactive chain
+ *
+ * INTERRUPT LATENCY:
+ * only case
+ */
+
+unsigned32 _Message_queue_Flush_support(
+ Message_queue_Control *the_message_queue
+)
+{
+ ISR_Level level;
+ Chain_Node *inactive_first;
+ Chain_Node *message_queue_first;
+ Chain_Node *message_queue_last;
+ unsigned32 count;
+
+ _ISR_Disable( level );
+ inactive_first = _Message_queue_Inactive_messages.first;
+ message_queue_first = the_message_queue->Pending_messages.first;
+ message_queue_last = the_message_queue->Pending_messages.last;
+
+ _Message_queue_Inactive_messages.first = message_queue_first;
+ message_queue_last->next = inactive_first;
+ inactive_first->previous = message_queue_last;
+ message_queue_first->previous =
+ _Chain_Head( &_Message_queue_Inactive_messages );
+
+ _Chain_Initialize_empty( &the_message_queue->Pending_messages );
+
+ count = the_message_queue->number_of_pending_messages;
+ the_message_queue->number_of_pending_messages = 0;
+ _ISR_Enable( level );
+ return( count );
+}
+
+/*PAGE
+ *
+ * _Message_queue_Submit
+ *
+ * This routine implements the directives q_send and q_urgent. It
+ * processes a message that is to be submitted to the designated
+ * message queue. The message will either be processed as a send
+ * send message which it will be inserted at the rear of the queue
+ * or it will be processed as an urgent message which will be inserted
+ * at the front of the queue.
+ *
+ * Input parameters:
+ * id - pointer to message queue
+ * the_buffer - pointer to message buffer
+ * submit_type - send or urgent message
+ *
+ * Output parameters:
+ * RTEMS_SUCCESSFUL - if successful
+ * error code - if unsuccessful
+ */
+
+rtems_status_code _Message_queue_Submit(
+ Objects_Id id,
+ Message_queue_Buffer *buffer,
+ Message_queue_Submit_types submit_type
+)
+{
+ register Message_queue_Control *the_message_queue;
+ Objects_Locations location;
+ Thread_Control *the_thread;
+ Message_queue_Buffer_control *the_message;
+
+ the_message_queue = _Message_queue_Get( id, &location );
+ switch ( location ) {
+ case OBJECTS_ERROR:
+ return( RTEMS_INVALID_ID );
+ case OBJECTS_REMOTE:
+ switch ( submit_type ) {
+ case MESSAGE_QUEUE_SEND_REQUEST:
+ return
+ _Message_queue_MP_Send_request_packet(
+ MESSAGE_QUEUE_MP_SEND_REQUEST,
+ id,
+ buffer,
+ 0, /* Not used */
+ MPCI_DEFAULT_TIMEOUT
+ );
+
+ case MESSAGE_QUEUE_URGENT_REQUEST:
+ return
+ _Message_queue_MP_Send_request_packet(
+ MESSAGE_QUEUE_MP_URGENT_REQUEST,
+ id,
+ buffer,
+ 0, /* Not used */
+ MPCI_DEFAULT_TIMEOUT
+ );
+ }
+ case OBJECTS_LOCAL:
+ the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
+
+ if ( the_thread ) {
+
+ _Message_queue_Copy_buffer(
+ buffer,
+ the_thread->Wait.return_argument
+ );
+
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
+ the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
+
+ _Message_queue_MP_Send_response_packet(
+ MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
+ id,
+ the_thread
+ );
+
+ }
+ _Thread_Enable_dispatch();
+ return( RTEMS_SUCCESSFUL );
+ }
+
+ if ( the_message_queue->number_of_pending_messages ==
+ the_message_queue->maximum_pending_messages ) {
+ _Thread_Enable_dispatch();
+ return( RTEMS_TOO_MANY );
+ }
+
+ the_message = _Message_queue_Allocate_message_buffer();
+
+ if ( !the_message ) {
+ _Thread_Enable_dispatch();
+ return( RTEMS_UNSATISFIED );
+ }
+
+ _Message_queue_Copy_buffer( buffer, &the_message->Contents );
+
+ the_message_queue->number_of_pending_messages += 1;
+
+ switch ( submit_type ) {
+ case MESSAGE_QUEUE_SEND_REQUEST:
+ _Message_queue_Append( the_message_queue, the_message );
+ break;
+ case MESSAGE_QUEUE_URGENT_REQUEST:
+ _Message_queue_Prepend( the_message_queue, the_message );
+ break;
+ }
+
+ _Thread_Enable_dispatch();
+ return( RTEMS_SUCCESSFUL );
+ }
+
+ return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
+}