00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef PACKETRECEIVER_H
00024 #define PACKETRECEIVER_H
00025
00026 #include <ptypes.h>
00027
00028 #include "framework.h"
00029
00030
00031
00032 #define MAX_UPD_PACKET_SIZE 65536
00033
00034 template<class PacketType>
00035 class Queue;
00036
00037
00038
00039
00040
00041
00042
00043
00044 template<class PacketType, class SocketType>
00045 class PacketReceiver : public thread
00046 {
00047 private:
00048 unsigned long packets_received;
00049 unsigned long packets_delivered;
00050 unsigned long bytes_received;
00051 unsigned long packets_unrel_skipped;
00052 unsigned long packets_rel_skipped;
00053 SocketType * socket;
00054 bool running;
00055 char buffer[MAX_UPD_PACKET_SIZE];
00056 Queue<PacketType> * queue;
00057 Framework<PacketType, SocketType> * framework;
00058
00059 public:
00060 PacketReceiver(SocketType * socket, Queue<PacketType> * queue, Framework<PacketType, SocketType> * framework);
00061 ~PacketReceiver();
00062
00063 public:
00064 void startListening();
00065 void stopListening();
00066
00067 public:
00068 virtual void execute();
00069 virtual void cleanup();
00070 void handlePacket(PacketType * packet);
00071 inline unsigned long bytesReceived() const { return bytes_received; };
00072 inline unsigned long packetsReceived() const { return packets_received; };
00073 inline unsigned long packetsDelivered() const { return packets_delivered; };
00074 inline unsigned long packetsUnreliableSkipped() const { return packets_unrel_skipped; };
00075 inline unsigned long packetsReliableSkipped() const { return packets_rel_skipped; };
00076 };
00077
00078
00079
00080 template<class PacketType, class SocketType>
00081 void PacketReceiver<PacketType, SocketType>::handlePacket( PacketType * packet )
00082 {
00083 bool deliver_packet = true;
00084
00085
00086 bool is_reliable = packet->isReliable();
00087 if(is_reliable)
00088 {
00089
00090 framework->ackPacket(packet);
00091 }
00092
00093
00094 if(!packet->isAckPacket())
00095 {
00096
00097 deliver_packet = framework->isPacketReadyForDelivery(packet);
00098 }
00099
00100
00101 if(deliver_packet)
00102 {
00103 while(packet != NULL)
00104 {
00105 packet = framework->deliverPacket(packet);
00106 packets_delivered++;
00107 if(packet)
00108 {
00109
00110 }
00111 }
00112 }
00113 else
00114 {
00115 if(is_reliable)
00116 {
00117 packets_rel_skipped++;
00118 }
00119 else
00120 {
00121 packets_unrel_skipped++;
00122 }
00123 }
00124 }
00125
00126
00127 template<class PacketType, class SocketType>
00128 void PacketReceiver<PacketType, SocketType>::execute( )
00129 {
00130 try
00131 {
00132 while(running)
00133 {
00134 int sender_port = socket->get_port();
00135 ipaddress sender = socket->get_ip();
00136 int res_size = socket->receive(buffer, MAX_UPD_PACKET_SIZE);
00137 if(res_size > 0)
00138 {
00139 PacketType * p = new PacketType(buffer, res_size);
00140 p->setPeer(sender, sender_port);
00141 bytes_received += res_size;
00142 packets_received++;
00143
00144 handlePacket(p);
00145 }
00146 else
00147 {
00148 std::cerr << "Error receiving packet from socket: " << std::endl;
00149 }
00150 }
00151 }
00152 catch(estream * e)
00153 {
00154 perr.putf("Error: %s\n", pconst(e->get_message()));
00155 delete e;
00156 }
00157 }
00158
00159 template<class PacketType, class SocketType>
00160 void PacketReceiver<PacketType, SocketType>::cleanup( )
00161 {
00162 std::cout << "Receiver-thread has been terminated" << std::endl;
00163 }
00164
00165
00166 template<class PacketType, class SocketType>
00167 PacketReceiver<PacketType, SocketType>::PacketReceiver(SocketType * socket, Queue<PacketType> * queue, Framework<PacketType, SocketType> * framework) :
00168 socket(socket),
00169 queue(queue),
00170 framework(framework),
00171 running(false),
00172 packets_received(0),
00173 bytes_received(0),
00174 packets_delivered(0),
00175 packets_unrel_skipped(0),
00176 packets_rel_skipped(0),
00177 thread(false)
00178 {
00179 }
00180
00181 template<class PacketType, class SocketType>
00182 PacketReceiver<PacketType, SocketType>::~PacketReceiver()
00183 {
00184 stopListening( );
00185 }
00186
00187 template<class PacketType, class SocketType>
00188 void PacketReceiver<PacketType, SocketType>::startListening( )
00189 {
00190 running = true;
00191 start();
00192 }
00193
00194 template<class PacketType, class SocketType>
00195 void PacketReceiver<PacketType, SocketType>::stopListening( )
00196 {
00197 running = false;
00198 }
00199
00200
00201
00202
00203
00204
00205 #endif