00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "serverframework.h"
00026
00027 #include "../common/pendingacklist.h"
00028 #include "../common/queue.h"
00029 #include "../common/packetreceiver.h"
00030 #include "../common/frameworkpacket.h"
00031 #include "../common/packetsender.h"
00032 #include "serverdata.h"
00033 #include "../common/plugin.h"
00034 #include "../common/presendingchain.h"
00035
00036 ServerFramework::ServerFramework( int port ) :
00037 Framework<ServerPacket,ipmsgserver>(port),
00038 connection_timeout_length(2000)
00039 {
00040 connected_clients.clear();
00041 clients.clear();
00042
00043
00044
00045
00046
00047 default_pre_send_chain = new PreSendingChain<ServerPacket>( std::vector<Plugin<ServerPacket> *>(), outqueue );
00048 setDefaultPreSendingChain( );
00049 }
00050
00051
00052 ServerFramework::~ServerFramework()
00053 {
00054
00055 delete default_pre_send_chain;
00056 }
00057
00058 void ServerFramework::registerData( ServerData * data )
00059 {
00060 server_data.insert( std::make_pair( data->getFrameworkId(), data ) );
00061 }
00062
00063 void ServerFramework::unregisterData( FrameworkData * data )
00064 {
00065 if ( server_data.find( data->getFrameworkId() ) == server_data.end() )
00066 {
00067 std::cout << "WARNING: Remove data not in serverframework, id:" << data->getFrameworkId() << std::endl;
00068 }
00069 else
00070 {
00071 server_data.erase( data->getFrameworkId() );
00072 }
00073 }
00074
00075 ServerData * ServerFramework::getDataObject( unsigned int objid )
00076 {
00077 if ( server_data.find( objid ) == server_data.end() )
00078 return NULL;
00079 return server_data[ objid ];
00080 }
00081
00082 void ServerFramework::start( )
00083 {
00084 pending_acks->start();
00085 receiver->startListening();
00086 sender->startSending();
00087 }
00088
00089 void ServerFramework::enqueuePacket( ServerPacket * packet, const std::map< unsigned int, Client * > & clientlist )
00090 {
00091 if(clientlist.size() == 1)
00092 {
00093 std::map< unsigned int, Client * >::const_iterator it = clientlist.begin();
00094 enqueuePacket(packet, it->second);
00095 }
00096 else
00097 {
00098 for ( std::map< unsigned int, Client * >::const_iterator it = clientlist.begin(); it != clientlist.end(); ++it )
00099 {
00100 ServerPacket * p = new ServerPacket( * packet );
00101 p->copyData(packet);
00102 enqueuePacket(p, it->second);
00103 }
00104 delete packet;
00105 }
00106 }
00107
00108 void ServerFramework::enqueuePacket( ServerPacket * packet, Client * client )
00109 {
00110 packet->client = client;
00111 packet->setPeer( packet->client->getAddress(), packet->client->getPort() );
00112 if(packet->isReliable())
00113 {
00114 packet->setSequenceNumber(packet->client->getNextOutgoingSeqNo(packet));
00115 }
00116 current_pre_send_chain->pack( packet );
00117 }
00118
00119
00120 void ServerFramework::registerPreSendingChain( const std::string & chainname, const std::vector<Plugin<ServerPacket> *> & stations )
00121 {
00122 if ( pre_sending_chains.find( chainname ) != pre_sending_chains.end() )
00123 throw FrameworkError( "Overwriting Presend Chain not allowed" );
00124
00125 pre_sending_chains.insert( std::make_pair( chainname, new PreSendingChain<ServerPacket>( stations, outqueue ) ) );
00126 }
00127
00128 void ServerFramework::setPreSendingChain( const std::string & chainname )
00129 {
00130 if ( pre_sending_chains.find( chainname ) == pre_sending_chains.end() )
00131 throw FrameworkError( std::string( "Unknown Presend Chain: " ) + chainname );
00132
00133 current_pre_send_chain = pre_sending_chains[ chainname ];
00134 }
00135
00136 void ServerFramework::purgeCurrentPreSendingChain( )
00137 {
00138 current_pre_send_chain->purge();
00139 }
00140
00141 void ServerFramework::setDefaultPreSendingChain( )
00142 {
00143 current_pre_send_chain = default_pre_send_chain;
00144 }
00145
00146 void ServerFramework::registerClient( Client * client )
00147 {
00148
00149 clients.insert( std::make_pair( client->getId(), client ) );
00150 if(client->getState() == Client::CONNECTED && client->hasCalledOnConnect())
00151 {
00152 connected_clients.insert( std::make_pair( client->getId(), client ) );
00153 }
00154 }
00155
00156 void ServerFramework::unregisterClient( Client * client )
00157 {
00158 clients.erase(client->getId());
00159
00160
00161 pending_acks->removeWithComparer(QueueDeletor(client));
00162 outqueue->removeWithComparer(QueueDeletor(client));
00163 }
00164
00165 void ServerFramework::handleSystemPacket( ServerPacket * p)
00166 {
00167
00168
00169 switch ( p->getReceiver() )
00170 {
00171
00172 case ServerPacket::PACKET_ACK:
00173 {
00174 p->client = getClientFromPacket(p);
00175 if ( p->client )
00176 {
00177 p->client->registerRTT(handleAckPacket(p, p->client->getId()));
00178 }
00179 else
00180 {
00181 std::cout << "We got an ackpacket from an unknown client: " << *p << std::endl;
00182 }
00183 }
00184 break;
00185 case ServerPacket::PACKET_HEARTBEAT:
00186 {
00187 if(p->client)
00188 {
00189
00190 }
00191 }
00192 break;
00193
00194 case ServerPacket::HANDSHAKE_INIT:
00195 {
00196 if(p->client->getState() == Client::NEWCLIENT)
00197 {
00198 ServerPacket * dummy_reply = ServerPacket::createDummyPacket();
00199 bool success = onInitialHandshake(p, dummy_reply);
00200
00201 ServerPacket * reply;
00202 if(success)
00203 {
00204
00205 reply = ServerPacket::createSystemPacket(ServerPacket::SETUP_DESCRIPTION);
00206
00207
00208 reply->writeUInt32(connection_timeout_length);
00209
00210
00211 std::set<unsigned int> pluginids;
00212 for(std::map<std::string, PreSendingChain<ServerPacket> *>::iterator it = pre_sending_chains.begin(); it != pre_sending_chains.end(); ++it)
00213 {
00214 it->second->appendUniqueIds(pluginids);
00215 }
00216
00217 reply->writeUInt32(pluginids.size());
00218 for(std::set<unsigned int>::iterator it = pluginids.begin(); it != pluginids.end(); ++it)
00219 {
00220
00221 reply->writeUInt32(*it);
00222 }
00223 }
00224 else
00225 {
00226 reply = ServerPacket::createSystemPacket(ServerPacket::DISCONTINUE_CLIENT);
00227 }
00228
00229 reply->appendData(dummy_reply);
00230 delete dummy_reply;
00231
00232 reply->setPeerFromPacket(p);
00233 enqueuePacket(reply, p->client);
00234 }
00235 else
00236 {
00237
00238 std::cerr << "Received INIT_HANDSHAKE from a known client[state:" << p->client->getState() << "]" << std::endl;
00239 }
00240 }
00241 break;
00242 case ServerPacket::DISCONTINUE_SERVER:
00243 {
00244
00245 if(p->client)
00246 {
00247 if(p->client->getState() == Client::NEWCLIENT)
00248 {
00249 unregisterClient(p->client);
00250 delete p->client;
00251 }
00252 else
00253 {
00254 std::cerr << "We received a DISCONTINUE_SERVER call from a client in state: " << p->client->getState() << std::endl;
00255 }
00256 }
00257 }
00258 break;
00259 case ServerPacket::READY_CLIENT:
00260 {
00261 if(p->client && p->client->getState() == Client::NEWCLIENT)
00262 {
00263
00264 p->client->setState(Client::CONNECTED);
00265 }
00266 else
00267 {
00268 std::cerr << "We received a READY_CLIENT call from a client in state: " << p->client->getState() << std::endl;
00269 }
00270 }
00271 break;
00272
00273 default:
00274 std::cerr << "Error unknown system-packet type: " << p->getReceiver() << std::endl;
00275 break;
00276 }
00277
00278 delete p;
00279 }
00280
00281 Client * ServerFramework::getClientFromPacket( ServerPacket * packet )
00282 {
00283
00284 for( ConstClientIterator it = clients.begin(); it != clients.end(); ++it )
00285 {
00286 Client * c = it->second;
00287 if ( packet->peer.host == c->getAddress() && packet->peer.port == c->getPort() )
00288 return c;
00289 }
00290 return NULL;
00291 }
00292
00293 void ServerFramework::keepAlive( unsigned int sleepmillis )
00294 {
00295
00296 unsigned int currentmillis = Framework<ServerPacket,ipmsgserver>::currentTimeMillis();
00297 for(ConstClientIterator it = clients.begin(); it != clients.end(); ++it)
00298 {
00299 if(it->second->getLastPacketReceivedTime() + connection_timeout_length < currentmillis)
00300 {
00301 std::cout << "Client: " << it->second->getId() << " timed out"<<std::endl;
00302 if(it->second->getState() == Client::CONNECTED)
00303 {
00304 it->second->setState(Client::DICONNECTED);
00305 }
00306 else
00307 {
00308 it->second->setState(Client::CONNECTION_ABORTED);
00309 }
00310 }
00311 }
00312
00313
00314
00315 std::vector<Client *> disconnectedclients;
00316 disconnectedclients.clear();
00317 for(std::map<unsigned int, Client *>::iterator it = clients.begin(); it != clients.end(); ++it)
00318 {
00319 Client * client = it->second;
00320 if(client)
00321 {
00322 if(client->getState() == Client::CONNECTED && !client->hasCalledOnConnect())
00323 {
00324 client->setCalledOnConnect(true);
00325
00326
00327 onClientConnected(client);
00328
00329
00330 registerClient(client);
00331 }
00332 else if((client->getState() == Client::DICONNECTED || client->getState() == Client::CONNECTION_ABORTED) && !client->hasCalledOnDisconnect())
00333 {
00334 disconnectedclients.push_back(client);
00335
00336 connected_clients.erase(client->getId());
00337 }
00338 }
00339 }
00340
00341 std::vector<Client *>::const_iterator it;
00342 for(it = disconnectedclients.begin(); it != disconnectedclients.end(); ++it)
00343 {
00344 Client * client = * it;
00345 unregisterClient(client);
00346 client->setCalledOnDisconnect(true);
00347 if(client->getState() == Client::DICONNECTED)
00348 {
00349
00350 onClientDisconnected(client);
00351 }
00352 delete client;
00353 }
00354
00355
00356 while(inqueue->size() > 0)
00357 {
00358 ServerPacket * p = inqueue->next();
00359 if(p->client)
00360 {
00361 ServerData * receiver = getDataObject(p->getReceiver());
00362 if(receiver)
00363 {
00364 receiver->clientPacket(p->client, p->getType(), p);
00365 }
00366 else
00367 {
00368 std::cerr << "Packet for non-existing data:" << p->getReceiver() << std::endl;
00369 }
00370 }
00371 else
00372 {
00373 std::cerr << "Throw away packet " << *p << " from non-existing client" << std::endl;
00374 }
00375 delete p;
00376 }
00377
00378
00379 if(sleepmillis != 0)
00380 {
00381
00382 std::cout << "TODO:: create a static sleep method"<<std::endl;
00383 }
00384 }
00385
00386 ServerPacket * ServerFramework::deliverPacket( ServerPacket * packet )
00387 {
00388
00389 ServerPacket * next_packet = NULL;
00390
00391
00392 if(packet->client)
00393 packet->client->registerLastPacketReceivedTime(Framework<ServerPacket,ipmsgserver>::currentTimeMillis());
00394
00395
00396 if(packet->isReliable())
00397 {
00398 next_packet = packet->client->incrementNextIngoingSeqNo(packet);
00399 }
00400
00401 if(packet->getType() == 0)
00402 {
00403 handleSystemPacket(packet);
00404 }
00405 else
00406 {
00407 inqueue->insert(packet);
00408 }
00409
00410 return next_packet;
00411 }
00412
00413 bool ServerFramework::isPacketReadyForDelivery( ServerPacket * packet )
00414 {
00415
00416 packet->client = getClientFromPacket( packet );
00417 if(packet->isHandshakeInitPacket() && packet->client == 0)
00418 {
00419
00420
00421 packet->client = createClient(packet->peer.host, packet->peer.port);
00422 registerClient(packet->client);
00423 }
00424
00425 return (packet->client == 0 ? false : packet->client->isPacketReadyForDelivery(packet));
00426 }
00427
00428 void ServerFramework::addPendingAck( ServerPacket * packet, bool haslock)
00429 {
00430
00431 if(packet->client)
00432 {
00433 pending_acks->addPendingAck(packet->getSequenceNumber(), packet->getPriority(), packet->client->getId(), packet->client->getRTT(), packet, haslock);
00434 }
00435 else
00436 {
00437 std::cerr << "Crap tried to add pending ack to non-existing client\n";
00438 throw new FrameworkError("Tried to add a pending ACK on a packet with no Client as receiver");
00439 }
00440 }
00441
00442 unsigned int ServerFramework::bufferedPackets( ) const
00443 {
00444 unsigned int buff_p_count = 0;
00445 for(ConstClientIterator it = clients.begin(); it != clients.end(); ++it)
00446 {
00447 buff_p_count += it->second->bufferedPackets( );
00448 }
00449 return buff_p_count;
00450 }
00451
00452 void ServerFramework::sentReliablePacket( ServerPacket * packet )
00453 {
00454
00455
00456
00457
00458
00459 }
00460
00461 void ServerFramework::addPluginSequenceNumber( ServerPacket * packet )
00462 {
00463 if(packet->getSequenceNumber() == 0)
00464 {
00465 packet->setSequenceNumber(packet->client->getNextOutgoingSeqNo(packet));
00466 }
00467 }
00468
00469