From 5e9b32b439627068a0292370fe595220dbfc95a0 Mon Sep 17 00:00:00 2001 From: Joel Sherrill Date: Tue, 26 Sep 1995 19:27:15 +0000 Subject: posix support initially added --- cpukit/posix/src/mqueue.c | 713 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 713 insertions(+) create mode 100644 cpukit/posix/src/mqueue.c (limited to 'cpukit/posix/src/mqueue.c') diff --git a/cpukit/posix/src/mqueue.c b/cpukit/posix/src/mqueue.c new file mode 100644 index 0000000000..f531b94754 --- /dev/null +++ b/cpukit/posix/src/mqueue.c @@ -0,0 +1,713 @@ +/* mqueue.c + * + * NOTE: The structure of the routines is identical to that of POSIX + * Message_queues to leave the option of having unnamed message + * queues at a future date. They are currently not part of the + * POSIX standard but unnamed message_queues are. This is also + * the reason for the apparently unnecessary tracking of + * the process_shared attribute. [In addition to the fact that + * it would be trivial to add pshared to the mq_attr structure + * and have process private message queues.] + * + * This code ignores the O_RDONLY/O_WRONLY/O_RDWR flag at open + * time. + */ + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/*PAGE + * + * _POSIX_Message_queue_Manager_initialization + * + * This routine initializes all message_queue manager related data structures. + * + * Input parameters: + * maximum_message_queues - maximum configured message_queues + * + * Output parameters: NONE + */ + +void _POSIX_Message_queue_Manager_initialization( + unsigned32 maximum_message_queues +) +{ + _Objects_Initialize_information( + &_POSIX_Message_queue_Information, + OBJECTS_POSIX_MESSAGE_QUEUES, + TRUE, + maximum_message_queues, + sizeof( POSIX_Message_queue_Control ), + TRUE, + _POSIX_PATH_MAX, + FALSE + ); +} + +/*PAGE + * + * _POSIX_Message_queue_Create_support + */ + +int _POSIX_Message_queue_Create_support( + const char *name, + int pshared, + unsigned int oflag, + struct mq_attr *attr, + POSIX_Message_queue_Control **message_queue +) +{ + POSIX_Message_queue_Control *the_mq; + + _Thread_Disable_dispatch(); + + the_mq = _POSIX_Message_queue_Allocate(); + + if ( !the_mq ) { + _Thread_Enable_dispatch(); + seterrno( ENFILE ); + return -1; + } + + if ( pshared == PTHREAD_PROCESS_SHARED && + !( _Objects_MP_Allocate_and_open( &_POSIX_Message_queue_Information, 0, + the_mq->Object.id, FALSE ) ) ) { + _POSIX_Message_queue_Free( the_mq ); + _Thread_Enable_dispatch(); + seterrno( ENFILE ); + return -1; + } + + the_mq->process_shared = pshared; + + if ( name ) { + the_mq->named = TRUE; + the_mq->open_count = 1; + the_mq->linked = TRUE; + } + else + the_mq->named = FALSE; + + if ( oflag & O_NONBLOCK ) + the_mq->blocking = FALSE; + else + the_mq->blocking = TRUE; + + /* XXX + * + * Note that this should be based on the current scheduling policy. + */ + + /* XXX + * + * Message and waiting disciplines are not distinguished. + */ +/* + the_mq_attr->message_discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO; + the_mq_attr->waiting_discipline = CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO; + */ + + the_mq->Message_queue.Attributes.discipline = + CORE_MESSAGE_QUEUE_DISCIPLINES_FIFO; + + if ( ! _CORE_message_queue_Initialize( + &the_mq->Message_queue, + OBJECTS_POSIX_MESSAGE_QUEUES, + &the_mq->Message_queue.Attributes, + attr->mq_maxmsg, + attr->mq_msgsize, + _POSIX_Message_queue_MP_Send_extract_proxy ) ) { + + if ( pshared == PTHREAD_PROCESS_SHARED ) + _Objects_MP_Close( &_POSIX_Message_queue_Information, the_mq->Object.id ); + + _POSIX_Message_queue_Free( the_mq ); + _Thread_Enable_dispatch(); + seterrno( ENOSPC ); + return -1; + } + + + /* XXX - need Names to be a string!!! */ + _Objects_Open( + &_POSIX_Message_queue_Information, + &the_mq->Object, + (char *) name + ); + + *message_queue = the_mq; + + if ( pshared == PTHREAD_PROCESS_SHARED ) + _POSIX_Message_queue_MP_Send_process_packet( + POSIX_MESSAGE_QUEUE_MP_ANNOUNCE_CREATE, + the_mq->Object.id, + (char *) name, + 0 /* Not used */ + ); + + _Thread_Enable_dispatch(); + return 0; +} + +/*PAGE + * + * 15.2.2 Open a Message Queue, P1003.1b-1993, p. 272 + */ + +mqd_t mq_open( + const char *name, + int oflag, + ... + /* mode_t mode, */ + /* struct mq_attr attr */ +) +{ + va_list arg; + mode_t mode; + struct mq_attr *attr; + int status; + Objects_Id the_mq_id; + POSIX_Message_queue_Control *the_mq; + + if ( oflag & O_CREAT ) { + va_start(arg, oflag); + mode = (mode_t) va_arg( arg, mode_t * ); + attr = (struct mq_attr *) va_arg( arg, struct mq_attr ** ); + va_end(arg); + } + + status = _POSIX_Message_queue_Name_to_id( name, &the_mq_id ); + + /* + * If the name to id translation worked, then the message queue exists + * and we can just return a pointer to the id. Otherwise we may + * need to check to see if this is a "message queue does not exist" + * or some other miscellaneous error on the name. + */ + + if ( status ) { + + if ( status == EINVAL ) { /* name -> ID translation failed */ + if ( !(oflag & O_CREAT) ) { /* willing to create it? */ + seterrno( ENOENT ); + return (mqd_t) -1; + } + /* we are willing to create it */ + } + seterrno( status ); /* some type of error */ + return (mqd_t) -1; + + } else { /* name -> ID translation succeeded */ + + if ( (oflag & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL) ) { + seterrno( EEXIST ); + return (mqd_t) -1; + } + + /* + * XXX In this case we need to do an ID->pointer conversion to + * check the mode. This is probably a good place for a subroutine. + */ + + the_mq->open_count += 1; + + return (mqd_t)&the_mq->Object.id; + + } + + /* XXX verify this comment... + * + * At this point, the message queue does not exist and everything has been + * checked. We should go ahead and create a message queue. + */ + + status = _POSIX_Message_queue_Create_support( + name, + TRUE, /* shared across processes */ + oflag, + attr, + &the_mq + ); + + if ( status == -1 ) + return (mqd_t) -1; + + return (mqd_t) &the_mq->Object.id; +} + +/*PAGE + * + * _POSIX_Message_queue_Delete + */ + +void _POSIX_Message_queue_Delete( + POSIX_Message_queue_Control *the_mq +) +{ + if ( !the_mq->linked && !the_mq->open_count ) { + _POSIX_Message_queue_Free( the_mq ); + + if ( the_mq->process_shared == PTHREAD_PROCESS_SHARED ) { + + _Objects_MP_Close( + &_POSIX_Message_queue_Information, + the_mq->Object.id + ); + + _POSIX_Message_queue_MP_Send_process_packet( + POSIX_MESSAGE_QUEUE_MP_ANNOUNCE_DELETE, + the_mq->Object.id, + 0, /* Not used */ + 0 /* Not used */ + ); + } + + } +} + +/*PAGE + * + * 15.2.2 Close a Message Queue, P1003.1b-1993, p. 275 + */ + +int mq_close( + mqd_t mqdes +) +{ + register POSIX_Message_queue_Control *the_mq; + Objects_Locations location; + + the_mq = _POSIX_Message_queue_Get( mqdes, &location ); + switch ( location ) { + case OBJECTS_ERROR: + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return POSIX_MP_NOT_IMPLEMENTED(); + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_LOCAL: + the_mq->open_count -= 1; + _POSIX_Message_queue_Delete( the_mq ); + _Thread_Enable_dispatch(); + return 0; + } + return POSIX_BOTTOM_REACHED(); +} + +/*PAGE + * + * 15.2.2 Remove a Message Queue, P1003.1b-1993, p. 276 + */ + +int mq_unlink( + const char *name +) +{ + int status; + register POSIX_Message_queue_Control *the_mq; + Objects_Id the_mq_id; + Objects_Locations location; + + status = _POSIX_Message_queue_Name_to_id( name, &the_mq_id ); + + if ( !status ) { + seterrno( status ); + return -1; + } + + the_mq = _POSIX_Message_queue_Get( the_mq_id, &location ); + switch ( location ) { + case OBJECTS_ERROR: + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return POSIX_MP_NOT_IMPLEMENTED(); + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_LOCAL: + + _Objects_MP_Close( + &_POSIX_Message_queue_Information, + the_mq->Object.id + ); + + the_mq->linked = FALSE; + + _POSIX_Message_queue_Delete( the_mq ); + + _Thread_Enable_dispatch(); + return 0; + } + return POSIX_BOTTOM_REACHED(); +} + +/*PAGE + * + * _POSIX_Message_queue_Send_support + */ + +int _POSIX_Message_queue_Send_support( + mqd_t mqdes, + const char *msg_ptr, + unsigned32 msg_len, + Priority_Control msg_prio, + Watchdog_Interval timeout +) +{ + register POSIX_Message_queue_Control *the_mq; + Objects_Locations location; + + the_mq = _POSIX_Message_queue_Get( mqdes, &location ); + switch ( location ) { + case OBJECTS_ERROR: + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return POSIX_MP_NOT_IMPLEMENTED(); + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_LOCAL: + /* XXX must add support for timeout and priority */ + _CORE_message_queue_Send( + &the_mq->Message_queue, + (void *) msg_ptr, + msg_len, + mqdes, + NULL /* XXX _POSIX_Message_queue_Core_message_queue_mp_support*/ + ); + _Thread_Enable_dispatch(); + return _Thread_Executing->Wait.return_code; + } + return POSIX_BOTTOM_REACHED(); +} + +/*PAGE + * + * 15.2.4 Send a Message to a Message Queue, P1003.1b-1993, p. 277 + * + * NOTE: P1003.4b/D8, p. 45 adds mq_timedsend(). + */ + +int mq_send( + mqd_t mqdes, + const char *msg_ptr, + size_t msg_len, + unsigned int msg_prio +) +{ + return _POSIX_Message_queue_Send_support( + mqdes, + msg_ptr, + msg_len, + msg_prio, + THREAD_QUEUE_WAIT_FOREVER + ); +} + +/*PAGE + * + * 15.2.4 Send a Message to a Message Queue, P1003.1b-1993, p. 277 + * + * NOTE: P1003.4b/D8, p. 45 adds mq_timedsend(). + */ + +int mq_timedsend( + mqd_t mqdes, + const char *msg_ptr, + size_t msg_len, + unsigned int msg_prio, + const struct timespec *timeout +) +{ + return _POSIX_Message_queue_Send_support( + mqdes, + msg_ptr, + msg_len, + msg_prio, + _POSIX_Time_Spec_to_interval( timeout ) + ); +} + +/*PAGE + * + * _POSIX_Message_queue_Receive_support + */ + +/* XXX be careful ... watch the size going through all the layers ... */ + +ssize_t _POSIX_Message_queue_Receive_support( + mqd_t mqdes, + char *msg_ptr, + size_t msg_len, + unsigned int *msg_prio, + Watchdog_Interval timeout +) +{ + register POSIX_Message_queue_Control *the_mq; + Objects_Locations location; + unsigned32 status = 0; + unsigned32 length_out; + + the_mq = _POSIX_Message_queue_Get( mqdes, &location ); + switch ( location ) { + case OBJECTS_ERROR: + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return POSIX_MP_NOT_IMPLEMENTED(); + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_LOCAL: + /* XXX need to define the options argument to this */ + length_out = msg_len; + _CORE_message_queue_Seize( + &the_mq->Message_queue, + mqdes, + msg_ptr, + &length_out, + /* msg_prio, XXXX */ + the_mq->blocking, + timeout + ); + _Thread_Enable_dispatch(); + if ( !status ) + return length_out; + /* XXX --- the return codes gotta be looked at .. fix this */ + return _Thread_Executing->Wait.return_code; + } + return POSIX_BOTTOM_REACHED(); +} + +/*PAGE + * + * 15.2.5 Receive a Message From a Message Queue, P1003.1b-1993, p. 279 + * + * NOTE: P1003.4b/D8, p. 45 adds mq_timedreceive(). + */ + +ssize_t mq_receive( + mqd_t mqdes, + char *msg_ptr, + size_t msg_len, + unsigned int *msg_prio +) +{ + return _POSIX_Message_queue_Receive_support( + mqdes, + msg_ptr, + msg_len, + msg_prio, + THREAD_QUEUE_WAIT_FOREVER + ); +} + +/*PAGE + * + * 15.2.5 Receive a Message From a Message Queue, P1003.1b-1993, p. 279 + * + * NOTE: P1003.4b/D8, p. 45 adds mq_timedreceive(). + */ + +int mq_timedreceive( /* XXX: should this be ssize_t */ + mqd_t mqdes, + char *msg_ptr, + size_t msg_len, + unsigned int *msg_prio, + const struct timespec *timeout +) +{ + return _POSIX_Message_queue_Receive_support( + mqdes, + msg_ptr, + msg_len, + msg_prio, + _POSIX_Time_Spec_to_interval( timeout ) + ); +} + +/*PAGE + * + * _POSIX_Message_queue_Notify_handler + * + */ + +void _POSIX_Message_queue_Notify_handler( + void *user_data +) +{ + POSIX_Message_queue_Control *the_mq; + + the_mq = user_data; + + /* XXX do something with signals here!!!! */ +} + +/*PAGE + * + * 15.2.6 Notify Process that a Message is Available on a Queue, + * P1003.1b-1993, p. 280 + */ + +int mq_notify( + mqd_t mqdes, + const struct sigevent *notification +) +{ + register POSIX_Message_queue_Control *the_mq; + Objects_Locations location; + + the_mq = _POSIX_Message_queue_Get( mqdes, &location ); + switch ( location ) { + case OBJECTS_ERROR: + seterrno( EBADF ); + return( -1 ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return POSIX_MP_NOT_IMPLEMENTED(); + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_LOCAL: + if ( notification ) { + if ( _CORE_message_queue_Is_notify_enabled( &the_mq->Message_queue ) ) { + _Thread_Enable_dispatch(); + seterrno( EBUSY ); + return( -1 ); + } + + _CORE_message_queue_Set_notify( &the_mq->Message_queue, NULL, NULL ); + + the_mq->notification = *notification; + + _CORE_message_queue_Set_notify( + &the_mq->Message_queue, + _POSIX_Message_queue_Notify_handler, + the_mq + ); + } else { + + _CORE_message_queue_Set_notify( &the_mq->Message_queue, NULL, NULL ); + + } + + _Thread_Enable_dispatch(); + return 0; + } + return POSIX_BOTTOM_REACHED(); +} + +/*PAGE + * + * 15.2.7 Set Message Queue Attributes, P1003.1b-1993, p. 281 + */ + +int mq_setattr( + mqd_t mqdes, + const struct mq_attr *mqstat, + struct mq_attr *omqstat +) +{ + register POSIX_Message_queue_Control *the_mq; + Objects_Locations location; + CORE_message_queue_Attributes *the_mq_attr; + + the_mq = _POSIX_Message_queue_Get( mqdes, &location ); + switch ( location ) { + case OBJECTS_ERROR: + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return POSIX_MP_NOT_IMPLEMENTED(); + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_LOCAL: + /* + * Return the old values. + */ + + /* XXX this is the same stuff as is in mq_getattr... and probably */ + /* XXX should be in an inlined private routine */ + + the_mq_attr = &the_mq->Message_queue.Attributes; + + omqstat->mq_flags = the_mq->flags; + omqstat->mq_msgsize = the_mq->Message_queue.maximum_message_size; + omqstat->mq_maxmsg = the_mq->Message_queue.maximum_pending_messages; + omqstat->mq_curmsgs = the_mq->Message_queue.number_of_pending_messages; + + /* + * Ignore everything except the O_NONBLOCK bit. + */ + + if ( mqstat->mq_flags & O_NONBLOCK ) + the_mq->blocking = FALSE; + else + the_mq->blocking = TRUE; + + the_mq->flags = mqstat->mq_flags; + + _Thread_Enable_dispatch(); + return 0; + } + return POSIX_BOTTOM_REACHED(); +} + +/*PAGE + * + * 15.2.8 Get Message Queue Attributes, P1003.1b-1993, p. 283 + */ + +int mq_getattr( + mqd_t mqdes, + struct mq_attr *mqstat +) +{ + register POSIX_Message_queue_Control *the_mq; + Objects_Locations location; + CORE_message_queue_Attributes *the_mq_attr; + + the_mq = _POSIX_Message_queue_Get( mqdes, &location ); + switch ( location ) { + case OBJECTS_ERROR: + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_REMOTE: + _Thread_Dispatch(); + return POSIX_MP_NOT_IMPLEMENTED(); + seterrno( EINVAL ); + return( -1 ); + case OBJECTS_LOCAL: + /* + * Return the old values. + */ + + /* XXX this is the same stuff as is in mq_setattr... and probably */ + /* XXX should be in an inlined private routine */ + + the_mq_attr = &the_mq->Message_queue.Attributes; + + mqstat->mq_flags = the_mq->flags; + mqstat->mq_msgsize = the_mq->Message_queue.maximum_message_size; + mqstat->mq_maxmsg = the_mq->Message_queue.maximum_pending_messages; + mqstat->mq_curmsgs = the_mq->Message_queue.number_of_pending_messages; + + _Thread_Enable_dispatch(); + return 0; + } + return POSIX_BOTTOM_REACHED(); +} -- cgit v1.2.3