From 3b438fa4b0284d340b60d865f4afcb76f363bc90 Mon Sep 17 00:00:00 2001 From: Joel Sherrill Date: Thu, 17 Aug 1995 19:39:31 +0000 Subject: variable length messages --- c/src/exec/rtems/src/msg.c | 270 +++++++++++++++++++++++++++++-------------- c/src/exec/rtems/src/msgmp.c | 118 ++++++++++++++----- 2 files changed, 269 insertions(+), 119 deletions(-) (limited to 'c/src/exec/rtems') diff --git a/c/src/exec/rtems/src/msg.c b/c/src/exec/rtems/src/msg.c index 7cfe683ce5..9b1c6acef5 100644 --- a/c/src/exec/rtems/src/msg.c +++ b/c/src/exec/rtems/src/msg.c @@ -35,41 +35,71 @@ * * Input parameters: * maximum_message_queues - number of message queues to initialize - * maximum_message - number of messages per queue * * Output parameters: NONE */ void _Message_queue_Manager_initialization( - unsigned32 maximum_message_queues, - unsigned32 maximum_messages + unsigned32 maximum_message_queues ) { - _Objects_Initialize_information( &_Message_queue_Information, TRUE, maximum_message_queues, sizeof( Message_queue_Control ) ); +} - if ( maximum_messages == 0 ) { - - _Chain_Initialize_empty( &_Message_queue_Inactive_messages ); - - } else { - - - _Chain_Initialize( - &_Message_queue_Inactive_messages, - _Workspace_Allocate_or_fatal_error( - maximum_messages * sizeof( Message_queue_Buffer_control ) - ), - maximum_messages, - sizeof( Message_queue_Buffer_control ) - ); +/*PAGE + * + * _Message_queue_Allocate + * + * Allocate a message queue and the space for its messages + */ - } +Message_queue_Control *_Message_queue_Allocate ( + unsigned32 count, + unsigned32 max_message_size +) +{ + Message_queue_Control *mq = 0; + unsigned32 message_buffering_required; + unsigned32 allocated_message_size; + + mq = (Message_queue_Control *)_Objects_Allocate(&_Message_queue_Information); + if (mq == 0) + goto failed; + + mq->maximum_message_size = max_message_size; + + /* + * round size up to multiple of a ptr for chain init + */ + + allocated_message_size = max_message_size; + if (allocated_message_size & (sizeof(unsigned32) - 1)) + { + allocated_message_size += sizeof(unsigned32); + allocated_message_size &= ~(sizeof(unsigned32) - 1); + } + + message_buffering_required = count * (allocated_message_size + sizeof(Message_queue_Buffer_control)); + + mq->message_buffers = (Message_queue_Buffer *) _Workspace_Allocate(message_buffering_required); + if (mq->message_buffers == 0) + goto failed; + + _Chain_Initialize(&mq->Inactive_messages, + mq->message_buffers, + count, + allocated_message_size + sizeof(Message_queue_Buffer_control)); + return mq; + +failed: + if (mq) + _Message_queue_Free(mq); + return (Message_queue_Control *) 0; } /*PAGE @@ -82,6 +112,7 @@ void _Message_queue_Manager_initialization( * Input parameters: * name - user defined queue name * count - maximum message and reserved buffer count + * max_message_size - maximum size of each message * attribute_set - process method * id - pointer to queue * @@ -94,7 +125,8 @@ void _Message_queue_Manager_initialization( rtems_status_code rtems_message_queue_create( Objects_Name name, unsigned32 count, - rtems_attribute attribute_set, + unsigned32 max_message_size, + rtems_attribute attribute_set, Objects_Id *id ) { @@ -107,9 +139,30 @@ rtems_status_code rtems_message_queue_create( !_Configuration_Is_multiprocessing() ) return( RTEMS_MP_NOT_CONFIGURED ); + if (count == 0) + return RTEMS_INVALID_NUMBER; + + if (max_message_size == 0) + return RTEMS_INVALID_SIZE; + +#if 1 + /* + * I am not 100% sure this should be an error. + * It seems reasonable to create a que with a large max size, + * and then just send smaller msgs from remote (or all) nodes. + */ + + if ( _Attributes_Is_global( attribute_set ) && + _Configuration_MPCI_table && + (_Configuration_MPCI_table->maximum_packet_size < max_message_size)) + { + return RTEMS_INVALID_SIZE; + } +#endif + _Thread_Disable_dispatch(); /* protects object pointer */ - the_message_queue = _Message_queue_Allocate(); + the_message_queue = _Message_queue_Allocate(count, max_message_size); if ( !the_message_queue ) { _Thread_Enable_dispatch(); @@ -121,15 +174,12 @@ rtems_status_code rtems_message_queue_create( the_message_queue->Object.id, FALSE ) ) ) { _Message_queue_Free( the_message_queue ); _Thread_Enable_dispatch(); - return( RTEMS_TOO_MANY ); + return RTEMS_TOO_MANY; } - if ( _Attributes_Is_limit( attribute_set ) ) - the_message_queue->maximum_pending_messages = count; - else - the_message_queue->maximum_pending_messages = 0xffffffff; + the_message_queue->maximum_pending_messages = count; - the_message_queue->attribute_set = attribute_set; + the_message_queue->attribute_set = attribute_set; the_message_queue->number_of_pending_messages = 0; _Chain_Initialize_empty( &the_message_queue->Pending_messages ); @@ -264,15 +314,11 @@ rtems_status_code rtems_message_queue_delete( rtems_status_code rtems_message_queue_send( Objects_Id id, - void *buffer + void *buffer, + unsigned32 size ) { - return( _Message_queue_Submit( - id, - (Message_queue_Buffer *) buffer, - MESSAGE_QUEUE_SEND_REQUEST - ) - ); + return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_SEND_REQUEST); } /*PAGE @@ -293,15 +339,11 @@ rtems_status_code rtems_message_queue_send( rtems_status_code rtems_message_queue_urgent( Objects_Id id, - void *buffer + void *buffer, + unsigned32 size ) { - return( _Message_queue_Submit( - id, - (Message_queue_Buffer *) buffer, - MESSAGE_QUEUE_URGENT_REQUEST - ) - ); + return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_URGENT_REQUEST); } /*PAGE @@ -325,6 +367,7 @@ rtems_status_code rtems_message_queue_urgent( rtems_status_code rtems_message_queue_broadcast( Objects_Id id, void *buffer, + unsigned32 size, unsigned32 *count ) { @@ -344,21 +387,33 @@ rtems_status_code rtems_message_queue_broadcast( _Message_queue_MP_Send_request_packet( MESSAGE_QUEUE_MP_BROADCAST_REQUEST, id, - (Message_queue_Buffer *) buffer, - 0, /* Not used */ + buffer, + &size, + 0, /* option_set not used */ MPCI_DEFAULT_TIMEOUT ); case OBJECTS_LOCAL: + { + Thread_Wait_information *waitp; + unsigned32 constrained_size; + number_broadcasted = 0; while ( (the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) { + waitp = &the_thread->Wait; number_broadcasted += 1; - _Message_queue_Copy_buffer( - (Message_queue_Buffer *) buffer, - the_thread->Wait.return_argument - ); + constrained_size = size; + if (size > the_message_queue->maximum_message_size) + constrained_size = the_message_queue->maximum_message_size; + + _Message_queue_Copy_buffer(buffer, + waitp->return_argument, + constrained_size); + + *waitp->Extra.message_size_p = constrained_size; + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) { the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL; @@ -372,9 +427,11 @@ rtems_status_code rtems_message_queue_broadcast( _Thread_Enable_dispatch(); *count = number_broadcasted; return( RTEMS_SUCCESSFUL ); - } + } - return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ + default: + return RTEMS_INTERNAL_ERROR; + } } /*PAGE @@ -398,33 +455,42 @@ rtems_status_code rtems_message_queue_broadcast( rtems_status_code rtems_message_queue_receive( Objects_Id id, void *buffer, + unsigned32 *size_p, unsigned32 option_set, - rtems_interval timeout + rtems_interval timeout ) { register Message_queue_Control *the_message_queue; - Objects_Locations location; + Objects_Locations location; the_message_queue = _Message_queue_Get( id, &location ); switch ( location ) { + case OBJECTS_ERROR: return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: _Thread_Executing->Wait.return_argument = buffer; - return - _Message_queue_MP_Send_request_packet( + + return _Message_queue_MP_Send_request_packet( MESSAGE_QUEUE_MP_RECEIVE_REQUEST, id, buffer, + size_p, option_set, timeout ); case OBJECTS_LOCAL: - if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) ) + if ( ! _Message_queue_Seize(the_message_queue, + option_set, + buffer, + size_p)) + { _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout ); + } _Thread_Enable_dispatch(); - return( _Thread_Executing->Wait.return_code ); + return _Thread_Executing->Wait.return_code; } return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ @@ -467,8 +533,9 @@ rtems_status_code rtems_message_queue_flush( _Message_queue_MP_Send_request_packet( MESSAGE_QUEUE_MP_FLUSH_REQUEST, id, - 0, /* Not used */ - 0, /* Not used */ + 0, /* buffer not used */ + 0, /* size_p */ + 0, /* option_set not used */ MPCI_DEFAULT_TIMEOUT ); @@ -511,7 +578,8 @@ rtems_status_code rtems_message_queue_flush( boolean _Message_queue_Seize( Message_queue_Control *the_message_queue, rtems_option option_set, - Message_queue_Buffer *buffer + void *buffer, + unsigned32 *size_p ) { ISR_Level level; @@ -526,8 +594,9 @@ boolean _Message_queue_Seize( the_message = _Message_queue_Get_pending_message( the_message_queue ); _ISR_Enable( level ); - _Message_queue_Copy_buffer( &the_message->Contents, buffer ); - _Message_queue_Free_message_buffer( the_message ); + *size_p = the_message->Contents.size; + _Message_queue_Copy_buffer( the_message->Contents.buffer, buffer, *size_p ); + _Message_queue_Free_message_buffer(the_message_queue, the_message ); return( TRUE ); } @@ -542,8 +611,9 @@ boolean _Message_queue_Seize( executing->Wait.id = the_message_queue->Object.id; executing->Wait.option_set = option_set; executing->Wait.return_argument = (unsigned32 *)buffer; + executing->Wait.Extra.message_size_p = size_p; _ISR_Enable( level ); - return( FALSE ); + return FALSE; } /*PAGE @@ -567,29 +637,29 @@ unsigned32 _Message_queue_Flush_support( Message_queue_Control *the_message_queue ) { - ISR_Level level; + ISR_Level level; Chain_Node *inactive_first; Chain_Node *message_queue_first; Chain_Node *message_queue_last; - unsigned32 count; + unsigned32 count; _ISR_Disable( level ); - inactive_first = _Message_queue_Inactive_messages.first; + inactive_first = the_message_queue->Inactive_messages.first; message_queue_first = the_message_queue->Pending_messages.first; message_queue_last = the_message_queue->Pending_messages.last; - _Message_queue_Inactive_messages.first = message_queue_first; - message_queue_last->next = inactive_first; - inactive_first->previous = message_queue_last; + the_message_queue->Inactive_messages.first = message_queue_first; + message_queue_last->next = inactive_first; + inactive_first->previous = message_queue_last; message_queue_first->previous = - _Chain_Head( &_Message_queue_Inactive_messages ); + _Chain_Head( &the_message_queue->Inactive_messages ); _Chain_Initialize_empty( &the_message_queue->Pending_messages ); count = the_message_queue->number_of_pending_messages; the_message_queue->number_of_pending_messages = 0; _ISR_Enable( level ); - return( count ); + return count; } /*PAGE @@ -605,17 +675,19 @@ unsigned32 _Message_queue_Flush_support( * * Input parameters: * id - pointer to message queue - * the_buffer - pointer to message buffer + * buffer - pointer to message buffer + * size - size in bytes of message to send * submit_type - send or urgent message * * Output parameters: * RTEMS_SUCCESSFUL - if successful - * error code - if unsuccessful + * error code - if unsuccessful */ rtems_status_code _Message_queue_Submit( Objects_Id id, - Message_queue_Buffer *buffer, + void *buffer, + unsigned32 size, Message_queue_Submit_types submit_type ) { @@ -625,9 +697,11 @@ rtems_status_code _Message_queue_Submit( Message_queue_Buffer_control *the_message; the_message_queue = _Message_queue_Get( id, &location ); - switch ( location ) { + switch ( location ) + { case OBJECTS_ERROR: return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: switch ( submit_type ) { case MESSAGE_QUEUE_SEND_REQUEST: @@ -636,7 +710,8 @@ rtems_status_code _Message_queue_Submit( MESSAGE_QUEUE_MP_SEND_REQUEST, id, buffer, - 0, /* Not used */ + &size, + 0, /* option_set */ MPCI_DEFAULT_TIMEOUT ); @@ -646,20 +721,33 @@ rtems_status_code _Message_queue_Submit( MESSAGE_QUEUE_MP_URGENT_REQUEST, id, buffer, - 0, /* Not used */ + &size, + 0, /* option_set */ MPCI_DEFAULT_TIMEOUT ); } - case OBJECTS_LOCAL: - the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); - if ( the_thread ) { + case OBJECTS_LOCAL: + if (size > the_message_queue->maximum_message_size) + { + _Thread_Enable_dispatch(); + return RTEMS_INVALID_SIZE; + } + /* + * Is there a thread currently waiting on this message queue? + */ + + the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); + if ( the_thread ) + { _Message_queue_Copy_buffer( buffer, - the_thread->Wait.return_argument + the_thread->Wait.return_argument, + size ); - + *the_thread->Wait.Extra.message_size_p = size; + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) { the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL; @@ -674,21 +762,26 @@ rtems_status_code _Message_queue_Submit( return( RTEMS_SUCCESSFUL ); } + /* + * No one waiting on this one currently. + * Allocate a message buffer and store it away + */ + if ( the_message_queue->number_of_pending_messages == the_message_queue->maximum_pending_messages ) { _Thread_Enable_dispatch(); return( RTEMS_TOO_MANY ); } - the_message = _Message_queue_Allocate_message_buffer(); - - if ( !the_message ) { + the_message = _Message_queue_Allocate_message_buffer(the_message_queue); + if ( the_message == 0) { _Thread_Enable_dispatch(); return( RTEMS_UNSATISFIED ); } - _Message_queue_Copy_buffer( buffer, &the_message->Contents ); - + _Message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size ); + the_message->Contents.size = size; + the_message_queue->number_of_pending_messages += 1; switch ( submit_type ) { @@ -702,7 +795,8 @@ rtems_status_code _Message_queue_Submit( _Thread_Enable_dispatch(); return( RTEMS_SUCCESSFUL ); + + default: + return RTEMS_INTERNAL_ERROR; /* And they were such nice boys, too! */ } - - return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ } diff --git a/c/src/exec/rtems/src/msgmp.c b/c/src/exec/rtems/src/msgmp.c index d3a1a02f33..75e83b5bf8 100644 --- a/c/src/exec/rtems/src/msgmp.c +++ b/c/src/exec/rtems/src/msgmp.c @@ -21,6 +21,7 @@ #include #include #include +#include /*PAGE * @@ -85,16 +86,16 @@ void _Message_queue_MP_Send_process_packet ( rtems_status_code _Message_queue_MP_Send_request_packet ( Message_queue_MP_Remote_operations operation, Objects_Id message_queue_id, - Message_queue_Buffer *buffer, - rtems_option option_set, - rtems_interval timeout + void *buffer, + unsigned32 *size_p, + rtems_option option_set, + rtems_interval timeout ) { Message_queue_MP_Packet *the_packet; switch ( operation ) { - case MESSAGE_QUEUE_MP_RECEIVE_REQUEST: case MESSAGE_QUEUE_MP_SEND_REQUEST: case MESSAGE_QUEUE_MP_URGENT_REQUEST: case MESSAGE_QUEUE_MP_BROADCAST_REQUEST: @@ -102,25 +103,69 @@ rtems_status_code _Message_queue_MP_Send_request_packet ( the_packet = _Message_queue_MP_Get_packet(); the_packet->Prefix.the_class = RTEMS_MP_PACKET_MESSAGE_QUEUE; - the_packet->Prefix.length = sizeof ( Message_queue_MP_Packet ); - the_packet->Prefix.to_convert = sizeof ( Message_queue_MP_Packet ) - - sizeof ( Message_queue_Buffer ); + the_packet->Prefix.length = sizeof(Message_queue_MP_Packet); + if ( size_p ) + the_packet->Prefix.length += *size_p; + the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet); + + /* + * make sure message is not too big for our MPCI driver + * We have to check it here instead of waiting for MPCI because + * we are about to slam in the payload + */ + + if (the_packet->Prefix.length > + _Configuration_MPCI_table->maximum_packet_size) + { + _Thread_Enable_dispatch(); + return RTEMS_INVALID_SIZE; + } + if ( ! _Options_Is_no_wait(option_set)) the_packet->Prefix.timeout = timeout; - the_packet->operation = operation; - the_packet->Prefix.id = message_queue_id; - the_packet->option_set = option_set; + the_packet->operation = operation; + the_packet->Prefix.id = message_queue_id; + the_packet->option_set = option_set; + + /* + * Copy the data into place if needed + */ + + if (buffer) + { + the_packet->Buffer.size = *size_p; + _Message_queue_Copy_buffer(buffer, + the_packet->Buffer.buffer, + *size_p); + } + + return _MPCI_Send_request_packet(rtems_get_node(message_queue_id), + &the_packet->Prefix, + STATES_WAITING_FOR_MESSAGE); + break; - if ( buffer ) - _Message_queue_Copy_buffer( buffer, &the_packet->Buffer ); + case MESSAGE_QUEUE_MP_RECEIVE_REQUEST: - return - _MPCI_Send_request_packet( - rtems_get_node( message_queue_id ), - &the_packet->Prefix, - STATES_WAITING_FOR_MESSAGE - ); + the_packet = _Message_queue_MP_Get_packet(); + the_packet->Prefix.the_class = RTEMS_MP_PACKET_MESSAGE_QUEUE; + the_packet->Prefix.length = sizeof(Message_queue_MP_Packet); + the_packet->Prefix.to_convert = sizeof(Message_queue_MP_Packet); + + if ( ! _Options_Is_no_wait(option_set)) + the_packet->Prefix.timeout = timeout; + + the_packet->operation = MESSAGE_QUEUE_MP_RECEIVE_REQUEST; + the_packet->Prefix.id = message_queue_id; + the_packet->option_set = option_set; + the_packet->size = 0; /* just in case of an error */ + + _Thread_Executing->Wait.return_argument = (unsigned32 *)buffer; + _Thread_Executing->Wait.Extra.message_size_p = size_p; + + return _MPCI_Send_request_packet(rtems_get_node(message_queue_id), + &the_packet->Prefix, + STATES_WAITING_FOR_MESSAGE); break; case MESSAGE_QUEUE_MP_ANNOUNCE_CREATE: @@ -132,12 +177,8 @@ rtems_status_code _Message_queue_MP_Send_request_packet ( case MESSAGE_QUEUE_MP_BROADCAST_RESPONSE: case MESSAGE_QUEUE_MP_FLUSH_RESPONSE: break; - } - /* - * The following line is included to satisfy compilers which - * produce warnings when a function does not end with a return. - */ + return RTEMS_SUCCESSFUL; } @@ -168,10 +209,16 @@ void _Message_queue_MP_Send_response_packet ( /* * The packet being returned already contains the class, length, and * to_convert fields, therefore they are not set in this routine. + * + * Exception: MESSAGE_QUEUE_MP_RECEIVE_RESPONSE needs payload length + * added to 'length' */ the_packet->operation = operation; the_packet->Prefix.id = the_packet->Prefix.source_tid; + if (operation == MESSAGE_QUEUE_MP_RECEIVE_RESPONSE) + the_packet->Prefix.length += the_packet->size; + _MPCI_Send_response_packet( rtems_get_node( the_packet->Prefix.source_tid ), &the_packet->Prefix @@ -243,7 +290,8 @@ void _Message_queue_MP_Process_packet ( the_packet->Prefix.return_code = rtems_message_queue_receive( the_packet->Prefix.id, - &the_packet->Buffer, + the_packet->Buffer.buffer, + &the_packet->size, the_packet->option_set, the_packet->Prefix.timeout ); @@ -260,10 +308,15 @@ void _Message_queue_MP_Process_packet ( the_thread = _MPCI_Process_response( the_packet_prefix ); - _Message_queue_Copy_buffer( - &the_packet->Buffer, - (Message_queue_Buffer *) the_thread->Wait.return_argument - ); + if (the_packet->Prefix.return_code == RTEMS_SUCCESSFUL) { + *the_thread->Wait.Extra.message_size_p = the_packet->size; + + _Message_queue_Copy_buffer( + the_packet->Buffer.buffer, + the_thread->Wait.return_argument, + the_packet->size + ); + } _MPCI_Return_packet( the_packet_prefix ); break; @@ -272,7 +325,8 @@ void _Message_queue_MP_Process_packet ( the_packet->Prefix.return_code = rtems_message_queue_send( the_packet->Prefix.id, - &the_packet->Buffer + the_packet->Buffer.buffer, + the_packet->Buffer.size ); _Message_queue_MP_Send_response_packet( @@ -294,7 +348,8 @@ void _Message_queue_MP_Process_packet ( the_packet->Prefix.return_code = rtems_message_queue_urgent( the_packet->Prefix.id, - &the_packet->Buffer + the_packet->Buffer.buffer, + the_packet->Buffer.size ); _Message_queue_MP_Send_response_packet( @@ -308,7 +363,8 @@ void _Message_queue_MP_Process_packet ( the_packet->Prefix.return_code = rtems_message_queue_broadcast( the_packet->Prefix.id, - &the_packet->Buffer, + the_packet->Buffer.buffer, + the_packet->Buffer.size, &the_packet->count ); -- cgit v1.2.3