00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 #ifndef PACKETSENDER_H
00024 #define PACKETSENDER_H
00025 
00026 #include <ptypes.h>
00027 #include <pinet.h>
00028 
00029 USING_PTYPES;
00030 
00031 
00032 template<class PacketType>
00033 class Queue;
00034 class FrameworkPacket;
00035 template<class PacketType, class SocketType>
00036 class Framework;
00037 template<class PacketType, class SocketType>
00038 int sendTo(PacketType * pack, SocketType * socket);
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 template<class PacketType, class SocketType>
00048 class PacketSender : public thread
00049 {
00050 private:
00051    Framework<PacketType, SocketType> * framework;
00052    unsigned long packets_sent, bytes_sent;
00053    Queue<PacketType> * queue;
00054    SocketType * socket;
00055    bool _running;
00056    mutex & m_mutex;
00057 
00058 public:
00059    PacketSender(SocketType * socket, Queue<PacketType> * queue, mutex & pendingackmutex, Framework<PacketType, SocketType> * framework);
00060    ~PacketSender();
00061 
00062 private:
00063    void sendPacket(PacketType * pack);
00064 
00065 public:
00066    void startSending();
00067    inline unsigned int bytesSent() const { return bytes_sent; };
00068    inline unsigned int packetsSent() const { return packets_sent; };
00069    void sendPacketSync(PacketType * pack);
00070 
00071 public:
00072    virtual void execute();
00073    virtual void cleanup();
00074 
00075 };
00076 
00077 
00078 template<class PacketType, class SocketType>
00079 PacketSender<PacketType, SocketType>::PacketSender(SocketType * socket, Queue<PacketType> * queue, mutex & pendingackmutex, Framework<PacketType, SocketType> * framework) :
00080    socket(socket),
00081    queue(queue),
00082    m_mutex(pendingackmutex),
00083    _running(false),
00084    packets_sent(0),
00085    bytes_sent(0),
00086    framework(framework),
00087    thread(false)
00088 {
00089 
00090 }
00091 
00092 template<class PacketType, class SocketType>
00093 PacketSender<PacketType, SocketType>::~PacketSender()
00094 {
00095 }
00096 
00097 template<class PacketType, class SocketType>
00098 void PacketSender<PacketType, SocketType>::sendPacketSync( PacketType * pack )
00099 {
00100    if(get_running())
00101       throw new FrameworkError("You cannot use sendPacketSync when the sender is running");
00102    std::cout << "About to send NOW" << std::endl;
00103    sendPacket(pack);
00104 }
00105 
00106 
00107 template<class PacketType, class SocketType>
00108 void PacketSender<PacketType, SocketType>::sendPacket( PacketType * pack )
00109 {
00110    scopelock mutexed_method(m_mutex); 
00111 
00112    if(!pack->peer.initialized)
00113    {
00114       std::cerr << "Packet " << * pack << " does not have a valid peer.host\n";
00115    }
00116 
00117    
00118    if(pack->isAborted())
00119    {
00120       delete pack;
00121    }
00122    else
00123    {
00124       
00125       if(pack->isReliable())
00126       {
00127          
00128          
00129          if(pack->isPluginPacket())
00130          {
00131             framework->addPluginSequenceNumber(pack);
00132          }
00133 
00134          
00135          framework->addPendingAck(pack, true);
00136       }
00137 
00138       
00139       pack->makeSendReady();
00140       if(!pack->isReliable())
00141       {
00142          
00143       }
00144       int bs = sendTo(pack, socket);
00145       
00146       if(bs != -1)
00147       {
00148          if(pack->isReliable())
00149          {
00150             pack->setStateSent();
00151             framework->sentReliablePacket(pack); 
00152          }
00153          else
00154          {
00155             
00156             
00157             delete pack;
00158          }
00159          bytes_sent += bs;
00160       }
00161       else
00162       {
00163          std::cerr << "PANIX: we could not send a packet, we got -1 for return value on bytes sent"  << std::endl;
00164       }
00165    }
00166 }
00167 
00168 
00169 template<class PacketType>
00170 int sendTo(PacketType * pack, ipmessage * socket)
00171 {
00172    int bs;
00173    try
00174    {
00175       socket->send(pack->getData(), pack->getSize());
00176       bs = pack->getSize();
00177    }
00178    catch(estream * err)
00179    {
00180       perr.putf("Error Sending: %s\n", pconst(err->get_message()));
00181       delete err;
00182       return -1;
00183    }
00184    return bs;
00185 }
00186 
00187 
00188 template<class PacketType>
00189 int sendTo(PacketType * pack, ipmsgserver * socket)
00190 {
00191    int bs;
00192    try
00193    {
00194       socket->sendto(pack->getData(), pack->getSize(), pack->peer.host, pack->peer.port);
00195       bs = pack->getSize();
00196    }
00197    catch(estream * err)
00198    {
00199       std::cerr << "Error Sending> " << pconst(err->get_message()) << std::endl;
00200       std::cerr << "Packet> " << *pack << std::endl;
00201       std::cerr << "Socket is set to " << pconst(socket->get_host()) << ":" << socket->get_port() << std::endl;
00202       delete err;
00203       return -1;
00204    }
00205    return bs;
00206 }
00207 
00208 template<class PacketType, class SocketType>
00209 void PacketSender<PacketType, SocketType>::startSending( )
00210 {
00211    _running = true;
00212    start();
00213 }
00214 
00215 template<class PacketType, class SocketType>
00216 void PacketSender<PacketType, SocketType>::execute( )
00217 {
00218    while(_running)
00219    {
00220       
00221       PacketType * p = queue->next();
00222       
00223       sendPacket(p);
00224       packets_sent++;
00225    }
00226 }
00227 
00228 template<class PacketType, class SocketType>
00229 void PacketSender<PacketType, SocketType>::cleanup( )
00230 {
00231    std::cout << "Sender has been terminated" << std::endl;
00232 }
00233 
00234 
00235 
00236 
00237 
00238 #endif