summaryrefslogtreecommitdiff
path: root/led/complex1/LocalMessageQueuePOSIX10031c.h
blob: 29aa7ca5548d7ec8897ee0588f4bf77e1f7994aa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
//
//
//  $Id$
//
//

/*! @file LocalMessageQueuePOSIX10031c.h
    @brief LocalMessageQueuePOSIX10031c Specification

    This file contains the implementation of the LocalMessageQueuePOSIX10031c
    template class.
 */

#ifndef __LocalMessageQueuePOSIX10031c_h
#define __LocalMessageQueuePOSIX10031c_h

#include <list>
#include <limits.h>
#include <pthread.h>

namespace OSWrapper {
  /*! @class LocalMessageQueuePOSIX10031c
      @brief POSIX10031c Message Queue Class
     
      Message queues allow application threads and processes to send 
      messages amongst themselves. Threads may block waiting for a 
      message to arrive.

      This is a POSIX10031c implementation of a Message Queue class.  It is
      useful for initial porting efforts and as a template for new 
      implementations.  It is intended for internal consumption by the
      implementation and is not expected to be used by applications.
   */
  template<
    class MessageClass
  >
  class LocalMessageQueuePOSIX10031c {
    public:
      /*! This is the constructor for the POSIX10031c Message Queue
          implementation.

          @param name is the string name of the message queue
          @param depth is the maximum number of pending messages

          @returns a new Message Queue
       */
      LocalMessageQueuePOSIX10031c(
        std::string name,
        size_t      depth = INT_MAX
      )
      {
        int status;

        depthM = depth;
        nameM = name;

        pendingM.clear();
        inactiveM.clear();

        status = pthread_cond_init(&conditionM, NULL);
        if (status) {
          std::cerr << "Local MQ " << nameM << " cond init failed\n";
          throw "Local MQ cond init failed";
        }

        status = pthread_mutex_init( &mutexM, NULL );
        if ( status ) {
          std::cerr << "Local MQ " << nameM << " mutex init failed\n";
          throw "Local MQ mutex init failed";
        }
      }

      /*! This is the destructor for the POSIX10031c Message Queue
           implementation.
       */
      ~LocalMessageQueuePOSIX10031c()
      {
        int status;

        status = pthread_cond_destroy( &conditionM );
        if (status) {
          std::cerr << "Local MQ " << nameM << " cond destroy failed\n";
          throw "Local MQ cond destroy failed";
        }

        status = pthread_mutex_destroy( &mutexM );
        if ( status ) {
          std::cerr << "Local MQ " << nameM << " mutex destroy failed\n";
          throw "Local MQ mutex destroy failed";
        }

      }

      /*! This method allocates a message from the inactive pool.
       */
      MessageClass *Allocate( void )
      {
        MessageClass *m;

        lock();
          if ( inactiveM.empty() ) {
            m = new MessageClass();
          } else {
            inactiveM.pop_front();
          }
        unlock();


// fprintf( stderr, "Allocate %p\n", m );
        return m;
      }

      /*! This method returns a message to the inactive pool.
       */
      void Deallocate( MessageClass *message  )
      {
        lock();
          inactiveM.push_back( message );
        unlock();
      }

      /*! This method sends the specified message to the queue.

          @param message is a pointer to the message buffer.

          @return true if message successfully sent
      */
      bool Send( MessageClass *message )
      {
        if ( !message )
          return false;

        lock();

// fprintf( stderr, "Send %p\n", message );
          if ( pendingM.size() >= depthM ) {
            unlock();
            return false;
          }

          pendingM.push_back( message );
          signal();

        unlock();
        return true;
      }

      /*! This method is invoked to block indefinitely waiting for 
          a message to arrive at this queue.  When a message is available,
          it will be returned to the caller.

          @returns the message received
       */
      MessageClass *Receive( void )
      {
        MessageClass *m;

        lock();
// fprintf( stderr, "Receive start\n" );
          if ( pendingM.size() != 0 ) {
            m = pendingM.front();
          } else {
            do { 
              wait();
              if ( pendingM.size() == 0 )
                continue; 
              m = pendingM.front();
            } while ( !m );
          }
          pendingM.pop_front();
// fprintf( stderr, "Receiving %p\n", m );
        unlock();
        return m;
      }

      /*! This method is invoked to poll for a message pending at the queue.
          If a message is available, it will be returned to the caller.

          @returns the message received or NULL if none available.
       */
      MessageClass *ReceivePoll( void )
      {
        MessageClass *m;

        lock();
          if ( pendingM.size() == 0 ) {
            m = pendingM.front();
            pendingM.pop_front();
          } else
            m = NULL;
        unlock();
        return m;
      }

      /*! This method returns true if a message is available.
       */
      bool IsAvailable( void )
      {
        bool retval;
        lock();
          retval = (pendingM.size() != 0);
        unlock();

        return retval;
      }

      /*! This method returns a pointer to a string indicating the
          implementation of this Message Queue.

          @returns a pointer to a string specifying the flavor
       */
      std::string Flavor(void)
      {
        return "POSIX10031c";
      }

      /*! This method returns a pointer to a string indicating the name
          of this Message Queue.

          @returns the name of this queue
       */
      std::string Name(void)
      {
        return nameM;
      }


      /*! This method returns the number of messages pending on this
          Message Queue.

          @returns the number of pending messages
       */
      unsigned int getNumberPending(void)
      {
        size_t size;

        lock();
          size = pendingM.size();
        unlock();

        return size;
      }

    protected:

      /*!  This method locks the mutex protecting the Message Queue.
       */
      void lock(void)
      {
        int status;
        status = pthread_mutex_lock( &mutexM );
        if ( status ) {
          std::cerr << "Local MQ " << nameM << " mutex lock failed\n";
          throw "Local MQ mutex lock failed";
        }
      }

      /*!  This method unlocks the mutex protecting the Message Queue.
       */
      void unlock(void)
      {
        int status;
        status = pthread_mutex_unlock( &mutexM );
        if ( status ) {
          std::cerr << "Local MQ " << nameM << " mutex unlock failed\n";
          throw "Local MQ mutex lock failed";
        }
      }

      /*!  This method signals the condition variable for the Message Queue.
       */
      void signal(void)
      {
        int status;
        status = pthread_cond_signal( &conditionM );
        if ( status ) {
          std::cerr << "Local MQ " << nameM << " condvar signal failed\n";
          throw "Local MQ condvar signal failed";
        }
      }

      /*!  This method waits for the condition variable for the Message Queue.
       */
      void wait(void)
      {
        int status;
        status = pthread_cond_wait( &conditionM, &mutexM );
        if ( status ) {
          std::cerr << "Local MQ " << nameM << " condvar wait failed\n";
          throw "Local MQ condvar wait failed";
        }
      }

      /*! This variable points to the string name that the user has 
          associated with this message queue.   The name is used by
          clients and servers to locate the Message Queue.
       */
      std::string nameM;

      /*! This variable contains the maximum number of pending messages
          for this Message Queue.
       */
      size_t depthM;

      /*! This variable contains the set of unused messages. 
       */
      std::list<MessageClass *> inactiveM;

      /*! This variable contains the set of pending messages. 
       */
      std::list<MessageClass *> pendingM;

      /*! This variable is the handle for the condition variable used to
       *  signal when messages are available.
       */
      pthread_cond_t  conditionM;

      /*! This variable is the handle for the mutex used to protect the 
       *  Message Queue.
       */
      pthread_mutex_t mutexM;

  };
}

#endif