00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "clientframework.h"
00028
00029 #include "../common/packetsender.h"
00030 #include "../common/packetreceiver.h"
00031 #include "../common/queue.h"
00032 #include "../common/pendingacklist.h"
00033 #include "../common/plugin.h"
00034 #include "clientdatamap.h"
00035 #include "clientdata.h"
00036
00037
00038
00039 ClientFramework::ClientFramework(int local_port) : Framework<ClientPacket,ipmessage>(local_port),
00040 last_received_pack(Framework<ClientPacket,ipmessage>::currentTimeMillis()),
00041 last_reliable_sent_packet_time(Framework<ClientPacket,ipmessage>::currentTimeMillis()),
00042 client_data_map(new ClientDataMap()),
00043 connection_timeout_length(2000),
00044 state(NOT_STARTED)
00045 {
00046 }
00047
00048
00049 ClientFramework::~ClientFramework()
00050 {
00051 delete client_data_map;
00052 }
00053
00054
00055 void ClientFramework::terminateServerConnection( )
00056 {
00057 if(receiver->get_running() && !receiver->get_finished())
00058 {
00059 std::cout << "[terminate] Receiver is running...." << std::endl;
00060 if(!receiver->get_signaled())
00061 {
00062 std::cout << "[terminate] Receiver will get the signal...." << std::endl;
00063 receiver->signal();
00064 }
00065 std::cout << "[terminate] Receiver is trying to terminate, we wait for it...." << std::endl;
00066
00067 receiver->waitfor();
00068 }
00069
00070 if(sender->get_running() && !sender->get_finished())
00071 {
00072 std::cout << "[terminate] Sender is running...." << std::endl;
00073 if(!sender->get_signaled())
00074 {
00075 std::cout << "[terminate] Sender will get the signal...." << std::endl;
00076 sender->signal();
00077 }
00078 std::cout << "[terminate] Sender is trying to terminate, we wait for it...." << std::endl;
00079
00080 sender->waitfor();
00081 }
00082
00083 if(pending_acks->get_running() && !pending_acks->get_finished())
00084 {
00085 std::cout << "[terminate] Pending_acks is running...." << std::endl;
00086 if(!pending_acks->get_signaled())
00087 {
00088 std::cout << "[terminate] Pending_acks will get the signal...." << std::endl;
00089 pending_acks->signal();
00090 }
00091 std::cout << "[terminate] Pending_acks is trying to terminate, we wait for it...." << std::endl;
00092
00093 pending_acks->waitfor();
00094 }
00095
00096 state = NOT_STARTED;
00097 }
00098
00099
00100 void ClientFramework::initServerConnection(const std::string & _server_host, int _server_port)
00101 {
00102
00103 terminateServerConnection();
00104
00105
00106 last_reliable_sent_packet_time = last_received_pack = msecs(now());
00107
00108
00109 server_host = phostbyname(_server_host.c_str());
00110 server_port = _server_port;
00111
00112 socket.set_ip(server_host);
00113 socket.set_port(server_port);
00114
00115
00116
00117
00118
00119
00120 ClientPacket * dummy_inittag = ClientPacket::createDummyPacket();
00121
00122 writeHandshakeInitMessage( dummy_inittag );
00123
00124 ClientPacket * request = ClientPacket::createSystemPacket(this, ClientPacket::HANDSHAKE_INIT );
00125 request->appendData( dummy_inittag );
00126 delete( dummy_inittag );
00127
00128
00129 state = HANDSHAKING_INIT;
00130 enqueuePacket(request, true);
00131
00132
00133 {
00134 pending_acks->start();
00135 sender->startSending();
00136 receiver->startListening();
00137 }
00138 }
00139
00140 void ClientFramework::registerPlugin(Plugin<ClientPacket> * plugin)
00141 {
00142 plugin->setNext(inqueue);
00143 plugins.insert( std::make_pair( plugin->getUniqueId(), plugin ));
00144 }
00145
00146 void ClientFramework::registerData( ClientData * data )
00147 {
00148 if(!data)
00149 throw new FrameworkError("Data pointer is null, this cannot be registered");
00150
00151
00152 if(client_data.find(data->getFrameworkId()) == client_data.end())
00153 {
00154
00155
00156 client_data[data->getFrameworkId()] = data;
00157
00158
00159 if(client_data_map->pointee_map.find(data->getFrameworkId()) != client_data_map->pointee_map.end())
00160 {
00161 PPMap & ppmap = client_data_map->pointee_map[data->getFrameworkId()];
00162 unsigned int i ;
00163 for (PPMap::iterator it = ppmap.begin() ; it != ppmap.end(); it++)
00164 {
00165 PtrPtrVec & ptrptrvec = it->second;
00166 for(PtrPtrVec::iterator vit = ptrptrvec.begin(); vit != ptrptrvec.end(); vit++)
00167 {
00168 ClientData ** ptrptr = *vit;
00169 if(*ptrptr == NULL)
00170 *ptrptr = data;
00171 else
00172 throw new FrameworkError("Data pointer has already been set");
00173 }
00174 }
00175 }
00176 }
00177 else
00178 {
00179 std::cout << "Data object already exist: registerData( ClientData * data : " << data->getFrameworkId() << ")" << std::endl;
00180 }
00181 }
00182
00183 void ClientFramework::unregisterData( ClientData * data )
00184 {
00185 if(!data)
00186 throw new FrameworkError("Data pointer is null, this cannot be unregistered");
00187
00188
00189 if(getDataObject(data->getFrameworkId()) != data)
00190 throw new FrameworkError("Unknown data - cannot be unregistered.");
00191
00192 client_data_map->unregisterData(data);
00193 client_data.erase(data->getFrameworkId());
00194 }
00195
00196 ClientData * ClientFramework::getDataObject( unsigned int objid )
00197 {
00198 if(client_data.find(objid) != client_data.end())
00199 {
00200 return client_data[objid];
00201 }
00202
00203 return 0;
00204 }
00205
00206 void ClientFramework::registerPointer( unsigned int receiver, unsigned int missing, ClientData ** ptrptr )
00207 {
00208 client_data_map->registerPointer(receiver, missing, ptrptr);
00209 }
00210
00211 bool ClientFramework::unregisterPointer(unsigned int receiver, unsigned int new_objid, ClientData **ptrptr)
00212 {
00213 return client_data_map->unregisterPointer(receiver, new_objid, ptrptr);
00214 }
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232 void ClientFramework::registerConstructor(unsigned int class_id, ClientData* (*func)(ClientFramework *, ClientPacket *))
00233 {
00234 if(constructors.find(class_id) != constructors.end())
00235 throw new FrameworkError("You cannot overwrite registered constructor functions");
00236
00237 constructors.insert(std::make_pair(class_id, func));
00238 }
00239
00240 void ClientFramework::handleSystemPacket( ClientPacket * p )
00241 {
00242
00243
00244 switch ( p->getReceiver() )
00245 {
00246 case ClientPacket::DISCONTINUE_CLIENT:
00247 if(state == HANDSHAKING_INIT)
00248 {
00249 onDisconnectCalled = false;
00250 onConnectCalled = false;
00251 state = HANDSHAKING_ABORTED_BY_SERVER;
00252
00253
00254
00255 onHandshakingDiscontinue(p);
00256 }
00257 else
00258 {
00259 std::cout << "Received discontinue when not Handshaking [state: "<<state<<"]\n";
00260 }
00261 break;
00262 case ClientPacket::SETUP_DESCRIPTION:
00263 {
00264 ClientPacket * reply;
00265
00266 if(state == HANDSHAKING_INIT)
00267 {
00268 bool success = true;
00269 bool plugins_ok = true;
00270
00271
00272 connection_timeout_length = p->readUInt32();
00273
00274
00275 unsigned int num_of_plugins = p->readUInt32();
00276
00277 for( int i = 0 ; i < num_of_plugins ; i++ )
00278 {
00279 if(!checkPlugin( p->readUInt32() ))
00280 {
00281 plugins_ok = false;
00282 break;
00283 }
00284 }
00285
00286 ClientPacket * dummy_reply = ClientPacket::createDummyPacket();
00287
00288
00289 success = onHandshakeDescription(p, dummy_reply);
00290
00291 if( success && plugins_ok )
00292 {
00293 reply = ClientPacket::createSystemPacket(this, ClientPacket::READY_CLIENT);
00294 state = CONNECTED;
00295 }
00296 else
00297 {
00298 reply = ClientPacket::createSystemPacket(this, ClientPacket::DISCONTINUE_SERVER);
00299 state = HANDSHAKING_ABORTED_BY_CLIENT;
00300 }
00301 reply->appendData( dummy_reply );
00302 delete dummy_reply;
00303
00304 onDisconnectCalled = false;
00305 onConnectCalled = false;
00306 enqueuePacket( reply );
00307 }
00308 }
00309 break;
00310
00311 case ClientPacket::PACKET_ACK:
00312 {
00313 if ( state == HANDSHAKING_INIT || state == CONNECTED )
00314 {
00315 avg_rtt.registerValue(handleAckPacket(p, 0));
00316 }
00317 }
00318 break;
00319 default:
00320 std::cerr << "Error unknown system-packet type: " << p->getReceiver() << std::endl;
00321 break;
00322 }
00323
00324 delete p;
00325 }
00326
00327
00328 void ClientFramework::keepAlive( unsigned int sleepmillis )
00329 {
00330
00331 if(state == NOT_STARTED)
00332 throw new FrameworkError("You should not try to call keepAlive(...) before calling initServerConnection() at least once");
00333
00334
00335 if( state == CONNECTED && !onConnectCalled)
00336 {
00337
00338 onConnect();
00339 onConnectCalled = true;
00340 }
00341 if( (state == DISCONNECTED || state == HANDSHAKING_ABORTED_BY_SERVER || state == HANDSHAKING_ABORTED_BY_CLIENT )&& !onDisconnectCalled )
00342 {
00343
00344 onDisconnect();
00345 onDisconnectCalled = true;
00346 disconnectionCleanup( );
00347
00348 }
00349
00350 while( inqueue->size() > 0 )
00351 {
00352 ClientPacket * inpacket = inqueue->next();
00353
00354 switch ( inpacket->getType() )
00355 {
00356 case 1:
00357 {
00358 if(plugins.find(inpacket->getReceiver()) != plugins.end())
00359 {
00360 plugins[inpacket->getReceiver()]->unpack(inpacket);
00361 }
00362 else
00363 {
00364 std::cerr << "ERROR: Unknown plugin: " << inpacket->getReceiver() << std::endl;
00365 }
00366
00367
00368 }
00369 break;
00370 case 2:
00371 {
00372 if(constructors.find(inpacket->getReceiver()) != constructors.end())
00373 {
00374
00375 constructors[inpacket->getReceiver()](this, inpacket);
00376 }
00377 else
00378 {
00379 std::cerr << "[Create]Constructor with id: " << inpacket->getReceiver() << " does not exist" << std::endl;
00380 }
00381 delete inpacket;
00382 }
00383 break;
00384 case 3:
00385 {
00386 ClientData * dataobj = getDataObject(inpacket->getReceiver());
00387 if(dataobj)
00388 {
00389
00390 delete dataobj;
00391 }
00392 else
00393 {
00394 std::cout << "[Destructor]DataObject("<<inpacket->getReceiver()<<") does not exist"<< std::endl;
00395 }
00396 delete inpacket;
00397 }
00398 break;
00399 default:
00400 {
00401 ClientData * dataobj = getDataObject(inpacket->getReceiver());
00402 if(dataobj)
00403 {
00404 dataobj->updatePacket(this, inpacket->getType(), inpacket);
00405 }
00406 else
00407 {
00408 std::cout << "[Update]DataObject("<<inpacket->getReceiver()<<") does not exist"<< std::endl;
00409 }
00410 delete inpacket;
00411 }
00412 break;
00413 }
00414 }
00415
00416
00417
00418 heartBeat();
00419
00420
00421
00422 if(sleepmillis != 0)
00423 {
00424 psleep(sleepmillis);
00425 }
00426 }
00427
00428 void ClientFramework::enqueuePacket(ClientPacket * packet, bool sendnow)
00429 {
00430 packet->setPeer(server_host, server_port);
00431 if(packet->isReliable())
00432 {
00433 packet->setSequenceNumber(sequencenumbers.getNextOutgoingSeqNo(packet));
00434
00435 }
00436 if(sendnow)
00437 {
00438 sender->sendPacketSync(packet);
00439 }
00440 else
00441 {
00442 outqueue->insert( packet );
00443 }
00444 }
00445
00446 bool ClientFramework::checkPlugin( unsigned int plug_id )
00447 {
00448 std::map<unsigned int, Plugin<ClientPacket> *>::iterator theIterator;
00449 theIterator = plugins.find(plug_id);
00450 if(theIterator == plugins.end())
00451 return false;
00452
00453 return true;
00454 }
00455
00456 bool ClientFramework::isPacketReadyForDelivery( ClientPacket * packet )
00457 {
00458 if(state == CONNECTED || state == HANDSHAKING_INIT)
00459 {
00460 if(packet->isReliable())
00461 {
00462 if(sequencenumbers.isNext(packet))
00463 {
00464 return true;
00465 }
00466 else if(sequencenumbers.isFuture(packet))
00467 {
00468
00469 sequencenumbers.bufferPacket(packet);
00470 return false;
00471 }
00472 else
00473 {
00474
00475
00476 delete packet;
00477 return false;
00478 }
00479 }
00480 else
00481 {
00482 if(packet->getSequenceNumber() >= sequencenumbers.getNextIngoingSeqNo(packet))
00483 {
00484 sequencenumbers.incrementNextIngoingSeqNo(packet);
00485 return true;
00486 }
00487
00488 delete packet;
00489 return false;
00490 }
00491 }
00492 else
00493 {
00494 std::cout << "Received packet: " << *packet << " while not connected\n";
00495
00496 if(packet->isReliable())
00497 delete packet;
00498 return false;
00499 }
00500 }
00501
00502 ClientPacket * ClientFramework::deliverPacket( ClientPacket * packet )
00503 {
00504
00505 last_received_pack = msecs(now());
00506
00507
00508 ClientPacket * next_pack = packet->isReliable() ? sequencenumbers.incrementNextIngoingSeqNo(packet) : 0;
00509
00510
00511 if(packet->getType() == 0)
00512 {
00513 handleSystemPacket(packet);
00514 }
00515 else
00516 {
00517 inqueue->insert(packet);
00518 }
00519
00520 return next_pack;
00521 }
00522
00523 void ClientFramework::addPendingAck( ClientPacket * packet, bool haslock)
00524 {
00525 pending_acks->addPendingAck(packet->getSequenceNumber(), packet->getPriority(), 0, avg_rtt.getAvg(), packet, haslock);
00526 }
00527
00528 unsigned int ClientFramework::bufferedPackets( ) const
00529 {
00530 return sequencenumbers.bufferePacketsTotal();
00531 }
00532
00533 void ClientFramework::sentReliablePacket( ClientPacket * packet )
00534 {
00535 last_reliable_sent_packet_time = msecs(now());
00536 }
00537
00538 void ClientFramework::heartBeat( )
00539 {
00540 if(state == CONNECTED || state == HANDSHAKING_INIT)
00541 {
00542 unsigned int curmillis = msecs(now());
00543 if(last_received_pack + connection_timeout_length < curmillis)
00544 {
00545
00546 state = DISCONNECTED;
00547 }
00548 else if(last_reliable_sent_packet_time + (connection_timeout_length/2) < curmillis)
00549 {
00550
00551
00552 enqueuePacket(ClientPacket::createSystemPacket(this, ClientPacket::PACKET_HEARTBEAT));
00553 }
00554 }
00555 }
00556
00557
00558 unsigned long ClientFramework::bytesReceived( ) const
00559 {
00560 return receiver ? receiver->bytesReceived() : 0;
00561 }
00562
00563 unsigned long ClientFramework::packetsReceived( ) const
00564 {
00565 return receiver ? receiver->packetsReceived() : 0;
00566 }
00567
00568 unsigned long ClientFramework::packetsDelivered( ) const
00569 {
00570 return receiver ? receiver->packetsDelivered() : 0;
00571 }
00572
00573 unsigned long ClientFramework::getSkippedUnreliablePackets( ) const
00574 {
00575 return receiver ? receiver->packetsUnreliableSkipped() : 0;
00576 }
00577
00578 unsigned long ClientFramework::getSkippedReliablePackets( ) const
00579 {
00580 return receiver ? receiver->packetsReliableSkipped() : 0;
00581 }
00582
00583 unsigned long ClientFramework::bytesSent( ) const
00584 {
00585 return sender ? sender->bytesSent() : 0;
00586 }
00587
00588 unsigned long ClientFramework::packetsSent( ) const
00589 {
00590 return sender ? sender->packetsSent() : 0;
00591 }
00592
00593 unsigned long ClientFramework::inQueueSize( ) const
00594 {
00595 return inqueue ? inqueue->size() : 0;
00596 }
00597
00598 unsigned long ClientFramework::outQueueSize( ) const
00599 {
00600 return outqueue ? outqueue->size() : 0;
00601 }
00602
00603 unsigned long ClientFramework::getPendingAckCount( ) const
00604 {
00605 return pending_acks ? pending_acks->size() : 0;
00606 }
00607
00608 void ClientFramework::retransmitPacket( FrameworkPacket * packet )
00609 {
00610 outqueue->insert(static_cast<ClientPacket *>(packet));
00611 }
00612
00613
00614
00615 void ClientFramework::disconnectionCleanup( )
00616 {
00617
00618 struct QueueDeleteAll
00619 {
00620 bool operator() ( const ClientPacket * packet ) const
00621 {
00622 return true;
00623 }
00624 bool operator() ( unsigned int userid ) const
00625 {
00626 return true;
00627 }
00628 };
00629
00630
00631 pending_acks->clear();
00632
00633
00634 outqueue->clear();
00635
00636
00637 inqueue->clear();
00638 }
00639
00640 unsigned int ClientFramework::getNextUnreliablePacketSeqNumber( ClientPacket * packet )
00641 {
00642 return sequencenumbers.getNextOutgoingSeqNo(packet);
00643 }
00644
00645 void ClientFramework::addPluginSequenceNumber( ClientPacket * packet )
00646 {
00647 if(packet->getSequenceNumber() == 0)
00648 {
00649 packet->setSequenceNumber(sequencenumbers.getNextOutgoingSeqNo(packet));
00650 }
00651 }
00652