Main Page | Modules | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | Related Pages | Examples

pendingacklist.h

00001 /***************************************************************************
00002  * The contents of this file are subject to the Mozilla Public             *
00003  * License Version 1.1 (the "License"); you may not use this file          *
00004  * except in compliance with the License. You may obtain a copy of         *
00005  * the License at http://www.mozilla.org/MPL/                              *
00006  *                                                                         *
00007  * Software distributed under the License is distributed on an "AS         *
00008  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or              *
00009  * implied. See the License for the specific language governing            *
00010  * rights and limitations under the License.                               *
00011  *                                                                         *
00012  * The Original Code is Game Network Framework (GaNeF).                    *
00013  *                                                                         *
00014  * The Initial Developers of the Original Code are                         *
00015  * Lars Langer and Emanuel Greisen                                         *
00016  * Copyright (C) 2005. Lars Langer & Emanuel Greisen                       *
00017  * All Rights Reserved.                                                    *
00018  *                                                                         *
00019  * Contributor(s):                                                         *
00020  *   none yet....                                                          *
00021  *                                                                         *
00022  ***************************************************************************/
00023 #ifndef PENDINGACKLIST_H
00024 #define PENDINGACKLIST_H
00025 
00026 #include <ptypes.h>
00027 #include "oakbinheap.h"
00028 #include "pendingack.h"
00029 #include <set>
00030 
00031 template<class PacketType, class SocketType>
00032 class Framework;
00033 
00034 
00035 /**
00036  * @ingroup Common
00037  * @brief This comparerer structure is used to sort the PendingAck in the heap.
00038 */
00039 template<class PacketType, class SocketType>
00040 struct PendingAckTimtoutCompare
00041 {
00042    bool operator() (const PendingAck<PacketType, SocketType> &a1,const PendingAck<PacketType, SocketType> &a2)
00043    {
00044       return a1.timeout < a2.timeout;
00045    };
00046 };
00047 
00048 /**
00049  * @ingroup Common
00050  * @brief This struct can sort the PendingAck for easy retrieval of a certain one, when we get an Ack
00051 */
00052 template<class PacketType, class SocketType>
00053 struct PendingAckKeyCompare
00054 {
00055    bool operator() (const PendingAck<PacketType, SocketType> * pendack1, const PendingAck<PacketType, SocketType> * pendack2)
00056   {
00057       if(pendack1->seq == pendack2->seq)
00058       {
00059          if(pendack1->pri == pendack2->pri)
00060          {
00061             return pendack1->user_id < pendack2->user_id;
00062          }
00063          else
00064          {
00065             return pendack1->pri < pendack2->pri;
00066          }
00067       }
00068       else
00069       {
00070          return (pendack1->seq < pendack2->seq);
00071       }
00072   }
00073 };
00074 
00075 /**
00076  * @ingroup Common
00077  * @brief This is the class keeping the list of pending acks.
00078  * It runs as its own thread, hence it can resend messages that are not Acknowledged in time.
00079  *
00080  * @author Lars Langer & Emanuel Greisen
00081 */
00082 template<class PacketType,class SocketType>
00083 class PendingAckList : public thread, public mutex
00084 {
00085 private:
00086    typedef typename std::set<PendingAck<PacketType, SocketType> *, PendingAckKeyCompare<PacketType, SocketType> >::iterator AckMapIt;
00087 private:
00088    Framework<PacketType, SocketType> * framework;
00089    unsigned int minRTT;
00090    double RTT_multiplier;
00091 
00092 protected:
00093    OakBinaryMinHeap<PendingAck<PacketType, SocketType>, PendingAckTimtoutCompare<PacketType, SocketType> > timeoutheap;
00094    std::set<PendingAck<PacketType, SocketType> *, PendingAckKeyCompare<PacketType, SocketType> > ackmap;
00095 private:
00096    void setPendingAckAcked(PendingAck<PacketType, SocketType> * penack);
00097 
00098 public:
00099    PendingAckList(Framework<PacketType, SocketType> * framework);
00100    ~PendingAckList();
00101 
00102 public:
00103    void addPendingAck(unsigned int packet_seq, unsigned short packet_pri, unsigned int user_id, unsigned int timeout, PacketType * pack, bool haslock);
00104    /// This method will remove a pending ack, and return the RTT for this ack.
00105    unsigned int registerAck(unsigned int packet_seq, unsigned char packet_pri, unsigned int user_id);
00106    /// The run-method of this thread.
00107    void execute();
00108    /// Returns the number of pending acks.
00109    inline unsigned int size() const { return ackmap.size(); };
00110    /// Will set all pending-acks that whose userid matches to state cancelled
00111    template<typename Comp>
00112    void removeWithComparer( const Comp & comp );
00113    /// Will simply clear the map.
00114    void clear();
00115 };
00116 
00117 
00118 template<class PacketType, class SocketType>
00119 PendingAckList<PacketType, SocketType>::PendingAckList( Framework<PacketType, SocketType> * framework ) :
00120 framework(framework),
00121 minRTT(60),
00122 RTT_multiplier(1.25),
00123 thread(false)
00124 {
00125    ackmap.clear();
00126 }
00127 
00128 template<class PacketType, class SocketType>
00129 PendingAckList<PacketType, SocketType>::~PendingAckList()
00130 {
00131 
00132 }
00133 
00134 template<class PacketType, class SocketType>
00135 template<typename Comp>
00136 void PendingAckList<PacketType, SocketType>::removeWithComparer( const Comp & comp )
00137 {
00138    enter();
00139    for(AckMapIt it = ackmap.begin(); it != ackmap.end(); ++it)
00140    {
00141       PendingAck<PacketType, SocketType> * penack = *it;
00142       if(comp(penack->user_id) && !penack->acked)
00143       {
00144          std::cout << "Erasing pending Ack: " << penack->packet << std::endl;
00145          setPendingAckAcked( penack );
00146       }
00147       else
00148       {
00149          //std::cout << "Skipping pending Ack: " << penack->user_id << std::endl;
00150       }
00151    }
00152    leave();
00153 }
00154 
00155 template<class PacketType, class SocketType>
00156 void PendingAckList<PacketType, SocketType>::clear( )
00157 {
00158    enter();
00159    for(AckMapIt it = ackmap.begin(); it != ackmap.end(); ++it)
00160    {
00161       PendingAck<PacketType, SocketType> * penack = *it;
00162       setPendingAckAcked( penack );
00163    }
00164    leave();
00165 }
00166 
00167 
00168 template<class PacketType, class SocketType>
00169 void PendingAckList<PacketType, SocketType>::addPendingAck(unsigned int packet_seq, unsigned short packet_pri, unsigned int user_id, unsigned int timeout, PacketType * pack, bool haslock)
00170 {
00171    // Insert data in to a map with unique key so we can ack it
00172    PendingAck<PacketType, SocketType> key(packet_seq, packet_pri, user_id, 0, 0);
00173    key.sent_time = Framework<PacketType, SocketType>::currentTimeMillis();
00174    timeout = (unsigned int)(timeout * RTT_multiplier);
00175 
00176    if(!haslock)
00177       enter();
00178    // Replace old - if any exist
00179    if(ackmap.find(&key) != ackmap.end())
00180    {
00181       PendingAck<PacketType, SocketType> * penAck = *ackmap.find(&key);
00182       //std::cout << "Rep PendingAck:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<packet_seq<<","<<packet_pri<<","<<user_id<<"]";
00183       //std::cout << "["<<penAck->seq<<","<<penAck->pri<<","<<penAck->user_id<<"]\n";
00184       penAck->timeout = timeout + key.sent_time;
00185       timeoutheap.heapify(penAck); // Make sure it finds it way down
00186    }
00187    else
00188    {
00189       // Inserting the packet in the minheap to be able to spot timeouts
00190       //std::cout << "New PendingAck:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<packet_seq<<","<<packet_pri<<","<<user_id<<"]" << std::endl;
00191 
00192       PendingAck<PacketType, SocketType> * penAck = new PendingAck<PacketType, SocketType>(packet_seq, packet_pri, user_id, (timeout+key.sent_time), pack);
00193       penAck->sent_time = key.sent_time;
00194       timeoutheap.insert(penAck);
00195       ackmap.insert(penAck);
00196    }
00197    // And now insert in the ackmap
00198 
00199    if(!haslock)
00200       leave();
00201 }
00202 
00203 template<class PacketType, class SocketType>
00204 unsigned int PendingAckList<PacketType, SocketType>::registerAck( unsigned int seq, unsigned char pri, unsigned int user_id )
00205 {
00206    unsigned int rtt_for_this_ack = std::numeric_limits<unsigned int>::max();
00207    //std::cout << "registerAck( " << seq << ", " << pri << ", " << user_id << " )\n";
00208    enter();
00209    PendingAck<PacketType, SocketType> key(seq, pri, user_id, 0, 0);
00210    if(ackmap.find(&key) == ackmap.end())
00211    {
00212       //std::cout << "No! PendingAck<PacketType, SocketType>:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<seq<<","<<pri<<","<<user_id<<"]\n";
00213    }
00214    else
00215    {
00216       PendingAck<PacketType, SocketType> * penAck = *ackmap.find(&key);
00217       //std::cout << "Ack PendingAck<PacketType, SocketType>:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<seq<<","<<pri<<","<<user_id<<"]:";
00218       //std::cout << "["<<penAck->seq<<","<<penAck->pri<<","<<penAck->user_id<<"]\n";
00219       if(!penAck->acked)
00220       {
00221          // The pending ack is there so we get it
00222          rtt_for_this_ack = Framework<PacketType, SocketType>::currentTimeMillis() - penAck->sent_time;
00223          /*
00224          if(penAck->seq != seq || penAck->pri != pri || penAck->user_id != user_id)
00225          {
00226          std::cout << "ERROR:\n";
00227          std::cout << "GOT PendingAck<PacketType, SocketType>:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<seq<<","<<pri<<","<<user_id<<"]\n";
00228          std::cout << "HAD PendingAck<PacketType, SocketType>:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<penAck->seq<<","<<penAck->pri<<","<<penAck->user_id<<"]\n";
00229       }
00230          */
00231          setPendingAckAcked(penAck);
00232       }
00233       //std::cout << "Ack PendingAck:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<seq<<","<<pri<<","<<user_id<<"]:";
00234       //std::cout << "["<<penAck->seq<<","<<penAck->pri<<","<<penAck->user_id<<"]\n";
00235    }
00236    leave();
00237    return rtt_for_this_ack;
00238 }
00239 
00240 
00241 template<class PacketType, class SocketType>
00242 void PendingAckList<PacketType, SocketType>::execute( )
00243 {
00244    while(true)
00245    {
00246       PendingAck<PacketType, SocketType> * tmpAck;
00247 
00248       // There has to be at least one element due to the semaphore wait()
00249       timeoutheap.peek();
00250       // Now that we know there is one, lets enter the Mutex and handle it here.
00251       enter();
00252       tmpAck = timeoutheap.peek();
00253       if( tmpAck->acked )
00254       {
00255          // Its been acked, remove it from both list and heap
00256          //std::cout << "Rem PendingAck:("<<timeoutheap.size()<<"/"<<ackmap.size()<<")["<<tmpAck->seq<<","<<tmpAck->pri<<","<<tmpAck->user_id<<"]\n";
00257 
00258          timeoutheap.getMin();
00259          ackmap.erase(tmpAck);
00260          delete tmpAck;
00261       }
00262       else if(tmpAck->timeout < Framework<PacketType, SocketType>::currentTimeMillis())
00263       {
00264          // Make sure it stays down in the heap
00265          tmpAck->timeout = Framework<PacketType, SocketType>::currentTimeMillis() + (unsigned int)(minRTT * RTT_multiplier);
00266          timeoutheap.heapify(tmpAck); // Move the element in the heap
00267 
00268          // Schedule for Retransmission (if it's already queued the queue will discard the queing of this pack):
00269          framework->retransmitPacket(tmpAck->packet);
00270       }
00271       else
00272       {
00273          leave();
00274          unsigned int time_to_sleep = std::min((unsigned int)(minRTT * RTT_multiplier), tmpAck->timeout - Framework<PacketType, SocketType>::currentTimeMillis());
00275          //std::cout << "PendingAckList.sleep(" << time_to_sleep << ")\n";
00276          thread::relax(time_to_sleep);
00277          continue;
00278       }
00279       leave();
00280    }
00281 }
00282 
00283 template<class PacketType, class SocketType>
00284 void PendingAckList<PacketType, SocketType>::setPendingAckAcked( PendingAck<PacketType, SocketType> * penAck )
00285 {
00286    //std::cout << "PendingAck on Pack:" << penAck->packet << std::endl;
00287    penAck->acked = true; // execute() now know that it can remove the ack from the heap
00288    penAck->timeout = 0; // Make sure it goes to the top of the heap
00289    timeoutheap.heapify(penAck);
00290    if(penAck->packet->hasBeenSent())
00291    {
00292       delete penAck->packet; // Since it't not in the queue, we can delete it
00293       penAck->packet = 0;
00294    }
00295    else
00296    {
00297       penAck->packet->setStateAborted(); // We must wait untill the sender deletes it.
00298    }
00299 }
00300 
00301 
00302 
00303 
00304 
00305 
00306 #endif

Generated on Mon Feb 6 12:24:50 2006 for Ganef by  doxygen 1.4.4