summaryrefslogtreecommitdiffstats
path: root/cpukit
diff options
context:
space:
mode:
Diffstat (limited to 'cpukit')
-rw-r--r--cpukit/rtems/src/msg.c270
-rw-r--r--cpukit/rtems/src/msgmp.c118
2 files changed, 269 insertions, 119 deletions
diff --git a/cpukit/rtems/src/msg.c b/cpukit/rtems/src/msg.c
index 7cfe683ce5..9b1c6acef5 100644
--- a/cpukit/rtems/src/msg.c
+++ b/cpukit/rtems/src/msg.c
@@ -35,41 +35,71 @@
*
* Input parameters:
* maximum_message_queues - number of message queues to initialize
- * maximum_message - number of messages per queue
*
* Output parameters: NONE
*/
void _Message_queue_Manager_initialization(
- unsigned32 maximum_message_queues,
- unsigned32 maximum_messages
+ unsigned32 maximum_message_queues
)
{
-
_Objects_Initialize_information(
&_Message_queue_Information,
TRUE,
maximum_message_queues,
sizeof( Message_queue_Control )
);
+}
- if ( maximum_messages == 0 ) {
-
- _Chain_Initialize_empty( &_Message_queue_Inactive_messages );
-
- } else {
-
-
- _Chain_Initialize(
- &_Message_queue_Inactive_messages,
- _Workspace_Allocate_or_fatal_error(
- maximum_messages * sizeof( Message_queue_Buffer_control )
- ),
- maximum_messages,
- sizeof( Message_queue_Buffer_control )
- );
+/*PAGE
+ *
+ * _Message_queue_Allocate
+ *
+ * Allocate a message queue and the space for its messages
+ */
- }
+Message_queue_Control *_Message_queue_Allocate (
+ unsigned32 count,
+ unsigned32 max_message_size
+)
+{
+ Message_queue_Control *mq = 0;
+ unsigned32 message_buffering_required;
+ unsigned32 allocated_message_size;
+
+ mq = (Message_queue_Control *)_Objects_Allocate(&_Message_queue_Information);
+ if (mq == 0)
+ goto failed;
+
+ mq->maximum_message_size = max_message_size;
+
+ /*
+ * round size up to multiple of a ptr for chain init
+ */
+
+ allocated_message_size = max_message_size;
+ if (allocated_message_size & (sizeof(unsigned32) - 1))
+ {
+ allocated_message_size += sizeof(unsigned32);
+ allocated_message_size &= ~(sizeof(unsigned32) - 1);
+ }
+
+ message_buffering_required = count * (allocated_message_size + sizeof(Message_queue_Buffer_control));
+
+ mq->message_buffers = (Message_queue_Buffer *) _Workspace_Allocate(message_buffering_required);
+ if (mq->message_buffers == 0)
+ goto failed;
+
+ _Chain_Initialize(&mq->Inactive_messages,
+ mq->message_buffers,
+ count,
+ allocated_message_size + sizeof(Message_queue_Buffer_control));
+ return mq;
+
+failed:
+ if (mq)
+ _Message_queue_Free(mq);
+ return (Message_queue_Control *) 0;
}
/*PAGE
@@ -82,6 +112,7 @@ void _Message_queue_Manager_initialization(
* Input parameters:
* name - user defined queue name
* count - maximum message and reserved buffer count
+ * max_message_size - maximum size of each message
* attribute_set - process method
* id - pointer to queue
*
@@ -94,7 +125,8 @@ void _Message_queue_Manager_initialization(
rtems_status_code rtems_message_queue_create(
Objects_Name name,
unsigned32 count,
- rtems_attribute attribute_set,
+ unsigned32 max_message_size,
+ rtems_attribute attribute_set,
Objects_Id *id
)
{
@@ -107,9 +139,30 @@ rtems_status_code rtems_message_queue_create(
!_Configuration_Is_multiprocessing() )
return( RTEMS_MP_NOT_CONFIGURED );
+ if (count == 0)
+ return RTEMS_INVALID_NUMBER;
+
+ if (max_message_size == 0)
+ return RTEMS_INVALID_SIZE;
+
+#if 1
+ /*
+ * I am not 100% sure this should be an error.
+ * It seems reasonable to create a que with a large max size,
+ * and then just send smaller msgs from remote (or all) nodes.
+ */
+
+ if ( _Attributes_Is_global( attribute_set ) &&
+ _Configuration_MPCI_table &&
+ (_Configuration_MPCI_table->maximum_packet_size < max_message_size))
+ {
+ return RTEMS_INVALID_SIZE;
+ }
+#endif
+
_Thread_Disable_dispatch(); /* protects object pointer */
- the_message_queue = _Message_queue_Allocate();
+ the_message_queue = _Message_queue_Allocate(count, max_message_size);
if ( !the_message_queue ) {
_Thread_Enable_dispatch();
@@ -121,15 +174,12 @@ rtems_status_code rtems_message_queue_create(
the_message_queue->Object.id, FALSE ) ) ) {
_Message_queue_Free( the_message_queue );
_Thread_Enable_dispatch();
- return( RTEMS_TOO_MANY );
+ return RTEMS_TOO_MANY;
}
- if ( _Attributes_Is_limit( attribute_set ) )
- the_message_queue->maximum_pending_messages = count;
- else
- the_message_queue->maximum_pending_messages = 0xffffffff;
+ the_message_queue->maximum_pending_messages = count;
- the_message_queue->attribute_set = attribute_set;
+ the_message_queue->attribute_set = attribute_set;
the_message_queue->number_of_pending_messages = 0;
_Chain_Initialize_empty( &the_message_queue->Pending_messages );
@@ -264,15 +314,11 @@ rtems_status_code rtems_message_queue_delete(
rtems_status_code rtems_message_queue_send(
Objects_Id id,
- void *buffer
+ void *buffer,
+ unsigned32 size
)
{
- return( _Message_queue_Submit(
- id,
- (Message_queue_Buffer *) buffer,
- MESSAGE_QUEUE_SEND_REQUEST
- )
- );
+ return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_SEND_REQUEST);
}
/*PAGE
@@ -293,15 +339,11 @@ rtems_status_code rtems_message_queue_send(
rtems_status_code rtems_message_queue_urgent(
Objects_Id id,
- void *buffer
+ void *buffer,
+ unsigned32 size
)
{
- return( _Message_queue_Submit(
- id,
- (Message_queue_Buffer *) buffer,
- MESSAGE_QUEUE_URGENT_REQUEST
- )
- );
+ return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_URGENT_REQUEST);
}
/*PAGE
@@ -325,6 +367,7 @@ rtems_status_code rtems_message_queue_urgent(
rtems_status_code rtems_message_queue_broadcast(
Objects_Id id,
void *buffer,
+ unsigned32 size,
unsigned32 *count
)
{
@@ -344,21 +387,33 @@ rtems_status_code rtems_message_queue_broadcast(
_Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_BROADCAST_REQUEST,
id,
- (Message_queue_Buffer *) buffer,
- 0, /* Not used */
+ buffer,
+ &size,
+ 0, /* option_set not used */
MPCI_DEFAULT_TIMEOUT
);
case OBJECTS_LOCAL:
+ {
+ Thread_Wait_information *waitp;
+ unsigned32 constrained_size;
+
number_broadcasted = 0;
while ( (the_thread =
_Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) {
+ waitp = &the_thread->Wait;
number_broadcasted += 1;
- _Message_queue_Copy_buffer(
- (Message_queue_Buffer *) buffer,
- the_thread->Wait.return_argument
- );
+ constrained_size = size;
+ if (size > the_message_queue->maximum_message_size)
+ constrained_size = the_message_queue->maximum_message_size;
+
+ _Message_queue_Copy_buffer(buffer,
+ waitp->return_argument,
+ constrained_size);
+
+ *waitp->Extra.message_size_p = constrained_size;
+
if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
@@ -372,9 +427,11 @@ rtems_status_code rtems_message_queue_broadcast(
_Thread_Enable_dispatch();
*count = number_broadcasted;
return( RTEMS_SUCCESSFUL );
- }
+ }
- return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
+ default:
+ return RTEMS_INTERNAL_ERROR;
+ }
}
/*PAGE
@@ -398,33 +455,42 @@ rtems_status_code rtems_message_queue_broadcast(
rtems_status_code rtems_message_queue_receive(
Objects_Id id,
void *buffer,
+ unsigned32 *size_p,
unsigned32 option_set,
- rtems_interval timeout
+ rtems_interval timeout
)
{
register Message_queue_Control *the_message_queue;
- Objects_Locations location;
+ Objects_Locations location;
the_message_queue = _Message_queue_Get( id, &location );
switch ( location ) {
+
case OBJECTS_ERROR:
return( RTEMS_INVALID_ID );
+
case OBJECTS_REMOTE:
_Thread_Executing->Wait.return_argument = buffer;
- return
- _Message_queue_MP_Send_request_packet(
+
+ return _Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_RECEIVE_REQUEST,
id,
buffer,
+ size_p,
option_set,
timeout
);
case OBJECTS_LOCAL:
- if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) )
+ if ( ! _Message_queue_Seize(the_message_queue,
+ option_set,
+ buffer,
+ size_p))
+ {
_Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout );
+ }
_Thread_Enable_dispatch();
- return( _Thread_Executing->Wait.return_code );
+ return _Thread_Executing->Wait.return_code;
}
return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
@@ -467,8 +533,9 @@ rtems_status_code rtems_message_queue_flush(
_Message_queue_MP_Send_request_packet(
MESSAGE_QUEUE_MP_FLUSH_REQUEST,
id,
- 0, /* Not used */
- 0, /* Not used */
+ 0, /* buffer not used */
+ 0, /* size_p */
+ 0, /* option_set not used */
MPCI_DEFAULT_TIMEOUT
);
@@ -511,7 +578,8 @@ rtems_status_code rtems_message_queue_flush(
boolean _Message_queue_Seize(
Message_queue_Control *the_message_queue,
rtems_option option_set,
- Message_queue_Buffer *buffer
+ void *buffer,
+ unsigned32 *size_p
)
{
ISR_Level level;
@@ -526,8 +594,9 @@ boolean _Message_queue_Seize(
the_message = _Message_queue_Get_pending_message( the_message_queue );
_ISR_Enable( level );
- _Message_queue_Copy_buffer( &the_message->Contents, buffer );
- _Message_queue_Free_message_buffer( the_message );
+ *size_p = the_message->Contents.size;
+ _Message_queue_Copy_buffer( the_message->Contents.buffer, buffer, *size_p );
+ _Message_queue_Free_message_buffer(the_message_queue, the_message );
return( TRUE );
}
@@ -542,8 +611,9 @@ boolean _Message_queue_Seize(
executing->Wait.id = the_message_queue->Object.id;
executing->Wait.option_set = option_set;
executing->Wait.return_argument = (unsigned32 *)buffer;
+ executing->Wait.Extra.message_size_p = size_p;
_ISR_Enable( level );
- return( FALSE );
+ return FALSE;
}
/*PAGE
@@ -567,29 +637,29 @@ unsigned32 _Message_queue_Flush_support(
Message_queue_Control *the_message_queue
)
{
- ISR_Level level;
+ ISR_Level level;
Chain_Node *inactive_first;
Chain_Node *message_queue_first;
Chain_Node *message_queue_last;
- unsigned32 count;
+ unsigned32 count;
_ISR_Disable( level );
- inactive_first = _Message_queue_Inactive_messages.first;
+ inactive_first = the_message_queue->Inactive_messages.first;
message_queue_first = the_message_queue->Pending_messages.first;
message_queue_last = the_message_queue->Pending_messages.last;
- _Message_queue_Inactive_messages.first = message_queue_first;
- message_queue_last->next = inactive_first;
- inactive_first->previous = message_queue_last;
+ the_message_queue->Inactive_messages.first = message_queue_first;
+ message_queue_last->next = inactive_first;
+ inactive_first->previous = message_queue_last;
message_queue_first->previous =
- _Chain_Head( &_Message_queue_Inactive_messages );
+ _Chain_Head( &the_message_queue->Inactive_messages );
_Chain_Initialize_empty( &the_message_queue->Pending_messages );
count = the_message_queue->number_of_pending_messages;
the_message_queue->number_of_pending_messages = 0;
_ISR_Enable( level );
- return( count );
+ return count;
}
/*PAGE
@@ -605,17 +675,19 @@ unsigned32 _Message_queue_Flush_support(
*
* Input parameters:
* id - pointer to message queue
- * the_buffer - pointer to message buffer
+ * buffer - pointer to message buffer
+ * size - size in bytes of message to send
* submit_type - send or urgent message
*
* Output parameters:
* RTEMS_SUCCESSFUL - if successful
- * error code - if unsuccessful
+ * error code - if unsuccessful
*/
rtems_status_code _Message_queue_Submit(
Objects_Id id,
- Message_queue_Buffer *buffer,
+ void *buffer,
+ unsigned32 size,
Message_queue_Submit_types submit_type
)
{
@@ -625,9 +697,11 @@ rtems_status_code _Message_queue_Submit(
Message_queue_Buffer_control *the_message;
the_message_queue = _Message_queue_Get( id, &location );
- switch ( location ) {
+ switch ( location )
+ {
case OBJECTS_ERROR:
return( RTEMS_INVALID_ID );
+
case OBJECTS_REMOTE:
switch ( submit_type ) {
case MESSAGE_QUEUE_SEND_REQUEST:
@@ -636,7 +710,8 @@ rtems_status_code _Message_queue_Submit(
MESSAGE_QUEUE_MP_SEND_REQUEST,
id,
buffer,
- 0, /* Not used */
+ &size,
+ 0, /* option_set */
MPCI_DEFAULT_TIMEOUT
);
@@ -646,20 +721,33 @@ rtems_status_code _Message_queue_Submit(
MESSAGE_QUEUE_MP_URGENT_REQUEST,
id,
buffer,
- 0, /* Not used */
+ &size,
+ 0, /* option_set */
MPCI_DEFAULT_TIMEOUT
);
}
- case OBJECTS_LOCAL:
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( the_thread ) {
+ case OBJECTS_LOCAL:
+ if (size > the_message_queue->maximum_message_size)
+ {
+ _Thread_Enable_dispatch();
+ return RTEMS_INVALID_SIZE;
+ }
+ /*
+ * Is there a thread currently waiting on this message queue?
+ */
+
+ the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
+ if ( the_thread )
+ {
_Message_queue_Copy_buffer(
buffer,
- the_thread->Wait.return_argument
+ the_thread->Wait.return_argument,
+ size
);
-
+ *the_thread->Wait.Extra.message_size_p = size;
+
if ( !_Objects_Is_local_id( the_thread->Object.id ) ) {
the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL;
@@ -674,21 +762,26 @@ rtems_status_code _Message_queue_Submit(
return( RTEMS_SUCCESSFUL );
}
+ /*
+ * No one waiting on this one currently.
+ * Allocate a message buffer and store it away
+ */
+
if ( the_message_queue->number_of_pending_messages ==
the_message_queue->maximum_pending_messages ) {
_Thread_Enable_dispatch();
return( RTEMS_TOO_MANY );
}
- the_message = _Message_queue_Allocate_message_buffer();
-
- if ( !the_message ) {
+ the_message = _Message_queue_Allocate_message_buffer(the_message_queue);
+ if ( the_message == 0) {
_Thread_Enable_dispatch();
return( RTEMS_UNSATISFIED );
}
- _Message_queue_Copy_buffer( buffer, &the_message->Contents );
-
+ _Message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size );
+ the_message->Contents.size = size;
+
the_message_queue->number_of_pending_messages += 1;
switch ( submit_type ) {
@@ -702,7 +795,8 @@ rtems_status_code _Message_queue_Submit(
_Thread_Enable_dispatch();
return( RTEMS_SUCCESSFUL );
+
+ default:
+ return RTEMS_INTERNAL_ERROR; /* And they were such nice boys, too! */
}
-
- return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */
}
diff --git a/cpukit/rtems/src/msgmp.c b/cpukit/rtems/src/msgmp.c
index d3a1a02f33..75e83b5bf8 100644
--- a/cpukit/rtems/src/msgmp.c
+++ b/cpukit/rtems/src/msgmp.c
@@ -21,6 +21,7 @@
#include <rtems/options.h>
#include <rtems/thread.h>
#include <rtems/watchdog.h>
+#include <rtems/config.h>
/*PAGE
*
@@ -85,16 +86,16 @@ void _Message_queue_MP_Send_process_packet (
rtems_status_code _Message_queue_MP_Send_request_packet (
Message_queue_MP_Remote_operations operation,
Objects_Id message_queue_id,
- Message_queue_Buffer *buffer,
- rtems_option option_set,
- rtems_interval timeout
+ void *buffer,
+ unsigned32 *size_p,
+ rtems_option option_set,
+ rtems_interval timeout
)
{
Message_queue_MP_Packet *the_packet;
switch ( operation ) {
- case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
case MESSAGE_QUEUE_MP_SEND_REQUEST:
case MESSAGE_QUEUE_MP_URGENT_REQUEST:
case MESSAGE_QUEUE_MP_BROADCAST_REQUEST:
@@ -102,25 +103,69 @@ rtems_status_code _Message_queue_MP_Send_request_packet (
the_packet = _Message_queue_MP_Get_packet();
the_packet->Prefix.the_class = RTEMS_MP_PACKET_MESSAGE_QUEUE;
- the_packet->Prefix.length = sizeof ( Message_queue_MP_Packet );
- the_packet->Prefix.to_convert = sizeof ( Message_queue_MP_Packet ) -
- sizeof ( Message_queue_Buffer );
+ the_packet->Prefix.length = sizeof(Message_queue_MP_Packet);
+ if ( size_p )
+ the_packet->Prefix.length += *size_p;
+ the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
+
+ /*
+ * make sure message is not too big for our MPCI driver
+ * We have to check it here instead of waiting for MPCI because
+ * we are about to slam in the payload
+ */
+
+ if (the_packet->Prefix.length >
+ _Configuration_MPCI_table->maximum_packet_size)
+ {
+ _Thread_Enable_dispatch();
+ return RTEMS_INVALID_SIZE;
+ }
+
if ( ! _Options_Is_no_wait(option_set))
the_packet->Prefix.timeout = timeout;
- the_packet->operation = operation;
- the_packet->Prefix.id = message_queue_id;
- the_packet->option_set = option_set;
+ the_packet->operation = operation;
+ the_packet->Prefix.id = message_queue_id;
+ the_packet->option_set = option_set;
+
+ /*
+ * Copy the data into place if needed
+ */
+
+ if (buffer)
+ {
+ the_packet->Buffer.size = *size_p;
+ _Message_queue_Copy_buffer(buffer,
+ the_packet->Buffer.buffer,
+ *size_p);
+ }
+
+ return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
+ &the_packet->Prefix,
+ STATES_WAITING_FOR_MESSAGE);
+ break;
- if ( buffer )
- _Message_queue_Copy_buffer( buffer, &the_packet->Buffer );
+ case MESSAGE_QUEUE_MP_RECEIVE_REQUEST:
- return
- _MPCI_Send_request_packet(
- rtems_get_node( message_queue_id ),
- &the_packet->Prefix,
- STATES_WAITING_FOR_MESSAGE
- );
+ the_packet = _Message_queue_MP_Get_packet();
+ the_packet->Prefix.the_class = RTEMS_MP_PACKET_MESSAGE_QUEUE;
+ the_packet->Prefix.length = sizeof(Message_queue_MP_Packet);
+ the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet);
+
+ if ( ! _Options_Is_no_wait(option_set))
+ the_packet->Prefix.timeout = timeout;
+
+ the_packet->operation = MESSAGE_QUEUE_MP_RECEIVE_REQUEST;
+ the_packet->Prefix.id = message_queue_id;
+ the_packet->option_set = option_set;
+ the_packet->size = 0; /* just in case of an error */
+
+ _Thread_Executing->Wait.return_argument = (unsigned32 *)buffer;
+ _Thread_Executing->Wait.Extra.message_size_p = size_p;
+
+ return _MPCI_Send_request_packet(rtems_get_node(message_queue_id),
+ &the_packet->Prefix,
+ STATES_WAITING_FOR_MESSAGE);
break;
case MESSAGE_QUEUE_MP_ANNOUNCE_CREATE:
@@ -132,12 +177,8 @@ rtems_status_code _Message_queue_MP_Send_request_packet (
case MESSAGE_QUEUE_MP_BROADCAST_RESPONSE:
case MESSAGE_QUEUE_MP_FLUSH_RESPONSE:
break;
-
}
- /*
- * The following line is included to satisfy compilers which
- * produce warnings when a function does not end with a return.
- */
+
return RTEMS_SUCCESSFUL;
}
@@ -168,10 +209,16 @@ void _Message_queue_MP_Send_response_packet (
/*
* The packet being returned already contains the class, length, and
* to_convert fields, therefore they are not set in this routine.
+ *
+ * Exception: MESSAGE_QUEUE_MP_RECEIVE_RESPONSE needs payload length
+ * added to 'length'
*/
the_packet->operation = operation;
the_packet->Prefix.id = the_packet->Prefix.source_tid;
+ if (operation == MESSAGE_QUEUE_MP_RECEIVE_RESPONSE)
+ the_packet->Prefix.length += the_packet->size;
+
_MPCI_Send_response_packet(
rtems_get_node( the_packet->Prefix.source_tid ),
&the_packet->Prefix
@@ -243,7 +290,8 @@ void _Message_queue_MP_Process_packet (
the_packet->Prefix.return_code = rtems_message_queue_receive(
the_packet->Prefix.id,
- &the_packet->Buffer,
+ the_packet->Buffer.buffer,
+ &the_packet->size,
the_packet->option_set,
the_packet->Prefix.timeout
);
@@ -260,10 +308,15 @@ void _Message_queue_MP_Process_packet (
the_thread = _MPCI_Process_response( the_packet_prefix );
- _Message_queue_Copy_buffer(
- &the_packet->Buffer,
- (Message_queue_Buffer *) the_thread->Wait.return_argument
- );
+ if (the_packet->Prefix.return_code == RTEMS_SUCCESSFUL) {
+ *the_thread->Wait.Extra.message_size_p = the_packet->size;
+
+ _Message_queue_Copy_buffer(
+ the_packet->Buffer.buffer,
+ the_thread->Wait.return_argument,
+ the_packet->size
+ );
+ }
_MPCI_Return_packet( the_packet_prefix );
break;
@@ -272,7 +325,8 @@ void _Message_queue_MP_Process_packet (
the_packet->Prefix.return_code = rtems_message_queue_send(
the_packet->Prefix.id,
- &the_packet->Buffer
+ the_packet->Buffer.buffer,
+ the_packet->Buffer.size
);
_Message_queue_MP_Send_response_packet(
@@ -294,7 +348,8 @@ void _Message_queue_MP_Process_packet (
the_packet->Prefix.return_code = rtems_message_queue_urgent(
the_packet->Prefix.id,
- &the_packet->Buffer
+ the_packet->Buffer.buffer,
+ the_packet->Buffer.size
);
_Message_queue_MP_Send_response_packet(
@@ -308,7 +363,8 @@ void _Message_queue_MP_Process_packet (
the_packet->Prefix.return_code = rtems_message_queue_broadcast(
the_packet->Prefix.id,
- &the_packet->Buffer,
+ the_packet->Buffer.buffer,
+ the_packet->Buffer.size,
&the_packet->count
);