From 53fb837afc4285486e318bcb614c911bbe9b1348 Mon Sep 17 00:00:00 2001 From: Joel Sherrill Date: Thu, 13 Jan 2000 19:25:15 +0000 Subject: POSIX message queues now include complete functionality including blocking sends when the queue is full. The SuperCore was enhanced to support blocking on send. The existing POSIX API was debugged and numerous test cases were added to psxmsgq01 by Jennifer Averett. SuperCore enhancements and resulting modifications to other APIs were done by Joel. There is one significant point of interpretation for the POSIX API. What happens to threads already blocked on a message queue when the mode of that same message queue is changed from blocking to non-blocking? We decided to unblock all waiting tasks with an EAGAIN error just as if a non-blocking version of the same operation had returned unsatisfied. This case is not discussed in the POSIX standard and other implementations may have chosen differently. --- cpukit/score/src/coremsgbroadcast.c | 19 +++++ cpukit/score/src/coremsgsubmit.c | 146 +++++++++++++++++++----------------- 2 files changed, 96 insertions(+), 69 deletions(-) (limited to 'cpukit/score/src') diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c index 2e6f649545..18e148ab1c 100644 --- a/cpukit/score/src/coremsgbroadcast.c +++ b/cpukit/score/src/coremsgbroadcast.c @@ -64,6 +64,25 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast( Thread_Wait_information *waitp; unsigned32 constrained_size; + /* + * If there are pending messages, then there can't be threads + * waiting for us to send them a message. + * + * NOTE: This check is critical because threads can block on + * send and receive and this ensures that we are broadcasting + * the message to threads waiting to receive -- not to send. + */ + + if ( the_message_queue->number_of_pending_messages != 0 ) { + *count = 0; + return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + } + + /* + * There must be no pending messages if there is a thread waiting to + * receive a message. + */ + number_broadcasted = 0; while ((the_thread = _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) { waitp = &the_thread->Wait; diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c index 4e15ab5bc0..6829351c18 100644 --- a/cpukit/score/src/coremsgsubmit.c +++ b/cpukit/score/src/coremsgsubmit.c @@ -53,102 +53,110 @@ * error code - if unsuccessful */ -CORE_message_queue_Status _CORE_message_queue_Submit( +void _CORE_message_queue_Submit( CORE_message_queue_Control *the_message_queue, void *buffer, unsigned32 size, Objects_Id id, CORE_message_queue_API_mp_support_callout api_message_queue_mp_support, - CORE_message_queue_Submit_types submit_type + CORE_message_queue_Submit_types submit_type, + boolean wait, + Watchdog_Interval timeout ) { + ISR_Level level; CORE_message_queue_Buffer_control *the_message; Thread_Control *the_thread; + Thread_Control *executing; - if ( size > the_message_queue->maximum_message_size ) - return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; + _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + + if ( size > the_message_queue->maximum_message_size ) { + _Thread_Executing->Wait.return_code = + CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE; + return; + } /* - * Is there a thread currently waiting on this message queue? + * Is there a thread currently waiting on this message queue? */ - the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); - if ( the_thread ) { - _CORE_message_queue_Copy_buffer( - buffer, - the_thread->Wait.return_argument, - size - ); - *(unsigned32 *)the_thread->Wait.return_argument_1 = size; - the_thread->Wait.count = submit_type; + if ( the_message_queue->number_of_pending_messages == 0 ) { + the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue ); + if ( the_thread ) { + _CORE_message_queue_Copy_buffer( + buffer, + the_thread->Wait.return_argument, + size + ); + *(unsigned32 *)the_thread->Wait.return_argument_1 = size; + the_thread->Wait.count = submit_type; #if defined(RTEMS_MULTIPROCESSING) - if ( !_Objects_Is_local_id( the_thread->Object.id ) ) - (*api_message_queue_mp_support) ( the_thread, id ); + if ( !_Objects_Is_local_id( the_thread->Object.id ) ) + (*api_message_queue_mp_support) ( the_thread, id ); #endif - - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + return; + } } /* - * No one waiting on this one currently. - * Allocate a message buffer and store it away + * No one waiting on the message queue at this time, so attempt to + * queue the message up for a future receive. */ - if ( the_message_queue->number_of_pending_messages == + if ( the_message_queue->number_of_pending_messages < the_message_queue->maximum_pending_messages ) { - return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; - } - the_message = _CORE_message_queue_Allocate_message_buffer(the_message_queue); - if ( the_message == 0 ) - return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED; - - _CORE_message_queue_Copy_buffer( buffer, the_message->Contents.buffer, size ); - the_message->Contents.size = size; - the_message->priority = submit_type; - - the_message_queue->number_of_pending_messages += 1; - - switch ( submit_type ) { - case CORE_MESSAGE_QUEUE_SEND_REQUEST: - _CORE_message_queue_Append( the_message_queue, the_message ); - break; - case CORE_MESSAGE_QUEUE_URGENT_REQUEST: - _CORE_message_queue_Prepend( the_message_queue, the_message ); - break; - default: - /* XXX interrupt critical section needs to be addressed */ - { - CORE_message_queue_Buffer_control *this_message; - Chain_Node *the_node; - - the_message->priority = submit_type; - for ( the_node = the_message_queue->Pending_messages.first ; - !_Chain_Is_tail( &the_message_queue->Pending_messages, the_node ) ; - the_node = the_node->next ) { - - this_message = (CORE_message_queue_Buffer_control *) the_node; - - if ( this_message->priority >= the_message->priority ) - continue; - - _Chain_Insert( the_node, &the_message->Node ); - break; - } - } - break; + the_message = + _CORE_message_queue_Allocate_message_buffer( the_message_queue ); + + /* + * NOTE: If the system is consistent, this error should never occur. + */ + if ( !the_message ) { + _Thread_Executing->Wait.return_code = + CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED; + return; + } + + _CORE_message_queue_Copy_buffer( + buffer, + the_message->Contents.buffer, + size + ); + the_message->Contents.size = size; + the_message->priority = submit_type; + + _CORE_message_queue_Insert_message( + the_message_queue, + the_message, + submit_type + ); + return; } /* - * According to POSIX, does this happen before or after the message - * is actually enqueued. It is logical to think afterwards, because - * the message is actually in the queue at this point. + * No message buffers were available so we may need to return an + * overflow error or block the sender until the message is placed + * on the queue. */ - if ( the_message_queue->number_of_pending_messages == 1 && - the_message_queue->notify_handler ) - (*the_message_queue->notify_handler)( the_message_queue->notify_argument ); - - return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL; + if ( !wait ) { + _Thread_Executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_TOO_MANY; + return; + } + + executing = _Thread_Executing; + + _ISR_Disable( level ); + _Thread_queue_Enter_critical_section( &the_message_queue->Wait_queue ); + executing->Wait.queue = &the_message_queue->Wait_queue; + executing->Wait.id = id; + executing->Wait.return_argument = (void *)buffer; + executing->Wait.return_argument_1 = (void *)size; + executing->Wait.count = submit_type; + _ISR_Enable( level ); + + _Thread_queue_Enqueue( &the_message_queue->Wait_queue, timeout ); } -- cgit v1.2.3