Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

Master.h

Go to the documentation of this file.
00001 //Master.h - header file for main master class and data structures used by master
00002 
00003 #ifndef _Master_h_
00004 #define _Master_h_
00005 
00006 #include <stdio.h>
00007 
00008 #include "defs.h"
00009 #include "Object.h"
00010 #include "List.h"
00011 #include "PostOffice.h"
00012 #include "xmldata.h"
00013 #include "Storage.h"
00014 #include "xmlstorage.h"
00015 #include "Address.h"
00016 #include "charstr.h"
00017 #include "userdb/userdb.h"
00018 #ifdef HAVE_OPENSSL
00019 #include "crypter/RSAcrypter.h"
00020 #endif
00021 
00022 #define QUEUE_MSG           0
00023 #define DONT_QUEUE_MSG      1
00024 
00028 class JobEntry : public Object {
00029  private:
00030   CharStr      *_user;     // user who submitted this job
00031  public:
00032   JobId        id;
00033   List         tasks;    // list of *TaskEntry's
00034   client_state cl_stat;
00035   Address      cl_addr;
00036 
00038   JobEntry()
00039   {
00040     _user = CharStr::Error;
00041   }
00042 
00044   ~JobEntry()
00045   {
00046       DBUG_PRINT("dbug", ("~JobEntry Job(%s,%d)", id.name(), id.id));
00047       tasks.Release();  // tasks are deleted through lTasksRun adn lTasksWait
00048       _user->released();
00049   }
00050 
00052   const char *user() 
00053   {
00054     return _user->str;
00055   }
00056 
00058   void user(char *usr)
00059   {
00060     _user = new CharStr(usr);
00061     _user->stored();
00062   }
00063 
00065   CharStr *_cs_user()
00066   {
00067     return _user;
00068   }
00069 
00071   void _cs_user(CharStr *usr)
00072   {
00073     _user->released();
00074     _user = usr;
00075     _user->stored();
00076   }
00077 
00078 };
00079 
00080 
00081 class SlaveEntry;
00082 class TaskEntry : public Object {
00083  private:
00084   CharStr    *_data;
00085  public:
00086   TaskId      id;
00087   List        taskInfos;
00088   JobEntry   *job;
00089   SlaveEntry *slave;
00090   task_state  stat;
00091   ZTime       timeout;
00092   ZTime       timeend;
00093 
00094   TaskEntry (TaskId &tid, JobEntry *job, 
00095        SlaveEntry *slave, task_state stat, CharStr *data)
00096     : id (tid)
00097     {
00098       this->job   = job;
00099       this->slave = slave;
00100       this->stat  = stat;
00101       _data  = data;
00102       _data->stored();
00103     }
00104  
00105   ~TaskEntry()
00106   {
00107     DBUG_PRINT("dbug", ("~TaskEntry Task(%s,%d,%d)", 
00108                job->id.name(), job->id.id, id.id));
00109     _data->released();
00110   }
00111 
00112   void _cs_data(CharStr *newdata)
00113   {
00114     _data->released();
00115     _data = newdata;
00116     _data->stored();
00117   }
00118   
00119   CharStr *_cs_data()
00120   {
00121     return _data;
00122   }
00123 
00124   const char *data()
00125   {
00126     return _data->str;
00127   }
00128 
00129   void data(char *newdata)
00130   {
00131       _data = CharStr::Error;
00132       _cs_data(CharStr::create(newdata));
00133   } 
00134 };
00135 
00136 // used to give the best fit Slave to a task
00137 class SlaveStatistics : public Object {
00138 //TODO
00139 // histogram of availability?
00140 
00141 public:
00142 
00143   //statistical info
00144   ZTime       on_since;
00145   ZTime       last_state_change;   // because this is reset by checkingThread, we need
00146   ZTime       last_state_change_real;   // also this for status output 
00147   double      sec_busy;             //this inculdes reserved & contacted
00148   double      sec_disabled;
00149   //sec_ready = 100% - sec_busy - sec_disabled
00150 
00151   //this is the quantitative measure of this slave's preference, it is computed
00152   //every once in a while based on statistical info in checkingThread()
00153   double      rank;    // 0.0...1.0:  0-bad, 1-good
00154 
00155   //constructor (new slave is being created)
00156   SlaveStatistics()
00157   {
00158      //initialize statistical info
00159      on_since.Now();
00160      last_state_change.Now();
00161      last_state_change_real.Now();
00162      sec_busy = 0.0;
00163      sec_disabled = 0.0;
00164   }
00165 
00166   //this method should be called each time slave changes a state
00167   void stateChange(slave_state state1, slave_state state2)
00168   {
00169       ZTime now;
00170       now.Now();
00171 
00172       if (state1 == state2) return;
00173 
00174       if (state1 == slave_disabled)
00175       {
00176         sec_disabled += ((double)now.sec - (double)last_state_change.sec)
00177              + 0.000001 * ((double)now.usec - (double)last_state_change.usec);
00178         last_state_change.Now();
00179         last_state_change_real = last_state_change;
00180       }
00181       else if ((state1 != slave_ready) && 
00182                ((state2 == slave_ready) || (state2 == slave_disabled)))
00183       {
00184         sec_busy += ((double)now.sec - (double)last_state_change.sec)
00185              + 0.000001 * ((double)now.usec - (double)last_state_change.usec);
00186         last_state_change.Now();
00187         last_state_change_real = last_state_change;
00188       }
00189       else //either was ready and becomes busy or disabled, or remains busy
00190       if (state1 == slave_ready) 
00191       {
00192         last_state_change.Now();
00193         last_state_change_real = last_state_change;
00194       }
00195   }
00196 
00197 };
00198 
00199 class SlaveEntry : public Object {
00200  private:
00201   slave_state state;    //it's private - we want to do statistics     
00202  public:
00203   slave_state prev_stat;
00204   SlaveInfo  *info;
00205   TaskEntry  *task;
00206   SlaveStatistics statistics;
00207   ZTime time_last_status;
00208   ZTime time_reserved;                  // valid only if slave_state is reserved
00209   JobEntry *jobReserved;    // valid only if slave_state is reserved
00210 
00211   SlaveEntry()
00212   {
00213     info = 0;
00214     prev_stat = slave_internal_error;
00215     state = slave_off;
00216   }
00217 
00218   ~SlaveEntry()
00219   {
00220     char _host[26];
00221     info->addr.getIP (_host);
00222     DBUG_PRINT("dbug", ("~SlaveEntry slave %s", _host));
00223     if (info) delete info;
00224   }
00225 
00226   //set new state for slave
00227   slave_state setState(slave_state newState)
00228   {
00229     slave_state oldState = state;
00230     statistics.stateChange(oldState, newState);
00231     state = newState;
00232     return oldState;
00233   }
00234 
00235   void setInfo (SlaveInfo *info)
00236   {
00237     if (this->info)
00238       delete this->info;
00239     this->info = info;
00240   }
00241 
00242   int setInfo (XMLData *xinfo, Address *remote)
00243   {
00244     char _h1[26], _h2[26];
00245     // get the slave info from message   
00246     SlaveInfo *info = new SlaveInfo;   
00247     if (!x2o (xinfo, *info))
00248       return 1;
00249 
00250     if (info->addr != *remote) {
00251       info->addr.getIP (_h1);
00252       remote->getIP (_h2);
00253       /* we log this now only for new slaves, otherwise we'll get a huge log
00254       DBUG_PRINT("warn", ("Master: set info slave with wrong XML address "
00255                           "%s (not %s)", _h1, _h2));
00256       */
00257       // use the address from the packet, not from the XML msg
00258       info->addr = *remote;
00259     }
00260 
00261     setInfo (info);
00262 
00263     return 0;
00264   }
00265 
00266   //get current state of slave
00267   slave_state stat()
00268   {
00269     return state;
00270   }
00271 };
00272 
00273 class ReserveEntry : public Object {
00274  public:
00275   ReserveId id;
00276   JobEntry *job; // the job to which this reserv belongs
00277   int parallel;  // parallel/serial reservation
00278   int slavesRequired;    // number of slaves left to reserve
00279   List slave_infos;    // type of slaves reservation accepts
00280   
00281   ~ReserveEntry()
00282   {
00283       DBUG_PRINT("dbug", ("~ReserveEntry"));
00284   }
00285 };
00286 
00287 class ClientMsgEntry : public Object {
00288  public:
00289   JobEntry *job;
00290   XMLData *msg;
00291   ZTime last_sent;
00292   int count_sent;
00293   uint mId;
00294 
00295   ~ClientMsgEntry()
00296   {
00297     DBUG_PRINT("dbug", ("~ClientMsgEntry"));
00298   }
00299 };
00300 
00301 //this is an element in list of job names and the latest numbers
00302 //that were used for them 
00303 class JobIDTableEntry : public Object {
00304  private:
00305   CharStr *_name;
00306  public:
00307   int id;
00308  JobIDTableEntry() 
00309  {
00310    _name = CharStr::Error;
00311  }
00312  ~JobIDTableEntry()
00313  {
00314    _name->released();
00315  }
00316 
00317  const char *name() { return _name->str; }
00318  CharStr *_cs_name() { return _name; }
00319  void name(char *nm) 
00320  {
00321    _name->released();
00322    _name = new CharStr(nm);
00323    _name->stored();
00324  }
00325  void _cs_name(CharStr *nm)
00326  {
00327    _name->released();
00328    _name = nm;
00329    _name->stored();
00330  }
00331 };
00332 
00333 typedef enum { mdir_m2c, mdir_c2m, mdir_m2s, mdir_s2m } msg_direction;
00334 
00335 class Master : public Object {
00336  private:
00337   List lSlaves;     // list of all slaves
00338   List lJobs;       // list of all jobs/clients
00339   List lTasksRun;   // list of running tasks on slaves
00340   List lTasksWait;  // list of waiting tasks at the master
00341   //List lTasksFinish;// list of finished tasks (w/ results)
00342   List lReservs;    // list of all reservations (*ReserveEntry)
00343   List lClientMsg;  // list of messages undelivered to clients
00344   List lJobIDTable; // list of job names and the latest numbers assigned
00345 
00346   PostOffice *po;   // communication post-office
00347 
00348   Lock lock;        // lock for accessing all data structures shared
00349                     // among checking and dispatching threads
00350   Semaphore monitor;    // posted when there's something to update on console monitor
00351   int master_running;   // set to 0 when the master is about to quit - so that threads
00352                         // will quit
00353   int threads_running;  // incremented when new master's thread is started, decremented
00354                         // when it finishes
00355   int shutdown;         // set to 1 when user enters 'q' to shutdown
00356 
00357   qpzuserdb *userdb;    // list of users with passwords
00358   ZTime next_userfile_reload;  //when will the userfile be reloaded next
00359 
00360   //configuration options
00361   long task_launch_timeout;  // time the slave has to start the task [sec]
00362   int max_send_retry;        // number of times messages are resent to
00363                              // client before 
00364   long slave_status_timeout; // after this time slave is killed if it
00365                              // doesn't send status 
00366   long client_resend_timeout; // how long time we wait until a waiting msg
00367                               // is resent to client
00368   long slave_reserve_timeout; // how much time the client has to respond to M_SLAVE_AVAIL msg
00369 
00370   char log_root[250];   // path to directory where all job and slave logs for this session are saved
00371   long log_on;                // whether logging into log_root is used
00372   char *userfile; // name of file with list of users and passwords
00373   long userfile_reload_period;  // how often (sec) is the userdb file reloaded to memory
00374   char *master_privatekeyfile;  // file name of the private key file
00375   char *status_file;            // where the current status is saved
00376   int meta_refresh_period;      // what will be in the meta refresh tag in
00377                                 // html status file (in seconds)
00378   int permanent_upgrade;        // 0/1 flag whether the permanent upgrade service is on
00379   XMLData *pu_config;           // XMLData structure with permanent upgrade configuration
00380   List toUpgradeLater;          // list of SlaveEntries of slaves that were busy
00381                                 // when non-immediate upgrade was received
00382   XMLData *upgradeLaterURLs;    // XML element containing upgrade URLs as its subelements
00383   CharStr *upgradeLaterVersion; // version for non-immediate upgrade
00384   
00385 #ifdef HAVE_OPENSSL
00386   RSAcrypter *master_crypter;   // RSA crypter initialized from the master private key
00387 #endif
00388 
00389   FILE *console;        // where the console output is sent - default stdout
00390   char *prompt;         // input console prompt (dispayed by output console)
00391   int inputPrompt;      // whehter to update the master output: if set to 1,
00392                         // user is entering data and output is not updated
00393   char *master_addr;    // master's address represented as string
00394   char *status_msg;     // new status msg
00395   char *last_status_msg; // last status msg
00396   int update_status;    // when set to 0, the status_msg is not updated to 'ready'
00397   int no_input;   // set to 1 if output console file is used - then
00398                         // there is no input console for master
00399   int init_ok;          // set to 0 if master was not constructed properly, otherwise 1
00400                         // application can check this with initialized()
00401   ZTime running_since;  // the time when the master was started
00402   double running_since_as_double;   // the same in seconds
00403 
00404   //---------------statistical information
00405   int totalTasks, totalJobs, totalReservations;
00406 
00407  public:
00408   // port zero means use default master port
00409   // if console_file is nonzero, it should contain path to a filename
00410   //  where the console output will be redirected
00411   Master (int port = 0, const char *console_file = 0);
00412 
00413   // returns 1, if master was constructed properly, otherwise returns 0
00414   int initialized() { return init_ok; }
00415 
00416   ~Master ();
00417 
00418   // the main thread which listens for messages on the post-office
00419   void startMaster ();
00420 
00421   // this thread should check periodicaly the lists for timeouts
00422   void checkingThread ();
00423 
00424   // console thread - prints info on console - possibly interactive in the future
00425   void consoleOutThread ();
00426   void consoleInThread ();
00427 
00428   // http server thread - starts the http server and handles requests
00429   void httpServerThread ();
00430 
00431  private:
00432   // when a msg is received by the post-office, this
00433   // function must be called to handle the message
00434   // * the msg will be destroyed by this function
00435   // * shouldn't waste time in this function (return asap)
00436   void dispatchMsg (XMLData *msg, Address *remote);
00437 
00438   // messages from the client
00439   void msgJobCtrl (XMLData *msg, Address *remote, CharStr *user);
00440   void msgJobCtrl_Stop (JobEntry *job, Address *remote);
00441   void msgJobCtrl_StopAllName (JobEntry *job, Address *remote);
00442   void msgJobCtrl_StopAllUser (JobEntry *job, Address *remote, CharStr *user);
00443   void msgJobCtrl_GetStatus (JobEntry *job, Address *remote);
00444 
00445   void msgTaskInit (XMLData *msg, Address *remote, CharStr *user);
00446   void msgTaskInit_Reserve (TaskEntry *task, SlaveEntry *slave);
00447   void msgTaskInit_Normal (TaskEntry *task);
00448 
00449   void msgClientStatus (XMLData *msg, Address *remote, CharStr *user);
00450   void msgClientStatus_On(JobId &jid, Address &clientAddress, 
00451                           XMLData *msg, CharStr *user);
00452   void msgClientStatus_Off(Address *remote, JobId &jid, XMLData *msg);
00453 
00454   void msgTaskCtrl (XMLData *msg, Address *remote, CharStr *user);
00455   void msgTaskCtrl_Stop(TaskEntry *task);
00456   void msgTaskCtrl_Ctrl(TaskEntry *task, CharStr *arg);
00457 
00458   void msgSlaveReserve (XMLData *msg, Address *remote, CharStr *user);
00459 
00460   void msgSlaveCtrl (XMLData *msg, Address *remote, CharStr *user);
00461   void msgSlaveCtrl_Upgrade(XMLData *msg, Address *remote);
00462 
00463   // messages sent to the client
00464   int  sndMessageToClient (Address *remote, JobEntry *job, XMLData *msg, int dont_queue = QUEUE_MSG);
00465   int  sndStatusError (Address *remote, char *err, XMLData *jobId);
00466   int  sndJobStatus (Address *remote, JobEntry *job, 
00467          JobId &jobId, job_state stat, char *err, XMLData *reserveInfo = XMLData::Nil, int dont_queue_msg = QUEUE_MSG);
00468   int  sndTaskStatus (Address *remote, JobEntry *job, TaskId &taskId, 
00469           task_state stat, const char *error, int dont_queue_msg = QUEUE_MSG);
00470   int  sndSlaveStatus (Address *remote, int ok, const char *error = 0);
00471   int  sndTaskFinish ();
00472   int  sndSlaveAvail(Address *remote, JobEntry *job, int slavesRequired, XMLData *slaveInfos, ReserveId resID);
00473 
00474   // messages from the slave
00475   void msgTaskStatus (XMLData *msg, Address *remote);
00476   void msgTaskStatus_Refused (TaskEntry *task, XMLData *msg, int &forwarded);
00477   void msgTaskStatus_Started (TaskEntry *task, XMLData *msg, int &forwarded);
00478   void msgTaskStatus_Ok (TaskEntry *task, XMLData *msg, int &forwarded);
00479   void msgTaskStatus_Crashed (TaskEntry *task, XMLData *msg, int &forwarded);
00480 
00481   void msgTaskFinish (XMLData *msg, Address *remote);
00482 
00483   void msgTaskMove (XMLData *msg, Address *remote);
00484 
00485   void msgSlaveStatus (XMLData *msg, Address *remote);
00486   void msgSlaveStatus_Off (SlaveEntry *slave);
00487   int msgSlaveStatus_Ready (SlaveEntry *&slave, Address *remote, XMLData *info, slave_state stat);
00488   void msgSlaveStatus_Busy (SlaveEntry *slave);
00489   void msgSlaveStatus_Disabled (SlaveEntry *slave);
00490 
00491   // messages sent to the slave
00492   int  sndMessageToSlave (Address *remote, XMLData *msg);
00493   int  sndTaskCtrl (Address *remote,
00494         TaskId &taskId, CharStr *action, CharStr *arg = CharStr::Error);
00495   int  sndTaskInit (Address *remote, TaskId &taskId, 
00496         CharStr *taskURL, long taskTimeout, CharStr *data, CharStr *userData);
00497   int  sndSlaveCtrl (Address *remote, CharStr *action, CharStr *URL = CharStr::Error);
00498 
00499 
00500  private:
00501   void slaveSetState (SlaveEntry *slave, slave_state state);
00502 
00503   JobEntry*     srchJob (JobId &jobId);
00504   JobEntry*     srchJob (XMLData *msg);
00505   TaskEntry*    srchTask (TaskId taskId);
00506   ReserveEntry* srchReserve (ReserveId &resID);
00507   TaskInfo*     slaveMatchTasks (SlaveInfo *slaveInfo, List *taskInfos);
00508   int           slaveMatchSlaves (SlaveInfo *slaveInfo, List *slaveInfos);
00509   int           slaveMatchSlave(SlaveInfo *info1, SlaveInfo *info2);
00510   SlaveEntry*   srchSlaveAddr (Address &addr); 
00511   SlaveEntry*   srchSlaveTaskInfos (slave_state state, 
00512             List *taskInfos,
00513             CharStr *&matchedURL, long &taskTimeout, CharStr *&userData);
00514   SlaveEntry*   srchSlaveSlaveInfos(slave_state state, List *slaveInfos);
00515   CharStr *matchURL(SlaveInfo *sInfo, XMLData *msg);
00516 
00517   int verifyUserPswd(CharStr *user, CharStr *pswd, Address *client_addr);
00518   int checkUser (CharStr *user1, CharStr *user2);
00519   int checkUser (CharStr *user1, char *user2);
00520 
00521   void checkBufferedTasks ();
00522   void checkReserves ();
00523   void startTask (TaskEntry *task, SlaveEntry *slave, CharStr *taskURL, 
00524                   long taskTimeout, CharStr *userData);
00525   void deleteTask (TaskEntry *task);
00526   void moveTask (TaskEntry *task, List &lSrc, List &lDst);
00527   void upgradeSlave(SlaveEntry *s, int immediate, CharStr *newVersion, CharStr *url);
00528 
00529   // prints the message to appropriate log file for the given task
00530   void log_task_msg(TaskEntry *task, XMLData *msg, msg_direction dir);
00531   // prints the message to appropriate log file for the given job
00532   void log_job_msg(JobEntry *slave, XMLData *msg, msg_direction dir);
00533   // prints the message to appropriate log file for the given slave
00534   void log_slave_msg(SlaveEntry *slave, XMLData *msg, msg_direction dir);
00535   // prints the message to other log file
00536   void log_other_msg(XMLData *msg, msg_direction dir);
00537 
00538   void log_slave_state_change(SlaveEntry *slave, slave_state state1, slave_state state2);
00539   void log_slave_total(ZTime &now, int sReady, int sDisabled, int sBusy, int sReserved);
00540 
00541   void make_dir(char *dir_name);   // aux. function: just creates directory
00542 
00543   //prints the status to a file either as text or html
00544   void print_status(FILE *f, int html);
00545 
00546   //writes status to status file
00547   void write_status();
00548 };
00549 
00550 #endif  // _Master_h_
00551 
00552 
00553 

Generated on Mon Nov 25 12:46:30 2002 for qadpz by doxygen1.2.18