Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

PostOffice.h

Go to the documentation of this file.
00001 // postoffice.h   - declaration of PostOffice class - general interface
00002 //                  for sending and receiving crypted XML messages between 
00003 //                  computers in a TCP/IP network. Received messages are
00004 //                  queued. The transmission is implemented using UDP protocol
00005 //                  with single confirmation and thus is not 100% reliable
00006 //                  on network connections with congestion.
00007 
00008 #include <string.h>
00009 #include <time.h>
00010 
00011 #ifdef _WIN32
00012 #include <windows.h>
00013 #else
00014 #include <unistd.h>
00015 #endif
00016 
00017 #include "defs.h"
00018 #include "Object.h"
00019 #include "Address.h"
00020 #include "xmldata.h"
00021 #include "List.h"
00022 #include "Syncro.h"
00023 #include "UDPConfirm.h"
00024 #include "crypter/crypter.h"
00025 
00026 
00027 #define SENDER_REMOVE_TIMEOUT   36000        // in seconds
00028 #define DEFAULT_POFFICE_PORT    9171        // default post office listening port
00029 
00030 #define NO_TIMEOUT    0   //indicates wait forever
00031 
00032 #define DONT_DELETE_DATA    1   //pass this to destructor of ItemDataIn to keep the XMLData
00033 #define DELETE_DATA         0   //otherwise pass this
00034 
00035 enum secure_method { sm_plain, sm_crypt, sm_sign };
00036 
00037 class ItemSender;
00038 
00039 //items in the lists stored in individual ItemSenders, in fact
00040 //it's only "Object" encapsulation of XMLData.
00041 class COM_API ItemDataIn: public Object {
00042  public:
00043   XMLData   *data;      // the data received
00044   ItemSender *sender;   // in which ItemSender it is stored
00045 
00046   //constructor
00047   ItemDataIn(XMLData *xmld, ItemSender *s)
00048   {
00049       data = xmld;
00050       sender = s;
00051   }
00052   
00053 };
00054 
00055 //this is an item in the list of addresses of senders where the messages
00056 //are arriving from. It associates the sender address with a semaphore.
00057 //the blocking receive is waiting on this semaphore until a message arrives
00058 //and the receiving thread wakes it up. Each time a message is stored
00059 //on a queue, the semaphore is posted. Each time it is removed from a queue,
00060 //the semaphore is waited on. Non-blocking receive first checks whether
00061 //the queue is empty before waiting on a semaphore
00062 class COM_API ItemSender: public Object {
00063 
00064 public:
00065     Address addr;   // sender address
00066     Semaphore *sem; // number of messages in the queue
00067     List data;      // list of ItemDataIn objects received for thie address
00068     time_t lastUsed;    // when was this sender last used
00069 
00070     //constructs ItemSender with empty list and semaphore initialized to 0
00071     ItemSender(Address &remote) : addr(remote)
00072     {
00073         sem = new Semaphore(0);
00074         time(&lastUsed);
00075     };
00076 
00077     //destructor
00078     ~ItemSender()
00079     {
00080         int i;
00081         //we have to delete all XMLData objects that are stored in the List Data
00082         //(it is not done in destructor of ItemDataIn, because often we need
00083         // to delete it without deleting the actual XMLData message - each time
00084         // a message is returned from PostOffice)
00085         ItemDataIn *item = ((ItemDataIn *)data.Get_First());
00086         for (i = data.Count(); i > 0; i--)
00087         {
00088             delete item->data;
00089             item = ((ItemDataIn *)data.Get_Next());
00090         }
00091         //and the semaphore object
00092         delete sem;
00093     }
00094 };
00095 
00096 
00097 //receiving thread entry point (need a WINAPI type call -> should not be method,
00098 //it just calls the method inbox() of the passed PostOffice object)
00099 #ifdef _WIN32
00100     DWORD WINAPI inbox_entry(LPVOID arg); 
00101 #else
00102     void *inbox_entry(void *arg);
00103 #endif
00104 
00105 class COM_API PostOffice {
00106 
00107 private:
00108     UDPConfirm  udp;         // UDPConfirm that is used for sending/receiving data
00109     List        queue;       // list of Senders
00110     List        receiveOrder; // list of messages from all receive queues in the order they arrived
00111     Semaphore   sem;         // general postoffice semaphore - counts all received messages
00112     Lock        lock;        // unique access lock
00113     thread_handle recThread; // receiving thread
00114     int         finish;      // flag indicating to receiving thread to terminate (when set to 1)
00115     int         localPort;   // on which port is the PostOffice accepting UDP packets
00116     int         interrupted; // set to 1 by stopWait(), otherwise should be kept 0
00117     Crypter    *crypter;     // crypter used for decoding/verifying received messages 
00118                              // and for encoding/signing sent messages
00119     int         noSecPlain;  // set to 1 if plain messages are not accepted
00120     int         catchSignal; // whether we catch SIGSEGV (useful for slave)
00121     int         init_ok;     // set to 0, if the post office construction failed, otherwise set to 1,
00122                              // use initialized() to get its value
00123 
00124     // thread entry point
00125     void inbox();   
00126     
00127     //both blocking and non-blocking calls to send are translated to this call
00128     int _send(XMLData *data, Address *remote, int nonblocking,
00129              secure_method s = sm_plain);
00130 
00131     //both blocking and non-blocking calls to receive are translated to this call
00132     XMLData *_receive(Address *remote, int blocking, int timeout);
00133 
00134     //both blocking and non-blocking calls to receive_any are translated to this call
00135     XMLData *_receive_any(Address &remote, int blocking, int timeout);
00136 
00137     //receiving thread entry point should be friend so that it can call inbox()
00138 #ifdef _WIN32
00139     friend DWORD WINAPI inbox_entry(LPVOID arg); 
00140 #else
00141     friend void *inbox_entry(void *arg);
00142 #endif
00143 
00144 public:
00145 
00146     //inidicates that some other thread called stopWait
00147     static XMLData *ReceiveInterrupted;
00148     
00149     //indicates timeout before any other message have been received
00150     static XMLData *Timeout;
00151 
00152     //initialization of sockets (neccessary to call on WIN32 platform before
00153     //constructing the PostOffice object)
00154     static void init_sockets()
00155     {
00156         // ... winsock init
00157 #ifdef _WIN32
00158         int err;
00159         WSADATA lpWSAData;
00160         if ((err = WSAStartup (0x0101, &lpWSAData)) != 0)
00161         {
00162             DBUG_PRINT("err", ("unable to load WSOCK32: err=%d", GetLastError ()));
00163             exit (1);
00164         }
00165 #endif
00166         Address::init_thishost();
00167     }
00168 
00169     //deallocates the socket library. Must be called before application exits,
00170     //after the PostOffice is destructed and the application doesn't use sockets anymore.
00171     static void release_sockets()
00172     {
00173 #ifdef _WIN32
00174         if (SOCKET_ERROR == WSACleanup())
00175         {
00176             DBUG_PRINT("err", ("WSAClenup() error: err=%d", WSAGetLastError()));
00177         }
00178 #endif
00179     }
00180 
00181     // construct PostOffice, incomming packets will be expected
00182     // at specified port, 0 means use default port
00183     // n specifies how many ports should be used for sending/receiving
00184     // the ports are used in a random manner. The application can
00185     // get the port numbers by calling getlocalPort()
00186     //   catchSignal  - want to catch SEGV type signals
00187     //   noSecPlain   - don't want to accept plain messages
00188     PostOffice(int port=0, int n = 1, int incremental = UDP_PORT_ITERATE,
00189                Crypter *crypter = 0, int noSecPlain = 1, int catchSignal = 0) 
00190       : udp (catchSignal)
00191     {
00192         init_ok = 1;
00193 
00194   if (! crypter)
00195     this->noSecPlain = 0;
00196   else
00197     this->noSecPlain = noSecPlain;
00198 
00199         this->crypter = crypter;
00200 
00201         interrupted = 0;
00202         if (port == 0) port = DEFAULT_POFFICE_PORT;
00203         this->catchSignal = catchSignal;
00204         if (udp.createSocket(port, n, incremental))
00205         {
00206 #ifdef DBUG_PRINT
00207             DBUG_PRINT("err", ("PostOffice: error creating socket"));
00208 #else
00209             cout << "PostOffice: error creating socket\n";
00210 #endif
00211             init_ok = 0;
00212             return;
00213         }
00214         //start receiving thread
00215         finish = 0;
00216         if (startThread(&recThread, inbox_entry, this))
00217         {
00218             cout << "PostOffice: error creating thread\n";
00219             init_ok = 0;
00220         }
00221     };
00222 
00223     // returns 1, if PostOffice was constructed properly, otherwise returns 0
00224     int initialized() { return init_ok; }
00225 
00226     //destructor 
00227     ~PostOffice() 
00228     {
00229         // terminate the receiving thread
00230         DBUG_PRINT("info", ("PostOffice::~PostOffice"));
00231         finish = 1;
00232         udp.stopWait();
00233         DBUG_PRINT("info", ("PostOffice: Waiting for inbox() thread to finish"));
00234         while (finish != 2) SLEEP_MSEC(10);
00235         DBUG_PRINT("info", ("PostOffice: inbox() thread finished"));
00236         //the messages stored in receiveOrder are not deallocated,
00237         //since they are all pointed to also from individual senders list,
00238         //which will deallocate them on their destruction
00239         receiveOrder.Release();
00240         DBUG_PRINT("info", ("PostOffice: leaving ~PostOffice"));
00241     };
00242 
00243     //returns the port numbers on which the PostOffice is accepting the data
00244     //it is either the default port number
00245     uint getLocalPort()
00246     {
00247         return udp.getLocalPort();
00248     }
00249 
00250     void getLocalPort(uint *ports)
00251     {
00252         udp.getLocalPort(ports);
00253     }
00254 
00255     //sends XMLData to a remote, returns 1 only after the remote
00256     //successfully received and confirmed the data, returns 0, if
00257     //there was an error while sending.
00258     //(there is a small chance in case of network congestion
00259     // that the peer received the data but the confirmation
00260     // did not come back)
00261     //if remote is not specified, the default remote is used
00262     int send(XMLData *data, Address *remote = NULL, 
00263              secure_method s = sm_plain);
00264 
00265     //nonblocking version of send: returns a message id which should
00266     //be passed to checkN(messageID) to check the status of message.
00267     //application must call checkN() until it returns send_confirm,
00268     //send_err, or send_timeout.
00269     //returns immediatelly and the message is queued
00270     //for delivery in outgoing messages queue. It will be sent
00271     //with requiring the confirmation from the adressee, but the
00272     //application can continue execution meanwhile.
00273     int sendN(XMLData *data, Address *remote = NULL,
00274              secure_method s = sm_plain);
00275 
00276     //checks the state of the message that was sent using sendN()
00277     //returns send_confirm, after the message was successfully
00278     // sent and confirmed, send_buffered, if the message is still
00279     // waiting for confirmation, send_err, if the send failed or
00280     // send_invalid if the messageID is incorrect, and send_timeout 
00281     // if the confirmation was not received within a timeout period
00282     send_state checkN(int messageID);
00283 
00284     //deletes a message that was sent using sendN()
00285     // that's because the UDPConfirm doesn't know when to do it!
00286     // and more checkN() calls can be made, even if the msg was
00287     // sent ok
00288     void deleteN (int messageID);
00289 
00290     //receives the next XMLData received from a remote. Returns
00291     //newly constructed object. If there is no data in the queue, 
00292     //waits until some data arrives.
00293     //if remote is not specified, the default remote is used.
00294     //Application can call stopWait(remote) with the same address
00295     //to interrupt waiting. In that case receive returns
00296     //PostOffice::ReceiveInterrupted. Be aware that if several of
00297     //your threads are waiting for a message from the same address,
00298     //only one of them is awaken.
00299     //if timeout milliseconds (approx.) pass without receiving
00300     // any message, PostOffice::Timeout is returned
00301     XMLData *receive(Address *remote = NULL, int timeout = NO_TIMEOUT);
00302 
00303     //non-blocking version of receive - returns XMLData::Nil if there
00304     //is no XMLData from remote, in the queue. Otherwise returns a new
00305     //XMLData with the data from a remote which are on top of the queue.
00306     //if remote is not specified, the default remote is used
00307     XMLData *receiveN(Address *remote = NULL);
00308 
00309     //receives the next XMLData object from any remote and stores
00310     //the source address to *remote. If there is no data in the queue,
00311     //waits until some data arrives.
00312     //Application can call stopWait(remote) with the same address
00313     //to interrupt waiting. In that case receive returns
00314     //XMLData::ReceiveInterrupted. Be aware that if several of
00315     //your threads are waiting for a message from the same address,
00316     //only one of them is awaken.    
00317     //if timeout milliseconds (approx.) pass without receiving
00318     // any message, PostOffice::Timeout is returned
00319     XMLData *receive_any(Address &remote, int timeout = NO_TIMEOUT);
00320 
00321     //non-blocking version of receive_any - returns XMLData::Nil, if
00322     //there is no XMLData in the queue. Otherwise returns the first
00323     //XMLData object from the top of the queue. Please note that
00324     //the order of messages doesn't have to be the same
00325     XMLData *receive_anyN(Address &remote);
00326 
00327     //sets the notoriously used remote address.
00328     void setRemote (Address &remote)
00329     {
00330          udp.setRemote(remote);
00331     }
00332 
00333     //puts back the message into inbox (at the end of the queue)
00334     void put_back(Address &addr, XMLData *data);
00335 
00336     //if some other thread of the application is waiting to receive some 
00337     //message from remote, this method will interrupt the block of the
00338     //waiting thread, which will continue out from receive() or receive_any()
00339     //by returning PostOffice::ReceiveInterrupted.
00340     void stopWait(Address &remote);
00341 
00342   void purgeIn (Address &remote);
00343 
00344   void setCrypter(Crypter *crypter);
00345 };

Generated on Mon Nov 25 12:46:30 2002 for qadpz by doxygen1.2.18