00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef PENDINGACKLIST_H
00024 #define PENDINGACKLIST_H
00025
00026 #include <ptypes.h>
00027 #include "oakbinheap.h"
00028 #include "pendingack.h"
00029 #include <set>
00030
00031 template<class PacketType, class SocketType>
00032 class Framework;
00033
00034
00035
00036
00037
00038
00039 template<class PacketType, class SocketType>
00040 struct PendingAckTimtoutCompare
00041 {
00042 bool operator() (const PendingAck<PacketType, SocketType> &a1,const PendingAck<PacketType, SocketType> &a2)
00043 {
00044 return a1.timeout < a2.timeout;
00045 };
00046 };
00047
00048
00049
00050
00051
00052 template<class PacketType, class SocketType>
00053 struct PendingAckKeyCompare
00054 {
00055 bool operator() (const PendingAck<PacketType, SocketType> * pendack1, const PendingAck<PacketType, SocketType> * pendack2)
00056 {
00057 if(pendack1->seq == pendack2->seq)
00058 {
00059 if(pendack1->pri == pendack2->pri)
00060 {
00061 return pendack1->user_id < pendack2->user_id;
00062 }
00063 else
00064 {
00065 return pendack1->pri < pendack2->pri;
00066 }
00067 }
00068 else
00069 {
00070 return (pendack1->seq < pendack2->seq);
00071 }
00072 }
00073 };
00074
00075
00076
00077
00078
00079
00080
00081
00082 template<class PacketType,class SocketType>
00083 class PendingAckList : public thread, public mutex
00084 {
00085 private:
00086 typedef typename std::set<PendingAck<PacketType, SocketType> *, PendingAckKeyCompare<PacketType, SocketType> >::iterator AckMapIt;
00087 private:
00088 Framework<PacketType, SocketType> * framework;
00089 unsigned int minRTT;
00090 double RTT_multiplier;
00091
00092 protected:
00093 OakBinaryMinHeap<PendingAck<PacketType, SocketType>, PendingAckTimtoutCompare<PacketType, SocketType> > timeoutheap;
00094 std::set<PendingAck<PacketType, SocketType> *, PendingAckKeyCompare<PacketType, SocketType> > ackmap;
00095 private:
00096 void setPendingAckAcked(PendingAck<PacketType, SocketType> * penack);
00097
00098 public:
00099 PendingAckList(Framework<PacketType, SocketType> * framework);
00100 ~PendingAckList();
00101
00102 public:
00103 void addPendingAck(unsigned int packet_seq, unsigned short packet_pri, unsigned int user_id, unsigned int timeout, PacketType * pack, bool haslock);
00104
00105 unsigned int registerAck(unsigned int packet_seq, unsigned char packet_pri, unsigned int user_id);
00106
00107 void execute();
00108
00109 inline unsigned int size() const { return ackmap.size(); };
00110
00111 template<typename Comp>
00112 void removeWithComparer( const Comp & comp );
00113
00114 void clear();
00115 };
00116
00117
00118 template<class PacketType, class SocketType>
00119 PendingAckList<PacketType, SocketType>::PendingAckList( Framework<PacketType, SocketType> * framework ) :
00120 framework(framework),
00121 minRTT(60),
00122 RTT_multiplier(1.25),
00123 thread(false)
00124 {
00125 ackmap.clear();
00126 }
00127
00128 template<class PacketType, class SocketType>
00129 PendingAckList<PacketType, SocketType>::~PendingAckList()
00130 {
00131
00132 }
00133
00134 template<class PacketType, class SocketType>
00135 template<typename Comp>
00136 void PendingAckList<PacketType, SocketType>::removeWithComparer( const Comp & comp )
00137 {
00138 enter();
00139 for(AckMapIt it = ackmap.begin(); it != ackmap.end(); ++it)
00140 {
00141 PendingAck<PacketType, SocketType> * penack = *it;
00142 if(comp(penack->user_id) && !penack->acked)
00143 {
00144 std::cout << "Erasing pending Ack: " << penack->packet << std::endl;
00145 setPendingAckAcked( penack );
00146 }
00147 else
00148 {
00149
00150 }
00151 }
00152 leave();
00153 }
00154
00155 template<class PacketType, class SocketType>
00156 void PendingAckList<PacketType, SocketType>::clear( )
00157 {
00158 enter();
00159 for(AckMapIt it = ackmap.begin(); it != ackmap.end(); ++it)
00160 {
00161 PendingAck<PacketType, SocketType> * penack = *it;
00162 setPendingAckAcked( penack );
00163 }
00164 leave();
00165 }
00166
00167
00168 template<class PacketType, class SocketType>
00169 void PendingAckList<PacketType, SocketType>::addPendingAck(unsigned int packet_seq, unsigned short packet_pri, unsigned int user_id, unsigned int timeout, PacketType * pack, bool haslock)
00170 {
00171
00172 PendingAck<PacketType, SocketType> key(packet_seq, packet_pri, user_id, 0, 0);
00173 key.sent_time = Framework<PacketType, SocketType>::currentTimeMillis();
00174 timeout = (unsigned int)(timeout * RTT_multiplier);
00175
00176 if(!haslock)
00177 enter();
00178
00179 if(ackmap.find(&key) != ackmap.end())
00180 {
00181 PendingAck<PacketType, SocketType> * penAck = *ackmap.find(&key);
00182
00183
00184 penAck->timeout = timeout + key.sent_time;
00185 timeoutheap.heapify(penAck);
00186 }
00187 else
00188 {
00189
00190
00191
00192 PendingAck<PacketType, SocketType> * penAck = new PendingAck<PacketType, SocketType>(packet_seq, packet_pri, user_id, (timeout+key.sent_time), pack);
00193 penAck->sent_time = key.sent_time;
00194 timeoutheap.insert(penAck);
00195 ackmap.insert(penAck);
00196 }
00197
00198
00199 if(!haslock)
00200 leave();
00201 }
00202
00203 template<class PacketType, class SocketType>
00204 unsigned int PendingAckList<PacketType, SocketType>::registerAck( unsigned int seq, unsigned char pri, unsigned int user_id )
00205 {
00206 unsigned int rtt_for_this_ack = std::numeric_limits<unsigned int>::max();
00207
00208 enter();
00209 PendingAck<PacketType, SocketType> key(seq, pri, user_id, 0, 0);
00210 if(ackmap.find(&key) == ackmap.end())
00211 {
00212
00213 }
00214 else
00215 {
00216 PendingAck<PacketType, SocketType> * penAck = *ackmap.find(&key);
00217
00218
00219 if(!penAck->acked)
00220 {
00221
00222 rtt_for_this_ack = Framework<PacketType, SocketType>::currentTimeMillis() - penAck->sent_time;
00223
00224
00225
00226
00227
00228
00229
00230
00231 setPendingAckAcked(penAck);
00232 }
00233
00234
00235 }
00236 leave();
00237 return rtt_for_this_ack;
00238 }
00239
00240
00241 template<class PacketType, class SocketType>
00242 void PendingAckList<PacketType, SocketType>::execute( )
00243 {
00244 while(true)
00245 {
00246 PendingAck<PacketType, SocketType> * tmpAck;
00247
00248
00249 timeoutheap.peek();
00250
00251 enter();
00252 tmpAck = timeoutheap.peek();
00253 if( tmpAck->acked )
00254 {
00255
00256
00257
00258 timeoutheap.getMin();
00259 ackmap.erase(tmpAck);
00260 delete tmpAck;
00261 }
00262 else if(tmpAck->timeout < Framework<PacketType, SocketType>::currentTimeMillis())
00263 {
00264
00265 tmpAck->timeout = Framework<PacketType, SocketType>::currentTimeMillis() + (unsigned int)(minRTT * RTT_multiplier);
00266 timeoutheap.heapify(tmpAck);
00267
00268
00269 framework->retransmitPacket(tmpAck->packet);
00270 }
00271 else
00272 {
00273 leave();
00274 unsigned int time_to_sleep = std::min((unsigned int)(minRTT * RTT_multiplier), tmpAck->timeout - Framework<PacketType, SocketType>::currentTimeMillis());
00275
00276 thread::relax(time_to_sleep);
00277 continue;
00278 }
00279 leave();
00280 }
00281 }
00282
00283 template<class PacketType, class SocketType>
00284 void PendingAckList<PacketType, SocketType>::setPendingAckAcked( PendingAck<PacketType, SocketType> * penAck )
00285 {
00286
00287 penAck->acked = true;
00288 penAck->timeout = 0;
00289 timeoutheap.heapify(penAck);
00290 if(penAck->packet->hasBeenSent())
00291 {
00292 delete penAck->packet;
00293 penAck->packet = 0;
00294 }
00295 else
00296 {
00297 penAck->packet->setStateAborted();
00298 }
00299 }
00300
00301
00302
00303
00304
00305
00306 #endif