/*
* Message Queue Manager
*
*
* COPYRIGHT (c) 1989, 1990, 1991, 1992, 1993, 1994.
* On-Line Applications Research Corporation (OAR).
* All rights assigned to U.S. Government, 1994.
*
* This material may be reproduced by or for the U.S. Government pursuant
* to the copyright license under the clause at DFARS 252.227-7013. This
* notice must appear in all copies of this file and its derivatives.
*
* $Id$
*/
#include <rtems/system.h>
#include <rtems/sysstate.h>
#include <rtems/core/chain.h>
#include <rtems/core/isr.h>
#include <rtems/core/coremsg.h>
#include <rtems/core/object.h>
#include <rtems/core/states.h>
#include <rtems/core/thread.h>
#include <rtems/core/wkspace.h>
#include <rtems/core/mpci.h>
#include <rtems/rtems/status.h>
#include <rtems/rtems/attr.h>
#include <rtems/rtems/message.h>
#include <rtems/rtems/options.h>
#include <rtems/rtems/support.h>
/*PAGE
*
* _Message_queue_Manager_initialization
*
* This routine initializes all message queue manager related
* data structures.
*
* Input parameters:
* maximum_message_queues - number of message queues to initialize
*
* Output parameters: NONE
*/
void _Message_queue_Manager_initialization(
unsigned32 maximum_message_queues
)
{
_Objects_Initialize_information(
&_Message_queue_Information,
OBJECTS_RTEMS_MESSAGE_QUEUES,
TRUE,
maximum_message_queues,
sizeof( Message_queue_Control ),
FALSE,
RTEMS_MAXIMUM_NAME_LENGTH,
FALSE
);
/*
* Register the MP Process Packet routine.
*/
_MPCI_Register_packet_processor(
MP_PACKET_MESSAGE_QUEUE,
_Message_queue_MP_Process_packet
);
}
/*PAGE
*
* _Message_queue_Allocate
*
* Allocate a message queue and the space for its messages
*
* Input parameters:
* the_message_queue - the message queue to allocate message buffers
* count - maximum message and reserved buffer count
* max_message_size - maximum size of each message
*
* Output parameters:
* the_message_queue - set if successful, NULL otherwise
*/
Message_queue_Control *_Message_queue_Allocate (
unsigned32 count,
unsigned32 max_message_size
)
{
return
(Message_queue_Control *)_Objects_Allocate(&_Message_queue_Information);
}
/*PAGE
*
* rtems_message_queue_create
*
* This directive creates a message queue by allocating and initializing
* a message queue data structure.
*
* Input parameters:
* name - user defined queue name
* count - maximum message and reserved buffer count
* max_message_size - maximum size of each message
* attribute_set - process method
* id - pointer to queue
*
* Output parameters:
* id - queue id
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_create(
rtems_name name,
unsigned32 count,
unsigned32 max_message_size,
rtems_attribute attribute_set,
Objects_Id *id
)
{
register Message_queue_Control *the_message_queue;
CORE_message_queue_Attributes the_message_queue_attributes;
boolean is_global;
if ( !rtems_is_name_valid( name ) )
return RTEMS_INVALID_NAME;
if ( (is_global = _Attributes_Is_global( attribute_set ) ) &&
!_System_state_Is_multiprocessing )
return RTEMS_MP_NOT_CONFIGURED;
if (count == 0)
return RTEMS_INVALID_NUMBER;
if (max_message_size == 0)
return RTEMS_INVALID_SIZE;
#if 1
/*
* I am not 100% sure this should be an error.
* It seems reasonable to create a que with a large max size,
* and then just send smaller msgs from remote (or all) nodes.
*/
if ( is_global && (_MPCI_table->maximum_packet_size < max_message_size) )
return RTEMS_INVALID_SIZE;
#endif
_Thread_Disable_dispatch(); /* protects object pointer */
the_message_queue = _Message_queue_Allocate( count, max_message_size );
if ( !the_message_queue ) {
_Thread_Enable_dispatch();
return RTEMS_TOO_MANY;
}
if ( is_global &&
!( _Objects_MP_Allocate_and_open( &_Message_queue_Information,
name, the_message_queue->Object.id, FALSE ) ) ) {
_Message_queue_Free( the_message_queue );
_Thread_Enable_dispatch();
return RTEMS_TOO_MANY;
}
the_message_queue->attribute_set = attribute_set;
if (_Attributes_Is_priority( attribute_set ) )
the_message_queue_attributes.discipline =
CORE_MESSAGE_QUEUE_DISCIPLINES_PRIORITY;
else
the_message_queue_attributes.discipline =
CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO;
if ( ! _CORE_message_queue_Initialize(
&the_message_queue->message_queue,
OBJECTS_RTEMS_MESSAGE_QUEUES,
&the_message_queue_attributes,
count,
max_message_size,
_Message_queue_MP_Send_extract_proxy ) ) {
if ( is_global )
_Objects_MP_Close(
&_Message_queue_Information, the_message_queue->Object.id);
_Message_queue_Free( the_message_queue );
_Thread_Enable_dispatch();
return RTEMS_TOO_MANY;
}
_Objects_Open(
&_Message_queue_Information,
&the_message_queue->Object,
&name
);
*id = the_message_queue->Object.id;
if ( is_global )
_Message_queue_MP_Send_process_packet(
MESSAGE_QUEUE_MP_ANNOUNCE_CREATE,
the_message_queue->Object.id,
name,
0
);
_Thread_Enable_dispatch();
return RTEMS_SUCCESSFUL;
}
/*PAGE
*
* rtems_message_queue_ident
*
* This directive returns the system ID associated with
* the message queue name.
*
* Input parameters:
* name - user defined message queue name
* node - node(s) to be searched
* id - pointer to message queue id
*
* Output parameters:
* *id - message queue id
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_ident(
rtems_name name,
unsigned32 node,
Objects_Id *id
)
{
Objects_Name_to_id_errors status;
status = _Objects_Name_to_id(
&_Message_queue_Information,
&name,
node,
id
);
return _Status_Object_name_errors_to_status[ status ];
}
/*PAGE
*
* rtems_message_queue_delete
*
* This directive allows a thread to delete the message queue specified
* by the given queue identifier.
*
* Input parameters:
* id - queue id
*
* Output parameters:
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_delete(
Objects_Id id
)
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location ) {
case OBJECTS_ERROR:
return RTEMS_INVALID_ID;
case OBJECTS_REMOTE:
_Thread_Dispatch();
return RTEMS_ILLEGAL_ON_REMOTE_OBJECT;
case OBJECTS_LOCAL:
_Objects_Close( &_Message_queue_Information,
&the_message_queue->Object );
_CORE_message_queue_Close(
&the_message_queue->message_queue,
_Message_queue_MP_Send_object_was_deleted,
CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED
);
_Message_queue_Free( the_message_queue );
if ( _Attributes_Is_global( the_message_queue->attribute_set ) ) {
_Objects_MP_Close(
&_Message_queue_Information,
the_message_queue->Object.id
);
_Message_queue_MP_Send_process_packet(
MESSAGE_QUEUE_MP_ANNOUNCE_DELETE,
the_message_queue->Object.id,
0, /* Not used */
0
);
}
_Thread_Enable_dispatch();
return RTEMS_SUCCESSFUL;
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
}
/*PAGE
*
* rtems_message_queue_send
*
* This routine implements the directives q_send. It sends a
* message to the specified message queue.
*
* Input parameters:
* id - pointer to message queue
* buffer - pointer to message buffer
* size - size of message to sent urgently
*
* Output parameters:
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_send(
Objects_Id id,
void *buffer,
unsigned32 size
)
{
return( _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_SEND_REQUEST) );
}
/*PAGE
*
* rtems_message_queue_urgent
*
* This routine implements the directives q_urgent. It urgents a
* message to the specified message queue.
*
* Input parameters:
* id - pointer to message queue
* buffer - pointer to message buffer
* size - size of message to sent urgently
*
* Output parameters:
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_urgent(
Objects_Id id,
void *buffer,
unsigned32 size
)
{
return(_Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_URGENT_REQUEST));
}
/*PAGE
*
* rtems_message_queue_broadcast
*
* This directive sends a message for every thread waiting on the queue
* designated by id.
*
* Input parameters:
* id - pointer to message queue
* buffer - pointer to message buffer
* size - size of message to broadcast
* count - pointer to area to store number of threads made ready
*
* Output parameters:
* count - number of threads made ready
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_broadcast(
Objects_Id id,
void *buffer,
unsigned32 size,
unsigned32 *count
)
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
CORE_message_queue_Status core_status;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location ) {
case OBJECTS_ERROR:
return RTEMS_INVALID_ID;
case OBJECTS_REMOTE:
_Thread_Executing->Wait.return_argument = count;
return
_Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_BROADCAST_REQUEST,
id,
buffer,
&size,
0, /* option_set not used */
MPCI_DEFAULT_TIMEOUT
);
case OBJECTS_LOCAL:
core_status = _CORE_message_queue_Broadcast(
&the_message_queue->message_queue,
buffer,
size,
id,
_Message_queue_Core_message_queue_mp_support,
count
);
_Thread_Enable_dispatch();
return
_Message_queue_Translate_core_message_queue_return_code( core_status );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
}
/*PAGE
*
* rtems_message_queue_receive
*
* This directive dequeues a message from the designated message queue
* and copies it into the requesting thread's buffer.
*
* Input parameters:
* id - queue id
* buffer - pointer to message buffer
* size - size of message receive
* option_set - options on receive
* timeout - number of ticks to wait
*
* Output parameters:
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_receive(
Objects_Id id,
void *buffer,
unsigned32 *size,
unsigned32 option_set,
rtems_interval timeout
)
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
boolean wait;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location ) {
case OBJECTS_ERROR:
return RTEMS_INVALID_ID;
case OBJECTS_REMOTE:
return _Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_RECEIVE_REQUEST,
id,
buffer,
size,
option_set,
timeout
);
case OBJECTS_LOCAL:
if ( _Options_Is_no_wait( option_set ) )
wait = FALSE;
else
wait = TRUE;
_CORE_message_queue_Seize(
&the_message_queue->message_queue,
the_message_queue->Object.id,
buffer,
size,
wait,
timeout
);
_Thread_Enable_dispatch();
return( _Message_queue_Translate_core_message_queue_return_code(
_Thread_Executing->Wait.return_code ) );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
}
/*PAGE
*
* rtems_message_queue_flush
*
* This directive removes all pending messages from a queue and returns
* the number of messages removed. If no messages were present then
* a count of zero is returned.
*
* Input parameters:
* id - queue id
* count - return area for count
*
* Output parameters:
* count - number of messages removed ( 0 = empty queue )
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code rtems_message_queue_flush(
Objects_Id id,
unsigned32 *count
)
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location ) {
case OBJECTS_ERROR:
return RTEMS_INVALID_ID;
case OBJECTS_REMOTE:
_Thread_Executing->Wait.return_argument = count;
return
_Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_FLUSH_REQUEST,
id,
0, /* buffer not used */
0, /* size */
0, /* option_set not used */
MPCI_DEFAULT_TIMEOUT
);
case OBJECTS_LOCAL:
*count = _CORE_message_queue_Flush( &the_message_queue->message_queue );
_Thread_Enable_dispatch();
return RTEMS_SUCCESSFUL;
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
}
/*PAGE
*
* _Message_queue_Submit
*
* This routine implements the directives rtems_message_queue_send
* and rtems_message_queue_urgent. It processes a message that is
* to be submitted to the designated message queue. The message will
* either be processed as a send send message which it will be inserted
* at the rear of the queue or it will be processed as an urgent message
* which will be inserted at the front of the queue.
*
* Input parameters:
* id - pointer to message queue
* buffer - pointer to message buffer
* size - size in bytes of message to send
* submit_type - send or urgent message
*
* Output parameters:
* RTEMS_SUCCESSFUL - if successful
* error code - if unsuccessful
*/
rtems_status_code _Message_queue_Submit(
Objects_Id id,
void *buffer,
unsigned32 size,
Message_queue_Submit_types submit_type
)
{
register Message_queue_Control *the_message_queue;
Objects_Locations location;
CORE_message_queue_Status core_status;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location )
{
case OBJECTS_ERROR:
return RTEMS_INVALID_ID;
case OBJECTS_REMOTE:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
return
_Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_SEND_REQUEST,
id,
buffer,
&size,
0, /* option_set */
MPCI_DEFAULT_TIMEOUT
);
case MESSAGE_QUEUE_URGENT_REQUEST:
return
_Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_URGENT_REQUEST,
id,
buffer,
&size,
0, /* option_set */
MPCI_DEFAULT_TIMEOUT
);
}
case OBJECTS_LOCAL:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
core_status = _CORE_message_queue_Send(
&the_message_queue->message_queue,
buffer,
size,
id,
_Message_queue_Core_message_queue_mp_support
);
break;
case MESSAGE_QUEUE_URGENT_REQUEST:
core_status = _CORE_message_queue_Urgent(
&the_message_queue->message_queue,
buffer,
size,
id,
_Message_queue_Core_message_queue_mp_support
);
break;
default:
core_status = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
return RTEMS_INTERNAL_ERROR; /* should never get here */
}
_Thread_Enable_dispatch();
return _Message_queue_Translate_core_message_queue_return_code(
core_status );
}
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
}
/*PAGE
*
* _Message_queue_Translate_core_message_queue_return_code
*
* Input parameters:
* the_message_queue_status - message_queue status code to translate
*
* Output parameters:
* rtems status code - translated RTEMS status code
*
*/
rtems_status_code _Message_queue_Translate_core_message_queue_return_code (
unsigned32 the_message_queue_status
)
{
switch ( the_message_queue_status ) {
case CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL:
return RTEMS_SUCCESSFUL;
case CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE:
return RTEMS_INVALID_SIZE;
case CORE_MESSAGE_QUEUE_STATUS_TOO_MANY:
return RTEMS_TOO_MANY;
case CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED:
return RTEMS_UNSATISFIED;
case CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_NOWAIT:
return RTEMS_UNSATISFIED;
case CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED:
return RTEMS_OBJECT_WAS_DELETED;
case CORE_MESSAGE_QUEUE_STATUS_TIMEOUT:
return RTEMS_TIMEOUT;
case THREAD_STATUS_PROXY_BLOCKING:
return THREAD_STATUS_PROXY_BLOCKING;
}
_Internal_error_Occurred( /* XXX */
INTERNAL_ERROR_RTEMS_API,
TRUE,
the_message_queue_status
);
return RTEMS_INTERNAL_ERROR; /* unreached - only to remove warnings */
}
/*PAGE
*
* _Message_queue_Core_message_queue_mp_support
*
* Input parameters:
* the_thread - the remote thread the message was submitted to
* id - id of the message queue
*
* Output parameters: NONE
*/
void _Message_queue_Core_message_queue_mp_support (
Thread_Control *the_thread,
Objects_Id id
)
{
the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
_Message_queue_MP_Send_response_packet(
MESSAGE_QUEUE_MP_RECEIVE_RESPONSE,
id,
the_thread
);
}