From 3b438fa4b0284d340b60d865f4afcb76f363bc90 Mon Sep 17 00:00:00 2001 From: Joel Sherrill Date: Thu, 17 Aug 1995 19:39:31 +0000 Subject: variable length messages --- cpukit/rtems/src/msg.c | 270 +++++++++++++++++++++++++++++++++---------------- 1 file changed, 182 insertions(+), 88 deletions(-) (limited to 'cpukit/rtems/src/msg.c') diff --git a/cpukit/rtems/src/msg.c b/cpukit/rtems/src/msg.c index 7cfe683ce5..9b1c6acef5 100644 --- a/cpukit/rtems/src/msg.c +++ b/cpukit/rtems/src/msg.c @@ -35,41 +35,71 @@ * * Input parameters: * maximum_message_queues - number of message queues to initialize - * maximum_message - number of messages per queue * * Output parameters: NONE */ void _Message_queue_Manager_initialization( - unsigned32 maximum_message_queues, - unsigned32 maximum_messages + unsigned32 maximum_message_queues ) { - _Objects_Initialize_information( &_Message_queue_Information, TRUE, maximum_message_queues, sizeof( Message_queue_Control ) ); +} - if ( maximum_messages == 0 ) { - - _Chain_Initialize_empty( &_Message_queue_Inactive_messages ); - - } else { - - - _Chain_Initialize( - &_Message_queue_Inactive_messages, - _Workspace_Allocate_or_fatal_error( - maximum_messages * sizeof( Message_queue_Buffer_control ) - ), - maximum_messages, - sizeof( Message_queue_Buffer_control ) - ); +/*PAGE + * + * _Message_queue_Allocate + * + * Allocate a message queue and the space for its messages + */ - } +Message_queue_Control *_Message_queue_Allocate ( + unsigned32 count, + unsigned32 max_message_size +) +{ + Message_queue_Control *mq = 0; + unsigned32 message_buffering_required; + unsigned32 allocated_message_size; + + mq = (Message_queue_Control *)_Objects_Allocate(&_Message_queue_Information); + if (mq == 0) + goto failed; + + mq->maximum_message_size = max_message_size; + + /* + * round size up to multiple of a ptr for chain init + */ + + allocated_message_size = max_message_size; + if (allocated_message_size & (sizeof(unsigned32) - 1)) + { + allocated_message_size += sizeof(unsigned32); + allocated_message_size &= ~(sizeof(unsigned32) - 1); + } + + message_buffering_required = count * (allocated_message_size + sizeof(Message_queue_Buffer_control)); + + mq->message_buffers = (Message_queue_Buffer *) _Workspace_Allocate(message_buffering_required); + if (mq->message_buffers == 0) + goto failed; + + _Chain_Initialize(&mq->Inactive_messages, + mq->message_buffers, + count, + allocated_message_size + sizeof(Message_queue_Buffer_control)); + return mq; + +failed: + if (mq) + _Message_queue_Free(mq); + return (Message_queue_Control *) 0; } /*PAGE @@ -82,6 +112,7 @@ void _Message_queue_Manager_initialization( * Input parameters: * name - user defined queue name * count - maximum message and reserved buffer count + * max_message_size - maximum size of each message * attribute_set - process method * id - pointer to queue * @@ -94,7 +125,8 @@ void _Message_queue_Manager_initialization( rtems_status_code rtems_message_queue_create( Objects_Name name, unsigned32 count, - rtems_attribute attribute_set, + unsigned32 max_message_size, + rtems_attribute attribute_set, Objects_Id *id ) { @@ -107,9 +139,30 @@ rtems_status_code rtems_message_queue_create( !_Configuration_Is_multiprocessing() ) return( RTEMS_MP_NOT_CONFIGURED ); + if (count == 0) + return RTEMS_INVALID_NUMBER; + + if (max_message_size == 0) + return RTEMS_INVALID_SIZE; + +#if 1 + /* + * I am not 100% sure this should be an error. + * It seems reasonable to create a que with a large max size, + * and then just send smaller msgs from remote (or all) nodes. + */ + + if ( _Attributes_Is_global( attribute_set ) && + _Configuration_MPCI_table && + (_Configuration_MPCI_table->maximum_packet_size < max_message_size)) + { + return RTEMS_INVALID_SIZE; + } +#endif + _Thread_Disable_dispatch(); /* protects object pointer */ - the_message_queue = _Message_queue_Allocate(); + the_message_queue = _Message_queue_Allocate(count, max_message_size); if ( !the_message_queue ) { _Thread_Enable_dispatch(); @@ -121,15 +174,12 @@ rtems_status_code rtems_message_queue_create( the_message_queue->Object.id, FALSE ) ) ) { _Message_queue_Free( the_message_queue ); _Thread_Enable_dispatch(); - return( RTEMS_TOO_MANY ); + return RTEMS_TOO_MANY; } - if ( _Attributes_Is_limit( attribute_set ) ) - the_message_queue->maximum_pending_messages = count; - else - the_message_queue->maximum_pending_messages = 0xffffffff; + the_message_queue->maximum_pending_messages = count; - the_message_queue->attribute_set = attribute_set; + the_message_queue->attribute_set = attribute_set; the_message_queue->number_of_pending_messages = 0; _Chain_Initialize_empty( &the_message_queue->Pending_messages ); @@ -264,15 +314,11 @@ rtems_status_code rtems_message_queue_delete( rtems_status_code rtems_message_queue_send( Objects_Id id, - void *buffer + void *buffer, + unsigned32 size ) { - return( _Message_queue_Submit( - id, - (Message_queue_Buffer *) buffer, - MESSAGE_QUEUE_SEND_REQUEST - ) - ); + return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_SEND_REQUEST); } /*PAGE @@ -293,15 +339,11 @@ rtems_status_code rtems_message_queue_send( rtems_status_code rtems_message_queue_urgent( Objects_Id id, - void *buffer + void *buffer, + unsigned32 size ) { - return( _Message_queue_Submit( - id, - (Message_queue_Buffer *) buffer, - MESSAGE_QUEUE_URGENT_REQUEST - ) - ); + return _Message_queue_Submit(id, buffer, size, MESSAGE_QUEUE_URGENT_REQUEST); } /*PAGE @@ -325,6 +367,7 @@ rtems_status_code rtems_message_queue_urgent( rtems_status_code rtems_message_queue_broadcast( Objects_Id id, void *buffer, + unsigned32 size, unsigned32 *count ) { @@ -344,21 +387,33 @@ rtems_status_code rtems_message_queue_broadcast( _Message_queue_MP_Send_request_packet( MESSAGE_QUEUE_MP_BROADCAST_REQUEST, id, - (Message_queue_Buffer *) buffer, - 0, /* Not used */ + buffer, + &size, + 0, /* option_set not used */ MPCI_DEFAULT_TIMEOUT ); case OBJECTS_LOCAL: + { + Thread_Wait_information *waitp; + unsigned32 constrained_size; + number_broadcasted = 0; while ( (the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue)) ) { + waitp = &the_thread->Wait; number_broadcasted += 1; - _Message_queue_Copy_buffer( - (Message_queue_Buffer *) buffer, - the_thread->Wait.return_argument - ); + constrained_size = size; + if (size > the_message_queue->maximum_message_size) + constrained_size = the_message_queue->maximum_message_size; + + _Message_queue_Copy_buffer(buffer, + waitp->return_argument, + constrained_size); + + *waitp->Extra.message_size_p = constrained_size; + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) { the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL; @@ -372,9 +427,11 @@ rtems_status_code rtems_message_queue_broadcast( _Thread_Enable_dispatch(); *count = number_broadcasted; return( RTEMS_SUCCESSFUL ); - } + } - return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ + default: + return RTEMS_INTERNAL_ERROR; + } } /*PAGE @@ -398,33 +455,42 @@ rtems_status_code rtems_message_queue_broadcast( rtems_status_code rtems_message_queue_receive( Objects_Id id, void *buffer, + unsigned32 *size_p, unsigned32 option_set, - rtems_interval timeout + rtems_interval timeout ) { register Message_queue_Control *the_message_queue; - Objects_Locations location; + Objects_Locations location; the_message_queue = _Message_queue_Get( id, &location ); switch ( location ) { + case OBJECTS_ERROR: return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: _Thread_Executing->Wait.return_argument = buffer; - return - _Message_queue_MP_Send_request_packet( + + return _Message_queue_MP_Send_request_packet( MESSAGE_QUEUE_MP_RECEIVE_REQUEST, id, buffer, + size_p, option_set, timeout ); case OBJECTS_LOCAL: - if ( !_Message_queue_Seize( the_message_queue, option_set, buffer ) ) + if ( ! _Message_queue_Seize(the_message_queue, + option_set, + buffer, + size_p)) + { _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout ); + } _Thread_Enable_dispatch(); - return( _Thread_Executing->Wait.return_code ); + return _Thread_Executing->Wait.return_code; } return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ @@ -467,8 +533,9 @@ rtems_status_code rtems_message_queue_flush( _Message_queue_MP_Send_request_packet( MESSAGE_QUEUE_MP_FLUSH_REQUEST, id, - 0, /* Not used */ - 0, /* Not used */ + 0, /* buffer not used */ + 0, /* size_p */ + 0, /* option_set not used */ MPCI_DEFAULT_TIMEOUT ); @@ -511,7 +578,8 @@ rtems_status_code rtems_message_queue_flush( boolean _Message_queue_Seize( Message_queue_Control *the_message_queue, rtems_option option_set, - Message_queue_Buffer *buffer + void *buffer, + unsigned32 *size_p ) { ISR_Level level; @@ -526,8 +594,9 @@ boolean _Message_queue_Seize( the_message = _Message_queue_Get_pending_message( the_message_queue ); _ISR_Enable( level ); - _Message_queue_Copy_buffer( &the_message->Contents, buffer ); - _Message_queue_Free_message_buffer( the_message ); + *size_p = the_message->Contents.size; + _Message_queue_Copy_buffer( the_message->Contents.buffer, buffer, *size_p ); + _Message_queue_Free_message_buffer(the_message_queue, the_message ); return( TRUE ); } @@ -542,8 +611,9 @@ boolean _Message_queue_Seize( executing->Wait.id = the_message_queue->Object.id; executing->Wait.option_set = option_set; executing->Wait.return_argument = (unsigned32 *)buffer; + executing->Wait.Extra.message_size_p = size_p; _ISR_Enable( level ); - return( FALSE ); + return FALSE; } /*PAGE @@ -567,29 +637,29 @@ unsigned32 _Message_queue_Flush_support( Message_queue_Control *the_message_queue ) { - ISR_Level level; + ISR_Level level; Chain_Node *inactive_first; Chain_Node *message_queue_first; Chain_Node *message_queue_last; - unsigned32 count; + unsigned32 count; _ISR_Disable( level ); - inactive_first = _Message_queue_Inactive_messages.first; + inactive_first = the_message_queue->Inactive_messages.first; message_queue_first = the_message_queue->Pending_messages.first; message_queue_last = the_message_queue->Pending_messages.last; - _Message_queue_Inactive_messages.first = message_queue_first; - message_queue_last->next = inactive_first; - inactive_first->previous = message_queue_last; + the_message_queue->Inactive_messages.first = message_queue_first; + message_queue_last->next = inactive_first; + inactive_first->previous = message_queue_last; message_queue_first->previous = - _Chain_Head( &_Message_queue_Inactive_messages ); + _Chain_Head( &the_message_queue->Inactive_messages ); _Chain_Initialize_empty( &the_message_queue->Pending_messages ); count = the_message_queue->number_of_pending_messages; the_message_queue->number_of_pending_messages = 0; _ISR_Enable( level ); - return( count ); + return count; } /*PAGE @@ -605,17 +675,19 @@ unsigned32 _Message_queue_Flush_support( * * Input parameters: * id - pointer to message queue - * the_buffer - pointer to message buffer + * buffer - pointer to message buffer + * size - size in bytes of message to send * submit_type - send or urgent message * * Output parameters: * RTEMS_SUCCESSFUL - if successful - * error code - if unsuccessful + * error code - if unsuccessful */ rtems_status_code _Message_queue_Submit( Objects_Id id, - Message_queue_Buffer *buffer, + void *buffer, + unsigned32 size, Message_queue_Submit_types submit_type ) { @@ -625,9 +697,11 @@ rtems_status_code _Message_queue_Submit( Message_queue_Buffer_control *the_message; the_message_queue = _Message_queue_Get( id, &location ); - switch ( location ) { + switch ( location ) + { case OBJECTS_ERROR: return( RTEMS_INVALID_ID ); + case OBJECTS_REMOTE: switch ( submit_type ) { case MESSAGE_QUEUE_SEND_REQUEST: @@ -636,7 +710,8 @@ rtems_status_code _Message_queue_Submit( MESSAGE_QUEUE_MP_SEND_REQUEST, id, buffer, - 0, /* Not used */ + &size, + 0, /* option_set */ MPCI_DEFAULT_TIMEOUT ); @@ -646,20 +721,33 @@ rtems_status_code _Message_queue_Submit( MESSAGE_QUEUE_MP_URGENT_REQUEST, id, buffer, - 0, /* Not used */ + &size, + 0, /* option_set */ MPCI_DEFAULT_TIMEOUT ); } - case OBJECTS_LOCAL: - the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); - if ( the_thread ) { + case OBJECTS_LOCAL: + if (size > the_message_queue->maximum_message_size) + { + _Thread_Enable_dispatch(); + return RTEMS_INVALID_SIZE; + } + /* + * Is there a thread currently waiting on this message queue? + */ + + the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); + if ( the_thread ) + { _Message_queue_Copy_buffer( buffer, - the_thread->Wait.return_argument + the_thread->Wait.return_argument, + size ); - + *the_thread->Wait.Extra.message_size_p = size; + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) { the_thread->receive_packet->return_code = RTEMS_SUCCESSFUL; @@ -674,21 +762,26 @@ rtems_status_code _Message_queue_Submit( return( RTEMS_SUCCESSFUL ); } + /* + * No one waiting on this one currently. + * Allocate a message buffer and store it away + */ + if ( the_message_queue->number_of_pending_messages == the_message_queue->maximum_pending_messages ) { _Thread_Enable_dispatch(); return( RTEMS_TOO_MANY ); } - the_message = _Message_queue_Allocate_message_buffer(); - - if ( !the_message ) { + the_message = _Message_queue_Allocate_message_buffer(the_message_queue); + if ( the_message == 0) { _Thread_Enable_dispatch(); return( RTEMS_UNSATISFIED ); } - _Message_queue_Copy_buffer( buffer, &the_message->Contents ); - + _Message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size ); + the_message->Contents.size = size; + the_message_queue->number_of_pending_messages += 1; switch ( submit_type ) { @@ -702,7 +795,8 @@ rtems_status_code _Message_queue_Submit( _Thread_Enable_dispatch(); return( RTEMS_SUCCESSFUL ); + + default: + return RTEMS_INTERNAL_ERROR; /* And they were such nice boys, too! */ } - - return( RTEMS_INTERNAL_ERROR ); /* unreached - only to remove warnings */ } -- cgit v1.2.3