From ac7d5ef06a6d6e8d84abbd1f0b82162725f98326 Mon Sep 17 00:00:00 2001 From: Joel Sherrill Date: Thu, 11 May 1995 17:39:37 +0000 Subject: Initial revision --- cpukit/rtems/src/msg.c | 708 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 708 insertions(+) create mode 100644 cpukit/rtems/src/msg.c (limited to 'cpukit/rtems/src/msg.c') diff --git a/cpukit/rtems/src/msg.c b/cpukit/rtems/src/msg.c new file mode 100644 index 0000000000..7cfe683ce5 --- /dev/null +++ b/cpukit/rtems/src/msg.c @@ -0,0 +1,708 @@ +/* + * Message Queue Manager + * + * + * COPYRIGHT (c) 1989, 1990, 1991, 1992, 1993, 1994. + * On-Line Applications Research Corporation (OAR). + * All rights assigned to U.S. Government, 1994. + * + * This material may be reproduced by or for the U.S. Government pursuant + * to the copyright license under the clause at DFARS 252.227-7013. This + * notice must appear in all copies of this file and its derivatives. + * + * $Id$ + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/*PAGE + * + * _Message_queue_Manager_initialization + * + * This routine initializes all message queue manager related + * data structures. + * + * Input parameters: + * maximum_message_queues - number of message queues to initialize + * maximum_message - number of messages per queue + * + * Output parameters: NONE + */ + +void _Message_queue_Manager_initialization( + unsigned32 maximum_message_queues, + unsigned32 maximum_messages +) +{ + + _Objects_Initialize_information( + &_Message_queue_Information, + TRUE, + maximum_message_queues, + sizeof( Message_queue_Control ) + ); + + if ( maximum_messages == 0 ) { + + _Chain_Initialize_empty( &_Message_queue_Inactive_messages ); + + } else { + + + _Chain_Initialize( + &_Message_queue_Inactive_messages, + _Workspace_Allocate_or_fatal_error( + maximum_messages * sizeof( Message_queue_Buffer_control ) + ), + maximum_messages, + sizeof( Message_queue_Buffer_control ) + ); + + } +} + +/*PAGE + * + * rtems_message_queue_create + * + * This directive creates a message queue by allocating and initializing + * a message queue data structure. + * + * Input parameters: + * name - user defined queue name + * count - maximum message and reserved buffer count + * attribute_set - process method + * id - pointer to queue + * + * Output parameters: + * id - queue id + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_create( + Objects_Name name, + unsigned32 count, + rtems_attribute attribute_set, + Objects_Id *id +) +{ + register Message_queue_Control *the_message_queue; + + if ( !_Objects_Is_name_valid( name ) ) + return ( RTEMS_INVALID_NAME ); + + if ( _Attributes_Is_global( attribute_set ) && + !_Configuration_Is_multiprocessing() ) + return( RTEMS_MP_NOT_CONFIGURED ); + + _Thread_Disable_dispatch(); /* protects object pointer */ + + the_message_queue = _Message_queue_Allocate(); + + if ( !the_message_queue ) { + _Thread_Enable_dispatch(); + return( RTEMS_TOO_MANY ); + } + + if ( _Attributes_Is_global( attribute_set ) && + !( _Objects_MP_Open( &_Message_queue_Information, name, + the_message_queue->Object.id, FALSE ) ) ) { + _Message_queue_Free( the_message_queue ); + _Thread_Enable_dispatch(); + return( RTEMS_TOO_MANY ); + } + + if ( _Attributes_Is_limit( attribute_set ) ) + the_message_queue->maximum_pending_messages = count; + else + the_message_queue->maximum_pending_messages = 0xffffffff; + + the_message_queue->attribute_set = attribute_set; + the_message_queue->number_of_pending_messages = 0; + + _Chain_Initialize_empty( &the_message_queue->Pending_messages ); + + _Thread_queue_Initialize( &the_message_queue->Wait_queue, attribute_set, + STATES_WAITING_FOR_MESSAGE ); + + _Objects_Open( &_Message_queue_Information, + &the_message_queue->Object, name ); + + *id = the_message_queue->Object.id; + + if ( _Attributes_Is_global( attribute_set ) ) + _Message_queue_MP_Send_process_packet( + MESSAGE_QUEUE_MP_ANNOUNCE_CREATE, + the_message_queue->Object.id, + name, + 0 + ); + + _Thread_Enable_dispatch(); + return( RTEMS_SUCCESSFUL ); +} + +/*PAGE + * + * rtems_message_queue_ident + * + * This directive returns the system ID associated with + * the message queue name. + * + * Input parameters: + * name - user defined message queue name + * node - node(s) to be searched + * id - pointer to message queue id + * + * Output parameters: + * *id - message queue id + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_ident( + Objects_Name name, + unsigned32 node, + Objects_Id *id +) +{ + return( _Objects_Name_to_id( &_Message_queue_Information, name, + node, id ) ); +} + +/*PAGE + * + * rtems_message_queue_delete + * + * This directive allows a thread to delete the message queue specified + * by the given queue identifier. + * + * Input parameters: + * id - queue id + * + * Output parameters: + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_delete( + Objects_Id id +) +{ + register Message_queue_Control *the_message_queue; + Objects_Locations location; + + the_message_queue = _Message_queue_Get( id, &location ); + switch ( location ) { + case OBJECTS_ERROR: + return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return( RTEMS_ILLEGAL_ON_REMOTE_OBJECT ); + case OBJECTS_LOCAL: + _Objects_Close( &_Message_queue_Information, + &the_message_queue->Object ); + + if ( the_message_queue->number_of_pending_messages != 0 ) + (void) _Message_queue_Flush_support( the_message_queue ); + else + _Thread_queue_Flush( + &the_message_queue->Wait_queue, + _Message_queue_MP_Send_object_was_deleted + ); + + _Message_queue_Free( the_message_queue ); + + if ( _Attributes_Is_global( the_message_queue->attribute_set ) ) { + _Objects_MP_Close( + &_Message_queue_Information, + the_message_queue->Object.id + ); + + _Message_queue_MP_Send_process_packet( + MESSAGE_QUEUE_MP_ANNOUNCE_DELETE, + the_message_queue->Object.id, + 0, /* Not used */ + MPCI_DEFAULT_TIMEOUT + ); + } + + _Thread_Enable_dispatch(); + return( RTEMS_SUCCESSFUL ); + } + + return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ +} + +/*PAGE + * + * rtems_message_queue_send + * + * This routine implements the directives q_send. It sends a + * message to the specified message queue. + * + * Input parameters: + * id - pointer to message queue + * buffer - pointer to message buffer + * + * Output parameters: + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_send( + Objects_Id id, + void *buffer +) +{ + return( _Message_queue_Submit( + id, + (Message_queue_Buffer *) buffer, + MESSAGE_QUEUE_SEND_REQUEST + ) + ); +} + +/*PAGE + * + * rtems_message_queue_urgent + * + * This routine implements the directives q_urgent. It urgents a + * message to the specified message queue. + * + * Input parameters: + * id - pointer to message queue + * buffer - pointer to message buffer + * + * Output parameters: + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_urgent( + Objects_Id id, + void *buffer +) +{ + return( _Message_queue_Submit( + id, + (Message_queue_Buffer *) buffer, + MESSAGE_QUEUE_URGENT_REQUEST + ) + ); +} + +/*PAGE + * + * rtems_message_queue_broadcast + * + * This directive sends a message for every thread waiting on the queue + * designated by id. + * + * Input parameters: + * id - pointer to message queue + * buffer - pointer to message buffer + * count - pointer to area to store number of threads made ready + * + * Output parameters: + * count - number of threads made ready + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_broadcast( + Objects_Id id, + void *buffer, + unsigned32 *count +) +{ + register Message_queue_Control *the_message_queue; + Objects_Locations location; + Thread_Control *the_thread; + unsigned32 number_broadcasted; + + the_message_queue = _Message_queue_Get( id, &location ); + switch ( location ) { + case OBJECTS_ERROR: + return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: + _Thread_Executing->Wait.return_argument = count; + + return + _Message_queue_MP_Send_request_packet( + MESSAGE_QUEUE_MP_BROADCAST_REQUEST, + id, + (Message_queue_Buffer *) buffer, + 0, /* Not used */ + MPCI_DEFAULT_TIMEOUT + ); + + case OBJECTS_LOCAL: + number_broadcasted = 0; + while ( (the_thread = + _Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) { + number_broadcasted += 1; + _Message_queue_Copy_buffer( + (Message_queue_Buffer *) buffer, + the_thread->Wait.return_argument + ); + + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) { + the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL; + + _Message_queue_MP_Send_response_packet( + MESSAGE_QUEUE_MP_RECEIVE_RESPONSE, + id, + the_thread + ); + } + } + _Thread_Enable_dispatch(); + *count = number_broadcasted; + return( RTEMS_SUCCESSFUL ); + } + + return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ +} + +/*PAGE + * + * rtems_message_queue_receive + * + * This directive dequeues a message from the designated message queue + * and copies it into the requesting thread's buffer. + * + * Input parameters: + * id - queue id + * buffer - pointer to message buffer + * option_set - options on receive + * timeout - number of ticks to wait + * + * Output parameters: + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_receive( + Objects_Id id, + void *buffer, + unsigned32 option_set, + rtems_interval timeout +) +{ + register Message_queue_Control *the_message_queue; + Objects_Locations location; + + the_message_queue = _Message_queue_Get( id, &location ); + switch ( location ) { + case OBJECTS_ERROR: + return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: + _Thread_Executing->Wait.return_argument = buffer; + return + _Message_queue_MP_Send_request_packet( + MESSAGE_QUEUE_MP_RECEIVE_REQUEST, + id, + buffer, + option_set, + timeout + ); + + case OBJECTS_LOCAL: + if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) ) + _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout ); + _Thread_Enable_dispatch(); + return( _Thread_Executing->Wait.return_code ); + } + + return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ +} + +/*PAGE + * + * rtems_message_queue_flush + * + * This directive removes all pending messages from a queue and returns + * the number of messages removed. If no messages were present then + * a count of zero is returned. + * + * Input parameters: + * id - queue id + * count - return area for count + * + * Output parameters: + * count - number of messages removed ( 0 = empty queue ) + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code rtems_message_queue_flush( + Objects_Id id, + unsigned32 *count +) +{ + register Message_queue_Control *the_message_queue; + Objects_Locations location; + + the_message_queue = _Message_queue_Get( id, &location ); + switch ( location ) { + case OBJECTS_ERROR: + return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: + _Thread_Executing->Wait.return_argument = count; + + return + _Message_queue_MP_Send_request_packet( + MESSAGE_QUEUE_MP_FLUSH_REQUEST, + id, + 0, /* Not used */ + 0, /* Not used */ + MPCI_DEFAULT_TIMEOUT + ); + + case OBJECTS_LOCAL: + if ( the_message_queue->number_of_pending_messages != 0 ) + *count = _Message_queue_Flush_support( the_message_queue ); + else + *count = 0; + _Thread_Enable_dispatch(); + return( RTEMS_SUCCESSFUL ); + } + + return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ +} + +/*PAGE + * + * _Message_queue_Seize + * + * This kernel routine dequeues a message, copies the message buffer to + * a given destination buffer, and frees the message buffer to the + * inactive message pool. + * + * Input parameters: + * the_message_queue - pointer to message queue + * option_set - options on receive + * the_buffer - pointer to message buffer to be filled + * + * Output parameters: + * TRUE - if message received or RTEMS_NO_WAIT and no message + * FALSE - if thread is to block + * + * NOTE: Dependent on BUFFER_LENGTH + * + * INTERRUPT LATENCY: + * available + * wait + */ + +boolean _Message_queue_Seize( + Message_queue_Control *the_message_queue, + rtems_option option_set, + Message_queue_Buffer *buffer +) +{ + ISR_Level level; + Message_queue_Buffer_control *the_message; + Thread_Control *executing; + + executing = _Thread_Executing; + executing->Wait.return_code = RTEMS_SUCCESSFUL; + _ISR_Disable( level ); + if ( the_message_queue->number_of_pending_messages != 0 ) { + the_message_queue->number_of_pending_messages -= 1; + + the_message = _Message_queue_Get_pending_message( the_message_queue ); + _ISR_Enable( level ); + _Message_queue_Copy_buffer( &the_message->Contents, buffer ); + _Message_queue_Free_message_buffer( the_message ); + return( TRUE ); + } + + if ( _Options_Is_no_wait( option_set ) ) { + _ISR_Enable( level ); + executing->Wait.return_code = RTEMS_UNSATISFIED; + return( TRUE ); + } + + the_message_queue->Wait_queue.sync = TRUE; + executing->Wait.queue = &the_message_queue->Wait_queue; + executing->Wait.id = the_message_queue->Object.id; + executing->Wait.option_set = option_set; + executing->Wait.return_argument = (unsigned32 *)buffer; + _ISR_Enable( level ); + return( FALSE ); +} + +/*PAGE + * + * _Message_queue_Flush_support + * + * This message manager routine removes all messages from a message queue + * and returns them to the inactive message pool. + * + * Input parameters: + * the_message_queue - pointer to message queue + * + * Output parameters: + * returns - number of messages placed on inactive chain + * + * INTERRUPT LATENCY: + * only case + */ + +unsigned32 _Message_queue_Flush_support( + Message_queue_Control *the_message_queue +) +{ + ISR_Level level; + Chain_Node *inactive_first; + Chain_Node *message_queue_first; + Chain_Node *message_queue_last; + unsigned32 count; + + _ISR_Disable( level ); + inactive_first = _Message_queue_Inactive_messages.first; + message_queue_first = the_message_queue->Pending_messages.first; + message_queue_last = the_message_queue->Pending_messages.last; + + _Message_queue_Inactive_messages.first = message_queue_first; + message_queue_last->next = inactive_first; + inactive_first->previous = message_queue_last; + message_queue_first->previous = + _Chain_Head( &_Message_queue_Inactive_messages ); + + _Chain_Initialize_empty( &the_message_queue->Pending_messages ); + + count = the_message_queue->number_of_pending_messages; + the_message_queue->number_of_pending_messages = 0; + _ISR_Enable( level ); + return( count ); +} + +/*PAGE + * + * _Message_queue_Submit + * + * This routine implements the directives q_send and q_urgent. It + * processes a message that is to be submitted to the designated + * message queue. The message will either be processed as a send + * send message which it will be inserted at the rear of the queue + * or it will be processed as an urgent message which will be inserted + * at the front of the queue. + * + * Input parameters: + * id - pointer to message queue + * the_buffer - pointer to message buffer + * submit_type - send or urgent message + * + * Output parameters: + * RTEMS_SUCCESSFUL - if successful + * error code - if unsuccessful + */ + +rtems_status_code _Message_queue_Submit( + Objects_Id id, + Message_queue_Buffer *buffer, + Message_queue_Submit_types submit_type +) +{ + register Message_queue_Control *the_message_queue; + Objects_Locations location; + Thread_Control *the_thread; + Message_queue_Buffer_control *the_message; + + the_message_queue = _Message_queue_Get( id, &location ); + switch ( location ) { + case OBJECTS_ERROR: + return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: + switch ( submit_type ) { + case MESSAGE_QUEUE_SEND_REQUEST: + return + _Message_queue_MP_Send_request_packet( + MESSAGE_QUEUE_MP_SEND_REQUEST, + id, + buffer, + 0, /* Not used */ + MPCI_DEFAULT_TIMEOUT + ); + + case MESSAGE_QUEUE_URGENT_REQUEST: + return + _Message_queue_MP_Send_request_packet( + MESSAGE_QUEUE_MP_URGENT_REQUEST, + id, + buffer, + 0, /* Not used */ + MPCI_DEFAULT_TIMEOUT + ); + } + case OBJECTS_LOCAL: + the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); + + if ( the_thread ) { + + _Message_queue_Copy_buffer( + buffer, + the_thread->Wait.return_argument + ); + + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) { + the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL; + + _Message_queue_MP_Send_response_packet( + MESSAGE_QUEUE_MP_RECEIVE_RESPONSE, + id, + the_thread + ); + + } + _Thread_Enable_dispatch(); + return( RTEMS_SUCCESSFUL ); + } + + if ( the_message_queue->number_of_pending_messages == + the_message_queue->maximum_pending_messages ) { + _Thread_Enable_dispatch(); + return( RTEMS_TOO_MANY ); + } + + the_message = _Message_queue_Allocate_message_buffer(); + + if ( !the_message ) { + _Thread_Enable_dispatch(); + return( RTEMS_UNSATISFIED ); + } + + _Message_queue_Copy_buffer( buffer, &the_message->Contents ); + + the_message_queue->number_of_pending_messages += 1; + + switch ( submit_type ) { + case MESSAGE_QUEUE_SEND_REQUEST: + _Message_queue_Append( the_message_queue, the_message ); + break; + case MESSAGE_QUEUE_URGENT_REQUEST: + _Message_queue_Prepend( the_message_queue, the_message ); + break; + } + + _Thread_Enable_dispatch(); + return( RTEMS_SUCCESSFUL ); + } + + return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ +} -- cgit v1.2.3