Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

Master.cpp

Go to the documentation of this file.
00001 // Master.cpp - implementation of the QADPZ master
00002 #include <stdio.h>  // console monitor
00003 #include <fstream.h> 
00004 #include <errno.h>
00005 
00006 #ifndef _WIN32
00007 
00008 #include <sys/types.h>   // mkdir()
00009 #include <sys/stat.h>
00010 #include <unistd.h>      // link, unlink
00011 
00012 #else
00013 
00014 #include <direct.h>      // _mkdir()
00015 
00016 #endif
00017 
00018 #include "defs.h"
00019 #include "Master.h"
00020 #include "xmlstorage.h"
00021 #include "messages.h"
00022 #include "List.h"
00023 #include "keywords.h"
00024 #include "other.h"
00025 
00026 
00027 //CONFIGURATION FILE SETTINGS
00028 
00029 //default values
00030 #define DEFAULT_SLAVE_STATUS_TIMEOUT     80  // in sec
00031 #define DEFAULT_CLIENT_RESEND_TIMEOUT    30
00032 #define DEFAULT_TASK_LAUNCH_TIMEOUT      16
00033 #define DEFAULT_SLAVE_RESERVE_TIMEOUT    30
00034 #define DEFAULT_MAX_SEND_RETRY           20
00035 #define DEFAULT_LOG_ROOT                 "log"
00036 #define DEFAULT_LOG_ON                   0
00037 #define DEFAULT_DBUG_LOG_ON              0
00038 #define DEFAULT_USER_FILE                "users.txt"
00039 #define DEFAULT_USERFILE_RELOAD_PERIOD   300 
00040 #define DEFAULT_MASTER_PRIV_KEYFILE      "privkey"
00041 #define DEFAULT_MASTER_CONFIG_FILE       "master.cfg"
00042 #define DEFAULT_RANDOM_FILE              "random.bin"
00043 #define DEFAULT_PERMANENT_UPGRADE        "off"
00044 #define DEFAULT_STATUS_FILE              "no"
00045 #define DEFAULT_MASTER_HTTP_ROOT         "data"
00046 #define DEFAULT_META_REFRESH_PERIOD      10
00047 
00048 //indexes for access to individual keywords
00049 #define K_MASTER_PORT           0
00050 #define K_TASK_LAUNCH_TIMEOUT   1
00051 #define K_MAX_SEND_RETRY        2
00052 #define K_SLAVE_STATUS_TIMEOUT  3
00053 #define K_CLIENT_RESEND_TIMEOUT 4
00054 #define K_LOG_ROOT              5
00055 #define K_USERFILE              6
00056 #define K_USERFILE_RELOAD_T     7
00057 #define K_MASTER_PRIVATEKEY_FILE 8
00058 #define K_RANDOM_FILE           9
00059 #define K_PERMANENT_UPGRADE     10
00060 #define K_SLAVE_RESERVE_TIMEOUT 11
00061 #define K_STATUS_FILE           12
00062 #define K_META_REFRESH_PERIOD   13
00063 #define K_LOG_ON                14
00064 #define K_DBUG_LOG_ON           15
00065 #define K_MASTER_HTTP_PORT      16
00066 #define K_MASTER_HTTP_ROOT      17
00067 #define N_MASTER_CFG_KWS        18
00068 
00069 static keyword_str kws[N_MASTER_CFG_KWS] = {
00070     // master port
00071     { "master_port", LONG_KEYWORD, 0, {0}, {MASTER_PORT} },  
00072     // time the slave has to start the task [sec]
00073     { "task_launch_timeout", LONG_KEYWORD, 0, {0}, {DEFAULT_TASK_LAUNCH_TIMEOUT} }, 
00074     // maximum # of resends to client
00075     { "max_send_retry", LONG_KEYWORD, 0, {0}, {DEFAULT_MAX_SEND_RETRY} },
00076     // after this time the slave is removed if it doesn't send status
00077     { "slave_status_timeout", LONG_KEYWORD, 0, {0}, {DEFAULT_SLAVE_STATUS_TIMEOUT} },
00078     //
00079     { "client_resend_timeout", LONG_KEYWORD, 0, {0}, {DEFAULT_CLIENT_RESEND_TIMEOUT} },
00080     // directory where the messages are logged into subdirectories
00081     { "log_root", STRING_KEYWORD, 0, {0}, {0} },
00082     // file with the user database
00083     { "user_file", STRING_KEYWORD, 0, {0}, {0} },
00084     // how often is the user database reloaded from file
00085     // so that the master doesn't have to be restarted (TODO: is this already implemented?)
00086     { "userfile_reload_period", LONG_KEYWORD, 0, {0}, {DEFAULT_USERFILE_RELOAD_PERIOD} },
00087     //
00088     { "master_privatekey_file", STRING_KEYWORD, 0, {0}, {0} },
00089     //
00090     { "random_file", STRING_KEYWORD, 0, {0}, {0} },
00091     // permanent upgrade service is either "on" or "off"
00092     { "permanent_upgrade", STRING_KEYWORD, 0, {0}, {0} },
00093     // how much time the client has to respond to M_SLAVE_AVAIL
00094     { "slave_reserve_timeout", LONG_KEYWORD, 0, {0}, {DEFAULT_SLAVE_RESERVE_TIMEOUT} },
00095     // where master saves current status info
00096     { "status_file", STRING_KEYWORD, 0, {0}, {0} },
00097     // what will be the refresh period in seconds for the html status file 
00098     { "meta_refresh_period", LONG_KEYWORD, 0, {0}, {DEFAULT_META_REFRESH_PERIOD} },
00099     // whether the logging of the 
00100     { "log_on", LONG_KEYWORD, 0, {0}, {DEFAULT_LOG_ON} },
00101     // whether the master.log debug file is genererated
00102     { "dbug_log_on", LONG_KEYWORD, 0, {0}, {DEFAULT_DBUG_LOG_ON} },
00103     // tcp/ip port for data sharing web server
00104     { "master_http_port", LONG_KEYWORD, 0, {0}, {MASTER_HTTP_PORT} },
00105     // root directory for data sharing web server
00106     { "master_http_root", STRING_KEYWORD, 0, {0}, {0} }
00107 };
00108 
00109 //END CONFIGURAGTION FILE SETTINGS
00110 
00111 //input console thread prompt
00112 #define DEFAULT_PROMPT  "[cmd:q,k,K,d,e,?]>"
00113 // master stores information about permanent upgrade here
00114 #define PU_FILENAME     "permanent_upgrade.xml"
00115 
00116 
00117 static THRD_RETURN master_console_out_entry (THRD_ARG arg)
00118 {
00119   ((Master *)arg)->consoleOutThread ();
00120 
00121   return 0;
00122 }
00123 
00124 
00125 static THRD_RETURN master_console_in_entry (THRD_ARG arg)
00126 {
00127   ((Master *)arg)->consoleInThread ();
00128   
00129   return 0;
00130 }
00131 
00132 
00133 static THRD_RETURN master_entry (THRD_ARG arg)
00134 {
00135   ((Master *)arg)->checkingThread ();
00136 
00137   return 0;
00138 }
00139 
00140 
00141 static THRD_RETURN master_http_server (THRD_ARG arg)
00142 {
00143   ((Master *)arg)->httpServerThread ();
00144 
00145   return 0;
00146 }
00147 
00148 
00149 Master::Master (int port, const char *console_file)
00150 {
00151   init_ok = 1;
00152   shutdown = 0;
00153   no_input = 0;
00154   prompt = DEFAULT_PROMPT;
00155   inputPrompt = 0;
00156   status_msg = "QADPZ master started";
00157 
00158   //initialize statistical information
00159   totalTasks = totalJobs = totalReservations = 0;
00160 
00161   PostOffice::init_sockets();
00162 
00163   THRD_HANDLE  consThread;   // console monitor thread
00164   THRD_HANDLE  consThread2;  // input console thread
00165   THRD_HANDLE  httpThread;   // http server thread
00166 
00167   master_running = 1;   // master is constructed
00168   threads_running = 0;  // no threads are running yet
00169   running_since.Now();
00170   running_since_as_double = (double)running_since.sec 
00171                              + 0.000001 * (double)running_since.usec;
00172 
00173   next_userfile_reload.GetTimeOfDay();
00174  
00175   // read the master config file
00176   kws[K_LOG_ROOT].default_value.as_string = DEFAULT_LOG_ROOT;
00177   kws[K_USERFILE].default_value.as_string = DEFAULT_USER_FILE;
00178   kws[K_RANDOM_FILE].default_value.as_string = DEFAULT_RANDOM_FILE;
00179   kws[K_MASTER_PRIVATEKEY_FILE].default_value.as_string = DEFAULT_MASTER_PRIV_KEYFILE;
00180   kws[K_PERMANENT_UPGRADE].default_value.as_string = DEFAULT_PERMANENT_UPGRADE;
00181   kws[K_STATUS_FILE].default_value.as_string = DEFAULT_STATUS_FILE;
00182   kws[K_MASTER_HTTP_ROOT].default_value.as_string = DEFAULT_MASTER_HTTP_ROOT;
00183   read_config(DEFAULT_MASTER_CONFIG_FILE, N_MASTER_CFG_KWS, kws);  
00184 
00185   //load variables
00186   if (kws[K_DBUG_LOG_ON].value.as_long)
00187     DBUG_SETFILE("master.log");
00188   task_launch_timeout = kws[K_TASK_LAUNCH_TIMEOUT].value.as_long;
00189   max_send_retry = kws[DEFAULT_MAX_SEND_RETRY].value.as_long;
00190   slave_status_timeout = kws[K_SLAVE_STATUS_TIMEOUT].value.as_long;
00191   client_resend_timeout = kws[K_CLIENT_RESEND_TIMEOUT].value.as_long;
00192   userfile_reload_period = kws[K_USERFILE_RELOAD_T].value.as_long;
00193   slave_reserve_timeout = kws[K_SLAVE_RESERVE_TIMEOUT].value.as_long;
00194 
00195   ZTime now; now.GetTimeOfDay();
00196   sprintf(log_root, "%s/%d", kws[K_LOG_ROOT].value.as_string, now.sec);
00197   log_on = kws[K_LOG_ON].value.as_long;
00198   userfile = kws[K_USERFILE].value.as_string;
00199   master_privatekeyfile = kws[K_MASTER_PRIVATEKEY_FILE].value.as_string;
00200   permanent_upgrade = strcmp(kws[K_PERMANENT_UPGRADE].value.as_string, "on") == 0;
00201   if (strcmp(kws[K_STATUS_FILE].value.as_string, "no") == 0) 
00202   {
00203       status_file = 0;
00204       DBUG_PRINT("warn", ("Master: no status file."));
00205   }
00206   else 
00207   {
00208       status_file = kws[K_STATUS_FILE].value.as_string;
00209       DBUG_PRINT("info", ("Master: status file is %s", status_file));
00210   }
00211   meta_refresh_period = (int)kws[K_META_REFRESH_PERIOD].value.as_long;
00212   //read configuration of permanent upgrade if on
00213   pu_config = XMLData::Nil;
00214   while (permanent_upgrade)
00215   {
00216       ifstream puf(PU_FILENAME);
00217 #ifdef sgi
00218       if (!puf.good())
00219 #else
00220       if (!puf.is_open())
00221 #endif
00222       {
00223           DBUG_PRINT("warn", ("Master: permanent upgrade is on, but cannot open config file, errno=", errno));
00224           break;
00225       }
00226       pu_config = new XMLData(puf);
00227       puf.close();
00228       if (pu_config->tag() == XMLData::ReadError)
00229       {
00230           DBUG_PRINT("warn", ("Master: error parsing permanent upgrade config file"));
00231           delete pu_config;
00232           pu_config = XMLData::Nil;
00233           break;
00234       }
00235       break;
00236   }
00237   //initiallize non-immediate upgrade globals
00238   upgradeLaterURLs = XMLData::Nil;
00239   upgradeLaterVersion = CharStr::Error;
00240 
00241   //if port came in argument, override the config file setting
00242   if (!port) port = kws[K_MASTER_PORT].value.as_long;
00243 
00244   //remember the master address represented as string (for status output)
00245   char *tmp = new char[80];
00246   gethostname(tmp, 78);
00247   master_addr = new char[strlen(tmp) + 8];
00248   sprintf(master_addr, "%s:%d", tmp, port);
00249   delete[] tmp;
00250 
00251   //open output console file, if any
00252   if (console_file) 
00253   {
00254     console = fopen(console_file, "w+");
00255     if (!console) 
00256     {
00257       DBUG_PRINT("err", ("Master: error openning console file, using stdout"));
00258       console = stdout;
00259     }
00260     else no_input = 1;
00261   }
00262   else console = stdout;
00263   
00264   //create session log directories
00265   if (log_on) 
00266   {
00267       make_dir(log_root);
00268       char tmpname[300];
00269       sprintf(tmpname, "%s/slaves", log_root);
00270       make_dir(tmpname);
00271       sprintf(tmpname, "%s/slaves/stats", log_root);
00272       make_dir(tmpname);
00273       sprintf(tmpname, "%s/jobs", log_root);
00274       make_dir(tmpname);
00275 
00276   // on UNIX platforms, create "last" link for convenience
00277 #ifndef _WIN32
00278       sprintf(tmpname, "%s/last", kws[K_LOG_ROOT].value.as_string);
00279       unlink(tmpname);
00280       symlink(log_root, tmpname);
00281 #endif
00282   }
00283 
00284 #ifdef HAVE_OPENSSL
00285   // initialize the crypter 
00286   master_crypter = new RSAcrypter(RSAcrypter::privatekey,
00287                           master_privatekeyfile
00288 #if !defined(HAVE_OS_Linux) && !defined(_WIN32)
00289                          ,kws[K_RANDOM_FILE].value.as_string
00290 #endif
00291                          );
00292 
00293   // read the userdb file
00294   userdb = new qpzuserdb(userfile, master_crypter);
00295 #else
00296   // read the userdb file
00297   userdb = new qpzuserdb(userfile);
00298 #endif
00299   userdb->save_on_delete = 0;
00300 
00301   // initialize the post-office
00302   //DBUG_PRINT("info", ("Master: initializing PostOffice"));
00303 #ifdef HAVE_OPENSSL
00304   // noSecPlain=0 only temporarily until slave will use crypter!!
00305   po = (PostOffice *) new PostOffice (port, 1, UDP_PORT_STRICT, master_crypter, 0);
00306 #else
00307   po = (PostOffice *) new PostOffice (port, 1, UDP_PORT_STRICT);
00308 #endif
00309 
00310   if (!po->initialized())
00311   {
00312     DBUG_PRINT("err", ("Master: error initializing PostOffice"));
00313     po = 0;
00314     shutdown = 1;
00315     init_ok = 0;
00316   }
00317 
00318   // start output console thread
00319   if (startThread (&consThread, master_console_out_entry, this)) {
00320     DBUG_PRINT("err", ("Master: error starting monitor thread"));
00321     fflush(stderr);
00322     fflush(stdout);
00323     exit(1);
00324   }
00325 
00326   // start input console thread
00327   if (!no_input)
00328     if (startThread (&consThread2, master_console_in_entry, this)) {
00329       DBUG_PRINT("err", ("Master: error starting input console thread"));
00330       fflush(stderr);
00331       fflush(stdout);
00332       exit(1);
00333     }
00334 
00335   // init out console
00336   monitor.post();
00337 
00338   // start http server
00339   if (startThread (&httpThread, master_http_server, this)) {
00340     DBUG_PRINT("err", ("Master: error starting http server thread"));
00341     fflush(stderr);
00342     fflush(stdout);
00343     exit(1);
00344   }
00345 }
00346 
00347 
00348 Master::~Master ()
00349 {
00350   //let all threads know about shutdown
00351   master_running = 0;
00352 
00353   //wake up the output console thread to make it terminate
00354   monitor.post();
00355   
00356   //wait for all threads to terminate
00357   DBUG_PRINT("info", ("Master: waiting for all threads to terminate..."));
00358   while (threads_running)
00359     SLEEP_MSEC(1);
00360   DBUG_PRINT("info", ("Master: all threads terminated, cleaning up..."));
00361 
00362   //delete pu_config XMLData, if any
00363   if (pu_config != XMLData::Nil) delete pu_config;
00364   //release non-immediate upgrade variables
00365   toUpgradeLater.Release();
00366   if (upgradeLaterURLs != XMLData::Nil) delete upgradeLaterURLs;
00367   upgradeLaterVersion->released();
00368 
00369   //delete postoffice
00370   if (po)
00371     delete po;
00372 
00373   //delete user database
00374   delete userdb;
00375 
00376   //delete crypter if used
00377 #ifdef HAVE_OPENSSL
00378   delete master_crypter;
00379 #endif
00380 
00381   //release the socket library (mainly for windows)
00382   PostOffice::release_sockets();
00383 
00384   //close the output console if used
00385   if (console != stdout) fclose(console);
00386 
00387   //deallocate string with master address used by print_status()
00388   delete [] master_addr;
00389 
00390   //deallocate static messages
00391   free_messages();
00392 
00393   DBUG_PRINT("info", ("Master: finished."));
00394 
00395   FILE *f = fopen(status_file, "a+");
00396   if (f) 
00397   {
00398       fprintf(f, "\nmaster terminated\n");
00399       fclose(f);
00400   }
00401 }
00402 
00403 
00404 // the main thread which listens for messages on the post-office
00405 void Master::startMaster ()
00406 {
00407   THRD_HANDLE  sendThread; // (re)sending thread
00408   
00409   //start sending thread
00410   if (startThread (&sendThread, master_entry, this)) {
00411     DBUG_PRINT("err", ("Master: error starting checking thread"));
00412     fflush(stderr);
00413     fflush(stdout);
00414     exit(1);
00415   }
00416 
00417   XMLData *data;
00418   Address *remote = new Address;
00419 
00420   DBUG_PRINT("info", ("Master: waiting for messages..."));
00421 
00422   // receive messages from clients/slaves
00423   while (!shutdown) {
00424 
00425     // see if new message
00426     data = po->receive_any (*remote);
00427 
00428     if ((data == XMLData::Nil) || (data == PostOffice::ReceiveInterrupted))
00429       continue;
00430 
00431     // dispatch the message
00432     dispatchMsg (data, remote);
00433 
00434     // update monitor
00435     monitor.post();
00436   }
00437 
00438   delete remote;
00439 
00440   DBUG_PRINT("info", ("Master: terminated"));
00441 }
00442 
00443 // this thread reads user commands from console
00444 //TODO: make this code accessible from web form interface
00445 void Master::consoleInThread()
00446 {
00447    Address *a;
00448    char s[60];
00449    char jname[52];
00450    int jid, tid;
00451    enum {
00452        main_menu, 
00453        kill_task, 
00454        kill_job, 
00455        disable_slave, 
00456        enable_slave, 
00457        show_help
00458    } state = main_menu;
00459 
00460    DBUG_PRINT("info", ("Master: consoleInThread started"));
00461    threads_running++;
00462 
00463    do {
00464      fgets(s, 58, stdin);
00465      switch (state)
00466      {
00467        case main_menu:
00468          switch (s[0])
00469          {
00470            case 'q':  //quit master
00471                   lock.lock();
00472                     shutdown = 1;
00473                     a = Address::getAddress(kws[K_MASTER_PORT].value.as_long);
00474                     po->stopWait(*a);
00475                     delete a;
00476                   lock.unlock();
00477                   break;
00478            case 'k':  //kill task
00479                   prompt = "Kill task: [job_name job_number task_id]>";
00480                   state = kill_task;
00481                   monitor.post();
00482                   inputPrompt = 2;
00483                   break;
00484            case 'K':  //kill job
00485                   prompt = "Kill job: [job_name job_number]>";
00486                   state = kill_job;
00487                   monitor.post();
00488                   inputPrompt = 2;
00489                   break;
00490            case 'd':  //disable slave
00491                   prompt = "Disable slave: [slave IP:PORT]>";
00492                   state = disable_slave;
00493                   monitor.post();
00494                   inputPrompt = 2;
00495                   break;
00496            case 'e':  //enable slave
00497                   prompt = "Enable slave: [slave IP:PORT]>";
00498                   state = enable_slave;
00499                   monitor.post();
00500                   inputPrompt = 2;
00501                   break;
00502            case '?':  //help
00503                   prompt = "\n\tq\t\tquit master\n\tk\t\tkill task\n\tK\t\tkill job\n\td\tdisable slave\n\te\tenable slave\n\t?\t\thelp\n";
00504                   state = show_help;
00505                   monitor.post();
00506                   break;
00507            default: break;
00508          }
00509          break;
00510        case kill_task: 
00511        {
00512          sscanf(s, "%50s %d %d", jname, &jid, &tid);
00513          lock.lock();
00514            JobId jobid(jname, jid);
00515            TaskId taskid(jobid, tid);
00516            TaskEntry *task = srchTask(taskid);
00517            if (task != 0)
00518            {
00519              msgTaskCtrl_Stop(task);
00520              status_msg = "task killed";
00521            }
00522            else status_msg = "kill_task: Task not found";
00523          lock.unlock();
00524          state = main_menu;
00525          prompt = DEFAULT_PROMPT;
00526          inputPrompt = 0;
00527          monitor.post();
00528          break;
00529        }
00530        case kill_job:
00531        {
00532          sscanf(s, "%50s %d", jname, &jid);
00533          JobId jobid(jname, jid);
00534          JobEntry *job = srchJob(jobid);
00535          if (job != 0)
00536          {
00537            lock.lock();
00538              if (job->cl_stat == client_on)
00539                msgJobCtrl_Stop(job, &job->cl_addr);
00540              else
00541                msgJobCtrl_Stop(job, 0);
00542            lock.unlock();
00543            status_msg = "job killed";
00544          }
00545          else status_msg = "kill_job: Job not found";
00546          state = main_menu;
00547          prompt = DEFAULT_PROMPT;
00548          inputPrompt = 0;
00549          monitor.post();
00550          break;
00551        }
00552        case disable_slave:
00553        {
00554          char slaveip[30];
00555          sscanf(s, "%30s", slaveip);
00556          Address saddr(slaveip);
00557          SlaveEntry *slave = srchSlaveAddr(saddr);
00558          if (slave)
00559          {
00560            lock.lock();
00561              if (slave->stat() == slave_reserved)
00562                status_msg = "reserved slave cannot be disabled, try again later";
00563              else
00564              {
00565                if (slave->task)
00566                  msgTaskCtrl_Stop(slave->task);
00567                slaveSetState(slave, slave_disabled);
00568                sndSlaveCtrl(&slave->info->addr, p_Disable, CharStr::Error);
00569              }
00570              status_msg = "slave_disabled";
00571            lock.unlock();
00572          }
00573          else status_msg = "disable_slave: Slave not found";
00574          state = main_menu;
00575          prompt = DEFAULT_PROMPT;
00576          inputPrompt = 0;
00577          monitor.post();
00578          break;
00579        }
00580        case enable_slave:
00581        {
00582          char slaveip[30];
00583          sscanf(s, "%30s", slaveip);
00584          Address saddr(slaveip);
00585          SlaveEntry *slave = srchSlaveAddr(saddr);
00586          if (slave)
00587          {
00588            lock.lock();
00589              if (slave->stat() != slave_disabled)
00590                status_msg = "slave is not disabled";
00591              else
00592                sndSlaveCtrl(&slave->info->addr, p_Enable, CharStr::Error);
00593              status_msg = "slave_disabled";
00594            lock.unlock();
00595          }
00596          else status_msg = "disable_slave: Slave not found";
00597          state = main_menu;
00598          prompt = DEFAULT_PROMPT;
00599          inputPrompt = 0;
00600          monitor.post();
00601          break;
00602        }
00603        case show_help:
00604        {
00605          state = main_menu;
00606          prompt = DEFAULT_PROMPT;
00607          monitor.post();
00608          break;
00609        }
00610      }
00611    } while (master_running && (!shutdown));
00612    DBUG_PRINT("info", ("Master: consoleInThread terminated"));
00613 
00614    threads_running--;
00615 }
00616 
00617 //computes and prints the status to file f (should be called only when
00618 // lock is locked). If html is one, the output will be in html format,
00619 // otherwise it will be plain text. master_addr should contain a string
00620 void Master::print_status(FILE *f, int html)
00621 {
00622   int nJobs, nClientsOn, nTasksRunning, nTasksWaiting, nSlavesReady, nSlavesBusy, nSlavesDisabled, nSlavesReserved;
00623   ZTime now;
00624   JobEntry *j;
00625   SlaveEntry *s;
00626   ReserveEntry *r;
00627   TaskEntry *t;
00628   static char *newln[2] = { "\n", "<br>\n" };
00629   static char *hline[2] = { "------------------------------------------------------------------------------\n",
00630                             "<hr>\n" };
00631 
00632   //print html header
00633   if (html) fprintf(f, "<html><head>\n<META http-equiv=\"refresh\" content=\"%d\">\n<title>QADPZ Master status information</title>\n</head>\n<body bgcolor=white>\n", meta_refresh_period);
00634 
00635   //*** collect total summary info
00636   //
00637   nJobs = lJobs.Count();
00638   lJobs.Start_Get();
00639   nClientsOn = 0;
00640   while ((j = (JobEntry *)lJobs.Get_Next()))
00641      if (j->cl_stat == client_on) nClientsOn++; 
00642   nTasksRunning = lTasksRun.Count();
00643   nTasksWaiting = lTasksWait.Count();
00644   nSlavesReady = 0;
00645   nSlavesBusy = 0;
00646   nSlavesDisabled = 0;
00647   lSlaves.Start_Get();
00648   while ((s = (SlaveEntry *)lSlaves.Get_Next()))
00649     if (s->stat() == slave_ready) nSlavesReady++;
00650     else if ((s->stat() == slave_busy) || 
00651              (s->stat() == slave_contacted)) nSlavesBusy++;
00652     else if (s->stat() == slave_disabled) nSlavesDisabled++;
00653       lReservs.Start_Get();
00654   nSlavesReserved = 0;
00655   while ((r = (ReserveEntry *)lReservs.Get_Next()))
00656     nSlavesReserved += r->slavesRequired;
00657   
00658   now.Now();
00659   if (html) fprintf(f, "<h3>");
00660   fprintf(f, "QADPZ Master (%s), %s, on since: %s", master_addr, now.print_ctime(), running_since.print_ctime());
00661   if (html) fprintf(f, "</h3>\n");
00662   else fprintf(f, "\n");
00663   fprintf(f, hline[html]);
00664 
00665   //*** log slave states into "stats/total.txt"
00666   //
00667   if (html) log_slave_total (now, nSlavesReady, nSlavesDisabled, nSlavesBusy, nSlavesReserved);
00668 
00669   //*** print current summary info on master
00670   //
00671   int nClientMsgs;
00672   nClientMsgs = lClientMsg.Count ();
00673   fprintf(f, "%d slaves: %d ready, %d busy, %d disabled, %d(%d) reserved%s",
00674           nSlavesReady+nSlavesBusy+nSlavesDisabled+nSlavesReserved,
00675           nSlavesReady, nSlavesBusy, nSlavesDisabled, nSlavesReserved,
00676           totalReservations, newln[html]);
00677   fprintf(f, "%d(%d) tasks: %d run, %d wait%s",
00678           nTasksRunning+nTasksWaiting, totalTasks,
00679           nTasksRunning, nTasksWaiting, newln[html]);
00680   fprintf(f, "%d(%d) jobs, %d clients on, %d client msgs%s",
00681           nJobs, totalJobs, nClientsOn, nClientMsgs, newln[html]);
00682 
00683    //*** print total statistics on master
00684    /*
00685    if (html)
00686    {
00687      fprintf(f, "total jobs=%d&nbsp;&nbsp;&nbsp;", totalJobs);
00688      fprintf(f, "total tasks=%d&nbsp;&nbsp;&nbsp;", totalTasks);
00689      fprintf(f, "total reservations=%d%s", totalReservations, newln[html]);
00690    }
00691    */
00692    fprintf(f, hline[html]);
00693 
00694   
00695   //*** print info about each job (one line per job)
00696   //
00697   int lnCount = 3;
00698   lJobs.Start_Get();
00699   while ((j = (JobEntry *)lJobs.Get_Next()))
00700   {
00701      lnCount++;
00702      if (lnCount == 23) break;
00703      fprintf(f, " %8s(%d;%s) ",j->id.name(), j->id.id, j->user());
00704      if (j->cl_stat == client_on)
00705      {
00706        char ip[26];
00707        j->cl_addr.getIP(ip);
00708        fprintf(f, "(%s) ", ip);
00709      }
00710      else fprintf(f, "abandoned              ");
00711      //print running tasks
00712      j->tasks.Start_Get();
00713      int tprinted = 0;
00714      fprintf(f, "r: ");           
00715      while ((t = (TaskEntry *)j->tasks.Get_Next()))
00716      if ((t->stat == task_started) ||
00717          (t->stat == task_launch) ||
00718          (t->stat == task_launch_moved))
00719      {
00720        tprinted++;                
00721        if ((tprinted == 11) && !html) break;
00722        fprintf(f, "%3d ", t->id.id);                
00723      }
00724      if (tprinted == 11) fprintf(f, "... ");
00725      else
00726      {
00727        fprintf(f, "w: ");           
00728        j->tasks.Start_Get();
00729        while ((t = (TaskEntry *)j->tasks.Get_Next()))
00730        if ((t->stat == task_buffered) ||
00731            (t->stat == task_buffered_moved))
00732        {
00733          tprinted++;
00734          if ((tprinted == 11) && !html) break;
00735          fprintf(f, "%3d", t->id.id);                
00736        }
00737        if (tprinted == 11) fprintf(f, "...  ");
00738        else fprintf(f, "  ");
00739      }
00740      while (tprinted++ < 10) fprintf(f, "    ");
00741      //print reservations
00742      int rprinted = 0;
00743      lReservs.Start_Get();
00744      while ((r = (ReserveEntry *)lReservs.Get_Next()))
00745      if (r->job == j)
00746      {
00747        rprinted++;
00748        if ((rprinted == 6) & !html) break;
00749        if (r->parallel) fprintf(f, "%dp ", r->slavesRequired);
00750        else fprintf(f, "%d ", r->slavesRequired);
00751      }
00752      if (rprinted == 6) fprintf(f, "...");
00753      fprintf(f, newln[html]);
00754    }
00755 
00756    //*** print info about each slave (one line per slave)
00757    //
00758    now.Now();
00759    lSlaves.Start_Get();
00760    if (html) fprintf(f, "%s", "<table border=1 cellspacing=0 cellpadding=1><tr><th>IP<th>Platform<th>State<th>Task<th>% busy<th>% disabled<th>last change<th>last status<th>on since</tr>\n");
00761 
00762    while ((s = (SlaveEntry *)lSlaves.Get_Next()))
00763    {
00764      char _host[26], *_stat;
00765      s->info->addr.getIP (_host);
00766      char stask[50];
00767      if (html) sprintf(stask, "&nbsp;");
00768      else stask[0] = '\0';
00769      switch (s->stat()) {
00770        case slave_ready:     _stat = "Ready  "; break;
00771        case slave_busy:      _stat = "Busy   "; 
00772                              if (s->task)
00773                                sprintf(stask, "[(%s,%d)-%d]", 
00774                                  s->task->id.jId.name(),
00775                                  s->task->id.jId.id,
00776                                  s->task->id.id);
00777                              break;
00778        case slave_disabled:  _stat = "Disable"; break;
00779        case slave_reserved:  _stat = "Reserv "; break;
00780        case slave_contacted: _stat = "Contact"; break;
00781        case slave_off:       _stat = "Off    "; break;
00782        default:              _stat = "ERR    "; break;
00783      }
00784      double is_on = (double)now.sec - (double)s->statistics.on_since.sec 
00785                       + 0.000001 * ((double)now.usec - (double)s->statistics.on_since.usec);
00786      if (html)
00787      {
00788        char *fontcolor;
00789        if (s->stat() == slave_ready) fontcolor = "green";
00790        else if (s->stat() == slave_disabled) fontcolor = "orange";
00791        else fontcolor = "blue";
00792        fprintf (f, "<tr><td><font color=%s>%s</font></td>\n", fontcolor, _host);
00793        fprintf (f, "<td><font color=%s>%s,%s,%d MHz,%.0f MB,%.0f MB</font></td>\n", fontcolor, s->info->os(), s->info->cpu(), s->info->cpu_speed, s->info->mem, s->info->disk);
00794        fprintf (f, "<td><font color=%s>%s </font></td>\n", fontcolor, _stat);
00795        fprintf (f, "<td><font color=%s>%s</font></td>\n", fontcolor, stask);
00796        fprintf (f, "<td><font color=%s>%.3f%%</font></td>\n", fontcolor, 100.0 * s->statistics.sec_busy / is_on);
00797        fprintf (f, "<td><font color=%s>%.3f%%</font></td>\n", fontcolor, 100.0 * s->statistics.sec_disabled / is_on);
00798        fprintf (f, "<td><font color=%s>%ds</font></td>\n", fontcolor, now.sec - s->statistics.last_state_change_real.sec);
00799        fprintf (f, "<td><font color=%s>%ds</font></td>\n", fontcolor, now.sec - s->time_last_status.sec);
00800        fprintf (f, "<td><font color=%s>%s</font></td></tr>\n", fontcolor, s->statistics.on_since.print_ctime());
00801      }
00802      else
00803        fprintf (f, "%s(%s,%s,%d MHz,%.0f MB,%.0f MB): %s%s (%.3f%% busy %.3f%% disabled)\n", 
00804               _host, s->info->os(),
00805               s->info->cpu(), s->info->cpu_speed, s->info->mem, s->info->disk, _stat, stask, 
00806               100.0 * s->statistics.sec_busy / is_on,  
00807               100.0 * s->statistics.sec_disabled / is_on);
00808    }
00809    if (html) fprintf(f, "</table>\n");
00810    if (update_status) last_status_msg = status_msg;
00811    fprintf(f, "last status: %s%s", last_status_msg, newln[html]);
00812    status_msg = "ready";
00813    update_status = 1;
00814    
00815    if (!html) fprintf(f, prompt);
00816    else
00817    {
00818        fprintf(f, "</body></html>\n");
00819    }
00820    fflush(f);
00821 }
00822 
00823 // this thread displays status information at the console
00824 void Master::consoleOutThread()
00825 {
00826     DBUG_PRINT("info", ("Master: consoleOutThread started"));
00827     threads_running++;
00828 
00829     while (master_running)
00830     {
00831 
00832       lock.lock();
00833 
00834           // if semaphore was posted multiple times, remove redundant posts...
00835           while (monitor.value()) monitor.wait();
00836     
00837           cls(console);
00838           print_status(console, 0);
00839 
00840       lock.unlock();
00841    
00842       //wait for another event
00843       do {
00844         monitor.wait();
00845       } while (inputPrompt == 1);        // if inputPrompt is 1, don't refresh,
00846                                          // becauee user is entering data
00847       if (inputPrompt) inputPrompt--;    // refresh first time if in.P. is 2
00848 
00849     }
00850     fprintf(console, "\n");
00851 
00852     DBUG_PRINT("info", ("Master: consoleOutThread terminated"));
00853     threads_running--;
00854 }
00855 
00856 // saves the current status into status file in HTML format
00857 void Master::write_status()
00858 {
00859     if (!status_file) return;
00860     FILE *f = fopen(status_file, "w+");
00861     if (!f)
00862     {
00863       DBUG_PRINT("err", ("Master: error opening status file %s", status_file));
00864       return;
00865     }
00866     print_status(f, 1);
00867     fclose(f);
00868 }
00869 
00870 // this thread should do the job of a simple http server
00871 void Master::httpServerThread ()
00872 {
00873 
00874 }
00875 
00876 // this thread should check periodicaly the lists for timeouts
00877 void Master::checkingThread ()
00878 {
00879   static int _count = -1;
00880   int change; // modifications were performed
00881 
00882   ZTime           now;
00883   char            nows[12];
00884   TaskEntry      *task;
00885   SlaveEntry     *slave;
00886   ClientMsgEntry *msg;
00887 
00888   DBUG_PRINT("info", ("Master: checkingThread started"));
00889   threads_running++;
00890 
00891   while (master_running) {
00892     // to avoid busy waiting
00893     SLEEP_MSEC(1000);
00894     change = 0;
00895 
00896     lock.lock();
00897     {
00898       // check timeout for tasks which are in state task_launch(_moved)
00899       now.Now ();
00900       now.print (nows);
00901       lTasksRun.Start_Get ();
00902       while ((task = (TaskEntry *)lTasksRun.Get_Next ())) 
00903         if (now > task->timeend)
00904           if ((task->stat == task_launch) || 
00905               (task->stat == task_launch_moved))
00906             {
00907               char _host[26];
00908               task->slave->info->addr.getIP (_host);
00909               DBUG_PRINT("err", ("Master: no answer to TASK_INIT, Task(%s,%d,%d), slave %s deleted, resched.task!", 
00910                                  nows, task->job->id.name(), 
00911                                  task->job->id.id, task->id.id, _host));
00912 
00913               // slave didn't answer TASK_INIT -> disable it
00914               slave = task->slave;
00915         slaveSetState (slave, slave_off);  // force logging of state change
00916               lSlaves.Remove (slave);
00917               task->slave = 0;
00918               delete slave;
00919 
00920               // ...delete the task from the list
00921               lTasksRun.Remove (task);
00922               // ...search for another slave
00923               msgTaskInit_Normal (task);
00924               change = 1;
00925             }
00926     }
00927     lock.unlock();
00928 
00929     SLEEP_MSEC(100);
00930 
00931     lock.lock();
00932     {
00933       // check slaves for periodical SLAVE_STATUS msgs and reservation timeout
00934       now.Now ();
00935       now.print (nows);
00936       lSlaves.Start_Get ();
00937       while ((slave = (SlaveEntry *)lSlaves.Get_Next ())) 
00938         if (now > (slave->time_last_status + slave_status_timeout))
00939           {
00940             char _host[26];
00941             slave->info->addr.getIP (_host);
00942             DBUG_PRINT("info", ("Master: no SLAVE_STATUS, "
00943                                "slave %s deleted", _host));
00944 
00945             // slave has a task assigned
00946             if (slave->task)
00947               {
00948                 // notify client and delete the task
00949                 sndTaskStatus (&slave->task->job->cl_addr, slave->task->job, slave->task->id, 
00950                                task_crashed, "err no SLAVE_STATUS received");
00951                 deleteTask (slave->task);
00952                 slave->task = 0;
00953               }
00954 
00955             // slave didn't send SLAVE_STATUS -> disable it
00956       slaveSetState (slave, slave_off);  // force logging of state change
00957             lSlaves.Remove (slave);
00958             delete slave;
00959             change = 1;
00960           }
00961         else if ((slave->stat() == slave_reserved) && (now > (slave->time_reserved + slave_reserve_timeout)))
00962         {
00963            char _host[26];
00964            slave->info->addr.getIP(_host);
00965            DBUG_PRINT("info", ("Master*%s: reserve_timeout for slave %s", nows, _host));
00966            slaveSetState(slave, slave_ready);
00967            checkBufferedTasks();
00968         }
00969 
00970       if (_count != lClientMsg.Count()) {
00971         _count = lClientMsg.Count();
00972         DBUG_PRINT("info", ("Master*%s: client msg queue has %d msgs", nows, _count));
00973       }
00974     }
00975     lock.unlock();
00976     
00977     SLEEP_MSEC(100);
00978 
00979     lock.lock();
00980     {
00981       // send again unconfirmed messages to clients
00982       now.Now ();
00983       now.print (nows);
00984       lClientMsg.Start_Get ();
00985       while ((msg = (ClientMsgEntry *)lClientMsg.Get_Next ())) 
00986         {
00987           // message is invalid (shouldn't happen at all!)
00988           if (po->checkN (msg->mId) == send_invalid)
00989             {
00990               DBUG_PRINT("err", ("Master*%s: invalid msgId!!", nows));
00991               po->deleteN (msg->mId);
00992               delete msg->msg;
00993               lClientMsg.Remove (msg);
00994               delete msg;
00995               change = 1;
00996               continue;
00997             }
00998 
00999           // message was confirmed
01000           if (po->checkN (msg->mId) == send_confirm)
01001             {
01002               po->deleteN (msg->mId);
01003               delete msg->msg;
01004               lClientMsg.Remove (msg);
01005               delete msg;
01006               change = 1;
01007               continue;
01008             }
01009           
01010           // message needs to be resent
01011           if ((now > (msg->last_sent + client_resend_timeout)) &&
01012               (msg->count_sent > 0) &&
01013         // don't send anymore to lost clients
01014         (msg->job->cl_addr.getPort()))
01015             {
01016               msg->count_sent--;
01017               msg->last_sent.Now ();
01018 #ifdef HAVE_OPENSSL
01019               po->sendN (msg->msg, &msg->job->cl_addr, sm_sign);
01020 #else
01021               po->sendN (msg->msg, &msg->job->cl_addr);
01022 #endif
01023               change = 1;
01024             }
01025 
01026           if ((now > (msg->last_sent + client_resend_timeout)) &&
01027               (msg->count_sent == 0))
01028             {
01029               msg->count_sent = -1;
01030               char _host[26];
01031               msg->job->cl_addr.getIP (_host);
01032               DBUG_PRINT("info", ("Master*%s: last send to client %s", nows, _host));
01033             }
01034         }  // while
01035     }
01036     lock.unlock();
01037 
01038     SLEEP_MSEC(100);
01039 
01040     lock.lock();
01041     {
01042       // check timeout for tasks which are running for too long
01043       now.Now ();
01044       now.print (nows);
01045       lTasksRun.Start_Get ();
01046       while ((task = (TaskEntry *)lTasksRun.Get_Next ())) 
01047         if ((task->stat == task_started) && (now > task->timeend))
01048           {
01049             // send a task stop to the slave
01050             sndTaskCtrl (&task->slave->info->addr, task->id, p_Stop);
01051 
01052             // notify client and delete the task
01053             sndTaskStatus (&task->job->cl_addr, task->job, task->id, 
01054                            task_crashed, "err timeout kill");
01055 
01056             DBUG_PRINT("info", ("Master: task running timeout, Task(%s,%d,%d) killed", 
01057                                 task->job->id.name (), 
01058                                 task->job->id.id, 
01059                                 task->id.id, nows));
01060 
01061             deleteTask (task);
01062             change = 1;
01063           }
01064     }
01065     lock.unlock();
01066 
01067     SLEEP_MSEC(100);
01068 
01069     lock.lock();
01070       // check timeout for reloading user database from file
01071       now.Now();
01072       if (now > next_userfile_reload)
01073       {
01074         delete userdb;
01075 #ifdef HAVE_OPENSSL
01076         // read the userdb file
01077         userdb = new qpzuserdb(userfile, master_crypter);
01078 #else
01079         // read the userdb file
01080         userdb = new qpzuserdb(userfile);
01081 #endif
01082         userdb->save_on_delete = 0;
01083         next_userfile_reload = now + userfile_reload_period;
01084       }      
01085     lock.unlock();
01086 
01087     SLEEP_MSEC(100);
01088 
01089     lock.lock();
01090       //write the current status to status file
01091       write_status();
01092     lock.unlock();
01093 
01094     SLEEP_MSEC(100);
01095 
01096     static int update_slave_statistics_counter = 10;
01097     lock.lock();
01098       //every once in a while update the statistics info for slaves 
01099       //and compute their rank
01100       if (!(update_slave_statistics_counter--))
01101       {
01102         update_slave_statistics_counter = 10;
01103         now.Now();
01104         SlaveEntry *s;
01105         lSlaves.Start_Get();
01106         while ((s = (SlaveEntry *)lSlaves.Get_Next()))
01107         {
01108           if (s->stat() == slave_disabled)
01109           {
01110              s->statistics.sec_disabled += ((double)now.sec - (double)s->statistics.last_state_change.sec)
01111                              + 0.000001 * ((double)now.usec - (double)s->statistics.last_state_change.usec);
01112              s->statistics.last_state_change = now;
01113           }
01114           else if (!(s->stat() == slave_ready))
01115           {
01116              s->statistics.sec_busy += (double)now.sec - (double)s->statistics.last_state_change.sec
01117                         + 0.000001 * ((double)now.usec - (double)s->statistics.last_state_change.usec);
01118              s->statistics.last_state_change = now;
01119           }
01120           //right now rank is 0.001 * cpu_speed - disabled_ratio
01121           s->statistics.rank = 0.001 * ((double)s->info->cpu_speed - s->statistics.sec_disabled / 
01122                           ((double)now.sec + 0.000001 * (double)now.usec - 
01123                            running_since_as_double));
01124         }
01125       }
01126     lock.unlock();
01127 
01128     // if any changes
01129     if (change) 
01130       monitor.post();
01131 
01132   }  // while
01133 
01134   DBUG_PRINT("info", ("Master: checkingThread terminated"));
01135   threads_running--;
01136 }
01137 
01138 
01139 void Master::dispatchMsg (XMLData *data, Address *remote)
01140 {
01141   ZTime now;
01142   char  nows[12], _host[26];
01143   XMLData *msg;
01144   CharStr *user, *passwd;
01145 
01146   //debug
01147   //char debugbuf[8004];
01148 
01149   user = CharStr::Error;
01150 
01151   lock.lock();
01152 
01153   now.Now ();
01154   now.print (nows);
01155   remote->getIP (_host);
01156 
01157   // extract user and password if present
01158   if (strcmp(data->tag()->str, p_Data->str) == 0)
01159   {
01160      user = data->sub(p_UserInfo)->sub(p_User)->getString();
01161      user->stored();
01162      data->reset();
01163      passwd = data->sub(p_UserInfo)->sub(p_Pswd)->getString();
01164      //note: we don't call user->stored() and passwd->stored()
01165      //      so we should not keep these guys stored
01166      if (!verifyUserPswd(user, passwd, remote))
01167      {
01168        DBUG_PRINT("err", ("Master: incorrect user/pswd (%s) from client %s",
01169                           user->str, _host));
01170        sndStatusError (remote, "err invalid user/pswd", 
01171                        data->reset(0, 1)->sub(p_JobID));
01172        delete data;
01173        lock.unlock();
01174        user->released();
01175        return;
01176      }
01177      else  // username and password correct - extract the message and 
01178            // delete the wrapping xml stuff
01179      {
01180        msg = data->sub(p_Message);
01181        if (msg == XMLData::Nil) 
01182        {
01183          DBUG_PRINT("err", ("Master: received <Data> element without <Message> subelement"));
01184          delete data;
01185          lock.unlock();
01186          user->released();
01187          return;
01188        }
01189 
01190        // this will remove without deallocating
01191        data->remove(p_Message->str, 1);
01192 
01193        delete data;
01194      }
01195   }
01196   else msg = data;
01197 
01198   if (! msgType (msg, p_M_SLAVE_STATUS))
01199   {
01200     DBUG_PRINT("info", ("Master*%s: dispatch %s from %s", 
01201                nows, msgType (msg)->str, _host));
01202   }
01203 
01204   // keep messages sorted based on their frequencies
01205 
01206   // M_SLAVE_STATUS message
01207   if (msgType (msg, p_M_SLAVE_STATUS))
01208     msgSlaveStatus (msg, remote);
01209 
01210   // M_TASK_STATUS message
01211   else if (msgType (msg, p_M_TASK_STATUS))
01212     msgTaskStatus (msg, remote);
01213 
01214   // M_TASK_INIT message
01215   else if (msgType (msg, p_M_TASK_INIT))
01216     msgTaskInit (msg, remote, user);
01217 
01218   // M_TASK_FINISH message
01219   else if (msgType (msg, p_M_TASK_FINISH))
01220     msgTaskFinish (msg, remote);
01221 
01222   // M_SLAVE_RESERVE message
01223   else if (msgType (msg, p_M_SLAVE_RESERVE))
01224     msgSlaveReserve (msg, remote, user);
01225 
01226   // M_JOB_CTRL message
01227   else if (msgType (msg, p_M_JOB_CTRL))
01228     msgJobCtrl (msg, remote, user);
01229 
01230   // M_CLIENT_STATUS message
01231   else if (msgType (msg, p_M_CLIENT_STATUS))
01232     msgClientStatus (msg, remote, user);
01233 
01234   // M_TASK_CTRL message
01235   else if (msgType (msg, p_M_TASK_CTRL))
01236     msgTaskCtrl (msg, remote, user);
01237 
01238   // M_SLAVE_CTRL message
01239   else if (msgType (msg, p_M_SLAVE_CTRL))
01240     msgSlaveCtrl (msg, remote, user);
01241 
01242   // M_TASK_MOVE message
01243   else if (msgType (msg, p_M_TASK_MOVE))
01244     msgTaskMove (msg, remote);
01245 
01246   else {
01247     DBUG_PRINT("err", ("Master: error invalid message type=%s", 
01248                        msg->getAttrib ("Type")->str));
01249     delete msg; 
01250   }
01251 
01252   lock.unlock();
01253   user->released();
01254   return;
01255 }
01256 
01257 
01258 //***********************************************//
01259 // M_JOB_CTRL
01260 void Master::msgJobCtrl (XMLData *msg, Address *remote, CharStr *user)
01261 {
01262   JobEntry *job;
01263   int queue_msg;
01264 
01265   do {
01266     // check if the message has a valid jobId
01267     job = srchJob(msg);
01268 
01269     // log the message
01270     log_job_msg(job, msg, mdir_c2m);
01271     msg->reset();
01272 
01273     if (! job) {
01274       JobId jid;
01275       x2o(msg->sub(p_JobID), jid);
01276       DBUG_PRINT("err", ("Master: error M_JOB_CTRL, Job(%s,%d) not found",
01277                  jid.name (), jid.id));
01278       sndJobStatus (remote, job, jid, job_refused, "err invalid jobid", 
01279                     XMLData::Nil, DONT_QUEUE_MSG);
01280       break;
01281     }
01282 
01283     //if msg comes from other client, don't queue the response
01284     queue_msg = (job->cl_addr == *remote);
01285 
01286     // extract action
01287     job_ctrl_action action;
01288     x2o(msg->getString(p_Action), action);
01289 
01290     // msg must come from the same client as before!
01291     // (don't do it for GetStatus message)
01292     if (action != get_job_status)
01293     {
01294       //only the old user or "admin" can send a JobCtrl
01295       if ((! checkUser (user, job->_cs_user ())) &&
01296           (! checkUser (user, "admin")))
01297       {
01298         DBUG_PRINT("err", ("Master: error M_JOB_CTRL, Job(%s,%d) owned by other user",
01299                            job->id.name(), job->id.id));
01300         sndJobStatus (remote, job, job->id, job_refused, "err invalid user for jobid",
01301                       XMLData::Nil, queue_msg);
01302         break;
01303       } 
01304    
01305       if ((job->cl_addr != *remote) ||
01306           (job->cl_stat != client_on))
01307       {
01308         char _h1[26], _h2[26];
01309         job->cl_addr.getIP (_h1);
01310         remote->getIP (_h2);
01311         DBUG_PRINT("info", ("Master: error M_JOB_CTRL, Job(%s, %d), "
01312                             "wrong client address %s (should be %s)",
01313                             job->id.name(), job->id.id, _h2, _h1));
01314         status_msg = "M_JOB_CTRL from wrong client address";
01315         sndJobStatus (remote, job, job->id, job_refused, "err invalid client", 
01316                       XMLData::Nil, queue_msg);
01317         break;
01318       }
01319     }
01320     
01321     if (action == stop_job)  
01322       msgJobCtrl_Stop (job, remote);
01323     else if (action == get_job_status)  
01324       msgJobCtrl_GetStatus (job, remote);
01325     else if (action == stop_all_jobs_user)
01326       msgJobCtrl_StopAllUser (job, remote, user);
01327     else if (action == stop_all_jobs_name)
01328       msgJobCtrl_StopAllName (job, remote);
01329     else 
01330     {
01331       DBUG_PRINT("info", ("Master: error M_JOB_CTRL, Job(%s,%d), "
01332                           "invalid Action=%s", 
01333                           job->id.name(), job->id.id,
01334                           msg->getString(p_Action)->str));
01335       status_msg = "M_JOB_CTRL invalid action";
01336       sndJobStatus (remote, job, job->id, job_refused, "err invalid action", XMLData::Nil, queue_msg);
01337       break;
01338     }
01339   } while (0);
01340 
01341   delete msg;
01342 }
01343 
01344 
01345 // M_JOB_CTRL(Stop)
01346 void Master::msgJobCtrl_Stop (JobEntry *job, Address *remote)
01347 {
01348   TaskEntry *task;
01349   SlaveEntry *slave;
01350   ReserveEntry *reserve;
01351 
01352   // put all slaves that are reserved for this job into ready mode
01353 
01354   // remove all reservations for this job 
01355 
01356   // for all tasks in the list 'tasks': 
01357   //    task state launch/running:
01358   //         1. put slave on disabled mode
01359   //         2. send TASK_CTRL_STOP to the SLAVE
01360   //         3. delete the task from LTasksRun
01361   //    task state is buffered
01362   //         1. delete the task from lTasksWait
01363   //    
01364   // all pending messages for this job from lClientMsg are removed
01365   //
01366   // send M_JOB_STATUS(stopped) back to the client withouth queing in lClientMsg
01367   //
01368   // job is removed from lJobs and deleted
01369   //
01370   // if there were any reservations for this job (i.e. some slaves
01371   //    are becaming available, check for new tasks to start)
01372 
01373     int need_check = 0;
01374     lSlaves.Start_Get();
01375     while ((slave = (SlaveEntry *) lSlaves.Get_Next()))
01376         if ((slave->stat() == slave_reserved) &&
01377             (slave->jobReserved == job))
01378         {
01379             slaveSetState(slave, slave_ready);
01380             slave->jobReserved = 0;
01381             need_check = 1;
01382         }
01383 
01384     lReservs.Start_Get();
01385     while ((reserve = (ReserveEntry *) lReservs.Get_Next()))
01386         if (reserve->job == job)
01387         {
01388             lReservs.Remove(reserve);
01389             delete reserve;
01390         }
01391 
01392     job->tasks.Start_Get ();
01393     while ((task = (TaskEntry *) job->tasks.Get_Next ()))
01394         if ((task->stat == task_launch) ||
01395             (task->stat == task_launch_moved) ||
01396             (task->stat == task_started))
01397         {
01398             slaveSetState(task->slave, slave_disabled);
01399             task->slave->task = 0;
01400             // send Stop to that task
01401             sndTaskCtrl (&task->slave->info->addr, task->id, p_Stop);
01402             lTasksRun.Remove(task);
01403         }
01404         else if ((task->stat == task_buffered) || (task->stat == task_buffered_moved))
01405             lTasksWait.Remove(task);
01406 
01407     lClientMsg.Start_Get();
01408     ClientMsgEntry *qmsg;
01409     while ((qmsg = (ClientMsgEntry *)lClientMsg.Get_Next()))
01410       if (qmsg->job == job) lClientMsg.Remove(qmsg);
01411 
01412     if (remote)
01413       sndJobStatus (remote, job, job->id, job_stopped, 0, XMLData::Nil, DONT_QUEUE_MSG);
01414 
01415     lJobs.Remove(job);
01416     delete job;
01417 
01418     // we purge all incomming msg ids for this client
01419     if (remote)
01420       po->purgeIn (*remote);
01421 
01422     if (need_check) 
01423       checkBufferedTasks();
01424     status_msg = "job deleted";
01425 }
01426 
01427 // M_JOB_CTRL(StopAllUser)
01428 void Master::msgJobCtrl_StopAllUser (JobEntry *job, Address *remote, CharStr *user)
01429 {
01430   // look for all jobs of this user and call msgJobCtrl_Stop for each
01431   JobEntry *j;
01432 
01433   lJobs.Start_Get();
01434   while ((j = (JobEntry *)lJobs.Get_Next()))
01435     if (!strcmp(user->str, j->user()))
01436       msgJobCtrl_Stop(j, remote);
01437 }
01438 
01439 // M_JOB_CTRL(StopAllName)
01440 void Master::msgJobCtrl_StopAllName (JobEntry *job, Address *remote)
01441 {
01442   //look for all jobs with the same name and call msgJobCtrl_Stop for each
01443   CharStr *name = job->id._cs_name();
01444   name->stored();
01445   JobEntry *j;
01446 
01447   lJobs.Start_Get();
01448   while ((j = (JobEntry *)lJobs.Get_Next()))
01449     if (!strcmp(j->id.name(), name->str))
01450       msgJobCtrl_Stop(j, remote);
01451   name->released();
01452 }
01453 
01454 // M_JOB_CTRL(GetStatus)
01455 // return a job status with all info about the job
01456 void Master::msgJobCtrl_GetStatus (JobEntry *job, Address *remote)
01457 {
01458     //collect all reservations for this job
01459     ReserveEntry *reserve;
01460     XMLData *reservations = XMLData::Nil;
01461 
01462     lReservs.Start_Get();
01463     while ((reserve = (ReserveEntry *)lReservs.Get_Next()))
01464         if (reserve->job == job)
01465         {
01466             reservations = new XMLData(p_Reservation, o2x(reserve->id), reservations);
01467             if (reserve->parallel) reservations->add(new XMLData(p_Parallel, XMLData::Nil));
01468             reservations->add(new XMLData(p_SlavesRequired, reserve->slavesRequired));
01469         }
01470 
01471     // and send status back to client
01472     if (job->cl_stat == client_on)
01473         sndJobStatus(remote, job, job->id, job_running, 0, reservations);
01474     else
01475         sndJobStatus(remote, job, job->id, job_abandoned, 0, reservations);
01476 
01477     if (reservations != XMLData::Nil) delete reservations;
01478     status_msg = "job status sent";
01479 }
01480 
01481 
01482 //***********************************************//
01483 // M_TASK_INIT
01484 void Master::msgTaskInit (XMLData *msg, Address *remote, CharStr *user)
01485 {
01486   JobEntry *job;
01487   TaskId taskId;
01488   TaskEntry *task;
01489 
01490   do {
01491     // search for the specified job
01492     XMLData *xTaskId = msg->sub(p_TaskID);
01493     job = srchJob (xTaskId);
01494     x2o(xTaskId, taskId);   
01495 
01496     DBUG_PRINT("info", ("Master: new M_TASK_INIT, Task(%s,%d,%d) arrived",
01497                         taskId.jId.name(), taskId.jId.id, taskId.id));
01498 
01499     // inexistent job
01500     if (! job)
01501     {
01502       DBUG_PRINT("err", ("Master: error M_TASK_INIT, inexistent job id for Task(%s,%d,%d)", 
01503                          taskId.jId.name(), taskId.jId.id, taskId.id)); 
01504       sndTaskStatus (remote, job, taskId, task_refused, "err inexistent job", DONT_QUEUE_MSG);
01505       break;
01506     }
01507     
01508     // invalid task
01509     if (taskId.id <= 0)
01510     {
01511       DBUG_PRINT("err", ("Master: error M_TASK_INIT, invalid task id for Task(%s,%d,%d)", 
01512                          taskId.jId.name(), taskId.jId.id, taskId.id)); 
01513       sndTaskStatus (remote, job, taskId, task_refused, "err invalid taskid", DONT_QUEUE_MSG);
01514       break;
01515     }
01516 
01517     // existent task with this id
01518     task = srchTask (taskId);
01519     if (task) {
01520       DBUG_PRINT("err", ("Master: error M_TASK_STATUS, already existent Task(%s,%d,%d)", 
01521                          taskId.jId.name(), taskId.jId.id, taskId.id)); 
01522       status_msg = "task status with wrong taskid";
01523       break;
01524     }
01525 
01526     // only job owner user can send a TaskInit
01527     if (! checkUser (user, job->_cs_user ()))
01528     {
01529       DBUG_PRINT("err", ("Master: error M_TASK_INIT, "
01530                          "job owned by other user (%s), Task(%s,%d,%d)", 
01531                          job->user(), job->id.name(), job->id.id, taskId.id));
01532       sndTaskStatus (remote, job, taskId, task_refused, "err invalid user for jobid", DONT_QUEUE_MSG);
01533       break;
01534     } 
01535 
01536     // msg must come from the same client as before!
01537     if (job->cl_addr != *remote) {
01538       char _h1[26], _h2[26];
01539       job->cl_addr.getIP (_h1);
01540       remote->getIP (_h2);
01541       DBUG_PRINT("err", ("Master: error M_TASK_INIT, "
01542                          "wrong client address %s (should be %s)",
01543                          job->id.name(), job->id.id, _h2, _h1));
01544       sndTaskStatus (remote, job, taskId, task_refused, "err invalid client", DONT_QUEUE_MSG);
01545       break;
01546     }
01547 
01548     // create new TaskEntry for this task
01549     TaskEntry *task = new TaskEntry(taskId, job, 0, 
01550                                     task_internal_error, 
01551                                     msg->sub(XMLData::CDATA)->getString());
01552     XMLData *x;
01553     while ((x = msg->sub(p_TaskInfo)) != XMLData::Nil)
01554     {        
01555       TaskInfo *taskInfo = new TaskInfo();
01556       if (!x2o(x, *taskInfo))
01557       {
01558         DBUG_PRINT("err", ("Master: error M_TASK_INIT with malformed TaskInfo"));
01559         sndTaskStatus(remote, job, taskId, task_refused, "err malformed TaskInfo");
01560         status_msg = "malformed taskInfo in task init";
01561         delete task; 
01562         delete taskInfo;
01563         delete msg;
01564         return;
01565       }
01566       task->taskInfos.Add(taskInfo);
01567     }
01568     
01569     // no TaskInfo is present in the message
01570     if (task->taskInfos.Count () == 0) {
01571       delete task;
01572       DBUG_PRINT("err", ("Master: error M_TASK_INIT no TaskInfo specified"));
01573       sndTaskStatus (remote, job, taskId, task_refused, "err no TaskInfo");
01574       status_msg = "no taskinfo in task init";
01575       break;
01576     }
01577 
01578     DBUG_PRINT("info", ("Master: new M_TASK_INIT, Task(%s,%d,%d) accepted",
01579                         taskId.jId.name(), taskId.jId.id, taskId.id));
01580 
01581     //add task on list of tasks for this job
01582     job->tasks.Add(task);
01583 
01584     //create new log file for this task
01585     if (log_on)
01586     {
01587       char logname[300];
01588       sprintf(logname, "%s/%s.%d/%d", 
01589               log_root, job->id.name(), job->id.id, task->id.id);
01590       ofstream logf(logname);
01591       logf.close();
01592       log_task_msg(task, msg, mdir_c2m);
01593     }
01594 
01595     // see what kind of TaskInit it is
01596     if (XMLData::Nil != msg->sub(p_Address))
01597     {
01598       // it is part of a reservation (Address is in the M_TASK_INIT)
01599       Address slaveAddr(msg->sub()->getString()->str);
01600       SlaveEntry *slave = srchSlaveAddr(slaveAddr);
01601       if (slave)
01602         msgTaskInit_Reserve(task, slave);
01603       else
01604       {
01605         char _host[26];
01606         slaveAddr.getIP(_host);
01607         DBUG_PRINT("err", ("Master: error M_TASK_INIT, Task(%s,%d,%d), "
01608                            "wrong slave address %s", 
01609                            taskId.jId.name(), taskId.jId.id, taskId.id,
01610                            _host));
01611         sndTaskStatus (&task->job->cl_addr, task->job, task->id, 
01612                        task_refused, "err invalid slave address");
01613         status_msg = "invalid slave in task init reserved";
01614         task->job->tasks.Remove(task);
01615         delete task;
01616         return;
01617       }
01618     }
01619     else
01620     {
01621       // no reserveID: either find a slave and start or buffer
01622       msgTaskInit_Normal(task);
01623     }
01624 
01625   } while (0);
01626 
01627   delete msg;
01628 }
01629 
01630 
01631 void Master::msgTaskInit_Reserve(TaskEntry *task, SlaveEntry *slave)
01632 {
01633   CharStr *taskURL = CharStr::Error;
01634   CharStr *userData;
01635   long taskTimeout;
01636 
01637   
01638   //get the URL for this slave
01639   TaskInfo *matched = slaveMatchTasks(slave->info, &task->taskInfos);
01640   if (!matched) 
01641     {
01642       DBUG_PRINT("err", ("Master: error M_TASK_INIT with reserve - no matching taskinfo for this slave"));
01643       sndTaskStatus (&task->job->cl_addr, task->job, task->id, task_refused, "no matching taskinfo for this slave");
01644       status_msg = "no matching taskinfo in taskinit reserved";
01645       task->job->tasks.Remove(task);
01646       delete task;
01647       return;
01648     }
01649 
01650   //slave must be reserved
01651   if ((slave->stat() != slave_reserved) || (slave->jobReserved != task->job))
01652   {
01653     DBUG_PRINT("err", ("Master: error M_TASK_INIT with reserve for a slave that is not reserved"));
01654     sndTaskStatus (&task->job->cl_addr, task->job, task->id, task_refused, "the requested slave is not reserved");
01655     status_msg = "task init for not reserved slave";
01656     task->job->tasks.Remove(task);
01657     delete task;
01658     return;
01659   }
01660   taskURL = matched->_cs_url();
01661   taskTimeout = matched->timeout;
01662   userData = matched->_cs_userData();
01663 
01664   startTask(task, slave, taskURL, taskTimeout, userData);
01665   
01666   //update statistical info
01667   totalTasks++;
01668 }
01669 
01670     
01671 void Master::msgTaskInit_Normal(TaskEntry *task)
01672 {
01673   CharStr *taskURL = CharStr::Error;
01674   CharStr *userData;
01675   long taskTimeout;
01676 
01677   SlaveEntry *slave = srchSlaveTaskInfos(slave_ready, &task->taskInfos, taskURL, taskTimeout, userData);
01678   if (!slave) {
01679     // change status of task to buffered
01680     task->stat = task_buffered;
01681     lTasksWait.Add(task);
01682     
01683     DBUG_PRINT("info", ("Master: task buffered"))
01684     
01685     sndTaskStatus (&task->job->cl_addr, task->job, task->id,
01686                    task_buffered, "task buffered");
01687     status_msg = "new task buffered";
01688     return;
01689   }
01690   
01691   startTask(task, slave, taskURL, taskTimeout, userData);
01692   //update statistical info
01693   totalTasks++;
01694 }
01695 
01696 
01697 void Master::startTask(TaskEntry *task, SlaveEntry *slave, CharStr *taskURL, 
01698                        long taskTimeout, CharStr *userData)
01699 {
01700   task_state old_task_stat = task->stat;
01701 
01702   //set the timeout for launch state
01703   ZTime now; now.Now ();
01704   task->timeend = now + task_launch_timeout;
01705   
01706   //save the timeout 
01707   task->timeout = taskTimeout;
01708  
01709   // save the task reference to the slave entry
01710   slave->task = task;
01711  
01712   // put the slave on contacted state
01713   slaveSetState(slave, slave_contacted);
01714   
01715   // change status of task to launch
01716   if (task->stat == task_buffered_moved)
01717     task->stat = task_launch_moved;
01718   else
01719     task->stat = task_launch;
01720 
01721   //remember the slave for this task
01722   task->slave = slave;
01723   
01724   //add task to running list
01725   lTasksRun.Add(task);
01726 
01727   //if the task is not in buffered state,
01728   //notify client that the task is buffered (this is for the
01729   //reason that the contacted slave might not respond immediatelly and
01730   //we don't want the client to be blocked until we are able
01731   //to send M_TASK_STATUS(Started) or other response when
01732   //the slave replies)
01733   if (old_task_stat != task_buffered)
01734     sndTaskStatus (&task->job->cl_addr, task->job, task->id, 
01735                    task_buffered, "slave contacted");
01736 
01737   // send the task to the slave (spare 4 seconds for communication)
01738   sndTaskInit (&slave->info->addr, task->id, taskURL, taskTimeout - 4, task->_cs_data(), userData);
01739 
01740   status_msg = "new task started";
01741   
01742   return;
01743 }
01744 
01745 
01746 void Master::checkBufferedTasks ()
01747 {
01748   TaskEntry  *task;
01749   CharStr    *taskURL = CharStr::Error;
01750   CharStr    *userData;
01751   SlaveEntry *slave;
01752   long       taskTimeout;
01753 
01754   // get 
01755   lTasksWait.Start_Get ();
01756   while ((task = (TaskEntry *)lTasksWait.Get_Next ())) 
01757     if ((slave = srchSlaveTaskInfos(slave_ready, &task->taskInfos, taskURL, taskTimeout, userData)))
01758       {
01759         lTasksWait.Remove (task);
01760         startTask(task, slave, taskURL, taskTimeout, userData);
01761       }
01762 
01763   //also check the reservations
01764   checkReserves();
01765 }
01766 
01767 //go through the lReservs and see if we can assign a new slave
01768 // so far only very simple check is used
01769 void Master::checkReserves()
01770 {
01771     ReserveEntry *reserve;
01772     SlaveEntry *slave;
01773     int found_reserve;
01774 
01775     //DBUG_PRINT("info", ("Master: execute checkReserves"));
01776 
01777     do {
01778       found_reserve = 0;
01779       lReservs.Start_Get();
01780       while ((reserve = (ReserveEntry *)lReservs.Get_Next()))
01781         if (reserve->parallel)
01782         {
01783             List selected;
01784             lSlaves.Start_Get();
01785             while ((slave = (SlaveEntry *)lSlaves.Get_Next()))
01786             {
01787                 if ((slave->stat() == slave_ready) &&
01788                     (slaveMatchSlaves(slave->info, &reserve->slave_infos)))                        
01789                 {
01790                    selected.Add(slave);
01791                    if (selected.Count() == reserve->slavesRequired) break;
01792                 }                   
01793             }
01794             
01795             if (selected.Count() != reserve->slavesRequired)
01796                 selected.Release();
01797             else
01798             {   
01799                 //prepare the XML with all SlaveInfos
01800                 XMLData *slaveInfos = new XMLData(p_SlaveInfo, XMLData::Nil);
01801                 SlaveEntry *slave;
01802 
01803                 selected.Start_Get();                
01804                 while ((slave = (SlaveEntry *)selected.Get_Next()))
01805                 {
01806                     slaveInfos->add(o2x(*slave->info));
01807                     slaveSetState(slave, slave_reserved);
01808                     slave->jobReserved = reserve->job;
01809                     slave->time_reserved.Now ();
01810                 }
01811                 //reply to client
01812                 sndSlaveAvail(&reserve->job->cl_addr, reserve->job, reserve->slavesRequired, 
01813                               slaveInfos, reserve->id);
01814                 //and remove ReserveEntry
01815                 lReservs.Remove(reserve);
01816                 delete reserve;                  
01817                 //avoid deleting slaves on the selected list
01818                 selected.Release();
01819             }
01820         }
01821         else  // serial case
01822           if ((slave = srchSlaveSlaveInfos(slave_ready, &reserve->slave_infos)))
01823           {
01824               DBUG_PRINT("info", ("Master: found matching slave for reserve"));
01825               //put slave to reserved state
01826               slaveSetState(slave, slave_reserved);
01827               slave->jobReserved = reserve->job;
01828               slave->time_reserved.Now ();
01829               //decrement number of required slaves
01830               reserve->slavesRequired--;
01831               //and reply to client
01832               sndSlaveAvail(&reserve->job->cl_addr, reserve->job, 1, o2x(*slave->info), reserve->id);
01833               if (!reserve->slavesRequired) // no more -> destroy this entry
01834               {
01835                   lReservs.Remove(reserve);
01836                   DBUG_PRINT("info", ("Master: Reserve(%s,%d) deleted", 
01837                              reserve->id.name(), reserve->id.id));
01838                   delete reserve;                  
01839               }
01840               else found_reserve = 1;
01841           }
01842     } while (found_reserve);
01843 }
01844 
01845 
01846 void Master::slaveSetState (SlaveEntry *slave, slave_state state)
01847 {
01848   slave_state state0;
01849 
01850   state0 = slave->setState(state);        
01851 
01852   // log state change of slave into "stats/status.txt"
01853   log_slave_state_change (slave, state0, state);
01854 }
01855 
01856 
01857 //***********************************************//
01858 // M_TASK_CTRL
01859 void Master::msgTaskCtrl (XMLData *msg, Address *remote, CharStr *user)
01860 {
01861   JobEntry *job;
01862   TaskEntry *task;
01863   TaskId taskId;
01864 
01865   do {
01866     // search for the specified job
01867     XMLData *xTaskId = msg->sub(p_TaskID);
01868     job = srchJob (xTaskId);
01869     x2o(xTaskId, taskId);   
01870 
01871     // inexistent job
01872     if (! job)
01873     {
01874       XMLData *jid = msg->sub(p_TaskID)->sub(p_JobID);
01875       DBUG_PRINT("err", ("Master: error M_TASK_CTRL, inexistent Job(%s,%d)", 
01876                          jid->sub(p_Name)->getString()->str, 
01877                          jid->sub(p_ID)->getLong()));
01878       sndTaskStatus (remote, job, taskId, task_refused, "err invalid jobid");
01879       status_msg = "task ctrl invalid jobid";
01880       break;
01881     }
01882 
01883     // invalid task
01884     if (taskId.id <= 0)
01885     {
01886       DBUG_PRINT("err", ("Master: error M_TASK_CTRL, "
01887                          "invalid task id=%d for Job(%s,%d)", 
01888                          taskId.id, taskId.jId.name(), taskId.jId.id)); 
01889       sndTaskStatus (remote, job, taskId, task_refused, "err invalid taskid", DONT_QUEUE_MSG);
01890       break;
01891     }
01892     
01893     // only job owner user or "admin" can send a TaskCtrl
01894     if ((! checkUser (user, job->_cs_user ())) &&
01895         (! checkUser (user, "admin")))
01896     {
01897       DBUG_PRINT("err", ("Master: error M_TASK_CTRL, job owned by other user Job(%s,%d)", 
01898                          job->id.name(), job->id.id));
01899       sndTaskStatus (remote, job, taskId, task_refused, "err invalid user for jobid");
01900       break;
01901     } 
01902 
01903     //get task entry
01904     task = srchTask(taskId);
01905 
01906     // no such task
01907     if (!task)
01908     {        
01909       DBUG_PRINT("err", ("Master: error M_TASK_CTRL unknown TaskID"));
01910       sndTaskStatus (remote, job, taskId, task_refused, "unknown TaskID");
01911       status_msg = "task ctrl unknown taskid";
01912       break;
01913     }
01914 
01915     // log message
01916     log_task_msg(task, msg, mdir_c2m);
01917 
01918     //extract action
01919     task_ctrl_action action;
01920     if (!x2o(msg->getString(p_Action), action))
01921     {
01922       DBUG_PRINT("err", ("Master: error M_TASK_CTRL with unknown action"));
01923       sndTaskStatus(remote, job, taskId, task_refused, "unknown action");
01924       status_msg = "task ctrl unknown action";
01925       break;
01926     }
01927 
01928     // msg must come from the same client as before!
01929     if ((job->cl_addr != *remote) && (action != stop_task))
01930     {
01931       DBUG_PRINT("err", ("Master: error M_TASK_CTRL wrong client address"));
01932       sndTaskStatus (remote, job, taskId, task_refused, "err invalid client");
01933       status_msg = "task ctrl invalid client";
01934       break;
01935     }
01936 
01937     if (action == stop_task)
01938         msgTaskCtrl_Stop(task);
01939     else if (action == control_task)
01940         msgTaskCtrl_Ctrl(task, msg->getString(p_Argument));
01941     
01942   } while (0);
01943 
01944   delete msg;
01945 }
01946 
01947 void Master::msgTaskCtrl_Stop(TaskEntry *task)
01948 {
01949     //save the taskID and client address for reply
01950     TaskId taskId = task->id;
01951     Address client = task->job->cl_addr;
01952     JobEntry *job = task->job;
01953 
01954     //send reply to client
01955     sndTaskStatus(&client, job, taskId, task_stopped, 0);
01956 
01957     // if task is running, 
01958     if ((task->stat == task_launch) ||
01959         (task->stat == task_launch_moved) ||
01960         (task->stat == task_started))
01961     {
01962         //send M_TASK_STOP to slave
01963         sndTaskCtrl(&task->job->cl_addr, task->id, p_Stop);
01964         //put slave on disabled mode
01965         task->slave->task = 0;
01966         slaveSetState(task->slave, slave_disabled);        
01967         //delete task from lTasksRun
01968         lTasksRun.Remove(task);
01969     }
01970     else if ((task->stat == task_buffered) ||
01971              (task->stat == task_buffered_moved))
01972         //delete task from lTasksWait
01973         lTasksWait.Remove(task);
01974     else
01975         DBUG_PRINT("err", ("Master: error M_TASK_CTRL(Stop) stopping task that is in unknown state")); 
01976     //remove task from list of job's tasks
01977     job->tasks.Remove(task);    
01978     //delete task entry
01979     delete task;   
01980     status_msg = "task stopped";
01981 }
01982 
01983 void Master::msgTaskCtrl_Ctrl(TaskEntry *task, CharStr *arg)
01984 {
01985     //task must be in task_started state (i.e. running)
01986     if (task->stat != task_started)
01987     {
01988         sndTaskStatus(&task->job->cl_addr, task->job, task->id, task_refused, 
01989             "M_TASK_CTRL(Control): task is not running");
01990         status_msg = "task ctrl for task that is not running";
01991     }        
01992     else
01993     {
01994         //just forward the message to slave
01995         sndTaskCtrl(&task->slave->info->addr, task->id, p_Control, arg);
01996         status_msg = "task ctrl forwarded to slave";
01997     }
01998 }
01999 
02000 //***********************************************//
02001 // M_SLAVE_CTRL
02002 void Master::msgSlaveCtrl (XMLData *msg, Address *remote, CharStr *user)
02003 {
02004 
02005   do {
02006     // check if the message is from 'admin' user
02007     if (! checkUser (user, "admin"))
02008     {
02009       DBUG_PRINT("err", ("Master: error M_SLAVE_CTRL attempt by not an admin"));
02010       sndSlaveStatus(remote, 0, "not an admin");
02011       status_msg = "slave ctrl by not an admin";
02012       break;
02013     }
02014 
02015     // log message
02016     log_other_msg(msg, mdir_c2m);
02017 
02018     //extract action
02019     slave_ctrl_action action;
02020     if (!x2o(msg->getString(p_Action), action))
02021     {
02022       DBUG_PRINT("err", ("Master: error M_SLAVE_CTRL with unknown action"));
02023       sndSlaveStatus(remote, 0, "unknown action");
02024       status_msg = "slave ctrl unknown action";
02025       break;
02026     }
02027 
02028     if (action == upgrade_slave)
02029         msgSlaveCtrl_Upgrade(msg, remote);
02030     
02031   } while (0);
02032 
02033   delete msg;
02034 }
02035 
02036 
02037 void Master::msgSlaveCtrl_Upgrade(XMLData *msg, Address *remote)
02038 {
02039     int immediate;
02040     enum { dont_change, start_pu, stop_pu } pu;  
02041     CharStr *newVersion;
02042 
02043     //extract NewVersion (note: newVersion is not stored)
02044     newVersion = msg->getString(p_NewVersion);
02045 
02046     //extract immediate
02047     if (msg->sub(p_Immediate) != XMLData::Nil) immediate = 1;
02048     else immediate = 0;
02049 
02050     //extract permanent upgrade
02051     CharStr *cs_pu = msg->getString(p_PermanentUpgrade);
02052     if (cs_pu == CharStr::Error) pu = dont_change;
02053     else if (strcmp(cs_pu->str, p_Start->str) == 0) pu = start_pu;
02054     else if (strcmp(cs_pu->str, p_Stop->str) == 0) pu = stop_pu;
02055     else {
02056         DBUG_PRINT("err", ("Master: message SLAVE_CTRL(Upgrade) unknown string in PermanentUpgrade"));
02057         return;
02058     }
02059 
02060     //change the permanent upgrade settings
02061     if ((pu == start_pu) || ((pu == dont_change) && permanent_upgrade))
02062     {
02063         if (pu_config != XMLData::Nil) delete pu_config;
02064         pu_config = new XMLData(p_PermanentUpgrade, msg->subAll(p_URL));
02065         pu_config->add(new XMLData(msg->sub(p_NewVersion), 0));
02066         FILE_UNLINK(PU_FILENAME);
02067         ofstream puf(PU_FILENAME);
02068         puf << *pu_config;
02069         puf.close();                
02070         if (!permanent_upgrade)
02071         {
02072             write_keyword(DEFAULT_MASTER_CONFIG_FILE, kws[K_PERMANENT_UPGRADE].key, "on");
02073             permanent_upgrade = 1;
02074         }
02075     }
02076     else if ((pu == stop_pu) && permanent_upgrade)
02077     {
02078         permanent_upgrade = 0;
02079         if (pu_config != XMLData::Nil) delete pu_config;
02080         pu_config = XMLData::Nil;
02081         write_keyword(DEFAULT_MASTER_CONFIG_FILE, kws[K_PERMANENT_UPGRADE].key, "off");
02082     }
02083             
02084     //send status message back to client
02085     sndSlaveStatus(remote, 1);
02086 
02087     //upgrade all slaves
02088 
02089     if (!immediate)
02090     {
02091       //forget old list of slaves to upgrade
02092       toUpgradeLater.Release();           
02093       //remember version for later upgrades
02094       upgradeLaterVersion->released();    
02095       upgradeLaterVersion = newVersion;   
02096       upgradeLaterVersion->stored();
02097       //remember the URLs
02098       if (upgradeLaterURLs != XMLData::Nil)
02099       {
02100         delete upgradeLaterURLs;
02101         upgradeLaterURLs = new XMLData(p_Ok, msg->subAll(p_URL));
02102       }
02103     }
02104 
02105     if (msg->sub(p_URL) != XMLData::Nil)
02106     {
02107         SlaveEntry *s;
02108         lSlaves.Start_Get();
02109         CharStr *url;
02110         while ((s = (SlaveEntry *)lSlaves.Get_Next())) 
02111         {
02112             url = matchURL(s->info, msg);
02113             upgradeSlave(s, immediate, newVersion, url);
02114         }
02115     }
02116 
02117     status_msg = "slaves upgraded";
02118 }
02119 
02120 void Master::upgradeSlave(SlaveEntry *s, int immediate, CharStr *newVersion, CharStr *url)
02121 {
02122     char addr[26];
02123     s->info->addr.getIP(addr);
02124 
02125     //check if the url was found
02126     if (url == CharStr::Error)
02127     {
02128       DBUG_PRINT("info", ("Master: upgradeSlave, skipping slave %s, no URL provided", addr));
02129     } 
02130     //upgrade only if it's immediate upgrade or slave is not busy
02131     else if (immediate || 
02132         ((s->stat() != slave_busy) &&
02133          (s->stat() != slave_contacted) &&
02134          (s->stat() != slave_reserved)))
02135     {
02136         //if newVersion specified, check if the slave is already upgraded
02137         if (newVersion != CharStr::Error)
02138             if (strcmp(s->info->version(), newVersion->str) == 0) {
02139                 DBUG_PRINT("info", ("Master: upgradeSlave, skipping already upgraded slave %s", addr));
02140                 return;
02141             }
02142         DBUG_PRINT("info", ("Master: upgrading slave %s", addr));
02143         sndSlaveCtrl(&s->info->addr, p_Upgrade, url);
02144     }
02145     else 
02146     {
02147        DBUG_PRINT("info", ("Master: upgradeSlave, skipping busy slave %s", addr));
02148        //will be upgraded when it becomes available
02149        toUpgradeLater.Add(s);
02150     }
02151 }
02152         
02153 
02154 //***********************************************//
02155 // M_CLIENT_STATUS
02156 void Master::msgClientStatus (XMLData *msg, Address *remote, CharStr *user)
02157 {
02158     client_state newClientState;
02159     JobId jid;
02160 
02161     do {
02162     
02163       //extract JobID and ClientStatus from the message
02164       x2o(msg->sub(p_JobID), jid);
02165       x2o(msg->getString(p_ClientStatus), newClientState);
02166 
02167       if (newClientState == client_on)
02168       {
02169           //Address clientAddress(msg->getString(p_Address)->str);
02170           Address clientAddress(*remote);
02171           msgClientStatus_On(jid, clientAddress, msg, user);
02172       }
02173       else if (newClientState == client_off)
02174       {
02175           msgClientStatus_Off(remote, jid, msg);
02176       }
02177       else  //unknown client status?
02178       {
02179           sndJobStatus(remote, (JobEntry *)0, jid, job_refused, "M_CLIENT_STATUS: unknown state", XMLData::Nil, DONT_QUEUE_MSG);
02180           DBUG_PRINT("err", ("Master: error M_CLIENT_STATUS: unknown client state %s", msg->getString(p_ClientStatus)->str)); 
02181           status_msg = "unknown client status";
02182           break;
02183       }
02184 
02185     } while (0);
02186 
02187     delete msg;
02188 }
02189 
02190 void Master::msgClientStatus_On(JobId &jid, Address &clientAddress, 
02191                                 XMLData *msg, CharStr *user)
02192 {
02193     // 3 possibilities:
02194     //     1. new job is being created -> new id for this name has to be generated
02195     //     2. old job that is abandoned is taken over by some client
02196     //     3. old job that is running is stolen by other client (this is also legal)
02197 
02198     if (jid.id == -1)  // new job
02199     {
02200 
02201         //find the first unused id
02202         jid.id = 1;
02203         lJobIDTable.Start_Get();
02204         JobIDTableEntry *jidte;
02205         while ((jidte = (JobIDTableEntry *)lJobIDTable.Get_Next()))
02206         {
02207           if (strcmp(jidte->name(), jid.name()) == 0)
02208           {
02209             jid.id = ++jidte->id;
02210             break;
02211           }
02212         }
02213         if (jid.id == 1) 
02214         {
02215           jidte = new JobIDTableEntry;
02216           jidte->_cs_name(jid._cs_name());
02217           jidte->id = 1;
02218           lJobIDTable.Add(jidte);
02219         }
02220 
02221         // create a log directory and file
02222         if (log_on)
02223         {
02224            char dir_name[310];
02225            sprintf(dir_name, "%s/jobs/%s.%d", log_root, jid.name(), jid.id);
02226            make_dir(dir_name);
02227            sprintf(dir_name, "%s/jobs/%s.%d/other", log_root, jid.name(), jid.id);
02228            ofstream otherlog(dir_name);
02229            otherlog.close();
02230         }
02231 
02232   // scan for existing clients from same addr:port
02233   JobEntry *job;
02234   lJobs.Start_Get ();
02235   while ((job = (JobEntry *) lJobs.Get_Next ()))
02236     if (job->cl_addr == clientAddress)
02237       // don't sent anymore messages to this client
02238       // because address was taken away by new job
02239       job->cl_addr.setPort (0);
02240   
02241         //create new JobEntry
02242         job = new JobEntry();
02243         job->id = jid;
02244         job->cl_addr = clientAddress;
02245         job->cl_stat = client_on;
02246         job->_cs_user(user);
02247         //and insert it into lJobs
02248         lJobs.Add(job);
02249 
02250         //log msg
02251         log_job_msg(job, msg, mdir_c2m);
02252 
02253         //respond with new JobID
02254         sndJobStatus(&clientAddress, job, job->id, job_running, 0);
02255         status_msg = "client registered with new job";
02256 
02257         //update statistical information
02258         totalJobs++;
02259 
02260     }
02261     else // old job
02262     {
02263 
02264         JobEntry *job = srchJob(jid);
02265         if (!job)
02266         {
02267             //log message
02268             log_job_msg (job, msg, mdir_c2m);
02269 
02270             DBUG_PRINT("err", ("Master: error M_CLIENT_STATUS(On): unknown JobID %s,%d", 
02271                               jid.name(), jid.id));
02272             sndJobStatus (&clientAddress, job, jid, job_refused, 
02273                           "M_CLIENT_STATUS(On): unknown JobID");
02274             status_msg = "client tried to take unknown job";
02275         }
02276         else // job really exists
02277         {
02278             //log message
02279             log_job_msg(job, msg, mdir_c2m);
02280 
02281             //only the old user or 'admin' can steel a job :)
02282             if ((! checkUser (user, job->_cs_user ())) &&
02283                 (! checkUser (user, "admin")))
02284             {
02285               DBUG_PRINT("err", ("Master: error M_CLIENT_STATUS(On): owned by other user JobID %s,%d", 
02286                                 jid.name(), jid.id));
02287               sndJobStatus (&clientAddress, job, jid, job_refused, 
02288                             "M_CLIENT_STATUS(On): invalid user for JobID");
02289               status_msg = "client tried to take other user's job";
02290             } 
02291             else // user ok
02292             {
02293               //first remove all pending messages for this job
02294               lClientMsg.Start_Get();
02295               ClientMsgEntry *qmsg;
02296               while ((qmsg = (ClientMsgEntry *)lClientMsg.Get_Next()))
02297                 if (qmsg->job == job) lClientMsg.Remove(qmsg);
02298 
02299               if (job->cl_stat == client_off)   // job is abandoned
02300               {
02301                 job->cl_addr = clientAddress;
02302                 job->cl_stat = client_on;
02303                 sndJobStatus(&clientAddress, job, jid, job_running, 0);
02304                 status_msg = "client took abandoned job";
02305               }
02306               else  // job is being stolen
02307               {
02308                 //first notify the previous client (if the address is different)
02309                 if (!(job->cl_addr == clientAddress))
02310                   sndJobStatus(&job->cl_addr, job, jid, job_stopped, "M_CLIENT_STATUS(On): job stolen by other client, sorry.");
02311                 job->cl_addr = clientAddress;
02312                 sndJobStatus(&clientAddress, job, jid, job_running, 0);
02313                 status_msg = "client stolen job from other client";
02314               }
02315             }
02316         }  // (!job)
02317 
02318     }  // new/old job
02319 }
02320 
02321 void Master::msgClientStatus_Off(Address *remote, JobId &jid, XMLData *msg)
02322 {
02323     JobEntry *job;
02324 
02325     job = srchJob(jid);
02326 
02327     if (!job)
02328     {
02329         sndJobStatus(remote, job, jid, job_refused, "M_CLIENT_STATUS(Off): unknown JobID");
02330         DBUG_PRINT("err", ("Master: error M_CLIENT_STATUS(Off): Unknown JobID")); 
02331         status_msg = "client off for unknown jobid";
02332     }
02333     else
02334     {
02335         // msg must come from the same client as before!
02336         if (job->cl_addr != *remote) {
02337           DBUG_PRINT("err", ("Master: error M_CLIENT_STATUS(Off) wrong client address"));
02338           sndJobStatus (remote, job, job->id, job_refused, "M_CLIENT_STATUS(Off): err invalid client");        
02339           status_msg = "client wanted to stop job of other client";
02340         }
02341         else
02342         {
02343           //log msg
02344           log_job_msg(job, msg, mdir_c2m);
02345 
02346           job->cl_stat = client_off;
02347           sndJobStatus(remote, job, jid, job_abandoned, 0);
02348           status_msg = "client went off, job abandoned";
02349         }
02350     }
02351 
02352   // we purge all incomming msg ids for this client
02353   po->purgeIn (*remote);
02354 }
02355 
02356 //***********************************************//
02357 // M_SLAVE_RESERVE
02358 void Master::msgSlaveReserve (XMLData *msg, Address *remote, CharStr *user)
02359 {
02360   JobEntry *job;
02361   ReserveId resId;
02362 
02363   do {
02364     // check if the message has a valid jobId
02365     job = srchJob(msg);
02366     if (! job) {
02367       DBUG_PRINT("err", ("Master: error M_SLAVE_RESERVE invalid jobID"));
02368       sndJobStatus (remote, job, job->id, job_refused, "M_SLAVE_RESERVE: err invalid jobid");
02369       status_msg = "slave reserve with wrong jobid";
02370       break;
02371     }
02372 
02373     // only the job's owner can reserve a slave
02374     if (! checkUser (user, job->_cs_user ())) {
02375       DBUG_PRINT("err", ("Master: error M_SLAVE_RESERVE wrong user"));
02376       sndJobStatus (remote, job, job->id, job_refused, "M_SLAVE_RESERVE: err invalid user");
02377       status_msg = "slave reserve from wrong user";
02378       break;
02379     }
02380 
02381     // msg must come from the same client as before!
02382     if (job->cl_addr != *remote) {
02383       DBUG_PRINT("err", ("Master: error M_SLAVE_RESERVE wrong client address"));
02384       sndJobStatus (remote, job, job->id, job_refused, "M_SLAVE_RESERVE: err invalid client");
02385       status_msg = "slave reserve from wrong client";
02386       break;
02387     }
02388 
02389     //log msg
02390     log_job_msg(job, msg, mdir_c2m);
02391 
02392     //extract ReserveID
02393     if (!x2o(msg->sub(p_ReserveID), resId))
02394     {
02395       DBUG_PRINT("err", ("Master: error M_SLAVE_RESERVE malformed ReserveID"));
02396       sndJobStatus(remote, job, job->id, job_refused, "M_SLAVE_RESERVE: err malformed ReserveID");
02397       status_msg = "slave reserve with malformed reserveID";
02398       break;
02399     }
02400 
02401     //create new ReserveEntry
02402     ReserveEntry *res = new ReserveEntry();
02403 
02404     //generate new id
02405     resId.id = 1;
02406     while (srchReserve(resId)) ++resId.id;
02407 
02408     res->id = resId;
02409     res->job = job;
02410     res->parallel = msg->sub(p_Parallel) != XMLData::Nil;
02411     res->slavesRequired = msg->sub(p_SlavesRequired)->getLong();
02412     if (res->slavesRequired < 1)
02413     {
02414       DBUG_PRINT("err", ("Master: error M_SLAVE_RESERVE wrong number of required slaves"));
02415       sndJobStatus (remote, job, job->id, job_refused, 
02416                     "M_SLAVE_RESERVE: wrong number of required slaves");
02417       status_msg = "slave reserve wrong number of slaves";
02418       break;
02419     }
02420     if (msg->sub(p_SlaveInfo) == XMLData::Nil)
02421     {
02422       DBUG_PRINT("err", ("Master: error M_SLAVE_RESERVE doesn't include SlaveInfo"));
02423       sndJobStatus(remote, job, job->id, job_refused, 
02424                    "M_SLAVE_RESERVE: missing SlaveInfo");
02425       status_msg = "slave reserve missing SlaveInfo";
02426       break;
02427     }
02428     msg->reset();
02429 
02430     XMLData *sinfo;
02431     while ((sinfo = msg->sub(p_SlaveInfo)) != XMLData::Nil)
02432     {
02433         SlaveInfo *s = new SlaveInfo();
02434         if (!x2o(sinfo, *s))
02435         {
02436           DBUG_PRINT("err", ("Master: error M_SLAVE_RESERVE with malformed SlaveInfo"));
02437           sndJobStatus(remote, job, job->id, job_refused, "M_SLAVE_RESERVE: malformed SlaveInfo");
02438           status_msg = "slave reserve with malformed SlaveInfo";
02439           delete msg;
02440           return;
02441         }
02442         res->slave_infos.Add(s);
02443     }
02444 
02445     //add reserveEntry to the list 
02446     lReservs.Add(res);
02447     
02448     //and reply back to client
02449     sndJobStatus (&job->cl_addr, job, job->id, job_running, 0, o2x(resId));
02450 
02451     //if slaves are available now, assign them immediatelly
02452     checkReserves();
02453  
02454     status_msg = "slave(s) reserved";
02455 
02456     //update statistical info
02457     totalReservations++;
02458 
02459   } while (0);
02460 
02461   delete msg;
02462 }
02463 
02464 
02465 
02466 //***********************************************//
02467 // M_TASK_FINISH
02468 void Master::msgTaskFinish (XMLData *msg, Address *remote)
02469 {
02470   TaskId      taskId;
02471   TaskEntry  *task;
02472 
02473   do {
02474     // extract taskId from the message and find the task entry
02475 
02476     x2o(msg->sub(p_TaskID), taskId);   
02477     task = srchTask (taskId);
02478 
02479     // no such task, message ignored
02480     if (!task) {
02481       DBUG_PRINT("err", ("Master: error M_TASK_FINISH invalid TaskId: id=%ld", 
02482                          msg->sub()->sub(p_ID)->getLong()));
02483       status_msg = "invalid task id in task finish";
02484       break;
02485     }
02486 
02487     // message must come from the slave associated with this task!
02488     if (task->slave->info->addr != *remote) {
02489       DBUG_PRINT("err", ("Master: error M_TASK_FINISH wrong slave address for task"));
02490       status_msg = "task finish from wrong slave";
02491       break;
02492     }
02493 
02494     if (task->stat != task_started) {
02495       DBUG_PRINT("err", ("Master: error M_TASK_FINISH slave is not running"));
02496       status_msg = "task finish from not started task";
02497       break;
02498     }
02499 
02500     //log msg
02501     log_slave_msg(task->slave, msg, mdir_s2m);
02502     log_task_msg(task, msg, mdir_m2c);
02503 
02504     // we have a correct finish for a task
02505     // forward message to client
02506     sndMessageToClient (&task->job->cl_addr, task->job, msg);
02507     
02508     // replace data in TaskEntry with the results from message    
02509     //task->_cs_data (msg->sub(XMLData::CDATA)->getString());
02510 
02511     // move the task to lTasksFinish
02512     //moveTask (task, lTasksRun, lTasksFinish);
02513 
02514     //detach the task from its slave and forget it
02515     task->slave->task = 0;
02516     deleteTask (task);
02517 
02518     status_msg = "task finish forwarded to client";
02519 
02520     return;            //don't delete this message (it's been forwarded)
02521 
02522   } while (0);
02523 
02524   delete msg;
02525   return;
02526 }
02527 
02528 
02529 //***********************************************//
02530 // M_TASK_MOVE
02531 void Master::msgTaskMove (XMLData *msg, Address *remote)
02532 {
02533   TaskId      taskId;
02534   TaskEntry  *task;
02535 
02536   do {
02537     // extract taskId from the message and find the task entry
02538     x2o(msg->sub(p_TaskID), taskId);
02539     task = srchTask (taskId);
02540 
02541     // no such task, message ignored
02542     if (!task) {
02543       DBUG_PRINT("err", ("Master: error M_TASK_MOVE invalid TaskId: id=%ld", 
02544                          msg->sub(p_TaskID)->sub(p_ID)->getLong()));
02545       status_msg = "task move with wrong taskid";
02546       break;
02547     }
02548 
02549     // message must come from the slave associated with this task!
02550     if (task->slave->info->addr != *remote) {
02551       DBUG_PRINT("err", ("Master: error M_TASK_MOVE wrong slave address for task"));
02552       status_msg = "task move from wrong slave";
02553       break;
02554     }
02555 
02556     //log msg
02557     log_slave_msg(task->slave, msg, mdir_s2m);
02558 
02559     if (task->stat != task_started) {
02560       DBUG_PRINT("err", ("Master: error M_TASK_MOVE slave is not running"));
02561       status_msg = "task move for not running task";
02562       break;
02563     }
02564 
02565     // we have a correct move for a task
02566     // replace data in TaskEntry with data from message
02567     if (msg->sub(XMLData::CDATA) != XMLData::Nil)
02568       task->_cs_data (msg->sub(XMLData::CDATA)->getString());
02569 
02570     //stop the watchdog for this task 
02571     ZTime now; 
02572     now.GetTimeOfDay();
02573     task->timeout = task->timeout - task->timeend - now;
02574 
02575     // set slave to disabled 
02576     // * we'll probably get anyway a M_SLAVE_STATUS(Disabled) shortly
02577     slaveSetState(task->slave, slave_disabled);
02578     task->slave->task = 0; 
02579 
02580     // task must be restarted on another slave!
02581     // * mark it buffered, so when it is started succesfuly, we'll be
02582     //   able to identify it was moved and notify the client
02583     task->stat = task_buffered_moved;
02584 
02585     // try to send the task to another slave
02586     moveTask (task, lTasksRun, lTasksWait);
02587     checkBufferedTasks ();
02588  
02589     status_msg = "task moved";
02590   } while (0);
02591 
02592   delete msg;
02593   return;
02594 }
02595 
02596 
02597 //***********************************************//
02598 // M_TASK_STATUS
02599 void Master::msgTaskStatus (XMLData *msg, Address *remote)
02600 {
02601   TaskId      taskId;
02602   TaskEntry  *task;
02603   int         forwarded = 0; // set to 1 if message is forwarded to client
02604 
02605   do {
02606     // extract taskId from the message and find the task entry
02607     x2o(msg->sub(p_TaskID), taskId);   
02608     task = srchTask (taskId);
02609 
02610     // no such task, message ignored
02611     if (!task) {
02612       DBUG_PRINT("err", ("Master: error M_TASK_STATUS invalid TaskId: id=%ld", 
02613                          msg->sub(p_TaskID)->sub(p_ID)->getLong()));
02614       status_msg = "task status with wrong taskid";
02615       break;
02616     }
02617 
02618    //we have to verify that the located task has some slave associated
02619    //before using it's value
02620     if (!task->slave) {
02621       DBUG_PRINT("err", ("Master: error M_TASK_STATUS invalid TaskId"));
02622       status_msg = "task status for task without slave";
02623       break;
02624     }
02625 
02626     // message must come from the slave associated with this task!
02627     if ((task->slave) &&
02628         (task->slave->info->addr != *remote))
02629     {
02630       DBUG_PRINT("err", ("Master: error M_TASK_STATUS wrong slave address for task"));
02631       status_msg = "task status from wrong slave";
02632       break;
02633     }
02634 
02635     //log msg
02636     log_slave_msg(task->slave, msg, mdir_s2m);
02637 
02638     // extract task status
02639     task_state stat;
02640     x2o(msg->getString(p_Status), stat);
02641 
02642     if (stat == task_refused) 
02643       msgTaskStatus_Refused (task, msg, forwarded);
02644     else if (stat == task_started)
02645       msgTaskStatus_Started (task, msg, forwarded);
02646     else if (stat == task_ok)
02647       msgTaskStatus_Ok (task, msg, forwarded);
02648     else if (stat == task_crashed)
02649       msgTaskStatus_Crashed (task, msg, forwarded);
02650     else 
02651       {
02652         DBUG_PRINT("err", ("Master: error M_TASK_STATUS invalid status=%s", 
02653                            msg->getString(p_Status)->str));
02654         status_msg = "invalid task status";
02655         break;
02656       }
02657   } while (0);
02658 
02659   if (!forwarded) delete msg;
02660   return;
02661 }
02662 
02663 
02664 void Master::msgTaskStatus_Refused (TaskEntry *task, XMLData *msg, int &forwarded)
02665 {
02666   status_msg = "task status (refused) forwarded to client";
02667   // task is in launch mode, this means task cannot start!
02668   if (task->stat == task_launch) {
02669     //log msg
02670     log_task_msg(task, msg, mdir_m2c);
02671     // notify client about the problem
02672     sndMessageToClient (&task->job->cl_addr, task->job, msg);
02673     forwarded = 1;
02674 
02675     // change slave status to slave_ready
02676     slaveSetState(task->slave, slave_ready);
02677 
02678     // slave forget this task
02679     task->slave->task = 0;
02680 
02681     // delete the task from the master!
02682     deleteTask (task);
02683   }
02684 
02685   // task is in started mode, this means task ctrl was refused
02686   else if (task->stat == task_started) {
02687     //log msg
02688     log_task_msg(task, msg, mdir_m2c);
02689     // notify client about the problem
02690     sndMessageToClient (&task->job->cl_addr, task->job, msg);
02691     forwarded = 1;
02692   } 
02693 
02694   // otherwise ignore the message
02695   else {
02696     DBUG_PRINT("err", ("Master: error M_TASK_STATUS(Refused) invalid for taskstat=%s", 
02697                        o2x(task->stat)->str));
02698     status_msg = "received invalid task status(refused)";
02699   }
02700 
02701   // try starting other tasks
02702   checkBufferedTasks ();
02703 
02704   return;
02705 }
02706 
02707 
02708 void Master::msgTaskStatus_Started (TaskEntry *task, XMLData *msg, int &forwarded)
02709 {   
02710   ZTime now;
02711 
02712   status_msg = "task status (started) forwarded to client";
02713   // task is in launch mode, this means task was started successfuly
02714   if (task->stat == task_launch) {
02715     // set the task status to started in the taskEntry
02716     // (taskEntry is in lTaskRun and will remain there!)
02717     task->stat = task_started;
02718     // put the corresponding slave to busy state
02719     slaveSetState(task->slave, slave_busy);
02720     // set the timeout for the task
02721     now.Now();
02722     task->timeend = now + task->timeout;
02723 
02724     //log msg
02725     log_task_msg(task, msg, mdir_m2c);
02726     // notify client about the task starting
02727     sndMessageToClient (&task->job->cl_addr, task->job, msg);
02728     forwarded = 1;
02729   }
02730 
02731   else if (task->stat == task_launch_moved) {
02732     // set the task status to started in the taskEntry
02733     // (taskEntry is in lTaskRun and will remain there!)
02734     task->stat = task_started;
02735 
02736     // change status to task_moved in the client notification
02737     msg->sub (p_Status)->set (p_Moved);
02738 
02739     //log msg
02740     log_task_msg(task, msg, mdir_m2c);
02741     // notify client about the task starting
02742     sndMessageToClient (&task->job->cl_addr, task->job, msg);
02743     forwarded = 1;
02744   }
02745 
02746   // otherwise ignore the message
02747   else {
02748     DBUG_PRINT("err", ("Master: error M_TASK_STATUS(Started) invalid for taskstat=%s", 
02749                        o2x(task->stat)->str));
02750     status_msg = "received invalid task status (started)";
02751   }
02752 
02753   return;
02754 }
02755 
02756 void Master::msgTaskStatus_Ok (TaskEntry *task, XMLData *msg, int &forwarded)
02757 {
02758   status_msg = "task status (ok) forwarded to client";
02759   // task is in started mode, which is good
02760   if (task->stat == task_started) {
02761     //log msg
02762     log_task_msg(task, msg, mdir_m2c);
02763     // notify client about this
02764     sndMessageToClient (&task->job->cl_addr, task->job, msg);
02765     forwarded = 1;
02766   }
02767 
02768   // otherwise ignore the message 
02769   // (can't get a task_ok for a TaskCtrl if task is not started!)
02770   else {
02771     DBUG_PRINT("err", ("Master: error M_TASK_STATUS(Ok) invalid for taskstat=%s", 
02772                        o2x(task->stat)->str));
02773     status_msg = "received invalid task_status (ok)";
02774   }
02775 
02776   return;
02777 }
02778 
02779 
02780 void Master::msgTaskStatus_Crashed (TaskEntry *task, XMLData *msg, int &forwarded)
02781 {
02782   status_msg = "task status (crashed) forwarded to client";
02783   // task is in started mode, which is good
02784   if (task->stat == task_started) {
02785     //log msg
02786     log_task_msg(task, msg, mdir_m2c);
02787     // notify client about this
02788     sndMessageToClient (&task->job->cl_addr, task->job, msg);
02789     forwarded = 1;
02790     //detach the task from slave
02791     task->slave->task = 0;
02792  
02793     // change slave status to slave_ready
02794     // !!don't do this - we'll wait for a SlaveStatus(Ready)!!
02795     //task->slave->stat = slave_ready;
02796     
02797     // delete the task from the master!
02798     deleteTask (task);
02799   }                                          
02800   
02801   // otherwise ignore the message 
02802   // (can't get a task_crashed if the task is not in task_started)
02803   else {
02804      DBUG_PRINT("err", ("Master: error M_TASK_STATUS(Crashed) invalid for taskstat=%s",
02805                         o2x(task->stat)->str));
02806      status_msg = "received invalid task status (ok)";
02807   }
02808 
02809   //checkBufferedTasks();
02810 
02811   return;
02812 }
02813                                              
02814  
02815 
02816 
02817 //***********************************************//
02818 // M_SLAVE_STATUS
02819 void Master::msgSlaveStatus (XMLData *msg, Address *remote)
02820 {
02821   SlaveEntry  *slave = 0;
02822 
02823   do {
02824     // extract slave status
02825     slave_state stat;
02826     x2o(msg->getString(p_Status), stat);
02827     
02828     slave = srchSlaveAddr(*remote);
02829 
02830     // test if we have a new slave (maybe also msgSlaveStatus_Disabled)
02831     if ((stat == slave_ready) || ((stat == slave_disabled) && (!slave)))
02832       {
02833         // log msg only if new slave
02834         if (msgSlaveStatus_Ready (slave, remote, msg->sub (p_SlaveInfo), stat))
02835           log_slave_msg(slave, msg, mdir_s2m);
02836         slave->prev_stat = stat;
02837         msg->reset();
02838         if (slave->setInfo (msg->sub (p_SlaveInfo), remote))
02839         {
02840           DBUG_PRINT("err", ("Master: error M_SLAVE_STATUS(ready) with malformed SlaveInfo")); 
02841           status_msg = "malformed SlaveInfo in slave status";   
02842           break;
02843         }
02844         break;
02845       }
02846 
02847     if (! slave)
02848       {
02849   char _host[26];
02850   remote->getIP (_host);
02851         msg->reset();
02852         DBUG_PRINT("err", ("Master: error M_SLAVE_STATUS invalid status=%s for inexistent slave %s - reEnable", 
02853                            msg->getString(p_Status)->str, _host));
02854   sndSlaveCtrl(remote, p_Disable);
02855   sndSlaveCtrl(remote, p_Enable);
02856         break;
02857       }
02858 
02859     //log msg
02860     if (stat != slave->prev_stat) 
02861       {
02862         slave->prev_stat = stat;
02863         log_slave_msg(slave, msg, mdir_s2m);
02864       }
02865 
02866     if (stat == slave_off) 
02867       msgSlaveStatus_Off (slave);
02868     else if (stat == slave_busy)
02869       msgSlaveStatus_Busy (slave);
02870     else if (stat == slave_disabled)
02871       msgSlaveStatus_Disabled (slave);
02872     else if (stat == slave_upgrade)
02873       break; // no action yet
02874     else 
02875       {
02876         DBUG_PRINT("err", ("Master: error M_SLAVE_STATUS invalid status=%s", 
02877                            msg->getString(p_Status)->str));
02878         status_msg = "received invalid slave status";
02879         break;
02880       }  
02881 
02882     // slave info might change (for example fast OS restart, then
02883     // we get a new info from the same machine!)
02884     if (slave->setInfo (msg->sub (p_SlaveInfo), remote))
02885     {
02886       DBUG_PRINT("err", ("Master: error M_SLAVE_STATUS(ready) with malformed SlaveInfo")); 
02887       status_msg = "malformed SlaveInfo in slave status";   
02888         break;
02889     }
02890   } while (0);
02891 
02892   delete msg;
02893   return;
02894 }
02895 
02896 
02897 // slave is going off (shutdown)   
02898 void Master::msgSlaveStatus_Off (SlaveEntry *slave)   
02899 {
02900  
02901    // see if there is a task associated with the slave   
02902    if (slave->task) {   
02903      // if it's not a task which wants to be moved, notify client   
02904      if ((slave->task->stat != task_launch_moved) &&   
02905          (slave->task->stat != task_buffered_moved))   
02906        sndTaskStatus (&slave->task->job->cl_addr, slave->task->job, slave->task->id, task_stopped, "");   
02907         
02908      deleteTask (slave->task);   
02909      slave->task = 0;   
02910    }   
02911     
02912    // delete the SlaveEntry   
02913    delete slave;   
02914     
02915    status_msg = "slave went off";
02916 
02917    return; 
02918 }
02919 
02920 // slave is ready to receive tasks
02921 // note: this method is called also when a new slave starts with 
02922 //       M_SLAVE_STATUS(slave_disabled)
02923 int Master::msgSlaveStatus_Ready (SlaveEntry *&slave, Address *remote, XMLData *xinfo, slave_state stat)   
02924 {
02925    int will_log = 1;
02926    char _host[26], _h1[26], _h2[26]; 
02927 
02928    // is this a new slave?
02929    if (! slave) {   
02930      // create new SlaveEntry   
02931      slave = new SlaveEntry;   
02932     
02933      // get the slave info from message   
02934      SlaveInfo *info = new SlaveInfo;   
02935      if (!x2o (xinfo, *info))
02936      {
02937        DBUG_PRINT("err", ("Master: error M_SLAVE_STATUS(ready) with malformed SlaveInfo"));
02938        status_msg = "malformed SlaveInfo in slave status";
02939        return will_log;
02940      }
02941 
02942      if (info->addr != *remote) {
02943        info->addr.getIP (_h1);
02944        remote->getIP (_h2);
02945        DBUG_PRINT("warn", ("Master: new slave with wrong XML address "
02946                            "%s (not %s)", _h1, _h2));
02947        // use the address from the packet, not from the XML msg
02948        info->addr = *remote;
02949      }
02950 
02951      slave->info = info;   
02952      slave->task = 0;   
02953      slave->jobReserved = 0;   
02954      slave->time_last_status.Now ();
02955      slaveSetState(slave, stat);   
02956     
02957      // add the new slave to the list of slaves   
02958      lSlaves.Add (slave);   
02959 
02960      remote->getIP (_host);
02961      DBUG_PRINT("info", ("Master: new slave "
02962                          "%s added", _host));
02963 
02964      // update timeout on slave   
02965      slave->time_last_status.Now ();
02966 
02967      // if permanent upgrade is on, check the version, and upgrade if not yet
02968      if (permanent_upgrade)
02969        if (strcmp(pu_config->sub(p_NewVersion)->getString()->str,
02970                   slave->info->version()) == 0)
02971          upgradeSlave(slave, 1,  
02972                       pu_config->sub()->getString(), 
02973                       matchURL(slave->info, pu_config));
02974 
02975      // try starting some tasks   
02976      if (stat == slave_ready) checkBufferedTasks ();   
02977 
02978      status_msg = "new slave went on";
02979    }   
02980     
02981    // slave became available   
02982    else if (slave->stat() != slave_ready) {   
02983 
02984      // if slave has a task or is reserved, 
02985      // it may not become ready. (slave should first send TASK_INFO or TASK_FINISH
02986      //  or client respond to reservation, or timeouts should apply)
02987      if ((slave->task) || (slave->stat() == slave_reserved)) {
02988          slave->time_last_status.Now ();
02989          DBUG_PRINT("info", ("Master: slave ready, but task is task_started or reserved!"));
02990      } else {
02991        slaveSetState(slave, slave_ready);   
02992     
02993        // update timeout on slave   
02994        slave->time_last_status.Now ();
02995 
02996        // check if the slave needs to be upgraded
02997        if (toUpgradeLater.Index(slave) != -1) 
02998        {
02999          upgradeSlave(slave, 1, upgradeLaterVersion, 
03000                       matchURL(slave->info, upgradeLaterURLs));
03001          toUpgradeLater.Remove(slave);
03002        }
03003        
03004        // try starting some tasks   
03005        checkBufferedTasks ();   
03006        status_msg = "slave became available";
03007      }
03008    }    
03009 
03010    // slave was ready, update timeout on slave
03011    else
03012    {
03013      slave->time_last_status.Now ();
03014      will_log = 0;
03015      update_status = 0;
03016    }
03017     
03018    return will_log;
03019 }
03020 
03021 
03022 void Master::msgSlaveStatus_Busy (SlaveEntry *slave)
03023 {
03024    // slave is running a task   
03025     
03026    if (slave->stat() != slave_busy) {   
03027      slaveSetState(slave, slave_busy);   
03028      DBUG_PRINT("info", ("Master: info msgSlaveStatus status changed to slave_busy"));   
03029      status_msg = "slave went busy";
03030    }   
03031     
03032    // update timeout on slave   
03033    slave->time_last_status.Now ();
03034     
03035    // if no task assigned force the slave to enabled with SLAVE_CTRL(Enable)
03036    if (! slave->task)
03037      sndSlaveCtrl(&slave->info->addr, p_Enable);
03038 
03039    return; 
03040 }
03041 
03042 
03043 void Master::msgSlaveStatus_Disabled (SlaveEntry *slave)   
03044 {
03045  
03046    // slave cannot run any task anymore (user login)   
03047     
03048    // see if there is a task associated with the slave   
03049    if (slave->task) {   
03050      // if it's not a task which wants to be moved, notify client   
03051      if ((slave->task->stat != task_launch_moved) &&   
03052          (slave->task->stat != task_buffered_moved))   
03053        sndTaskStatus (&slave->task->job->cl_addr, slave->task->job, slave->task->id, task_stopped, "");   
03054             
03055      deleteTask (slave->task);   
03056      slave->task = 0;   
03057      status_msg = "slave with task disabled";
03058      slaveSetState(slave, slave_disabled);   
03059    }   
03060    else if (slave->stat() != slave_disabled) {
03061      // if isn't already a disabled slave, change state
03062      status_msg = "slave without task disabled";
03063      slaveSetState(slave, slave_disabled);   
03064    }
03065 
03066    slave->time_last_status.Now ();
03067     
03068    return; 
03069 }
03070 
03071 
03072 //***********************************************//
03073 // search functions
03074 
03075 // finds a JobEntry based on the jobId
03076 JobEntry* Master::srchJob (JobId &jobId)
03077 {
03078   JobEntry *job;
03079 
03080   lJobs.Start_Get ();
03081   while ((job = (JobEntry *) lJobs.Get_Next ()))
03082     if (job->id == jobId)
03083       break;
03084 
03085   return job;
03086 }
03087 
03088 
03089 // finds a JobEntry based on the <TaskId> xmldata
03090 JobEntry* Master::srchJob (XMLData *msg)
03091 {
03092   JobId jobId;
03093 
03094   // extract jobId
03095   x2o (msg->sub (p_JobID), jobId);
03096 
03097   // look for coresponding JobEntry
03098   return srchJob (jobId);
03099 }
03100 
03101 
03102 // finds a TaskEntry based on the taskId
03103 // - will look in all three lists (lTasksWait/Run/Finished)
03104 // - a task cannot be in more than one of these lists!
03105 TaskEntry* Master::srchTask (TaskId taskId)
03106 {
03107   TaskEntry *task;
03108 
03109   // check in lTasksRun
03110   lTasksRun.Start_Get ();
03111   while ((task = (TaskEntry *) lTasksRun.Get_Next ()))
03112     if (task->id == taskId)
03113       break;
03114 
03115   // check in lTasksWait
03116   if (!task) {
03117     lTasksWait.Start_Get ();
03118     while ((task = (TaskEntry *) lTasksWait.Get_Next ()))
03119       if (task->id == taskId)
03120         break;
03121   }
03122 
03123   return task;
03124 }
03125 
03126 
03127 //locates ReserveEntry for a given resID in lReserv
03128 ReserveEntry *Master::srchReserve(ReserveId &resID)
03129 {
03130     ReserveEntry *res;
03131 
03132     lReservs.Start_Get();
03133     while ((res = (ReserveEntry *) lReservs.Get_Next ()))
03134         if (res->id == resID)
03135             break;
03136 
03137     return res;
03138 }
03139 
03140 // returns matching TaskInfo if slaveInfo satisfies requirements of 
03141 // one of the taskInfos, otherwise returns 0
03142 TaskInfo *Master::slaveMatchTasks(SlaveInfo *slaveInfo, List *taskInfos)
03143 {
03144     TaskInfo *info;
03145     int softfound, i, j;
03146     char _host[26];
03147 
03148     if (!taskInfos) return 0;
03149     taskInfos->Start_Get();
03150     slaveInfo->addr.getIP (_host);
03151     while ((info = (TaskInfo *)taskInfos->Get_Next()))
03152     {
03153 #ifdef DBUG_MASTER
03154     DBUG_PRINT("info", ("Master: checking for TaskInfo - SlaveInfo match\n"
03155                   "              (%s;%s;%.f;%.f) (%s;%s;%.0f;%.0f)@%s",
03156                          info->os(), 
03157                          info->cpu(), 
03158                          info->mem * info->mem_unit, 
03159                          info->disk * info->disk_unit, 
03160                          slaveInfo->os(), 
03161                          slaveInfo->cpu(), 
03162                          slaveInfo->mem * slaveInfo->mem_unit, 
03163                          slaveInfo->disk * slaveInfo->disk_unit,
03164                          _host
03165                ));
03166 #endif
03167 
03168         // all software required in info must be in slaveInfo
03169         softfound = 1;
03170         for (i = 0; i < info->nSoftInfos; i++)
03171         {
03172           softfound = 0;
03173           for (j = 0; j < slaveInfo->nSoftInfos; j++)
03174             if ((strcmp(info->softwareInfos[i].name(), 
03175                         slaveInfo->softwareInfos[j].name()) == 0) &&
03176                 ((strcmp(info->softwareInfos[i].version(), 
03177                          slaveInfo->softwareInfos[j].version()) == 0) ||
03178                  (strcmp(info->softwareInfos[i].version(), "*") == 0))) 
03179             {
03180               softfound = 1;
03181               break;
03182             }
03183           if (!softfound) 
03184             break;
03185         }
03186 
03187         if ((strcmp(info->os(), slaveInfo->os()) == 0) &&
03188             (strcmp(info->cpu(), slaveInfo->cpu()) == 0) &&
03189             (info->mem * info->mem_unit <= slaveInfo->mem * slaveInfo->mem_unit) &&
03190             (info->disk * info->disk_unit <= slaveInfo->disk *slaveInfo->disk_unit) &&
03191             softfound) 
03192         {
03193            DBUG_PRINT("info", 
03194                         ("Master: match ok for TaskInfo - SlaveInfo\n"
03195                   "              (%s;%s;%.f;%.f) (%s;%s;%.0f;%.0f)@%s",
03196                          info->os(), 
03197                          info->cpu(), 
03198                          info->mem * info->mem_unit, 
03199                          info->disk * info->disk_unit, 
03200                          slaveInfo->os(), 
03201                          slaveInfo->cpu(), 
03202                          slaveInfo->mem * slaveInfo->mem_unit, 
03203                          slaveInfo->disk * slaveInfo->disk_unit,
03204                          _host
03205                ));
03206             return info;
03207         }
03208     }
03209     //DBUG_PRINT("info",("Master: no match"));
03210     return 0;
03211 }
03212 
03213 //returns 1 if info1 satisfies info2
03214 int Master::slaveMatchSlave(SlaveInfo *info1, SlaveInfo *info2)
03215 {
03216     int i, j, softfound;
03217     char _host[26];
03218     info2->addr.getIP (_host);
03219 
03220 #ifdef DBUG_MASTER
03221     DBUG_PRINT("info", ("Master: checking for SlaveInfo - SlaveInfo match\n"
03222                   "              (%s;%s;%.f;%.f) (%s;%s;%.0f;%.0f)@%s",
03223                          info1->os(), 
03224                          info1->cpu(), 
03225                          info1->mem * info1->mem_unit, 
03226                          info1->disk * info1->disk_unit, 
03227                          info2->os(), 
03228                          info2->cpu(), 
03229                          info2->mem * info2->mem_unit, 
03230                          info2->disk * info2->disk_unit,
03231                          _host
03232                ));
03233 #endif
03234 
03235     // all software required in info2 must be in info1
03236     softfound = 1;
03237     for (i = 0; i < info2->nSoftInfos; i++)
03238     {
03239       softfound = 0;
03240       for (j = 0; j < info1->nSoftInfos; j++)
03241         if ((strcmp(info2->softwareInfos[i].name(), info1->softwareInfos[j].name()) == 0) &&
03242            ((strcmp(info2->softwareInfos[i].version(), info1->softwareInfos[j].version()) == 0) ||
03243              (strcmp(info2->softwareInfos[i].version(), "*") == 0))) 
03244         {
03245           softfound = 1;
03246           break;
03247         }
03248       if (!softfound) break;
03249     }
03250 
03251   if ((strcmp(info1->os(), info2->os()) == 0) &&
03252       (strcmp(info1->cpu(), info2->cpu()) == 0) &&
03253       (info1->mem * info1->mem_unit >= info2->mem * info2->mem_unit) &&
03254       (info1->disk * info1->disk_unit >= info2->disk *info2->disk_unit) &&
03255       softfound)
03256   {
03257     DBUG_PRINT("info", ("Master: match ok for SlaveInfo - SlaveInfo\n"
03258                   "              (%s;%s;%.f;%.f) (%s;%s;%.0f;%.0f)@%s",
03259                          info1->os(), 
03260                          info1->cpu(), 
03261                          info1->mem * info1->mem_unit, 
03262                          info1->disk * info1->disk_unit, 
03263                          info2->os(), 
03264                          info2->cpu(), 
03265                          info2->mem * info2->mem_unit, 
03266                          info2->disk * info2->disk_unit,
03267                          _host
03268                ));
03269     return 1;
03270   } 
03271   else
03272     return 0;
03273 }
03274 
03275 //returns 1 if slaveInfo satisfies at least one of slaveInfos
03276 int Master::slaveMatchSlaves (SlaveInfo *slaveInfo, List *slaveInfos)
03277 {
03278     SlaveInfo *info;
03279     if (!slaveInfos) return 0;
03280     slaveInfos->Start_Get();
03281     while ((info = (SlaveInfo *)slaveInfos->Get_Next()))
03282         if (slaveMatchSlave(slaveInfo, info))
03283                 return 1;
03284     return 0;
03285 }
03286 
03287 
03288 //returns slave from lSlaves that has a specified address
03289 SlaveEntry *Master::srchSlaveAddr(Address &addr)
03290 {
03291   SlaveEntry *slv;
03292   char _h1[26], _h2[26];
03293   addr.getIP (_h2);
03294 
03295   lSlaves.Start_Get();
03296   while ((slv = (SlaveEntry *) lSlaves.Get_Next ())) {
03297     slv->info->addr.getIP (_h1);
03298     //DBUG_PRINT("info", ("Master: search (compare %s %s)", _h1, _h2));
03299     if (/*(slv->stat == slave_reserved) &&*/
03300         (slv->info->addr == addr))
03301       return slv;
03302   }
03303 
03304   //DBUG_PRINT("warn", ("Master: slave %s not found", _h2));
03305 
03306   return 0;
03307 }
03308 
03309 //returns a best-rank slave on 'slaves' list that is in a given state
03310 // and satisfies taskInfos (if non-0)
03311 SlaveEntry *Master::srchSlaveTaskInfos(slave_state state, List *taskInfos, 
03312                               CharStr *&matchedURL, long &timeout, CharStr *&userData)
03313 {
03314     SlaveEntry *s, *bests = 0;
03315     TaskInfo *match, *bestm = 0;
03316     double best_rank = -1.0;
03317 
03318     lSlaves.Start_Get();
03319     while ((s = (SlaveEntry *) lSlaves.Get_Next ()))
03320         if ((s->stat() == state) &&
03321             (match = slaveMatchTasks(s->info, taskInfos)))
03322         {
03323             if (s->statistics.rank > best_rank)
03324             {
03325               best_rank = s->statistics.rank;
03326               bests = s;
03327               bestm = match;
03328             }
03329         }
03330      if (bests)
03331      {
03332        match = bestm;  //slaveMatchTasks(bests->info, taskInfos);
03333        matchedURL = match->_cs_url();
03334        timeout = match->timeout;
03335        userData = match->_cs_userData();
03336      }
03337 
03338      return bests;
03339 }
03340 
03341 //returns the first slave from lSlaves that
03342 // satisfies at least one slaveInfo from slaveInfos list and has specified
03343 // state, if no slave is found, 0 is returned
03344 SlaveEntry*   Master::srchSlaveSlaveInfos(slave_state state, List *slaveInfos)
03345 {
03346     SlaveEntry *s = 0, *bests = 0;    
03347     double best_rank = -1.0;
03348 
03349     lSlaves.Start_Get();
03350     while ((s = (SlaveEntry *) lSlaves.Get_Next ()))
03351         if ((s->stat() == state) &&
03352             slaveMatchSlaves(s->info, slaveInfos))
03353         {
03354           if (s->statistics.rank > best_rank)
03355           {
03356             best_rank = s->statistics.rank;
03357             bests = s;
03358           }
03359         }
03360     
03361     return bests;
03362 }
03363 
03364 //finds the URL with matching OS-CPU combination in a message for a given SlaveInfo
03365 CharStr *Master::matchURL(SlaveInfo *sInfo, XMLData *msg)
03366 {
03367     XMLData *u;
03368     char *url_os, *url_cpu;
03369 
03370     msg->reset();
03371     while ((u = msg->sub(p_URL)) != XMLData::Nil)
03372     {
03373         url_os  = (char*)u->getAttrib(p_OS)->str;
03374         url_cpu = (char*)u->getAttrib(p_CPU)->str;
03375         DBUG_PRINT("info", ("Master: matchURL() OS=%s/%s CPU=%s/%s",
03376                              url_os, sInfo->os(),
03377                              url_cpu, sInfo->cpu()));
03378         if ((strcmp(url_os, sInfo->os()) == 0) &&
03379             (strcmp(url_cpu, sInfo->cpu()) == 0))
03380           return u->getString();
03381     }
03382     return CharStr::Error;
03383 }
03384 
03385 //***********************************************//
03386 // send message functions
03387 
03388 int Master::sndMessageToClient (Address *remote, JobEntry *job, XMLData *msg, int dont_queue)
03389 {
03390   if (!job) dont_queue = 1;   //just in case...
03391 
03392   // don't send anymore to lost clients
03393   if (! remote->getPort ()) {
03394     if (dont_queue)
03395       delete msg;
03396     return 0;
03397   }
03398 
03399   if (!dont_queue)
03400   {
03401     ClientMsgEntry *qmsg = new ClientMsgEntry();
03402     qmsg->job = job;
03403     qmsg->msg = msg;
03404     qmsg->last_sent.GetTimeOfDay();
03405     qmsg->count_sent = max_send_retry;
03406 #ifdef HAVE_OPENSSL
03407     qmsg->mId = po->sendN (msg, remote, sm_sign);
03408 #else
03409     qmsg->mId = po->sendN (msg, remote);
03410 #endif
03411     lClientMsg.Add(qmsg);
03412   }
03413   else 
03414   {
03415 #ifdef HAVE_OPENSSL
03416     po->sendN (msg, remote, sm_sign);
03417 #else
03418     po->sendN (msg, remote);
03419 #endif
03420     delete msg;
03421   }
03422 
03423   return 0;
03424 }
03425 
03426 
03427 int Master::sndStatusError (Address *remote, char *error, XMLData *jobId)
03428 {
03429   XMLData *msg;
03430 
03431   msg = xmlmsg (p_M_JOB_STATUS, XMLData::Nil);
03432   msg->add (new XMLData(jobId, 0));
03433   JobStatus jstat(0, 0, job_refused);
03434   msg->add (o2x( jstat));
03435   if (error)
03436     msg->add (new XMLData (p_Error, new CharStr (error)));
03437 
03438   log_job_msg(0, msg, mdir_m2c);
03439   
03440   sndMessageToClient (remote, 0, msg, DONT_QUEUE_MSG);
03441 
03442   return 0;
03443 }
03444 
03445 
03446 // send a M_JOB_STATUS message to the client, reserveInfo is optional and contain
03447 //   either ReserveID or list of Reservations already converted to XMLData
03448 int Master::sndJobStatus (Address *remote, JobEntry *job, JobId &jobId, job_state stat, char *error, XMLData *reserveInfo, int dont_queue_msg)
03449 {
03450   XMLData *msg;
03451 
03452   msg = xmlmsg (p_M_JOB_STATUS, XMLData::Nil);
03453   msg->add (o2x (jobId));
03454   //count running and waiting tasks
03455   int nRunning = 0;
03456   int nWaiting = 0;
03457   if (job)
03458   {
03459     TaskEntry *tofj;
03460     job->tasks.Start_Get();
03461     while ((tofj = (TaskEntry *)job->tasks.Get_Next()))
03462       if (tofj->stat == task_started)
03463         ++nRunning;
03464       else
03465         ++nWaiting;
03466   }
03467   JobStatus jstat(nRunning, nWaiting, stat);
03468   XMLData *jstatus = o2x (jstat);
03469   if (reserveInfo != XMLData::Nil)
03470       jstatus->add(reserveInfo);
03471   msg->add (jstatus);
03472   if (error)
03473     msg->add (new XMLData (p_Error, new CharStr (error)));
03474 
03475   //log msg
03476   log_job_msg(job, msg, mdir_m2c);
03477   
03478   sndMessageToClient (remote, job, msg, dont_queue_msg);
03479 
03480   return 0;
03481 }
03482 
03483 
03484 // send a M_TASK_STATUS message to the client
03485 int Master::sndTaskStatus (Address *remote, JobEntry *job, TaskId &taskId, task_state stat, const char *error, int dont_queue_msg)
03486 {
03487   TaskStatus tstat(taskId, stat, error);
03488 
03489   XMLData *msg = o2x(tstat);
03490   TaskEntry *task = srchTask(taskId);
03491 
03492   //log msg
03493   if (task) log_task_msg(task, msg, mdir_m2c);
03494   else log_job_msg(job, msg, mdir_m2c);
03495 
03496   sndMessageToClient (remote, job, msg, dont_queue_msg);
03497 
03498   return 0;
03499 }
03500 
03501 // send a M_SLAVE_STATUS message to the client (ok=1 -> OK, ok=0 -> Refused+Error)
03502 int Master::sndSlaveStatus (Address *remote, int ok, const char *error)
03503 {
03504 
03505   XMLData *msg = xmlmsg(p_M_SLAVE_STATUS, 
03506                    new XMLData(p_Status, ok?p_Ok:p_Refused));
03507   if (!ok) msg->add(new XMLData(p_Error, CharStr::create(error)));
03508 
03509   //log msg
03510   log_other_msg(msg, mdir_m2c);
03511 
03512   sndMessageToClient (remote, 0, msg, DONT_QUEUE_MSG);
03513 
03514   return 0;
03515 }
03516 
03517 
03518 int Master::sndSlaveAvail(Address *remote, JobEntry *job, int slavesRequired, XMLData *slaveInfos, ReserveId resID)
03519 {
03520    XMLData *x = xmlmsg(p_M_SLAVE_AVAIL, o2x(job->id));
03521    x->add(o2x(resID));
03522    x->add(new XMLData(p_SlavesRequired, slavesRequired));
03523    x->add(slaveInfos);
03524 
03525    //log msg
03526    log_job_msg(job, x, mdir_m2c);
03527 
03528    sndMessageToClient(remote, job, x);
03529    return 0;
03530 }
03531 
03532 int Master::sndMessageToSlave (Address *remote, XMLData *msg)
03533 {
03534   //log msg
03535   SlaveEntry *slave = srchSlaveAddr(*remote);
03536 
03537   if (slave) log_slave_msg(slave, msg, mdir_m2s);
03538 
03539   // send the message to the slave
03540 #ifdef HAVE_OPENSSL
03541     // change it when slave will accept crypter
03542     //po->sendN (msg, remote, sm_sign);
03543     po->sendN (msg, remote);
03544 #else
03545     po->sendN (msg, remote);
03546 #endif
03547 
03548   return 0;
03549 }
03550 
03551 
03552 // send a M_TASK_CTRL message to the slave
03553 int Master::sndTaskCtrl (Address *remote, TaskId &taskId, CharStr *action, CharStr *arg)
03554 {
03555   XMLData *msg;
03556 
03557   msg = xmlmsg (p_M_TASK_CTRL, XMLData::Nil);
03558   msg->add (o2x (taskId));
03559   msg->add (new XMLData (p_Action, action));
03560   if (arg != CharStr::Error) msg->add(new XMLData(p_Argument, arg));
03561 
03562   sndMessageToSlave (remote, msg);
03563 
03564   delete msg;
03565 
03566   return 0;
03567 }
03568 
03569 
03570 // send a M_TASK_INIT message to the slave
03571 int Master::sndTaskInit (Address *remote, TaskId &taskId, CharStr *taskURL, long taskTimeout, CharStr *data, CharStr *userData)
03572 {
03573   XMLData *msg;
03574 
03575   msg = xmlmsg(p_M_TASK_INIT, o2x(taskId));
03576   msg->add(new XMLData(p_URL, taskURL));
03577   msg->add(new XMLData(p_TimeOut, taskTimeout));
03578   if (data != CharStr::Error) 
03579     msg->add(new XMLData(XMLData::CDATA, data));
03580   if (userData != CharStr::Error)
03581     msg->add(new XMLData(p_UserData, userData));
03582 
03583   sndMessageToSlave (remote, msg);
03584 
03585   delete msg;
03586 
03587   return 0;
03588 }
03589 
03590 // send a M_SLAVE_CTRL message to the slave
03591 int Master::sndSlaveCtrl (Address *remote, CharStr *action, CharStr *URL)
03592 {
03593   XMLData *msg;
03594 
03595   msg = xmlmsg (p_M_SLAVE_CTRL, new XMLData(p_Action, action));
03596   if (URL != CharStr::Error) msg->add(new XMLData(p_URL, URL));
03597 
03598   sndMessageToSlave (remote, msg);
03599 
03600   delete msg;
03601 
03602   return 0;
03603 }
03604 
03605 
03606 //***********************************************//
03607 // list management functions
03608 
03609 
03610 void Master::deleteTask (TaskEntry *task)
03611 {
03612   if (! task)   
03613      return; 
03614 
03615     // remove task from SlaveEntry
03616   task->slave->task = 0;
03617 
03618   // remove task from JobEntry's list of tasks
03619   task->job->tasks.Remove (task);
03620 
03621   // remove the TaskEntry
03622   if (!lTasksRun.Remove (task))
03623     if (!lTasksWait.Remove (task))
03624       DBUG_PRINT("err", ("Master: error deleteTask: task in none of the lists"));
03625 
03626   // destroy the TaskEntry
03627   delete task;
03628     
03629   return;
03630 }
03631 
03632 
03633 void Master::moveTask (TaskEntry *task, List &lSrc, List &lDst)
03634 {
03635   if (!lSrc.Remove (task))
03636     DBUG_PRINT("err", ("Master: error moveTask: task missing in source list"));
03637 
03638   lDst.Add (task);
03639 }
03640 
03641 void Master::make_dir(char *dir_name)
03642 {
03643 #ifdef _WIN32
03644   if (_mkdir(dir_name))
03645   {
03646 #else
03647   if (mkdir(dir_name, S_IRWXU | S_IRGRP | S_IROTH | S_IXGRP | S_IXOTH))
03648   {
03649 #endif
03650     DBUG_PRINT("err", ("Master: error creating directory %s", dir_name));
03651     fflush(stderr);
03652     fflush(stdout);
03653     exit(1);
03654   }
03655 }
03656 
03657 // prints the message to appropriate log file for the given task
03658 void Master::log_task_msg(TaskEntry *task, XMLData *msg, msg_direction dir)
03659 {
03660   if (!log_on) return;
03661   char logfilename[300];
03662   sprintf(logfilename, "%s/jobs/%s.%d/%d", log_root, task->id.jId.name(), task->id.jId.id, task->id.id);
03663   ofstream logf(logfilename, ios::app | ios::out );
03664   ZTime now; now.GetTimeOfDay();
03665   XMLData *stamp;
03666   if (dir == mdir_c2m)
03667     stamp = new XMLData("Time_C2M", now.print_ctime());
03668   else
03669     stamp = new XMLData("Time_M2C", now.print_ctime()); 
03670   logf << *stamp;
03671   logf << *msg << '\n';
03672   logf.close();
03673   delete stamp;
03674 }
03675  
03676 // prints the message to appropriate log file for the given job
03677 void Master::log_job_msg(JobEntry *job, XMLData *msg, msg_direction dir)
03678 {
03679   if (!log_on) return;
03680   char logfilename[300];
03681   if (!job) {
03682     log_other_msg(msg, dir);
03683     return;
03684   }
03685 
03686   sprintf(logfilename, "%s/jobs/%s.%d/other", log_root, job->id.name(), job->id.id);
03687   ofstream logf(logfilename, ios::app | ios::out );
03688   ZTime now; now.GetTimeOfDay();
03689   XMLData *stamp;
03690   if (dir == mdir_c2m)
03691     stamp = new XMLData("Time_C2M", now.print_ctime());
03692   else
03693     stamp = new XMLData("Time_M2C", now.print_ctime());
03694   logf << *stamp;
03695   logf << *msg << '\n';
03696   logf.close();
03697   delete stamp;
03698 }
03699 
03700 // prints the message to appropriate log file for the given job
03701 void Master::log_other_msg(XMLData *msg, msg_direction dir)
03702 {
03703   if (!log_on) return;
03704   char logfilename[300];
03705   sprintf(logfilename, "%s/other", log_root);
03706   ofstream logf(logfilename, ios::app | ios::out );
03707   ZTime now; now.GetTimeOfDay();
03708   XMLData *stamp;
03709   if (dir == mdir_c2m)
03710     stamp = new XMLData("Time_C2M", now.print_ctime());
03711   else if (dir == mdir_m2c)
03712     stamp = new XMLData("Time_M2C", now.print_ctime());
03713   else if (dir == mdir_s2m)
03714     stamp = new XMLData("Time_S2M", now.print_ctime());
03715   else if (dir == mdir_m2s)
03716     stamp = new XMLData("Time_M2S", now.print_ctime());
03717   logf << *stamp;
03718   logf << *msg << '\n';
03719   logf.close();
03720   delete stamp;
03721 }
03722    
03723 // prints the message to appropriate log file for the given slave
03724 void Master::log_slave_msg(SlaveEntry *slave, XMLData *msg, msg_direction dir)
03725 {
03726   if (!log_on) return;
03727   char logfilename[300];
03728   char sladr[26];
03729   slave->info->addr.getIP(sladr);
03730   char *col = strchr(sladr, ':');
03731   if (!col) return;
03732   *col = '.';
03733   sprintf(logfilename, "%s/slaves/%s", log_root, sladr);
03734   ofstream logf(logfilename, ios::app | ios::out);
03735   ZTime now; now.GetTimeOfDay();
03736   XMLData *stamp;
03737   if (dir == mdir_s2m)
03738     stamp = new XMLData("Time_S2M", now.print_ctime());
03739   else
03740     stamp = new XMLData("Time_M2S", now.print_ctime());
03741   logf << *stamp; 
03742   delete stamp;
03743   logf << *msg << '\n';
03744 }
03745 
03746 void Master::log_slave_state_change(SlaveEntry *slave, slave_state state1, slave_state state2)
03747 {
03748     if (!log_on) return;
03749     char logfilename[300];
03750     char _host[26], *_stat1, *_stat2, _onsince[26];
03751     slave->info->addr.getIP(_host);
03752     sprintf(logfilename, "%s/slaves/stats/status.txt", log_root);
03753     ofstream logf(logfilename, ios::app | ios::out);
03754     ZTime now; now.Now();
03755     double is_on = (double)now.sec - (double)running_since.sec 
03756       + 0.000001 * ((double)now.usec - (double)running_since.usec);
03757     sprintf (_onsince, "%.2f", is_on);
03758     switch (state1) 
03759       {
03760   case slave_off:       _stat1 = "00 Off    "; break;
03761   case slave_disabled:  _stat1 = "10 Disable"; break;
03762   case slave_busy:      _stat1 = "20 Busy   "; break;
03763   case slave_contacted: _stat1 = "30 Contact"; break;
03764   case slave_reserved:  _stat1 = "40 Reserv "; break;
03765   case slave_ready:     _stat1 = "50 Ready  "; break;
03766         default:              _stat1 = "25 Unknown";
03767       }
03768     switch (state2) 
03769       {
03770   case slave_off:       _stat2 = "00 Off    "; break;
03771   case slave_disabled:  _stat2 = "10 Disable"; break;
03772   case slave_busy:      _stat2 = "20 Busy   "; break;
03773   case slave_contacted: _stat2 = "30 Contact"; break;
03774   case slave_reserved:  _stat2 = "40 Reserv "; break;
03775         case slave_ready:     _stat2 = "50 Ready  "; break;
03776         default:              _stat2 = "25 Unknown";
03777       }
03778     logf << _onsince << " "
03779    << now.print_hour() << " "
03780    << _host << " " 
03781    << _stat1 << " "
03782    << _stat2 << "\n";
03783 }
03784 
03785 void Master::log_slave_total(ZTime &now, int sReady, int sDisabled, int sBusy, int sReserved)
03786 {
03787     if (!log_on) return;
03788     char logfilename[300], buff[300], _onsince[26];
03789     sprintf(logfilename, "%s/slaves/stats/total.txt", log_root);
03790     ofstream logf(logfilename, ios::app | ios::out);
03791     double is_on = (double)now.sec - (double)running_since.sec 
03792       + 0.000001 * ((double)now.usec - (double)running_since.usec);
03793     sprintf (_onsince, "%.2f", is_on);
03794     sprintf (buff, "%d %d %d %d P %d %d %d %d", 
03795        sReady, sDisabled, sBusy, sReserved,
03796        sReady, sReady+sDisabled, sReady+sDisabled+sBusy, sReady+sDisabled+sBusy+sReserved);
03797     logf << _onsince << " "
03798    << now.print_hour() << " "
03799    << buff << "\n";
03800 }
03801 
03802 
03803 //verifies he username and password
03804 int Master::verifyUserPswd (CharStr *user, CharStr *pswd, Address *client_addr)
03805 {
03806 //uncomment this if userdb is not working
03807 //return 1;
03808 
03809 #ifdef HAVE_OPENSSL
03810   // incorrect password?
03811   if (userdb->comparepass(user->str, pswd->str))
03812     return 0;
03813   else 
03814     return 1;
03815 #else
03816   return 1;
03817 #endif
03818 }
03819 
03820 // compare 2 users (return 1 if the same user)
03821 int Master::checkUser (CharStr *user1, CharStr *user2)
03822 {
03823   return (! strcmp (user1->str, user2->str));
03824 }
03825 
03826 // compare 2 users (return 1 if the same user)
03827 int Master::checkUser (CharStr *user1, char *user2)
03828 {
03829   return (! strcmp (user1->str, user2));
03830 }
03831 
03832 

Generated on Mon Nov 25 12:46:30 2002 for qadpz by doxygen1.2.18