00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef QUEUE_H
00024 #define QUEUE_H
00025
00026 #include <ptypes.h>
00027 #include <set>
00028 #include <iterator>
00029 #include <ostream>
00030 #include <limits>
00031
00032 #include "packetchainstep.h"
00033
00034
00035
00036
00037
00038
00039 template<class PacketType>
00040 struct QueueElement
00041 {
00042
00043 static unsigned int next_sequence_number;
00044
00045
00046 PacketType * pack;
00047 unsigned char priority;
00048 unsigned int seqno;
00049 unsigned char type;
00050 unsigned int objid;
00051 unsigned int user_id;
00052
00053
00054
00055 QueueElement<PacketType>(PacketType * _pack)
00056 {
00057 pack = _pack;
00058 priority = pack->getPriority();
00059 seqno = (pack->isIncomming() || pack->isReliable() ? pack->getSequenceNumber() : std::numeric_limits<unsigned int>::max());
00060 type = pack->getType();
00061 objid = pack->getReceiver();
00062 user_id = pack->getUserID();
00063 }
00064 };
00065
00066 template<class PacketType>
00067 unsigned int QueueElement<PacketType>::next_sequence_number = 0;
00068
00069
00070
00071
00072
00073
00074 template<class PacketType>
00075 struct QueueComparer
00076 {
00077 bool operator() (const QueueElement<PacketType> & p1, const QueueElement<PacketType> & p2) const
00078 {
00079 if(p1.priority < p2.priority)
00080 return true;
00081 if(p1.priority > p2.priority)
00082 return false;
00083
00084 if(p1.seqno < p2.seqno)
00085 return true;
00086 if(p1.seqno > p2.seqno)
00087 return false;
00088
00089 if(p1.type < p2.type)
00090 return true;
00091 if(p1.type > p2.type)
00092 return false;
00093
00094 if(p1.objid < p2.objid)
00095 return true;
00096 if(p1.objid > p2.objid)
00097 return false;
00098
00099 if(p1.user_id < p2.user_id)
00100 return true;
00101 if(p1.user_id > p2.user_id)
00102 return false;
00103
00104
00105 return false;
00106 }
00107 };
00108
00109
00110
00111
00112
00113
00114 template<class PacketType>
00115 class Queue : public PacketChainStep<PacketType>, public semaphore
00116 {
00117 protected:
00118 typedef typename std::set< QueueElement<PacketType> , QueueComparer<PacketType> > QueueSet;
00119 typedef typename QueueSet::iterator QueueSetIt;
00120 QueueSet queue;
00121 mutex m_mutex;
00122
00123 public:
00124 Queue();
00125 ~Queue();
00126
00127 public:
00128 unsigned int size() { return queue.size(); };
00129 void insert( PacketType * p );
00130 PacketType * next();
00131 PacketType * peek();
00132
00133 public:
00134
00135 inline void pack(PacketType * packet){ insert(packet); };
00136
00137 inline void unpack(PacketType * packet){ insert(packet); };
00138
00139 template<class Comp>
00140 void removeWithComparer( const Comp & comp );
00141 void clear();
00142 };
00143
00144
00145 template<class PacketType>
00146 template<class Comp>
00147 void Queue<PacketType>::removeWithComparer( const Comp & comp )
00148 {
00149 m_mutex.enter();
00150 for(QueueSetIt it = queue.begin(); it != queue.end(); ++it)
00151 {
00152 PacketType * packet = it->pack;
00153 if(comp(packet))
00154 {
00155 wait();
00156
00157 queue.erase(it);
00158 delete packet;
00159 }
00160 else
00161 {
00162
00163 }
00164 }
00165 m_mutex.leave();
00166 }
00167
00168 template<class PacketType>
00169 void Queue<PacketType>::clear( )
00170 {
00171 m_mutex.enter();
00172 for(QueueSetIt it = queue.begin(); it != queue.end(); ++it)
00173 {
00174 PacketType * packet = it->pack;
00175 wait();
00176
00177 queue.erase(it);
00178 delete packet;
00179 }
00180 m_mutex.leave();
00181 }
00182
00183
00184
00185 template<class PacketType>
00186 Queue<PacketType>::Queue() : semaphore(0)
00187 {
00188 queue.clear();
00189 }
00190
00191
00192 template<class PacketType>
00193 Queue<PacketType>::~Queue()
00194 {
00195 }
00196
00197 template<class PacketType>
00198 void Queue<PacketType>::insert( PacketType * p )
00199 {
00200 bool inserted = false;
00201 m_mutex.enter();
00202 if(!p->isQueued())
00203 {
00204 p->setStateQueued();
00205 std::pair<QueueSetIt, bool> insert_result = queue.insert(QueueElement<PacketType>(p));
00206 if(insert_result.second)
00207 {
00208 inserted = true;
00209 }
00210 else
00211 {
00212
00213 QueueSetIt next_pos = insert_result.first;
00214 next_pos++;
00215
00216
00217 delete insert_result.first->pack;
00218
00219 queue.erase(insert_result.first);
00220
00221 queue.insert(next_pos, QueueElement<PacketType>(p));
00222 }
00223 }
00224 else
00225 {
00226
00227 }
00228 m_mutex.leave();
00229 if(inserted)
00230 post();
00231 }
00232
00233 template<class PacketType>
00234 PacketType * Queue<PacketType>::next( )
00235 {
00236 PacketType * p;
00237 wait();
00238 m_mutex.enter();
00239 {
00240 if(queue.begin() == queue.end())
00241 std::cout << "QUEUE: no elements when next(...) was allowed to bypass wait(...)"<<std::endl;
00242 QueueSetIt tmp_pit = queue.begin();
00243 p = tmp_pit->pack;
00244 queue.erase(tmp_pit);
00245 }
00246 m_mutex.leave();
00247 return p;
00248 }
00249
00250 template<class PacketType>
00251 PacketType * Queue<PacketType>::peek( )
00252 {
00253 PacketType * p;
00254 m_mutex.enter();
00255 if(size() == 0)
00256 {
00257 p = NULL;
00258 }
00259 else
00260 {
00261 QueueSetIt pit = queue.begin();
00262 p = pit->pack;
00263 }
00264 m_mutex.leave();
00265 return p;
00266 }
00267
00268
00269
00270
00271
00272 #endif