00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef PACKETSENDER_H
00024 #define PACKETSENDER_H
00025
00026 #include <ptypes.h>
00027 #include <pinet.h>
00028
00029 USING_PTYPES;
00030
00031
00032 template<class PacketType>
00033 class Queue;
00034 class FrameworkPacket;
00035 template<class PacketType, class SocketType>
00036 class Framework;
00037 template<class PacketType, class SocketType>
00038 int sendTo(PacketType * pack, SocketType * socket);
00039
00040
00041
00042
00043
00044
00045
00046
00047 template<class PacketType, class SocketType>
00048 class PacketSender : public thread
00049 {
00050 private:
00051 Framework<PacketType, SocketType> * framework;
00052 unsigned long packets_sent, bytes_sent;
00053 Queue<PacketType> * queue;
00054 SocketType * socket;
00055 bool _running;
00056 mutex & m_mutex;
00057
00058 public:
00059 PacketSender(SocketType * socket, Queue<PacketType> * queue, mutex & pendingackmutex, Framework<PacketType, SocketType> * framework);
00060 ~PacketSender();
00061
00062 private:
00063 void sendPacket(PacketType * pack);
00064
00065 public:
00066 void startSending();
00067 inline unsigned int bytesSent() const { return bytes_sent; };
00068 inline unsigned int packetsSent() const { return packets_sent; };
00069 void sendPacketSync(PacketType * pack);
00070
00071 public:
00072 virtual void execute();
00073 virtual void cleanup();
00074
00075 };
00076
00077
00078 template<class PacketType, class SocketType>
00079 PacketSender<PacketType, SocketType>::PacketSender(SocketType * socket, Queue<PacketType> * queue, mutex & pendingackmutex, Framework<PacketType, SocketType> * framework) :
00080 socket(socket),
00081 queue(queue),
00082 m_mutex(pendingackmutex),
00083 _running(false),
00084 packets_sent(0),
00085 bytes_sent(0),
00086 framework(framework),
00087 thread(false)
00088 {
00089
00090 }
00091
00092 template<class PacketType, class SocketType>
00093 PacketSender<PacketType, SocketType>::~PacketSender()
00094 {
00095 }
00096
00097 template<class PacketType, class SocketType>
00098 void PacketSender<PacketType, SocketType>::sendPacketSync( PacketType * pack )
00099 {
00100 if(get_running())
00101 throw new FrameworkError("You cannot use sendPacketSync when the sender is running");
00102 std::cout << "About to send NOW" << std::endl;
00103 sendPacket(pack);
00104 }
00105
00106
00107 template<class PacketType, class SocketType>
00108 void PacketSender<PacketType, SocketType>::sendPacket( PacketType * pack )
00109 {
00110 scopelock mutexed_method(m_mutex);
00111
00112 if(!pack->peer.initialized)
00113 {
00114 std::cerr << "Packet " << * pack << " does not have a valid peer.host\n";
00115 }
00116
00117
00118 if(pack->isAborted())
00119 {
00120 delete pack;
00121 }
00122 else
00123 {
00124
00125 if(pack->isReliable())
00126 {
00127
00128
00129 if(pack->isPluginPacket())
00130 {
00131 framework->addPluginSequenceNumber(pack);
00132 }
00133
00134
00135 framework->addPendingAck(pack, true);
00136 }
00137
00138
00139 pack->makeSendReady();
00140 if(!pack->isReliable())
00141 {
00142
00143 }
00144 int bs = sendTo(pack, socket);
00145
00146 if(bs != -1)
00147 {
00148 if(pack->isReliable())
00149 {
00150 pack->setStateSent();
00151 framework->sentReliablePacket(pack);
00152 }
00153 else
00154 {
00155
00156
00157 delete pack;
00158 }
00159 bytes_sent += bs;
00160 }
00161 else
00162 {
00163 std::cerr << "PANIX: we could not send a packet, we got -1 for return value on bytes sent" << std::endl;
00164 }
00165 }
00166 }
00167
00168
00169 template<class PacketType>
00170 int sendTo(PacketType * pack, ipmessage * socket)
00171 {
00172 int bs;
00173 try
00174 {
00175 socket->send(pack->getData(), pack->getSize());
00176 bs = pack->getSize();
00177 }
00178 catch(estream * err)
00179 {
00180 perr.putf("Error Sending: %s\n", pconst(err->get_message()));
00181 delete err;
00182 return -1;
00183 }
00184 return bs;
00185 }
00186
00187
00188 template<class PacketType>
00189 int sendTo(PacketType * pack, ipmsgserver * socket)
00190 {
00191 int bs;
00192 try
00193 {
00194 socket->sendto(pack->getData(), pack->getSize(), pack->peer.host, pack->peer.port);
00195 bs = pack->getSize();
00196 }
00197 catch(estream * err)
00198 {
00199 std::cerr << "Error Sending> " << pconst(err->get_message()) << std::endl;
00200 std::cerr << "Packet> " << *pack << std::endl;
00201 std::cerr << "Socket is set to " << pconst(socket->get_host()) << ":" << socket->get_port() << std::endl;
00202 delete err;
00203 return -1;
00204 }
00205 return bs;
00206 }
00207
00208 template<class PacketType, class SocketType>
00209 void PacketSender<PacketType, SocketType>::startSending( )
00210 {
00211 _running = true;
00212 start();
00213 }
00214
00215 template<class PacketType, class SocketType>
00216 void PacketSender<PacketType, SocketType>::execute( )
00217 {
00218 while(_running)
00219 {
00220
00221 PacketType * p = queue->next();
00222
00223 sendPacket(p);
00224 packets_sent++;
00225 }
00226 }
00227
00228 template<class PacketType, class SocketType>
00229 void PacketSender<PacketType, SocketType>::cleanup( )
00230 {
00231 std::cout << "Sender has been terminated" << std::endl;
00232 }
00233
00234
00235
00236
00237
00238 #endif