Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

PostOffice.cpp

Go to the documentation of this file.
00001 //postoffice.cpp  - implementation of PostOffice class
00002 
00003 #include "PostOffice.h"
00004 
00005 static int init_charStr = CharStr::init_static();   // perform static initialization in CharStr
00006 
00007 static XMLData recInterrupted("PostOffice::ReceiveInterrupted", XMLData::Nil);
00008 XMLData *PostOffice::ReceiveInterrupted = &recInterrupted;
00009 
00010 static XMLData timout("PostOffice::Timeout", XMLData::Nil);
00011 XMLData *PostOffice::Timeout = &timout;
00012 
00013 int PostOffice::_send(XMLData *data, Address *remote, int nonblocking, 
00014                       secure_method sec)
00015 {    
00016   // 3+1 means => 3="XML", 1="c| |s" (pretty ugly :(, but easy)
00017 
00018     int maxlen = 2048;
00019     int msglen;
00020     uint msglen2=0;    
00021     Buffer *msg;
00022     uchar *b;
00023     char sec_code;
00024     int retval;
00025 
00026     //print the XMLData to buffer
00027     do {
00028       maxlen <<= 1;
00029       b = new uchar[maxlen+1];
00030 
00031       msglen = data->print((char *)b, maxlen);
00032 
00033       //if the buffer was too small try again with double buffer
00034       if (!msglen) 
00035         delete[] b;  
00036     } while (!msglen);  // Iterate until buffer is large enough...
00037 
00038     Crypter::buffer *cryptbuf = 0;
00039 
00040     // see what kind of encoding we do to the message
00041     switch (sec) 
00042     {
00043       case sm_plain:
00044         sec_code = ' ';
00045         break;
00046 
00047       case sm_crypt:
00048         sec_code = 'c';
00049         cryptbuf = this->crypter->public_encrypt(b,strlen((char *)b));
00050   if (cryptbuf==0) {
00051           DBUG_PRINT("err", ("PostOffice: error unable to encrypt message!"));
00052           return 0;
00053         }
00054         delete[] b;
00055   b=cryptbuf->data;
00056   msglen=cryptbuf->size;
00057         break;
00058 
00059       case sm_sign:
00060         sec_code = 's';
00061         cryptbuf = this->crypter->private_sign(b,strlen((char *)b));
00062   if (cryptbuf==0){
00063           DBUG_PRINT("err", ("PostOffice: error unable to sign message!"));
00064           return 0;
00065         }
00066   msglen = 2 + cryptbuf->size + strlen((char *)b);
00067 #ifdef DBUG_POSTOFFICE
00068         DBUG_PRINT("info", ("PostOffice: message %d bytes '%s', "
00069                                          "signature %d bytes '%s'", 
00070                             msglen, b, cryptbuf->size, cryptbuf->data));
00071 #endif
00072         uchar *bb = new uchar[msglen];
00073         msglen2 = cryptbuf->size;
00074         bb[0] = (char)(msglen2 / 256);
00075         bb[1] = (char)(msglen2 % 256);
00076         memcpy (bb + 2, cryptbuf->data, cryptbuf->size);
00077         memcpy (bb + 2 + cryptbuf->size, b, strlen((char *)b));
00078         delete[] b;
00079         delete cryptbuf; cryptbuf = 0;
00080         b = bb;
00081         break;
00082     }
00083 
00084     msg = new Buffer(msglen + 5, NULL);
00085     uchar *msgbuf;
00086     msg->buffer(msglen2, msgbuf);
00087 
00088     memcpy ((char *)msgbuf + 3+1, (char *)b, msglen); 
00089     
00090     //attach XML header
00091     msgbuf[0] = 'X'; 
00092     msgbuf[1] = 'M'; 
00093     msgbuf[2] = 'L';
00094     msgbuf[3] = sec_code;
00095 
00096     msg->setCrt (msglen + 3+1);
00097 
00098     // if cryptbuff is not null, then b is pointing to it
00099     if (cryptbuf!=0)
00100       delete cryptbuf;
00101     else
00102       delete[] b;
00103 
00104     //and send
00105     if (strcmp(data->tag()->str, "Data") == 0)
00106     {
00107 #ifdef DBUG_POSTOFFICE
00108        DBUG_PRINT("info", ("Postoffice: sending message %s (w/ user+pswd)", 
00109                   data->sub("Message")->getAttrib("Type")->str));
00110 #endif
00111        data->reset();
00112     }
00113     else
00114     {
00115 #ifdef DBUG_POSTOFFICE
00116        DBUG_PRINT("info", ("Postoffice: sending message %s", 
00117                   data->getAttrib("Type")->str));
00118 #endif
00119     }
00120      
00121     if (nonblocking) 
00122         if (remote) retval = udp.sendDataN(msg, *remote);
00123         else retval = udp.sendDataN(msg);
00124     else 
00125         if (remote) retval = udp.sendData(msg, *remote);
00126         else retval = udp.sendData(msg);
00127 
00128 #ifdef DBUG_POSTOFFICE
00129     DBUG_PRINT("info", ("PostOffice: sent message (id=%d)", retval));
00130 #endif
00131 
00132     return retval;
00133 }
00134 
00135 int PostOffice::send(XMLData *data, Address *remote, secure_method s)
00136 {
00137     return _send(data, remote, 0, s);
00138 }
00139 
00140 int PostOffice::sendN(XMLData *data, Address *remote, secure_method s)
00141 {
00142     return _send(data, remote, 1, s);
00143 }
00144 
00145 send_state PostOffice::checkN(int messageID)
00146 {
00147     return udp.checkN(messageID);
00148 }
00149 
00150 void PostOffice::deleteN(int messageID)
00151 {
00152     udp.deleteN(messageID);
00153     return;
00154 }
00155 
00156 XMLData *PostOffice::receiveN(Address *remote)
00157 {
00158   //DBUG_PRINT("info", ("PostOffice: receiving non-blocking"))
00159   return _receive(remote, 0, 0);
00160 }
00161 
00162 
00163 XMLData *PostOffice::receive(Address *remote, int timeout)
00164 {
00165   if (timeout == NO_TIMEOUT)
00166   {
00167     //DBUG_PRINT("info", ("PostOffice: receiving blocking"))
00168   }
00169   return _receive(remote, 1, timeout);
00170 }
00171 
00172 
00173 XMLData *PostOffice::_receive(Address *remote, int blocking, int timeout)
00174 {
00175     ItemSender *sender;
00176     ItemDataIn *msg;
00177     XMLData *rv;
00178     time_t t;
00179 
00180     //obtain lock
00181     lock.lock();
00182     time(&t);
00183     
00184     //locate the sender in the list of senders
00185     sender = (ItemSender *)queue.Get_First();
00186     while (sender != NULL)
00187     {
00188         //found -> exit loop
00189         if (sender->addr.equals(*remote)) break;
00190         // check for sender removal timeout
00191         if (sender->lastUsed)
00192             if ((t - sender->lastUsed > SENDER_REMOVE_TIMEOUT) && 
00193                 (sender->data.Count() == 0) &&
00194                 (sender->sem->value() > 0))
00195             {   // if timeout exceeded and queue is empty, remove from the list of senders
00196                 ItemSender *to_remove = sender;
00197                 sender = (ItemSender *)queue.Get_Next();
00198                 queue.Remove(to_remove);
00199                 delete to_remove;
00200                 continue;
00201             }
00202 
00203         sender = (ItemSender *)queue.Get_Next();
00204     }
00205 
00206     if (blocking)
00207     {       
00208         if (sender == NULL) // not found -> have to create new item with this address
00209         {
00210             sender = new ItemSender(*remote);
00211             queue.Add(sender);
00212         }
00213 
00214         //make sure the sender won't be removed before releasing lock
00215         sender->lastUsed = 0;
00216 
00217         do {
00218             //have sender: release lock and wait for a new message
00219             lock.unlock();
00220             if (SEM_TIMEOUT == sender->sem->wait(timeout))
00221                 return PostOffice::Timeout;
00222 
00223             //if PostOffice is being destructed...
00224             if (finish) return XMLData::Nil;
00225 
00226             //if some other thread called stopWait()
00227             if (interrupted) 
00228             {
00229                 interrupted = 0;
00230                 return PostOffice::ReceiveInterrupted;
00231             }
00232 
00233             //message arrived (or was already in the queue): obtain lock again
00234             lock.lock();
00235         }
00236         //repeat waiting for the message, if the arrived message was removed
00237         //from the queue before we locked the lock - it could have been fetched
00238         //by other non-blocking receive meanwhile
00239         while (NULL == (msg = (ItemDataIn *)sender->data.Get_First()));
00240 
00241     }
00242     else    //non-blocking
00243     {
00244         if (sender == NULL)     // not found -> no message in queue
00245         {
00246             lock.unlock();
00247             DBUG_PRINT("warn", ("PostOffice: sender not known, no message in queue"))
00248             return XMLData::Nil;
00249         }
00250 
00251         //sender in the list of senders: is its data list empty?
00252         if (NULL == (msg = (ItemDataIn *)sender->data.Get_First()))
00253         {
00254             lock.unlock();
00255             DBUG_PRINT("warn", ("PostOffice: sender known, no message in queue"))
00256             return XMLData::Nil;
00257         }
00258         
00259         //decrement semaphore
00260         sender->sem->wait();
00261     }
00262 
00263     //there is a new message and we have lock: remove it from the queue
00264     sender->data.Remove(msg);
00265     receiveOrder.Remove(msg);
00266     sem.wait();  // also decrement global postoffice semaphore
00267 
00268     //release lock and return the first message from the list
00269     time(&sender->lastUsed);
00270     
00271     lock.unlock();
00272     rv = msg->data;
00273     delete msg;
00274     //DBUG_PRINT("info", ("PostOffice: new message"))
00275     return rv;
00276 }
00277 
00278 XMLData *PostOffice::receive_any(Address &remote, int timeout)
00279 {
00280   if (timeout == NO_TIMEOUT)
00281   {
00282     //DBUG_PRINT("info", ("PostOffice: receiving blocking"))
00283   }
00284   return _receive_any(remote, 1, timeout);
00285 }
00286 
00287 XMLData *PostOffice::receive_anyN(Address &remote)
00288 {
00289   //DBUG_PRINT("info", ("PostOffice:receiving non-blocking"))
00290   return _receive_any(remote, 0, 0);
00291 }
00292 
00293 XMLData *PostOffice::_receive_any(Address &remote, int blocking, int timeout)
00294 {
00295     ItemSender *sender;
00296     ItemDataIn *msg;
00297     XMLData *rv;
00298     time_t t;
00299 
00300     if (blocking)
00301     {
00302         do {
00303             //wait for message
00304             if (SEM_TIMEOUT == sem.wait(timeout))
00305                 return PostOffice::Timeout;
00306 
00307             //if PostOffice is being destructed...
00308             if (finish) return XMLData::Nil;
00309 
00310             //if some other thread called stopWait()
00311             if (interrupted) 
00312             {
00313                 interrupted = 0;
00314                 return PostOffice::ReceiveInterrupted;
00315             }
00316 
00317             //obtain lock
00318             lock.lock();        
00319     
00320             //fetch the message
00321             msg = (ItemDataIn *)receiveOrder.Get_First();
00322     
00323             if (msg == NULL) lock.unlock();
00324         } while (msg == NULL);
00325     }
00326     else
00327     {
00328         //obtain a lock
00329         lock.lock();
00330 
00331         //see if there is a message
00332         msg = (ItemDataIn *)receiveOrder.Get_First();
00333     
00334         //no message -> return
00335         if (msg == NULL)
00336         {
00337             lock.unlock();
00338             //DBUG_PRINT("info", ("PostOffice: nothing in queue"))
00339             return XMLData::Nil;
00340         }
00341     }
00342 
00343     //remove message from sender queue and decrement its semaphore
00344     msg->sender->data.Remove(msg);
00345     msg->sender->sem->wait();
00346     time(&msg->sender->lastUsed);
00347 
00348     //store the sender's address (heavy object copy assignment)
00349     remote = msg->sender->addr;
00350 
00351     //see if senders are timeouted
00352     time(&t);
00353     sender = (ItemSender *)queue.Get_First();
00354     while (sender != NULL)
00355     {
00356         if (sender->lastUsed)
00357             if ((t - sender->lastUsed > SENDER_REMOVE_TIMEOUT) && 
00358                 (sender->data.Count() == 0) &&
00359                 (sender->sem->value() > 0))
00360             {   // if timeout exceeded and nothing on the queue, 
00361                 //   remove the sender from the list of senders
00362                 ItemSender *to_remove = sender;
00363                 sender = (ItemSender *)queue.Get_Next();
00364                 queue.Remove(to_remove);
00365                 delete to_remove;
00366                 continue;
00367             }
00368         sender = (ItemSender *)queue.Get_Next();
00369     }
00370 
00371     //remove message from global postoffice queue
00372     receiveOrder.Remove(msg);
00373 
00374     //release lock and return the first message from the list
00375     lock.unlock();
00376     rv = msg->data;
00377     delete msg;
00378     //DBUG_PRINT("info", ("PostOffice: receive returns new message"))
00379     return rv;
00380 }
00381 
00382 void PostOffice::stopWait(Address &remote)
00383 {
00384     ItemSender *sender;
00385 
00386     //obtain a lock
00387     lock.lock();
00388 
00389     //nobody waiting?
00390     if (sem.value() > 0) 
00391     {
00392         lock.unlock();
00393         return;
00394     }
00395 
00396     //locate the sender in the list of senders
00397     sender = (ItemSender *)queue.Get_First();
00398     while (sender != NULL)
00399     {
00400         //found -> exit loop
00401         if (sender->addr.equals(remote)) break;
00402         sender = (ItemSender *)queue.Get_Next();
00403     }
00404 
00405     interrupted = 1;
00406     sem.post();
00407 
00408     //if sender found, iterrupt also the specific receive()
00409     if (sender != NULL) sender->sem->post();
00410         
00411     //release lock
00412     lock.unlock();
00413 }
00414 
00415 
00416 THRD_RETURN inbox_entry(THRD_ARG arg)
00417 {
00418     ((PostOffice *)arg)->inbox();
00419     return 0;
00420 }
00421 
00422 void PostOffice::inbox()
00423 {
00424     uchar *b = 0, *bb = 0;
00425     uint len;
00426     Buffer *buf = new Buffer();
00427     Address addr;
00428     
00429     if (catchSignal)
00430       sigThread();
00431 
00432     DBUG_PRINT("info", ("PostOffice: inboxThread started"));
00433 
00434     Crypter::buffer *cryptbuf = 0;
00435 
00436     while (1)
00437     {
00438         cryptbuf = 0;
00439 
00440         //**** receive new UDP packet ****
00441         udp.waitData(buf, addr);
00442 
00443         //is PostOffice being destructed?
00444         if (finish) break;
00445 
00446         buf->buffer (len, b);
00447 
00448         //DBUG_PRINT("info", ("PostOffice: new message detected"));
00449         
00450         //test XML header
00451         if (strncmp((char *)b, "XML", 3)) {
00452           DBUG_PRINT("err", ("PostOffice: not XML message!"));
00453           continue;
00454         }
00455         b[len] = '\0';
00456         
00457         // plain message
00458         if (b[3] == ' ') {
00459           bb = b + 4;
00460     if (this->noSecPlain) {
00461             DBUG_PRINT("err", ("PostOffice: plain message not accepted!"));
00462             continue; 
00463     }
00464         }
00465 
00466         // signed message
00467         else if (b[3] == 's')
00468         { 
00469           if (! crypter) {
00470             DBUG_PRINT("err", ("PostOffice: no crypter for signed message!"));
00471             continue; 
00472           }
00473 
00474           int msglen; uint msglen2;    
00475           msglen = len - 4 - 2;
00476           // size of signature
00477           msglen2 = 256*b[4] + b[5];
00478 
00479           // length of the real message
00480           msglen = msglen - msglen2;
00481           bb = b + 4 + 2 + msglen2;
00482     if ((msglen <=0) || 
00483               (! crypter->public_verify (bb, msglen, b+4+2))) {
00484             DBUG_PRINT("err", ("PostOffice: invalid signature "
00485                                "message %d bytes '%s', signature %d bytes '%s'!",
00486                                msglen + 2 + msglen2, bb, msglen2, b+4+2));
00487             continue; 
00488           }
00489         }
00490 
00491         // crypted message
00492         else if (b[3] == 'c')
00493         {
00494           if (! crypter) {
00495             DBUG_PRINT("err", ("PostOffice: no crypter for crypted message!"));
00496             continue; 
00497           }
00498 
00499           cryptbuf = crypter->private_decrypt (b+4, len-4);
00500           if (cryptbuf == 0) {
00501             DBUG_PRINT("err", ("PostOffice: invalid encryption!"));
00502             continue; 
00503           }
00504           bb = cryptbuf->data;
00505           bb[cryptbuf->size] = 0;
00506         }
00507 
00508         // unknown coding
00509         else {
00510           DBUG_PRINT("err", ("PostOffice: invalid encoding type=%c!", b[3]));
00511           continue;
00512         }
00513 
00514         //header OK, parse XML data
00515         XMLData *data = new XMLData((char *)bb);
00516 
00517         //parse with error?
00518         if (data->tag() == XMLData::ReadError)
00519         {
00520           DBUG_PRINT("err", ("PostOffice: message xml parse error, throwing away\n%s\n%s",
00521                      bb, data->getString()->str))
00522           delete data;
00523         } else {
00524           //parse OK, store the new message into the queue
00525           put_back(addr, data);
00526         }
00527 
00528         // crypted message requires new storage buffer, delete it
00529         if (b[3] == 'c') {
00530           delete cryptbuf;
00531         }
00532 
00533     }  // while(1)
00534     //signal that the thread finished
00535     finish = 2;
00536     delete buf;
00537 
00538     DBUG_PRINT("info", ("PostOffice: inboxThread terminated"));
00539 }
00540 
00541 void PostOffice::put_back(Address &addr, XMLData *data)
00542 {
00543         lock.lock();
00544 
00545         //locate sender 
00546         ItemSender *sender = (ItemSender *)queue.Get_First();
00547         while (sender != NULL)
00548         {
00549             if (sender->addr.equals(addr)) break;
00550             sender = (ItemSender *)queue.Get_Next();
00551         }
00552 
00553         if (sender == NULL) // not found -> have to create new item with this address
00554         {
00555             sender = new ItemSender(addr);
00556             queue.Add(sender);
00557         }
00558 
00559         //add message to the queue
00560         ItemDataIn *msg = new ItemDataIn(data, sender);
00561         sender->data.Add(msg);
00562         receiveOrder.Add(msg);
00563         //DBUG_PRINT("info", ("PostOffice: new message in the queue"))
00564         //debug
00565         if (strcmp(msg->data->tag()->str, "Data") == 0)
00566         {
00567 #ifdef DBUG_POSTOFFICE
00568             DBUG_PRINT("info", ("PostOffice: message %s (w/ user+pswd)", 
00569               msg->data->sub("Message")->getAttrib("Type")->str));
00570 #endif
00571             msg->data->reset();
00572         }
00573         else 
00574         {
00575 #ifdef DBUG_POSTOFFICE
00576             DBUG_PRINT("info", ("PostOffice: message %s", 
00577               msg->data->getAttrib("Type")->str));
00578 #endif
00579         }
00580         
00581         //post (increment) semaphore (i.e. wake up waiting thread if any)
00582         sender->sem->post();
00583         sem.post();
00584 
00585         //release lock
00586         lock.unlock();
00587 }
00588 
00589 
00590 void PostOffice::purgeIn (Address &remote)
00591 {
00592   udp.purgeIn (remote);
00593 }
00594 
00595 void PostOffice::setCrypter(Crypter *crypter){
00596   this->crypter = crypter;
00597 }
00598 

Generated on Mon Nov 25 12:46:30 2002 for qadpz by doxygen1.2.18