Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

UDPConfirm.cpp

Go to the documentation of this file.
00001 /*
00002   UDPConfirm - reliable udp communication functions
00003  
00004   Module:    $RCSfile: UDPConfirm.cpp,v $
00005   Date:      $Date: 2002/08/26 23:29:52 $
00006   Version:   $Revision: 1.47 $
00007   ID:        $Id: UDPConfirm.cpp,v 1.47 2002/08/26 23:29:52 zoran Exp $
00008   Authors:   Zoran Constantinescu <zoranc@acm.org>
00009 */
00010 
00011 
00012 #include <string.h>
00013 
00014 #include "defs.h"
00015 #include "UDPConfirm.h"
00016 
00017 
00018 THRD_RETURN outbox_entry (THRD_ARG arg)
00019 {
00020   ((UDPConfirm *)arg)->check ();
00021 
00022   return 0;
00023 }
00024 
00025 
00026 UDPConfirm::UDPConfirm (int catchSignal)
00027 {
00028   idCrt   = 0;
00029   sockn   = 0;
00030   sockfd  = 0;
00031   sock    = 0;
00032   sockmax = 0;
00033   this->catchSignal = catchSignal;
00034 }
00035 
00036 
00037 UDPConfirm::~UDPConfirm ()
00038 {
00039   closeSocket ();
00040 }
00041 
00042 
00043 int UDPConfirm::createSocket (int port, int n, int flag)
00044 {
00045   sockmax = 0;
00046   sockn   = n;
00047   sockfd  = new int[n];
00048   sock    = new UDPSocket[n];
00049 
00050   // create a number of 'n' communication sockets
00051   for (int i = 0; i < n; i++) {
00052     // only the first port is according to 'flag', the rest are iterated
00053     if (i > 0)
00054       flag = UDP_PORT_ITERATE;
00055 
00056     // create the sockets
00057     sockfd[i] = sock[i].createSocket (port, flag);
00058     if (sockfd[i] < 0) {
00059       DBUG_PRINT("err", ("UDPConfirm: error creating socket"));
00060       return -1;
00061     }
00062     if (sockfd[i] > sockmax)
00063       sockmax = sockfd[i];
00064   }
00065 
00066   // the thread should stop when finish=1
00067   finish = 0;
00068   //start sending thread
00069   if (startThread (&sendThread, outbox_entry, this)) {
00070     DBUG_PRINT("err", ("UDPConfirm: error starting outbox thread"));
00071     return -1;
00072   }
00073 
00074   return 0;
00075 }
00076 
00077 
00078 void UDPConfirm::closeSocket ()
00079 {
00080   // stoping (re)sending thread
00081   finish = 1;
00082   while (finish != 0)
00083     SLEEP_MSEC(100);
00084 
00085   // destroying all communication sockets
00086   if (sock) {
00087     for (int i = 0; i < sockn; i++)
00088       sock[i].closeSocket ();
00089     delete[] sock;
00090     sock = 0;
00091   }
00092   if (sockfd) {
00093     delete[] sockfd; 
00094     sockfd = 0;
00095   }
00096   sockn = 0;
00097 }
00098 
00099 
00100 void UDPConfirm::setRemote (Address &remote)
00101 {
00102   for (int i = 0; i < sockn; i++)
00103     sock[i].setRemote (remote.getHost (), remote.getPort ());
00104 }
00105  
00106 
00107 void UDPConfirm::getLocalPort (uint *ports)
00108 {
00109   for (int i = 0; i < sockn; i++)
00110     ports[i] = sock[i].getLocalPort ();
00111 }
00112 
00113 
00114 int UDPConfirm::getLocalPort ()
00115 {
00116   return sock[0].getLocalPort ();
00117 }
00118 
00119 
00120 uint UDPConfirm::getLocalAddr ()
00121 {
00122   return sock[0].getLocalAddr ();
00123 }
00124 
00125 
00126 // non-blocking send to default remote
00127 uint UDPConfirm::sendDataN (Buffer *data)
00128 {
00129   uint host;
00130   int  port;
00131 
00132   // get default remote address
00133   sock[0].getRemote (host, port);
00134   Address remote (host, port);
00135 
00136   return sendDataN (data, remote);
00137 }
00138 
00139 
00140 // non-blocking send
00141 // returns the udp id of the sent message
00142 uint UDPConfirm::sendDataN (Buffer *data, Address &remote)
00143 {
00144   ItemDataOut *item;
00145 
00146 #ifdef DBUG_UDPCONFIRM
00147   DBUG_PRINT("info", ("UDPConfirm: sendDataN"));
00148 #endif
00149 
00150   item = sendIt (data, remote);
00151 
00152   return item->id;
00153 }
00154 
00155 
00156 // status of message for a non-blocking send
00157 send_state UDPConfirm::checkN (uint msgId)
00158 {
00159   ItemDataOut *item;
00160   send_state stat = send_invalid;
00161 
00162   if (lOut.Count () == 0)
00163     return stat;
00164 
00165   lockO.lock ();
00166   {
00167     lOut.Start_Get();
00168     while ((item = (ItemDataOut *) lOut.Get_Next ()))
00169       if (item->id == msgId)
00170       {
00171         stat = item->stat;
00172         break;
00173       }
00174   }
00175   lockO.unlock ();
00176 
00177   if ((stat == send_buffered_new) || 
00178       (stat == send_buffered_sent))
00179     stat = send_buffered;
00180 
00181   return stat;
00182 }
00183 
00184 
00185 // delete the message with this id (for non-blocking send)
00186 void UDPConfirm::deleteN (uint msgId)
00187 {
00188   ItemDataOut *item;
00189 
00190   if (lOut.Count () == 0)
00191     return;
00192 
00193   lockO.lock ();
00194   {
00195     lOut.Start_Get();
00196     while ((item = (ItemDataOut *) lOut.Get_Next ()))
00197       if (item->id == msgId)
00198       {
00199         deleteItem (item);
00200         break;
00201       }
00202   }
00203   lockO.unlock ();
00204 
00205   return;
00206 }
00207 
00208 
00209 // blocking send
00210 int UDPConfirm::sendData (Buffer *data)
00211 {
00212   uint host;
00213   int  port;
00214 
00215   // get default remote address
00216   sock[0].getRemote (host, port);
00217   Address remote (host, port);
00218 
00219   return sendData (data, remote);
00220 }
00221 
00222 
00223 // blocking send
00224 int UDPConfirm::sendData (Buffer *data, Address &remote)
00225 {
00226   int err;
00227   ItemDataOut *item;
00228 
00229 #ifdef DBUG_UDPCONFIRM
00230   DBUG_PRINT("info", ("UDPConfirm: sendData"));
00231 #endif
00232 
00233   item = sendIt (data, remote);
00234 
00235   // wait on semaphore until the confirmation or timeout
00236   item->block.wait ();
00237 
00238   if (item->stat == send_confirm)
00239     err = item->id;  // sent with confirm
00240   else 
00241     err = 0;  // something went wrong
00242 
00243 /* TODO -> all outbox items should be deleted!!!!
00244            including also sendDataN()
00245 */
00246   lockO.lock ();
00247   {
00248     deleteItem (item);
00249   }
00250   lockO.unlock ();
00251 
00252   return err;
00253 }
00254 
00255 
00256 // creates an entry in the list and sends data
00257 ItemDataOut* UDPConfirm::sendIt (Buffer *data, Address &remote)
00258 {
00259   char _host[26];
00260   ItemDataOut *item;
00261   
00262   // create a new out-item
00263   item = new ItemDataOut (data, remote, nextId ());
00264 
00265   item->addr.getIP (_host);
00266 
00267 #ifdef DBUG_UDPCONFIRM
00268   DBUG_PRINT("info", ("UDPConfirm: send data id=%08x to %s (1st)", item->id, _host));
00269 #endif
00270 
00271   // ...lock the list (so the sending thread won't conflict)
00272   lockO.lock ();
00273   {
00274     // insert into list
00275     insertItem (item);
00276     // send out message
00277     sendItem (item);
00278   }
00279   lockO.unlock ();
00280 
00281   return item;
00282 }
00283 
00284 
00285 // returns a (pseudo)random port index for communication
00286 // (a number between 0 and sockn-1)
00287 int UDPConfirm::getRandPort ()
00288 {
00289   static int socki = 0;
00290 
00291   socki++;
00292   if (socki == sockn)
00293     socki = 0;
00294 
00295   return socki;
00296 }
00297 
00298 
00299 int UDPConfirm::sendItem (ItemDataOut *item)
00300 {
00301   int err;
00302 
00303   item->data->setId (item->id);
00304   err = sock[getRandPort ()].
00305           sendData (item->data, 
00306                     item->addr.getHost (),
00307                     item->addr.getPort ());
00308   if (! err)
00309     item->stat = send_buffered_sent;
00310   else
00311     item->stat = send_err ;
00312 
00313   item->timesent.Now ();
00314   item->countsent--;
00315 
00316   return err; 
00317 }
00318 
00319 
00320 int UDPConfirm::sendConfirm (uint id, Address &remote)
00321 {
00322   int err;
00323 
00324   Buffer data (3, 0);
00325   data.setId (id);
00326   data << (uchar)'O';
00327   data << (uchar)'K';
00328 
00329 #ifdef DBUG_UDPCONFIRM
00330   DBUG_PRINT("info", ("UDPConfirm: send conf id=%08x", id));
00331 #endif
00332 
00333   // ...lock for sending
00334   lockO.lock ();
00335   {
00336     err = sock[getRandPort ()].
00337             sendData (&data, 
00338                       remote.getHost (),
00339                       remote.getPort ());
00340   }
00341   lockO.unlock ();
00342   
00343   return err;
00344 }
00345 
00346 
00347 void UDPConfirm::sendStop ()
00348 {
00349 //  int err;
00350 
00351   Buffer data (5, 0);
00352   data.setId (0);
00353   data << (uchar)'S';
00354   data << (uchar)'T';
00355   data << (uchar)'O';
00356   data << (uchar)'P';
00357 
00358 #ifdef DBUG_UDPCONFIRM
00359   DBUG_PRINT("info", ("UDPConfirm: send self-stop"));
00360 #endif
00361 
00362   // ...lock for sending
00363   lockO.lock ();
00364   {
00365     sock[0].
00366       sendData (&data, 
00367                 getLocalAddr (),
00368                 getLocalPort ());
00369   }
00370   lockO.unlock ();
00371 
00372   return;
00373 }
00374 
00375 
00376 // blocking receive
00377 int UDPConfirm::waitData (Buffer *data, Address &remote)
00378 {
00379   uint host;
00380   int  port;
00381   int  err = 0;
00382   uint id;
00383   uchar *tmp;
00384   fd_set rfds;
00385   static int rfd = 0;
00386 
00387   if (sockn == 0)
00388     err = -1;
00389   else while (1) {
00390 
00391     FD_ZERO (&rfds);
00392     for (int i = 0; i < sockn; i++)
00393       FD_SET (sockfd[i], &rfds);
00394 
00395     // wait forever on receive from the sockets
00396     err = select (sockmax + 1, &rfds, 0, 0, 0);
00397 
00398     if (! err)
00399       continue;
00400 
00401     // see which socket has the data
00402     if (FD_ISSET(0, &rfds))
00403       // ...sock[0] has priority
00404       rfd = 0;
00405     else
00406       // ...cycle through all sockfd-s, allways starting with
00407       // the socket next to the previously receiving socket
00408       for (int j = 0; j < sockn; j++)
00409         if (FD_ISSET((rfd + j + 1) % sockn, &rfds))
00410           { rfd = (rfd + j + 1) % sockn; break; }
00411 
00412     // receive the data from the selected socket
00413     err = sock[rfd].waitData (data, host, port);
00414     if (err)
00415       continue;
00416 
00417     remote.setHost (host);
00418     remote.setPort (port);
00419 
00420     tmp = (uchar *)*data;
00421     id = data->getId ();
00422 
00423     // it is a fake msg to stop the recving thread
00424     if (! memcmp ((char *)(tmp), "STOP", 4))
00425     {
00426 #ifdef DBUG_UDPCONFIRM
00427       DBUG_PRINT("info", ("UDPConfirm: recv self-stop"));
00428 #endif
00429       break;
00430     }
00431 
00432     // it is a confirmation
00433     if (! memcmp ((char *)(tmp), "OK", 2)) 
00434     {
00435 #ifdef DBUG_UDPCONFIRM
00436       DBUG_PRINT("info", ("UDPConfirm: recv conf id=%08x", id));
00437 #endif
00438 
00439       // need to unblock any sending threads waiting
00440       ItemDataOut *item = isItem (id, remote);
00441       if (item) {
00442         item->stat = send_confirm;
00443         item->block.post ();
00444       }
00445       continue;
00446     }
00447   
00448     // it is a message...
00449     //if (memcmp ((char *)(tmp), "OK", 2)) 
00450     {
00451 #ifdef DBUG_UDPCONFIRM
00452       DBUG_PRINT("info", ("UDPConfirm: recv data id=%08x", id));
00453 #endif
00454 
00455       // ...first confirm it
00456       sendConfirm (id, remote);
00457 
00458       // ...if not duplicate...
00459       if (! isConfirm (id, remote)) {
00460         // ...add entry and return
00461         insertConfirm (id, remote);
00462         break;
00463       } else {
00464 #ifdef DBUG_UDPCONFIRM
00465         DBUG_PRINT("warn", ("UDPConfirm: already received id=%08x", id));
00466 #endif
00467       }
00468     }
00469   }  // while()
00470 
00471   return err;
00472 }
00473 
00474 
00475 uint UDPConfirm::nextId ()
00476 {
00477   idCrt++;
00478 
00479   return idCrt;
00480 }
00481 
00482 
00483 int UDPConfirm::check ()
00484 {
00485   DBUG_PRINT("info", ("UDPConfirm: outboxThread started"));
00486 
00487   if (catchSignal)
00488     sigThread();
00489 
00490   // infinite loop
00491   while (1) {
00492     // check timed-out messages
00493     checkList ();
00494 
00495     // see if need to stop the thread
00496     if (finish == 1) {
00497       finish = 0;
00498       //exitThread ();
00499       break;
00500     }
00501 
00502     // wait a little bit (0.1 sec)
00503     SLEEP_MSEC(100);
00504   }  // while(1)
00505 
00506   DBUG_PRINT("info", ("UDPConfirm: outboxThread terminated"));
00507 
00508   return 0;
00509 }
00510 
00511 
00512 //TODO: we need some sleeping in this procedure!! it can eat CPU!
00513 //      
00514 int UDPConfirm::checkList ()
00515 {
00516   char _host[26];
00517   ItemDataOut *item;
00518   ItemDataInR *itemr;
00519 
00520   lockO.lock ();
00521   if (lOut.Count () != 0) 
00522   {
00523     ZTime now; now.Now ();
00524     {
00525       // go through all the items in the list...
00526       lOut.Start_Get();
00527       while ((item = (ItemDataOut *) lOut.Get_Next ())) 
00528       {
00529         // ...check if resent enough
00530         if (item->countsent == 0) {
00531           item->stat = send_timeout;   //TODO was send_err!!
00532           item->block.post ();
00533           item->countsent = -1;
00534           item->addr.getIP (_host);
00535 #ifdef DBUG_UDPCONFIRM
00536           DBUG_PRINT("info", ("UDPConfirm: send data id=%08x to %s (timeout)", item->id, _host));
00537 #endif
00538           continue;
00539         }
00540 
00541         // ...check the timeouts
00542         if ((item->stat != send_confirm) &&
00543             (item->countsent > 0))
00544           if ((now - item->timesent) > item->timeout) {
00545             item->addr.getIP (_host);
00546 #ifdef DBUG_UDPCONFIRM
00547             DBUG_PRINT("info", ("UDPConfirm: send data id=%08x to %s (re)", item->id, _host));
00548 #endif
00549             sendItem (item);
00550           }
00551       }  // while()
00552     }
00553   } // if()
00554   lockO.unlock ();
00555 
00556   lockI.lock ();
00557   if (lIn.Count () != 0) 
00558   {
00559     ZTime now; now.Now ();
00560     {
00561       lIn.Start_Get();
00562       while ((itemr = (ItemDataInR *) lIn.Get_Next ())) 
00563       {
00564         if ((now - itemr->timerecv) > itemr->timeout) {
00565 #ifdef DBUG_UDPCONFIRM
00566           DBUG_PRINT("info", ("UDPConfirm: recv data id=%08x (del)", itemr->id));
00567 #endif
00568           lIn.Remove (itemr);
00569           delete itemr;
00570         }
00571       }  // while()
00572     }
00573   } // if()
00574   lockI.unlock ();
00575 
00576   return 0;
00577 }
00578 
00579 
00580 // destroys all entries from the list
00581 int UDPConfirm::deleteList ()
00582 {
00583   lockO.lock ();
00584   {
00585     lOut.Destroy ();
00586   }
00587   lockO.unlock ();
00588   return 0;
00589 }
00590 
00591 
00592 // new entry in the list (! need explicit lock!)
00593 int UDPConfirm::insertItem (ItemDataOut *item)
00594 {
00595   lOut.Add (item);
00596 
00597   return 0;
00598 }
00599 
00600 
00601 // delets an entry from the list  
00602 int UDPConfirm::deleteItem (ItemDataOut *item)
00603 {
00604   lOut.Remove (item);
00605   delete item;
00606 
00607   return 0;
00608 }
00609 
00610 
00611 int UDPConfirm::deleteId (uint id)
00612 {
00613   ItemDataOut *item;
00614 
00615   if (lOut.Count () == 0)
00616     return 0;
00617  
00618   lockO.lock ();
00619   {
00620     lOut.Start_Get();
00621     while ((item = (ItemDataOut *) lOut.Get_Next ()))
00622       if (item->id == id)
00623       {
00624         lOut.Remove (item);
00625         delete item;
00626       }
00627   }
00628   lockO.unlock ();
00629 
00630   return 0;
00631 }
00632 
00633 
00634 ItemDataOut* UDPConfirm::isItem (uint id, Address &remote)
00635 {
00636   ItemDataOut *item = 0;
00637 
00638   if (lOut.Count () == 0)
00639     return 0;
00640  
00641   lockO.lock ();
00642   {
00643     lOut.Start_Get();
00644     while ((item = (ItemDataOut *) lOut.Get_Next ()))
00645       if ((item->id == id) && (item->addr == remote))
00646         break;
00647   }
00648   lockO.unlock ();
00649 
00650   return item;
00651 }
00652 
00653 
00654 int UDPConfirm::isConfirm (uint id, Address &remote)
00655 {
00656   ItemDataInR *item;
00657   int res = 0;
00658 
00659   if (lIn.Count () == 0)
00660     return res;
00661  
00662   lockI.lock ();
00663   {
00664     lIn.Start_Get();
00665     while ((item = (ItemDataInR *) lIn.Get_Next ()))
00666       if ((item->id == id) && (item->addr == remote)) 
00667       { // got a data duplicate
00668         res = 1; 
00669         item->timerecv.Now ();
00670         break;
00671       }
00672   }
00673   lockI.unlock ();
00674 
00675   return res;
00676 }
00677 
00678 
00679 int UDPConfirm::insertConfirm (uint id, Address &remote)
00680 {
00681   ItemDataInR *item;
00682 
00683   lockI.lock ();
00684   {
00685     item = new ItemDataInR (remote, id);
00686     item->timerecv.Now ();
00687     lIn.Add (item);
00688   }
00689   lockI.unlock ();
00690 
00691   return 0;
00692 }
00693 
00694 
00695 void UDPConfirm::stopWait()
00696 {
00697   // need for unblocking receiving threads when finishing
00698   // ... just send a very short message to the udp port to unlock
00699   sendStop ();
00700 }
00701 
00702 
00703 // remove all items from the incomming list for the specified remote
00704 void UDPConfirm::purgeIn (Address &remote)
00705 {
00706   ItemDataInR *item;
00707 
00708   char _host[26];
00709   remote.getIP (_host);
00710   DBUG_PRINT("info", ("UDPConfirm: purging all listIn for %s", _host));
00711 
00712   lockI.lock ();
00713   {
00714     lIn.Start_Get();
00715     while ((item = (ItemDataInR *) lIn.Get_Next ()))
00716       if (item->addr == remote)
00717       {
00718         lIn.Remove (item);
00719         delete item;
00720       }
00721   }
00722   lockI.unlock ();
00723 }
00724 

Generated on Mon Nov 25 12:46:32 2002 for qadpz by doxygen1.2.18