Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members  

Slave.cpp

Go to the documentation of this file.
00001 /*
00002   Slave - the slave class
00003  
00004   Module:    $RCSfile: Slave.cpp,v $
00005   Date:      $Date: 2002/11/20 20:48:25 $
00006   Version:   $Revision: 1.138 $
00007   ID:        $Id: Slave.cpp,v 1.138 2002/11/20 20:48:25 zoran Exp $
00008   Authors:   Zoran Constantinescu <zoranc@acm.org>
00009 
00010 */
00011 
00012 #include "defs.h"
00013 #include "Slave.h"
00014 #include "SlaveServ.h"
00015 #if defined(_WIN32) || defined(HAVE_OS_Darwin)
00016 #include "SlaveServDB.h"
00017 #endif
00018 #include "Storage.h"
00019 #include "xmlstorage.h"
00020 #include "keywords.h"
00021 #include "other.h"
00022 
00023 #include <fcntl.h>      // open()
00024 
00025 #ifndef _WIN32
00026 
00027 #include <signal.h>       // SIGALRM
00028 #include <sys/types.h>    // chmod()
00029 #include <sys/stat.h>     // chmod()
00030 #include <errno.h>        // errno
00031 #include <unistd.h>       // exit(), unlink()
00032 #include <sys/types.h>
00033 #include <sys/stat.h>
00034 #include <sys/utsname.h>  // uname()
00035 #include <sys/ioctl.h>    // ioctl()
00036 #include <sys/wait.h>
00037 #if defined(HAVE_OS_SunOS)
00038 #include <stropts.h>      // ioctl()
00039 #include <termio.h>
00040 #endif
00041 
00042 #include <setjmp.h>
00043 sigjmp_buf env;
00044 
00045 #include "SlaveInfoUnix.h" // TestProcess(), TestCPU()
00046 
00047 #else
00048 
00049 #include <io.h>            // _mktemp()
00050 #include <direct.h>        // _rmdir()
00051 #include <winbase.h>    // GlobalMemoryStatus()
00052 #include "SlaveInfoWin.h"  // TestProcess(), TestCPU()
00053 
00054 #endif
00055 
00056 // the instance of the slave (needed when crash in task execution)
00057 extern Slave *slv;
00058 char slave_version[] = "$Date: 2002/11/20 20:48:25 $ $Revision: 1.138 $";
00059 
00060 
00061 THRD_RETURN task_entry (THRD_ARG arg)
00062 {
00063   ((Slave *)arg)->startTask (); return 0;
00064 }
00065 
00066 THRD_RETURN timer_entry (THRD_ARG arg)
00067 {
00068   ((Slave *)arg)->startTimer (); return 0;
00069 }
00070 
00071 THRD_RETURN cbtaskstop_entry (THRD_ARG arg)
00072 {
00073   ((Slave *)arg)->startcbTaskStop (); return 0;
00074 }
00075 
00076 THRD_RETURN cbtaskctrl_entry (THRD_ARG arg)
00077 {
00078   ((Slave *)arg)->startcbTaskCtrl (); return 0;
00079 }
00080 
00081 // config file settings
00082 #define K_MASTER_HOST           0
00083 #define K_MASTER_PORT           1
00084 #define K_MASTER_TIMEOUT        2
00085 #define K_SLAVESTATUS_INTERVAL  3
00086 #define K_OS                    4
00087 #define K_CPU                   5
00088 #define K_CBTASKSTOP_TIMEOUT    6
00089 #define K_SLAVE_DBUG_FILE       7
00090 #define K_TMP_DIR               8
00091 #define K_SOFTWARE              9
00092 #define K_SOFTVERSION           10
00093 #define K_SOFTDETECT            11
00094 #define K_SOFTDETROW            12
00095 #define K_SOFTDETWORD           13
00096 #define K_SLAVE_HTTP_PORT       14
00097 #define K_SLAVE_HTTP_ROOT       15
00098 #define N_SLAVE_CFG_KWS         16
00099 
00100 #define DEFAULT_MASTER_HOST             "localhost"
00101 #define DEFAULT_OS                      "autodetect"
00102 #define DEFAULT_CPU                     "autodetect"
00103 #define DEFAULT_SLAVE_CONFIG_FILE       "slave.cfg"
00104 #ifdef _WIN32
00105 #define DEFAULT_TMP_DIR                 "c:\\qadpz"
00106 #define DEFAULT_SLAVE_HTTP_ROOT         "c:\\qadpz\\data"
00107 #else
00108 #define DEFAULT_TMP_DIR                 "/tmp/qadpz"
00109 #define DEFAULT_SLAVE_HTTP_ROOT         "/tmp/qadpz/data"
00110 #endif
00111 #define DEFAULT_SLAVE_DBUG_FILE         ""
00112 #define DEFAULT_MASTER_TIMEOUT          5000    // in msecs
00113 #define DEFAULT_SLAVESTATUS_INTERVAL    10      // in secs
00114 #define DEFAULT_CBTASKSTOP_TIMEOUT      1000    // in msecs, wait before destroying the thread
00115 
00116 Slave::Slave (int catchSignal)
00117 {
00118   static keyword_str kws[N_SLAVE_CFG_KWS] = {
00119     { "master_host", STRING_KEYWORD, 0, {0}, {0}},
00120     { "master_port", LONG_KEYWORD, 0, {0}, {MASTER_PORT}},
00121     { "master_timeout", LONG_KEYWORD, 0, {0}, {DEFAULT_MASTER_TIMEOUT}},
00122     { "slave_status_interval", LONG_KEYWORD, 0, {0}, {DEFAULT_SLAVESTATUS_INTERVAL}},
00123     { "os", STRING_KEYWORD, 0, {0} },
00124     { "cpu", STRING_KEYWORD, 0, {0} },
00125     { "cbtaskstop_timeout", LONG_KEYWORD, 0, {0}, {DEFAULT_CBTASKSTOP_TIMEOUT}},
00126     { "dbug_file", STRING_KEYWORD, 0, {0}},
00127     { "tmp_dir", STRING_KEYWORD, 0, {0}},
00128     { "software", MULTISTRING_KEYWORD, 0, {0}},
00129     { "soft_version", MULTISTRING_KEYWORD, 0, {0}},
00130     { "soft_detect", MULTISTRING_KEYWORD, 0, {0}},
00131     { "soft_detrow", MULTISTRING_KEYWORD, 0, {0}},
00132     { "soft_detword", MULTISTRING_KEYWORD, 0, {0}},
00133     { "slave_http_port", LONG_KEYWORD, 0, {0}, {SLAVE_HTTP_PORT}},
00134     { "slave_http_root", STRING_KEYWORD, 0, {0}}
00135   };
00136 
00137   // read the slave config file
00138   kws[K_MASTER_HOST].default_value.as_string = DEFAULT_MASTER_HOST;
00139   kws[K_OS].default_value.as_string = DEFAULT_OS;
00140   kws[K_CPU].default_value.as_string = DEFAULT_CPU;
00141   kws[K_SLAVE_DBUG_FILE].default_value.as_string = DEFAULT_SLAVE_DBUG_FILE;
00142   kws[K_TMP_DIR].default_value.as_string = DEFAULT_TMP_DIR;
00143   kws[K_SLAVE_HTTP_ROOT].default_value.as_string = DEFAULT_SLAVE_HTTP_ROOT;
00144 
00145   // read config file
00146   int config_found = read_config(DEFAULT_SLAVE_CONFIG_FILE, N_SLAVE_CFG_KWS, kws);
00147   DBUG_SETFILE(kws[K_SLAVE_DBUG_FILE].value.as_string);
00148   if (config_found < 0) 
00149     DBUG_PRINT("err", ("slave config file was not found, using default values"));
00150   DBUG_PRINT("info", ("starting slave version %s %s", 
00151                       SLAVE_QADPZ_VERSION, 
00152                       slave_version));
00153   
00154   // set up temporary directory
00155   tmp_dir = kws[K_TMP_DIR].value.as_string;
00156   DIR_MKDIR(tmp_dir);
00157   //TODO if failed, should switch to OS dependent temporary dir
00158   DBUG_PRINT("info", ("tmp_dir is '%s'", tmp_dir));
00159 
00160   master_timeout = kws[K_MASTER_TIMEOUT].value.as_long;
00161   cbtaskstop_timeout = kws[K_CBTASKSTOP_TIMEOUT].value.as_long;
00162   slave_status_interval = kws[K_SLAVESTATUS_INTERVAL].value.as_long;
00163   def_os = kws[K_OS].value.as_string;
00164   def_cpu = kws[K_CPU].value.as_string;
00165 
00166   // detect software versions
00167   software = kws[K_SOFTWARE].value.as_multistring;
00168   softversion = kws[K_SOFTVERSION].value.as_multistring;
00169   softdetect = kws[K_SOFTDETECT].value.as_multistring;
00170   softdetrow = kws[K_SOFTDETROW].value.as_multistring;
00171   softdetword = kws[K_SOFTDETWORD].value.as_multistring;
00172   int a, b = 0;
00173   a = multi_str_count(software);
00174   if (a != multi_str_count(softversion)) b = 1;
00175   if (a != multi_str_count(softdetect)) b = 1;
00176   if (a != multi_str_count(softdetrow)) b = 1;
00177   if (a != multi_str_count(softdetword)) b = 1;
00178   if (b)
00179   {
00180      DBUG_PRINT("err", ("Config file error: All 5 entries specifying software have to be present in all software entries. I will ingore all software entries."));
00181      software = softversion = softdetect = softdetrow = softdetword = 0;
00182   }
00183 
00184   // resolve the master's address
00185   master_addr = new char[strlen(kws[K_MASTER_HOST].value.as_string) + 1];
00186   strcpy (master_addr, kws[K_MASTER_HOST].value.as_string);
00187   master_local = master.setHost (master_addr);
00188   master.setPort (kws[K_MASTER_PORT].value.as_long);
00189   remote = master;
00190 
00191   // other  
00192   firsttime = 1;
00193   this->catchSignal = catchSignal;
00194 
00195   // initialize the post-office
00196 #ifdef HAVE_OPENSSL
00197   // crypter=0 and noSecPlain=0 only temporarily until slave will use crypter!!
00198   po = (PostOffice *) new PostOffice (SLAVE_PORT, 1, UDP_PORT_ITERATE, 0, 0, catchSignal);
00199 #else
00200   po = (PostOffice *) new PostOffice (SLAVE_PORT, 1, UDP_PORT_ITERATE, 0, 0, catchSignal);
00201 #endif
00202   if (!po->initialized()) 
00203   {
00204     DBUG_PRINT("err", ("error initializing post office"));
00205     exit(1);
00206   }
00207   po->setRemote (master);
00208 
00209   char _host[26];
00210   master.getIP (_host);
00211   DBUG_PRINT("info", ("setting remote master to %s", _host));
00212 
00213   // start with STAT_READY
00214   this->status = slave_ready;
00215 
00216   // set all handlers to null
00217   taskHnd = 0;
00218   taskExec = 0;
00219   taskThread = 0;
00220 
00221   // find out everything about this slave
00222   detectSlaveInfo (&slaveInfo);
00223 
00224   // callbacks to null
00225   cbTaskStop = 0;
00226   cbTaskCtrl = 0;
00227 
00228   // start the timer thread
00229   // find a way to communicate with the thread (maybe pipes?!)
00230   if (::startThread (&timerThread, timer_entry, this)) {
00231     DBUG_PRINT("err", ("error creating timer thread"));
00232     exit(1);
00233   }
00234 }
00235 
00236 
00237 Slave::~Slave ()
00238 {
00239   // must kill the taskThread!
00240   THRD_KILL(taskThread);
00241   taskThread = 0;
00242 
00243   if (po)
00244     delete po;
00245 
00246   PostOffice::release_sockets();
00247   free_messages();
00248 }
00249 
00250 
00251 
00252 #ifndef _WIN32
00253 
00254 // signal handler for faulty slave programs
00255 void sigcatch(int sig)
00256 {
00257   DBUG_PRINT("err", ("### SIG#%d caught in taskExec", sig));
00258   siglongjmp(env, 1);
00259 }
00260 
00261 // this is the method called by the task thread
00262 void Slave::startTask ()
00263 {
00264   int err;
00265 
00266   if (! catchSignal) {
00267     // execute the user provided task function
00268     err = taskExec (data, datares, userdata
00269 #if defined(HAVE_OS_Darwin)
00270         ,
00271 //        q2adpz_slv_task_finish,
00272         q2adpz_slv_task_status,
00273         q2adpz_slv_setcb_task_stop,
00274         q2adpz_slv_setcb_task_ctrl,
00275         q2adpz_slv_put_data,
00276         q2adpz_slv_get_data,
00277         q2adpz_slv_get_tmpdir,
00278         q2adpz_slv_get_max_runtime
00279 #endif
00280         );
00281     this->endTask (0);
00282     ::exitThread();
00283     return;
00284   }
00285 
00286   struct sigaction sa;
00287   sa.sa_handler = sigcatch;
00288   //sa.sa_flags = SA_ONSTACK;
00289   err = sigaction(SIGSEGV, &sa, NULL);
00290   err = sigaction(SIGHUP,  &sa, NULL);
00291   err = sigaction(SIGFPE,  &sa, NULL);
00292   err = sigaction(SIGILL,  &sa, NULL);
00293   if (err < 0) {
00294     DBUG_PRINT("err", ("error setting signal action in startTask"));
00295   }
00296 
00297   if (sigsetjmp(env, 1) == 0) {
00298 
00299     // execute the user provided task function
00300     err = taskExec (data, datares, userdata
00301 #if defined(HAVE_OS_Darwin)
00302         ,
00303 //        q2adpz_slv_task_finish,
00304         q2adpz_slv_task_status,
00305         q2adpz_slv_setcb_task_stop,
00306         q2adpz_slv_setcb_task_ctrl,
00307         q2adpz_slv_put_data,
00308         q2adpz_slv_get_data,
00309         q2adpz_slv_get_tmpdir,
00310         q2adpz_slv_get_max_runtime
00311 #endif
00312         );
00313     this->endTask (0);
00314     ::exitThread();
00315 
00316   } else {
00317 
00318     DBUG_PRINT("err", ("### SEGfault detected!"));
00319     // this doesn't exist anymore!!
00320     slv->endTask (1);
00321     ::exitThread();
00322 
00323   }  // if
00324 }
00325 
00326 #else  // _WIN32
00327 
00328 void Slave::startTask ()
00329 {
00330   int err;
00331 
00332   try {
00333 
00334     // execute the user provided task function
00335     // we need to pass these functions to the dll (only in Win32)
00336     err = taskExec (data, datares, userdata,
00337 //        q2adpz_slv_task_finish,
00338         q2adpz_slv_task_status,
00339         q2adpz_slv_setcb_task_stop,
00340         q2adpz_slv_setcb_task_ctrl,
00341         q2adpz_slv_put_data,
00342         q2adpz_slv_get_data,
00343         q2adpz_slv_get_tmpdir,
00344         q2adpz_slv_get_max_runtime        
00345         );
00346     this->endTask (0);
00347     ::exitThread();
00348 
00349   } catch (...) {
00350 
00351     DBUG_PRINT("err", ("### SEGfault detected!"));
00352     // this doesn't exist anymore!!
00353     slv->endTask (1);
00354     ::exitThread();
00355 
00356   }  // try
00357 }
00358 
00359 #endif  // _WIN32
00360 
00361 void Slave::endTask (int err)
00362 {
00363   taskThread = 0;
00364 
00365   DBUG_PRINT("dbug", ("endTask, datares: '%s'", datares));
00366 
00367   // don't need the library anymore
00368   if (taskHnd != 0) {
00369     DBUG_PRINT("info", ("######################## unloading library"));
00370     SLEEP_SEC (0);
00371 
00372     int err;
00373     err = LIB_UNLOAD(taskHnd);
00374     taskHnd = 0;
00375     FILE_UNLINK (taskLib);
00376 
00377     DBUG_PRINT("info", ("######################## unload ok %d", err));
00378   }
00379   
00380   // don't need the data anymore
00381   memset (data, 0, UDP_MAXSIZE);
00382   
00383   // notify master about the task
00384   if (err) {
00385     // task crashed
00386     sndTaskStatus (&taskId, task_crashed, "err CRASH on execTask");
00387 
00388     // slave becomes ready, send status
00389     this->status = slave_ready;
00390     sndSlaveStatus();
00391   }
00392   else {
00393     // task finished
00394     DBUG_PRINT("debug", ("to send TaskFinish (%s)", datares));
00395     sndTaskFinish (datares);
00396     DBUG_PRINT("debug", ("sent TaskFinis"));
00397   
00398     // dont'need datares anymore
00399     memset (datares, 0, UDP_MAXSIZE);
00400 
00401     // slave becomes ready, send status
00402     this->status = slave_ready;
00403     DBUG_PRINT("debug", ("to send slave status ready"));
00404     sndSlaveStatus();
00405   }
00406 }
00407 
00408 void Slave::startTimer ()
00409 {
00410   if (catchSignal)
00411     sigThread();
00412 
00413   while (1) {
00414     if (master_local) {
00415       // try resolving again master's address
00416       uint master_ip = master.host;
00417       master_local = master.setHost (master_addr);
00418       if (master_local) {
00419         if (master_ip != master.host) {
00420           char _host[26];
00421           master.getIP (_host);
00422           DBUG_PRINT("info", ("resetting remote master to %s", _host));
00423         }
00424       } else {
00425         remote = master;
00426         po->setRemote (master);
00427         firsttime = 1;
00428         char _host[26];
00429         master.getIP (_host);
00430         DBUG_PRINT("info", ("resetting remote master to %s", _host));
00431       }
00432     }
00433     
00434     if (! master_local) {
00435       // send usual status message
00436       lock.lock ();
00437       sndSlaveStatus ();
00438       lock.unlock ();
00439     }
00440 
00441     // wait some time :)
00442     SLEEP_SEC(slave_status_interval);
00443   }
00444 }
00445 
00446 void Slave::startcbTaskStop ()
00447 {
00448   if (catchSignal)
00449     sigThread();
00450 
00451   // call user call-back function
00452   cbTaskStop ();
00453 
00454   // unlock the semaphore waiting for user thread stop
00455   sem.post ();
00456 }
00457 
00458 void Slave::startcbTaskCtrl ()
00459 {
00460   if (catchSignal)
00461     sigThread();
00462 
00463   // call user call-back function
00464   cbTaskCtrl (taskctrl_arg);
00465 
00466 }
00467 
00468 
00469 // forced stop of the running task thread
00470 void Slave::stopTask ()
00471 {
00472   THRD_KILL(taskThread);
00473   taskThread = 0;
00474 }
00475 
00476 
00477 // the main thread which listens for messages on the post-office
00478 void Slave::startSlave ()
00479 {
00480   XMLData *xdata;
00481   Address  remote0;
00482   int      user;
00483 
00484   while (1) {
00485 
00486     //############# detect user logon/logoff 
00487     user = detectUserStat ();
00488 
00489     // somebody logs in
00490     if (user && (this->status != slave_disabled))
00491     {
00492       if (this->status == slave_busy)
00493         msgTaskCtrl_Stop ();
00494       this->status = slave_disabled;
00495       sndSlaveStatus();
00496     }
00497 
00498     // somebody logs out
00499     if (!user && (this->status == slave_disabled))
00500     {
00501       this->status = slave_ready;
00502       sndSlaveStatus();
00503     }
00504     //#############
00505 
00506     // see if new messages from master (timeout=500msec)
00507     xdata = po->receive_any (remote0, 500);
00508 
00509     if (xdata == XMLData::Nil)
00510       continue;
00511 
00512     if (xdata == PostOffice::Timeout)
00513       continue;
00514 
00515     // see if remote has the same IP as master unless it's the first msg from master
00516     if (firsttime)
00517     {
00518       firsttime = 0;
00519       master = remote0;
00520     }
00521     else if (remote0.getHost () != master.getHost ()) {
00522       char host1[16], host2[16];
00523       remote0.getHost (host1);
00524       master.getHost (host2);
00525       DBUG_PRINT("warn", ("data from wrong master %s, known %s", host1, host2));
00526       if (! xdata)
00527         { delete xdata; xdata = 0; }
00528       continue;
00529     }
00530 
00531     // maybe the master is using a different port
00532     remote = remote0;
00533 
00534     // dispatch the message
00535     dispatchMsg (xdata);
00536 
00537   }
00538 
00539   DBUG_PRINT("info", ("slave service terminated"));
00540 }
00541 
00542 
00543 // when a msg is received by the post-office, this
00544 // function must be called to handle the message
00545 // * the msg will be destroyed by this function
00546 // * shouldn't waste time in this function (return asap)
00547 void Slave::dispatchMsg (XMLData *msg)
00548 {
00549   lock.lock ();
00550   {
00551     // M_TASK_INIT message
00552     if (msgType(msg, p_M_TASK_INIT)) msgTaskInit (msg);
00553 
00554     // M_TASK_CTRL message
00555     else if (msgType(msg, p_M_TASK_CTRL)) msgTaskCtrl(msg);
00556 
00557     // M_SLAVE_CTRL message
00558     else if (msgType(msg, p_M_SLAVE_CTRL)) msgSlaveCtrl(msg);
00559 
00560     else {
00561       DBUG_PRINT("err", ("error invalid message Type=%s", 
00562                          msg->getAttrib ("Type")->str));
00563       delete msg;
00564     }
00565   }
00566   lock.unlock ();
00567 }
00568 
00569 
00570 // M_TASK_INIT message
00571 // * a new task is requested to start on the slave
00572 void Slave::msgTaskInit (XMLData *msg)
00573 {
00574   TaskId taskId2;
00575 
00576   do {
00577     // check if we can do this
00578     if (status == slave_disabled) {
00579       DBUG_PRINT("err", ("Slave: error task_ctrl slave is disabled"));
00580       sndTaskStatus (&taskId2, task_refused, "err slave disabled");
00581       break;
00582     }
00583 
00584     // extract taskId and data
00585     x2o(msg->sub (p_TaskID), taskId2);
00586 
00587     // see if we can start a new task
00588     if (this->status != slave_ready) {
00589       DBUG_PRINT("err", ("cannot start another new task"));
00590       sndTaskStatus (&taskId2, task_refused, "task already running");
00591       break;
00592     }
00593 
00594     max_runtime = msg->getLong(p_TimeOut);
00595 
00596 #ifdef _WIN32
00597     // don't want message box warning if running as a service
00598     SetErrorMode (SEM_FAILCRITICALERRORS);
00599 #endif
00600 
00601     // ... load the library
00602     char err_buff[MAXERR_URL];
00603     if (getLibrary (msg->getString (p_URL)->str, taskLib, err_buff) == 0) {
00604       DBUG_PRINT("err", ("error downloading slv_task library: %s", err_buff));
00605       sndTaskStatus (&taskId2, task_refused, "err download slv_task library");
00606       break;
00607     }
00608     taskHnd = LIB_LOAD(taskLib);
00609     if (! taskHnd) {
00610 #ifdef _WIN32
00611       DBUG_PRINT("err", ("error loading slv_task library '%s', error: %d", 
00612                  taskLib, LIB_ERROR()));
00613 #else
00614       DBUG_PRINT("err", ("error loading slv_task library '%s', error: %s", 
00615                  taskLib, LIB_ERROR()));
00616 #endif
00617       sndTaskStatus (&taskId2, task_refused, "err loading slv_task library");
00618       break;
00619     }
00620     DBUG_PRINT("info", ("initializing slv_task library...ok"));
00621 
00622     // ... read task startup function
00623     taskExec = (int (*) (const char *, char *, char *
00624 #if defined(_WIN32) || defined(HAVE_OS_Darwin)
00625                            , 
00626 //                           int  (*) (const char *),
00627                            int  (*) (task_state, char *),
00628                            void (*) (void (*) (void)),
00629                            void (*) (void (*) (const char *)),
00630                            int (*) (const char *, const char *, int),
00631                            int (*) (const char *, const char *, int),
00632                            char * (*) (void),
00633                            long (*) (void)
00634 #endif
00635                           )) LIB_SYMBOL(taskHnd, "taskExec");
00636     if (! taskExec) {
00637       DBUG_PRINT("err", ("error loading 'taskExec' symbol"));
00638       sndTaskStatus (&taskId2, task_refused, "err loading taskExec symbol");
00639       break;
00640     }
00641 
00642     DBUG_PRINT("info", ("reading symbol taskExec...ok"));
00643     taskId = taskId2;
00644 
00645     // get the data from the message
00646     CharStr *getData;
00647     if ((getData = msg->getString(XMLData::CDATA)) != CharStr::Error)
00648       strcpy(data, getData->str);
00649     else data[0] = '\0';
00650     if ((getData = msg->getString(p_UserData)) != CharStr::Error)
00651       strcpy(userdata, getData->str);
00652     else userdata[0] = '\0';
00653     DBUG_PRINT("info", ("setting data for task: '%s', and userdata: '%s'", data, userdata));
00654 
00655     // ... create a new thread for the task and execute it
00656     // find a way to communicate with the thread (maybe pipes?!)
00657     if (::startThread (&taskThread, task_entry, this)) {
00658       DBUG_PRINT("err", ("error creating new task thread"));
00659       sndTaskStatus (&taskId2, task_refused, "err create task thread");
00660       break;
00661     }
00662     // ... notify the master about this
00663     sndTaskStatus (&taskId, task_started);
00664 
00665     // ... now we are running the task
00666     this->status = slave_busy;
00667 
00668   } while (0);
00669 
00670   // delete the message
00671   delete msg;
00672 
00673   return;
00674 }
00675 
00676 
00677 // M_TASK_CTRL message
00678 // * message for controling the running task
00679 void Slave::msgTaskCtrl (XMLData *msg)
00680 {
00681   TaskId taskId2;
00682 
00683   do {
00684     // check if we can do this
00685     if (status == slave_disabled) {
00686       DBUG_PRINT("err", ("Slave: error task_ctrl slave is disabled"));
00687       sndTaskStatus (&taskId2, task_refused, "err slave disabled");
00688       break;
00689     }
00690 
00691     // check if there is any task running
00692     if (status == slave_ready) {
00693       DBUG_PRINT("err", ("Slave: error task_ctrl, no task is running"));
00694       sndTaskStatus (&taskId2, task_refused, "err no task running");
00695       break;
00696     }
00697 
00698     // check if the message is valid
00699     x2o (msg->sub (p_TaskID), taskId2);
00700 
00701     if (taskId != taskId2) {
00702       DBUG_PRINT("err", ("Slave: error task_ctrl, another task id=%d is running", taskId.id));
00703       sndTaskStatus (&taskId2, task_refused, "err another task is running");
00704       break;
00705     }
00706 
00707     // extract the "Action" part and execute it
00708     const char *action;
00709     action = msg->getString (p_Action)->str;
00710 
00711     // *** "Stop"= stops the running task (kill it!)
00712     if (! strcmp (action, "Stop")) {
00713       // notify slave user that the task is going to be terminated
00714       //TODO  -> next line generates recursive SIGSEGV for a faulty slave
00715       //msgTaskCtrl_Stop ();
00716 
00717       sndSlaveStatus ();
00718       break;
00719     }
00720 
00721     // *** "Control"= send a control message to the task
00722     else if (! strcmp (action, "Control")) {
00723       // notify slave user that the task is going to be terminated
00724       //TODO
00725       // !! maybe better to call this from a newly created thread,
00726       // just in case the user blocks this function!
00727 
00728       // ... make a new thread for the function
00729       if (cbTaskCtrlThread) {
00730         taskctrl_arg = msg->getString (p_Argument)->str;
00731         ::startThread (&cbTaskCtrlThread, cbtaskstop_entry, this);
00732       } else {
00733         sndTaskStatus (&taskId2, task_refused, "no callback defined for this task"); 
00734       }
00735 
00736       // don't send TASK_STATUS to the master, will be sent by user
00737       //sndTaskStatus (&taskId2, task_ok, "task ctrl"); 
00738       break;
00739     }
00740 
00741     // invalid action
00742     else {
00743       DBUG_PRINT("err", ("err M_TASK_CTRL invalid Action=%s", action));
00744       sndTaskStatus (&taskId2, task_refused, "err invalid action");
00745       break;
00746     }
00747   } while (0);
00748 
00749   // delete the message
00750   delete msg;
00751 
00752   return;
00753 }
00754 
00755 
00756 void Slave::msgTaskCtrl_Stop ()
00757 {
00758   // if no task is running, no need to stop it
00759   if (status == slave_ready)
00760     return;
00761 
00762   // !! maybe better to call this from a newly created thread,
00763   // just in case the user blocks this function ?!
00764   // and put a timeout for waiting the return of the function
00765 
00766   // ... make a new thread for the function
00767   ::startThread (&cbTaskStopThread, cbtaskstop_entry, this);
00768 
00769   // wait on the semaphore with timeout
00770   // stop the task thread if returned on timeout
00771   if (sem.wait (cbtaskstop_timeout) == SEM_TIMEOUT) {
00772     THRD_KILL(taskThread);
00773     taskThread = 0;
00774     DBUG_PRINT("err", ("Slave: timeout in TaskStop, task killed"));
00775   }
00776 
00777   //sndTaskStatus (&taskId2, task_stopped, "task stop");
00778   status = slave_ready;
00779   LIB_UNLOAD(taskHnd);
00780   FILE_UNLINK (taskLib);
00781 
00782   return;
00783 }
00784 
00785 
00786 #ifndef _WIN32
00787 char *run_file = "slave";
00788 char *old_file = "slave_old";
00789 char *new_file = "slave_new";
00790 #else
00791 char *run_file = "slave.exe";
00792 char *old_file = "slave_old.exe";
00793 char *new_file = "slave_new.exe";
00794 #endif
00795 
00796 
00797 void Slave::msgSlaveCtrl (XMLData *msg)
00798 {
00799   do {
00800 
00801     // extract the "Action" part and execute it
00802     const char *action;
00803     action = msg->getString (p_Action)->str;
00804 
00805     // *** "Disable" = slave will not accept any more tasks
00806     if (! strcmp (action, p_Disable->str)) {
00807       // notify slave user that the task is going to be terminated
00808       if (this->status == slave_busy)
00809         msgTaskCtrl_Stop ();
00810 
00811       this->status = slave_disabled;
00812       sndSlaveStatus();
00813       break;
00814     }
00815 
00816     // *** "Enable" = slave will start accepting tasks
00817     if (! strcmp (action, p_Enable->str)) {
00818       if (this->status == slave_disabled)  
00819         this->status = slave_ready;
00820 
00821       sndSlaveStatus();
00822       break;
00823     }
00824 
00825     // *** "Shutdown" = slave will be stopped completely
00826     else if (! strcmp (action, p_Shutdown->str)) {
00827       // notify slave user that the task is going to be terminated
00828       if (this->status == slave_busy)
00829         msgTaskCtrl_Stop ();
00830 
00831       this->status = slave_off;
00832       sndSlaveStatus();
00833 
00834 /* don't delete directory
00835       // delete the temp directory!?
00836       DIR_RMDIR (tmp_dir);
00837 */
00838       break;
00839     }
00840      
00841     // *** "Upgrade" = slave will be upgraded to new version
00842     else if (! strcmp (action, p_Upgrade->str)) {
00843 
00844       DBUG_PRINT("info", ("! slave upgrade procedure started"));
00845 
00846       // donwload the new executable
00847       char err_buff[MAXERR_URL];
00848       if (getURL (msg->getString (p_URL)->str, new_file, err_buff) == 0) {
00849         sndSlaveStatus ();
00850         break;
00851       }
00852 #ifndef _WIN32
00853       chmod (new_file, S_IRUSR | S_IWUSR | S_IXUSR |
00854                        S_IRGRP | S_IXGRP |
00855                        S_IROTH | S_IXOTH);
00856 #endif
00857 
00858       // notify slave user that the task is going to be terminated
00859       // and terminate it
00860       msgTaskCtrl_Stop();
00861 
00862       // change the executable
00863       if (slaveUpgrade (run_file, new_file) == 1) {
00864         // upgrade unsuccessful
00865         sndSlaveStatus ();
00866         break;
00867       }
00868       else
00869         // old slave succesfuly exits!
00870         // the new slave should be replacing the old exe now
00871         exit (0);
00872 
00873     }
00874 
00875     // invalid action
00876     else {
00877       DBUG_PRINT("err", ("err M_SLAVE_CTRL invalid Action=%s", action));
00878       sndSlaveStatus ();
00879       break;
00880     }
00881   } while (0);
00882 
00883   // delete the message
00884   delete msg;
00885 
00886   return;
00887 }
00888 
00889 
00890 int Slave::sndTaskFinish (const char *datares)
00891 {
00892   XMLData *msg;
00893 
00894   // check if valid running task
00895   if (taskId.id == 0)
00896     return 1;
00897 
00898   // wait for main thread to send the TASK_STATUS(Started) 
00899   // (ie. slave_ready --> slave_busy)
00900   while (this->status == slave_ready)
00901     SLEEP_MSEC(10);
00902 
00903   // create the message
00904   msg = xmlmsg(p_M_TASK_FINISH, o2x(taskId));
00905 
00906   // here comes the result data (a copy will be created!!)
00907   if (datares[0])
00908     msg->add (new XMLData (XMLData::CDATA, datares));
00909 
00910   // send the message
00911   po->send (msg, &remote);
00912   delete msg;
00913 
00914   return 0;
00915 }
00916 
00917 
00918 int Slave::sndTaskStatus (TaskId *taskId, task_state stat, char *error)
00919 {
00920   XMLData *msg;
00921 
00922   if (taskId == 0)
00923     taskId = &(this->taskId);
00924 
00925   // create the message
00926   msg = xmlmsg(p_M_TASK_STATUS, o2x(*taskId));
00927   msg->add (new XMLData (p_Status, o2x (stat)));
00928   if (error)
00929     msg->add (new XMLData (p_Error, new CharStr (error)));
00930 
00931   // send the message
00932   po->send (msg, &remote);
00933   delete msg;
00934 
00935   return 0;
00936 }
00937 
00938  
00939 // will use the default master port for this!
00940 int Slave::sndSlaveStatus ()
00941 {
00942   XMLData *msg; 
00943 
00944   // create the message
00945   msg = xmlmsg(p_M_SLAVE_STATUS, o2x(status));
00946   msg->add (o2x (slaveInfo));
00947 
00948   // send the message
00949   po->send (msg, &remote);
00950   delete msg;
00951 
00952   return 0;
00953 }
00954 
00955 
00956 // callback functions for TASK_CTRL messages
00957 void Slave::setTaskStop (void (*cb) (void))
00958 {
00959   if (cbTaskStop)
00960     cbTaskStop = cb;  
00961 }
00962 
00963 
00964 void Slave::setTaskCtrl (void (*cb) (const char *arg))
00965 {
00966   if (cbTaskCtrl)
00967     cbTaskCtrl = cb;
00968 }
00969 
00970 
00971 // finds out the url of the correct library (for this OS)
00972 // and downloads it localy, so it can be used
00973 // returns the location of the local library
00974 //  os is obsolete
00975 int Slave::getLibrary (const char *taskURL, char *taskLib, char *err_buff)
00976 {
00977   int   res;
00978 
00979   // the library file
00980 #ifndef _WIN32
00981 
00982   sprintf (taskLib, "%s/q2_XXXXXX", tmp_dir);
00983   int id = mkstemp (taskLib);
00984   if (id == -1) {
00985     DBUG_PRINT("info", ("could not create temporary file %s for %s", 
00986                         taskLib, taskURL));
00987     return 0;
00988   }
00989   chmod (taskLib, S_IRUSR|S_IWUSR|S_IXUSR);
00990 
00991 #else
00992 
00993   sprintf (taskLib, "%s\\q2_XXXXXX.dll", tmp_dir);
00994   taskLib[strlen(taskLib) - 4] = '\0';
00995   if (! _mktemp (taskLib)) {
00996     DBUG_PRINT("info", ("could not create temporary file %s for %s", 
00997                         taskLib, taskURL));
00998     return 0;
00999   }
01000   taskLib[strlen(taskLib)] = '.';
01001 
01002 #endif
01003 
01004   // downloads the library to local file
01005   DBUG_PRINT("info", ("downloading %s to %s", taskURL, taskLib));
01006   res = getURL (taskURL, taskLib, err_buff);
01007 
01008   if (res == 0)
01009     { DBUG_PRINT("info", ("download failed: %s", err_buff)); }
01010   else
01011     { DBUG_PRINT("info", ("download ok")); }
01012 
01013   return res;
01014 }
01015 
01016 
01017 // finds out the type of the slave machine:
01018 //  * cpu, os, version
01019 void Slave::detectSlaveInfo (SlaveInfo *slave)
01020 {
01021   int i;
01022 
01023 #ifndef _WIN32
01024   //TODO test with uname instead of THIS_OS and THIS_CPU
01025   struct utsname sys;
01026   uname (&sys);
01027 #endif
01028 
01029   if (strcmp(def_os, "autodetect"))
01030      slave->os (def_os);
01031   else
01032 #ifdef _WIN32
01033     slave->_cs_os(p_Win32);
01034 #else
01035     //slave->os(sys.sysname);
01036     slave->os(THIS_OS);
01037 #endif
01038 
01039   if (strcmp(def_cpu, "autodetect"))
01040     slave->cpu(def_cpu);
01041   else
01042 #ifdef _WIN32
01043     slave->_cs_cpu(p_i386);
01044 #else
01045     //slave->os(sys.machine);
01046     if ((strcmp(THIS_CPU + 2, "86") == 0) && (*((char *)THIS_CPU) == 'i'))
01047       slave->_cs_cpu(p_i386);
01048     else 
01049       slave->cpu(THIS_CPU);
01050 #endif
01051 
01052   // autodetect slave info (CPU, memory, disk)
01053   detectCPU (slave->cpu_speed);
01054   slave->mem_unit  = 1024 * 1024;  // MB
01055   detectMemory (slave->mem);
01056   slave->disk_unit = 1024 * 1024;  // MB
01057   detectDisk (slave->disk);
01058 
01059   slave->version(SLAVE_QADPZ_VERSION);
01060   p_multi_str p = software;
01061   p_multi_str q = softversion;
01062   p_multi_str pd = softdetect;
01063   p_multi_str pdr = softdetrow;
01064   p_multi_str pdw = softdetword;
01065 
01066   slave->nSoftInfos = multi_str_count(p);
01067   if (slave->nSoftInfos)
01068   {
01069     //we allocate full array, but some of them will not be used
01070     //if not detected, the name and version are CharStr::Error
01071     slave->softwareInfos = new SoftwareInfo[slave->nSoftInfos];
01072     char nm[100];
01073 #ifdef _WIN32
01074     sprintf(nm, "%s\\qadpz.soft.detect", tmp_dir);
01075 #else
01076     sprintf(nm, "%s/qadpz.soft.detect", tmp_dir);
01077 #endif
01078     for (i = 0; i < slave->nSoftInfos; i++)
01079     {
01080      // try to autodetect the software 
01081       if (strcmp(pd->str, "0"))   // need to detect?
01082       {
01083 #ifndef _WIN32
01084         int detpid;
01085 
01086         if ((detpid = fork()) == 0)
01087         {
01088            char **args;
01089            split_arguments(args, pd->str, 0);
01090            //redirect stdout and stderr
01091            int fd = open(nm, O_WRONLY | O_CREAT | O_TRUNC, 0700);
01092            close(1);
01093            close(2);
01094            dup2(fd, 1);
01095            dup2(fd, 2);
01096            DBUG_CLOSEFILE();  //to avoid chaotic output in logfile
01097            execvp(args[0], args);
01098            printf("QADPZ_SOFT_DETECT_UNSUCCESSFUL_START\nerrno=%d\n", errno);
01099            close(1); close(2); close(fd);
01100            exit(0);
01101         }
01102 
01103         int status, counter = 0, exited;
01104         do { 
01105           exited = (detpid == waitpid(detpid, &status, WNOHANG));
01106           SLEEP_MSEC(100);
01107           counter++;
01108         } while (!exited && (counter < 100)); //give 10 seconds for detect
01109 
01110         //check if the exec was successful
01111         FILE *f = fopen(nm, "r");
01112         //read the first line from detect output
01113         char s[256];
01114         if (!f) s[0] = '\0';
01115         else if (!fgets(s, 255, f)) s[0] = '\0';
01116         else if (s[strlen(s) - 1] == '\n') s[strlen(s) - 1] = '\0';
01117         //was exec error?
01118         if (strcmp(s, "QADPZ_SOFT_DETECT_UNSUCCESSFUL_START") == 0)
01119         {
01120           //read errno
01121           if (fgets(s, 255, f))
01122             s[strlen(s) - 1] = '\0';
01123           else s[0] = '\0';
01124           fclose(f);
01125           DBUG_PRINT("info", ("software detection negative for software %s, (%s)", p->str, s));
01126           p = p->next; 
01127           q = q->next;
01128           pd = pd->next;
01129           pdr = pdr->next;
01130           pdw = pdw->next;
01131           continue;   //not found -> skip to next software
01132         }
01133         //is the version detection used?
01134         if (!(strcmp(pdr->str, "0") || strcmp(pdw->str, "0")))
01135         {
01136           //no version detection
01137           slave->softwareInfos[i].version(q->str);
01138           DBUG_PRINT("info", ("detected software '%s', no version detection", p->str));
01139         }
01140         else
01141         {
01142           //skip pdr->str - 1 lines from the detect output
01143           char *ver2 = s;
01144           for (counter = 2; counter < atoi(pdr->str); counter++)
01145             if (! (ver2 = fgets(s, 255, f))) break;
01146            //to be continued... (see below)
01147 #else  //WIN32 version
01148         FILE *f;
01149         int failed = 0;
01150         
01151         STARTUPINFO startupInfo;
01152         GetStartupInfo(&startupInfo);
01153         PROCESS_INFORMATION procInfo;
01154         //redirect output so that the user is not bothered with messages;
01155         _unlink(nm);
01156         int redirect_out = _open(nm, _O_WRONLY | _O_CREAT | _O_TRUNC, 0777);
01157         if (redirect_out < 0) failed = redirect_out;
01158         else
01159         {
01160           startupInfo.dwFlags += STARTF_USESTDHANDLES;
01161           startupInfo.hStdInput = GetStdHandle(STD_INPUT_HANDLE);
01162           startupInfo.hStdOutput = (HANDLE) _get_osfhandle(redirect_out);
01163           startupInfo.hStdError = (HANDLE) _get_osfhandle(redirect_out);
01164 
01165           if (!CreateProcess(0, 
01166                pd->str,  // pointer to command line string
01167                0,  // process security attributes 0=handle cannot be inheritted
01168                0,   // thread security attributes 0=handle cannot be inheritted
01169                1,  // handle inheritance flag false=do inherit handles (because of log file)
01170                0, // creation flags, no special flags
01171                0,  // pointer to new environment block, use parent environment
01172                tmp_dir,   // pointer to current directory name
01173                &startupInfo,  // pointer to STARTUPINFO
01174                &procInfo))
01175             failed = 1;        
01176           close(redirect_out);
01177         }
01178         if (failed)
01179         { 
01180           // if detect program not started, skip and process next
01181           DBUG_PRINT("info", ("software detection negative (%d) for software %s, (err=%d)", failed, p->str, GetLastError()));
01182           p = p->next; 
01183           q = q->next;
01184           pd = pd->next;
01185           pdr = pdr->next;
01186           pdw = pdw->next;
01187           close(redirect_out);
01188           continue;   //not found -> skip to next software
01189         }
01190 
01191         //wait for detect program to finish
01192         int terminated = 0, counter = 0;
01193         char s[256];
01194         do {
01195           DWORD exitCode;
01196           GetExitCodeProcess(procInfo.hProcess, &exitCode);
01197           if (exitCode != STILL_ACTIVE) terminated = 1;
01198           else 
01199           {
01200               SLEEP_MSEC(100);
01201               counter++;
01202           }
01203         } while ((!terminated) && (counter < 100));   // wait max. 10 sec.
01204 
01205         //is the version detection used?
01206         if (!(strcmp(pdr->str, "0") || strcmp(pdw->str, "0")))
01207         {
01208           //no version detection
01209           slave->softwareInfos[i].version(q->str);
01210           DBUG_PRINT("info", ("detected software '%s', no version detection", p->str));
01211         }
01212         else
01213         {
01214           f = fopen(nm, "r");
01215           char *ver2;
01216           ver2 = fgets(s, 255, f);
01217           //skip pdr->str - 1 lines from the detect output
01218           if (ver2)
01219             for (counter = 2; counter < atoi(pdr->str); counter++)
01220               if (! (ver2 = fgets(s, 255, f))) break;
01221 #endif
01222    // here continue all platforms...
01223 
01224           // if ver2 is 0, there were not enough lines, a standard
01225           // version will be used
01226           if (!ver2) 
01227           {
01228             DBUG_PRINT("warn", ("not enough lines produced by the detect program '%s' (needed %s lines)", pd->str, pdr->str));
01229             slave->softwareInfos[i].version(q->str);
01230           }
01231           else // line was read in 
01232           {
01233             // now string s contains the proper line
01234             char *ver = s;
01235             if (s[strlen(s) - 1] == '\n') s[strlen(s) - 1] = '\0';
01236             // skip pdw->str - 1 words in this line
01237             for (counter = 1; (*ver) && (counter < atoi(pdw->str)); counter++)
01238             {
01239               ver = strchr(ver, ' ');
01240               if (!ver) break;
01241               while (*ver == ' ') ver++;
01242             }
01243             //see if there were enough words
01244             int wordfound = 1;
01245             if (!ver) wordfound = 0;
01246             if ((wordfound) && (*ver == 0)) wordfound = 0;
01247             if (!wordfound)
01248             {
01249               DBUG_PRINT("warn", ("not enough words on the required line of detect software output for command '%s', line %s and word %s", pd->str, pdr->str, pdw->str));
01250               slave->softwareInfos[i].version(q->str);
01251             }
01252             else
01253             {
01254               //if it is not the last word, terminate it with zero
01255               if ((ver2 = strchr(ver, ' ')) != 0) *ver2 = '\0';
01256               //ver now points to the correct string, remove optional quotes
01257               if (ver[0] == '\"') ver++;
01258               if (ver[strlen(ver) - 1] == '\"') ver[strlen(ver) - 1] = '\0';
01259               //and remember the version
01260               slave->softwareInfos[i].version(ver);
01261               DBUG_PRINT("info", ("detected software '%s' version '%s'", p->str, ver));
01262             }
01263           }
01264         }
01265         if (f) fclose(f);
01266       }
01267       else 
01268       {
01269         DBUG_PRINT("info", ("added software '%s' version '%s', detection disabled", p->str, q->str));
01270         slave->softwareInfos[i].version(q->str);
01271       }
01272 
01273       slave->softwareInfos[i].name(p->str);
01274       p = p->next;
01275       q = q->next;
01276       pd = pd->next;
01277       pdr = pdr->next;
01278       pdw = pdw->next;
01279     }
01280   }
01281 
01282   // we should find out the slave's address
01283   char _local[121];
01284   gethostname (_local, 120);
01285 
01286   Address local (_local, po->getLocalPort ());
01287   slave->addr = local;
01288 }
01289 
01290 
01291 #ifdef _WIN32
01292 extern BOOL bDebug;
01293 #endif
01294 
01295 
01296 // return 1 if user is logon, 0 if logoff
01297 int Slave::detectUserStat ()
01298 {
01299 #ifdef _WIN32
01300   if (bDebug)
01301     return 0;
01302   return TestProcess ("EXPLORER.EXE");
01303 #else
01304   //TODO do it on Unix :(
01305   return TestProcess ("");
01306 #endif
01307 }
01308 
01309 
01310 // return the CPU frequency, 0 is unable to determine
01311 void Slave::detectCPU (int &freq)
01312 {
01313   TestCPU (freq);
01314 
01315   return;
01316 }
01317 
01318 
01319 // return the available memory, 0 is unable to determine
01320 void Slave::detectMemory (double &mem)
01321 {
01322   TestMemory (mem);
01323 
01324   return;
01325 }
01326 
01327 
01328 // return the available local disk space, 0 is unable to determine
01329 void Slave::detectDisk (double &disk)
01330 {
01331   TestDisk (tmp_dir, disk);
01332 
01333   return;
01334 }
01335 
01336 
01337 //uploads given file to a QADPZ data server at given address, if NULL, then default
01338 //data server is used; if taskId0 is non-zero, taskId = 0 is used - for common job files
01339 //returns 0 if error, otherwise non-zero
01340 int Slave::put_data(const char *fileName, const char *url_upload, int taskId0)
01341 {
01342   char str_job_name[25];    // 10+strlen(jobName)
01343   char str_job_id[20];      //  8+
01344   char str_task_id[20];     //  9+
01345   char str_task_file[200];  // 12+strlen(fileName)
01346  
01347   sprintf (str_job_name, "job_name=%s", taskId.jId.name());
01348   sprintf (str_job_id, "job_id=%d", taskId.jId.id);
01349   sprintf (str_task_id, "task_id=%d", taskId0?0:taskId.id);
01350   sprintf (str_task_file, "task_file=@%s", fileName);
01351 
01352   DIR_CHDIR(tmp_dir);
01353 
01354   int rv = postURL (url_upload, "upload.txt",
01355              str_job_name,
01356              str_job_id,
01357              str_task_id,
01358              str_task_file,
01359              "submit=send", 0);
01360 
01361   FILE_UNLINK("upload.txt");
01362 
01363   return rv;
01364 }
01365  
01366 //dowwnloads given file from a QADPZ data server at given address, if NULL, default
01367 //data server is used; if taskId0 is non-zero, taskId = 0 is used - for common job files
01368 //returns 0 if error, non-zero otherwise
01369 int Slave::get_data(const char *fileName, const char *url_download, int taskId0)
01370 {
01371   char str_job_name[25];    // 10+strlen(jobName)
01372   char str_job_id[20];      //  8+
01373   char str_task_id[20];     //  9+
01374   char str_task_file[200];  // 12+strlen(fileName)
01375   char str_filename[400];   // strlen(fileName) + strlen(tmpdir)
01376  
01377   sprintf (str_job_name, "job_name=%s", taskId.jId.name());
01378   sprintf (str_job_id, "job_id=%d", taskId.jId.id);
01379   sprintf (str_task_id, "task_id=%d", taskId0?0:taskId.id);
01380   sprintf (str_task_file, "task_file=@%s", fileName);
01381   sprintf (str_filename, "%s/%s", tmp_dir, fileName);
01382 
01383   DBUG_PRINT("info", ("Slave::get_data(%s,%s)", fileName, url_download));
01384   DBUG_PRINT("dbug", ("str_job_name:%s, str_job_id:%s, str_task_id:%s, str_task_file:%s",
01385                        str_job_name, str_job_id, str_task_id, str_task_file));
01386 
01387   return postURL (url_download, str_filename,
01388            str_job_name,
01389            str_job_id,
01390            str_task_id,
01391            str_task_file,
01392            "submit=send", 0);
01393 }
01394 
01395 
01396 static int try_max = 5;    // how many times to try
01397 static int try_sleep = 1;  // secs between the tries
01398 static char *try_file_new = "slave_new.ok";
01399 static char *try_file_old = "slave_old.ok";
01400 
01401 // will replace the old executable with the new one
01402 // (at this stage, the old one is running and the new one is d/l-ed)
01403 // return 0 if ok, otherwise 1
01404 int Slave::slaveUpgrade(char *oldSlave, char *newSlave)
01405 {
01406       // start the new slave
01407       DBUG_PRINT("info", ("starting new slave: %s", newSlave));
01408 #ifdef _WIN32
01409       STARTUPINFO startupInfo;
01410       GetStartupInfo(&startupInfo);
01411       PROCESS_INFORMATION procInfo;
01412       char argSlave[1024];
01413       sprintf (argSlave, "%s -debug", newSlave);
01414       if (!CreateProcess(newSlave,
01415         argSlave,        // pointer to command line string
01416         0,               // process security attributes 0=handle cannot be inheritted
01417         0,               // thread security attributes 0=handle cannot be inheritted
01418         false,           // handle inheritance flag false=don't inherit handles
01419         0,               // creation flags, no special flags
01420         0,               // pointer to new environment block, use parent environment
01421         0, //?tmpdir,    // pointer to current directory name
01422         &startupInfo,    // pointer to STARTUPINFO
01423         &procInfo))
01424       {
01425         DBUG_PRINT("err", ("cannot execute downloaded executable, errno=%d", errno));
01426         return 1;
01427       }
01428 #else
01429       if ((fork()) == 0)
01430       {
01431         // this is the new slave
01432         DBUG_CLOSEFILE();
01433         execl (newSlave, newSlave, 0);
01434         DBUG_PRINT("err", ("cannot execute new slave, errno=%d", errno));
01435         return 1;
01436       }
01437 #endif
01438 
01439       // this is the old slave
01440       // ...wait msg from new slave that it started ok
01441       // (the new slave is supposed to create a temp file "slave_new.ok")
01442       // (the old slave will try to open that file exclusively a few times)
01443       // (if successful, it will remove that file and create another "slave_old.ok")
01444       // (the new slave will try to open that file exclusively a few times)
01445       // (if successful, ie. old slave finished, it will copy the new executable)
01446       int fid;
01447 
01448       // try opening the file
01449 printf("try opening %s\n", try_file_new);
01450       int i = 0;
01451       while (i < try_max) {
01452         SLEEP_SEC(try_sleep);
01453 printf("try %d\n", i);
01454         if ((fid = FILE_OPEN (try_file_new, O_EXCL)) != -1) {
01455           FILE_CLOSE (fid);
01456           FILE_UNLINK(try_file_new);
01457 printf("try ok. unlink %s\n", try_file_new);
01458           break;
01459         }
01460         i++;
01461       }
01462       if (i == try_max) {
01463 printf("cannot open %s, errno=%d", try_file_new, errno);
01464         DBUG_PRINT("err", ("cannot open %s, errno=%d", try_file_new, errno));
01465         return 1;        
01466       }
01467 
01468 printf("creating %s\n", try_file_old);
01469       // create the temp file for slave old finish
01470       if ((fid = FILE_OPEN (try_file_old, O_CREAT|O_EXCL,
01471 #ifdef _WIN32
01472                                           _S_IREAD | _S_IWRITE
01473 #else
01474                                           S_IRUSR | S_IWUSR | S_IXUSR |
01475                                           S_IRGRP | S_IXGRP |
01476                                           S_IROTH | S_IXOTH
01477 #endif
01478                             )) == -1) {
01479 printf ("cannot create %s, errno=%d", try_file_old, errno);
01480         return 1;        
01481       }
01482       FILE_CLOSE (fid);
01483 printf("created ok\n");
01484 printf("terminating\n");
01485 
01486       return 0;
01487 }
01488 
01489 
01490 // this is the code executed by the new slave
01491 // return 0 = upgrade succesful, new slave is started
01492 //        1 = upgrade failed, but old slave is still running
01493 //        2 = upgrade failed, old slave is restarted
01494 int slaveNewUpgrade()
01495 {
01496   int i;
01497   int fid;
01498 
01499 #ifndef _WIN32
01500   // first we have to detach from the parent...
01501   // ...close connections to stdin, stdout, stderr
01502   close(0);close(1);close(2); 
01503   // ...bind /dev/null to stdin
01504   open("/dev/null",O_RDONLY,0);
01505   // ...bind a log file to stdout and stderr 
01506   dup(open("Process.log",O_WRONLY|O_CREAT,0777));
01507   // ...detach ourselves from our controlling tty
01508   i = open("/dev/tty",O_RDWR);
01509   if (i >= 0) {
01510     ioctl(i,TIOCNOTTY,0);
01511     close(i);
01512   }
01513 #endif
01514 
01515 printf("creating %s\n", try_file_new);
01516   // create the temp file for slave old finish
01517   if ((fid = FILE_OPEN (try_file_new, O_CREAT|O_EXCL,
01518 #ifdef _WIN32
01519                                       _S_IREAD | _S_IWRITE
01520 #else
01521                                       S_IRUSR | S_IWUSR | S_IXUSR |
01522                                       S_IRGRP | S_IXGRP |
01523                                       S_IROTH | S_IXOTH
01524 #endif
01525                             )) == -1) {
01526     DBUG_PRINT("err", ("cannot create %s, errno=%d", try_file_new, errno));
01527     return 1;
01528   }
01529   FILE_CLOSE (fid);
01530 printf("created ok\n");
01531 
01532 printf("try opening %s\n", try_file_new);
01533   i = 0;
01534   while (i < try_max) {
01535     SLEEP_SEC(try_sleep);
01536 printf("try %d\n", i);
01537     if ((fid = FILE_OPEN (try_file_old, O_EXCL)) != -1) {
01538       FILE_CLOSE (fid);
01539       FILE_UNLINK(try_file_old);
01540 printf("try ok. unlink %s\n", try_file_old);
01541       break;
01542     }
01543     i++;
01544   }
01545 
01546   if (i == try_max) {
01547     DBUG_PRINT("err", ("cannot open %s, errno=%d", try_file_old, errno));
01548 printf("cannot open %s, errno=%d\n", try_file_old, errno);
01549     return 1;
01550   }
01551 
01552   // wait a little bit... though the old slave should be down already
01553   SLEEP_SEC(1);
01554 
01555   // now rename the old executable...
01556 printf("renaming %s to %s\n", run_file, old_file);
01557   FILE_RENAME (run_file, old_file);
01558 
01559 printf("copying %s to %s\n", new_file, run_file);
01560   // now copy the new executable (can't rename because it is running)
01561   FILE *fnew = fopen (new_file, "rb");
01562   FILE *frun = fopen (run_file, "wb");
01563 
01564   if (fnew == 0)
01565   {
01566 printf ("error opening %s", new_file);
01567     rename (old_file, run_file);
01568     return 1;
01569   }
01570   if (frun == 0)
01571   {
01572 printf ("error opening %s", run_file);
01573     rename (old_file, run_file);
01574     return 1;
01575   }
01576 
01577   int size, max_size = 16384;
01578   uchar buff[16384];    //msvc requires constant size
01579   while (! feof (fnew)) {
01580     clearerr (fnew);
01581     size = fread (buff, 1, max_size, fnew);
01582 /*
01583     if (ferror (fnew)) {
01584       // problem, must recover old executable...
01585       rename (old_file, run_file);
01586       DBUG_PRINT("err", ("error reading %s", new_file));
01587       return 2;
01588     }
01589 */
01590     size = fwrite (buff, 1, size, frun);
01591 //printf("read %d\n", size);
01592 /*
01593     if (ferror (fnew)) {
01594       // problem, must recover old executable...
01595       rename (old_file, run_file);
01596       DBUG_PRINT("err", ("error writing %s", run_file));
01597       return 2;
01598     }
01599 */
01600   }
01601 
01602   // everything seems ok...
01603   fclose (frun);
01604   fclose (fnew);
01605 
01606 #ifndef _WIN32
01607   chmod (run_file, S_IRUSR | S_IWUSR | S_IXUSR |
01608                    S_IRGRP | S_IXGRP |
01609                    S_IROTH | S_IXOTH);
01610 #endif
01611 
01612   return 0;
01613 }
01614 
01615 //just to have this function callable from slave library (otherwise it wasn't :-((()
01616 void useless()
01617 {
01618   char **x = 0, *y = 0;
01619   split_arguments(x, y, 0);
01620 }

Generated on Mon Nov 25 12:46:31 2002 for qadpz by doxygen1.2.18