00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00036 #include "OW_config.h"
00037 #include "OW_Exec.hpp"
00038 #include "OW_Format.hpp"
00039 #include "OW_Assertion.hpp"
00040 #include "OW_PosixUnnamedPipe.hpp"
00041 #include "OW_Array.hpp"
00042 #include "OW_IOException.hpp"
00043 #include "OW_Thread.hpp"
00044 #include "OW_Select.hpp"
00045 #include "OW_ExceptionIds.hpp"
00046 #include "OW_IntrusiveCountableBase.hpp"
00047 #include "OW_DateTime.hpp"
00048 #include "OW_AutoPtr.hpp"
00049
00050 #include <map>
00051
00052 extern "C"
00053 {
00054 #ifdef OW_HAVE_SYS_RESOURCE_H
00055 #include <sys/resource.h>
00056 #endif
00057 #ifndef OW_WIN32
00058 #include <unistd.h>
00059 #include <sys/wait.h>
00060 #include <fcntl.h>
00061 #endif
00062 #include <errno.h>
00063 #include <stdio.h>
00064 #include <signal.h>
00065 }
00066
00067 #include <cerrno>
00068 #include <iostream>
00069
00070
00071 #ifndef NSIG
00072 #define NSIG 64
00073 #endif
00074
00075 namespace OW_NAMESPACE
00076 {
00077
00078 using std::cerr;
00079 using std::endl;
00080 OW_DEFINE_EXCEPTION_WITH_ID(ExecTimeout);
00081 OW_DEFINE_EXCEPTION_WITH_ID(ExecBufferFull);
00082 OW_DEFINE_EXCEPTION_WITH_ID(ExecError);
00083
00084 #ifndef OW_WIN32
00085 class PopenStreamsImpl : public IntrusiveCountableBase
00086 {
00087 public:
00088 PopenStreamsImpl();
00089 ~PopenStreamsImpl();
00090 UnnamedPipeRef in() const;
00091 void in(const UnnamedPipeRef& pipe);
00092 UnnamedPipeRef out() const;
00093 void out(const UnnamedPipeRef& pipe);
00094 UnnamedPipeRef err() const;
00095 void err(const UnnamedPipeRef& pipe);
00096 Array<UnnamedPipeRef> extraPipes() const;
00097 void setExtraPipes(const Array<UnnamedPipeRef>& pipes);
00098
00099 pid_t pid();
00100 void pid(pid_t newPid);
00101 int getExitStatus();
00102 int getExitStatus(UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term);
00103 void setProcessStatus(int ps)
00104 {
00105 m_processstatus = ps;
00106 }
00107 private:
00108 UnnamedPipeRef m_in;
00109 UnnamedPipeRef m_out;
00110 UnnamedPipeRef m_err;
00111 Array<UnnamedPipeRef> m_extraPipes;
00112 pid_t m_pid;
00113 int m_processstatus;
00114 };
00116 PopenStreamsImpl::PopenStreamsImpl()
00117 : m_pid(-1)
00118 , m_processstatus(-1)
00119 {
00120 }
00122 UnnamedPipeRef PopenStreamsImpl::in() const
00123 {
00124 return m_in;
00125 }
00127 void PopenStreamsImpl::in(const UnnamedPipeRef& pipe)
00128 {
00129 m_in = pipe;
00130 }
00132 UnnamedPipeRef PopenStreamsImpl::out() const
00133 {
00134 return m_out;
00135 }
00137 void PopenStreamsImpl::out(const UnnamedPipeRef& pipe)
00138 {
00139 m_out = pipe;
00140 }
00142 UnnamedPipeRef PopenStreamsImpl::err() const
00143 {
00144 return m_err;
00145 }
00147 void PopenStreamsImpl::err(const UnnamedPipeRef& pipe)
00148 {
00149 m_err = pipe;
00150 }
00152 Array<UnnamedPipeRef> PopenStreamsImpl::extraPipes() const
00153 {
00154 return m_extraPipes;
00155 }
00157 void PopenStreamsImpl::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00158 {
00159 m_extraPipes = pipes;
00160 }
00162 pid_t PopenStreamsImpl::pid()
00163 {
00164 return m_pid;
00165 }
00167 void PopenStreamsImpl::pid(pid_t newPid)
00168 {
00169 m_pid = newPid;
00170 }
00172 static inline ProcId safeWaitPid(ProcId pid, int* status, int options)
00173 {
00174
00175
00176 int localReturnValue = -1;
00177 pid_t returnedPID = ::waitpid(pid, &localReturnValue, options);
00178 if( returnedPID > 0 )
00179 {
00180 *status = localReturnValue;
00181 }
00182 return returnedPID;
00183 }
00184
00186 static ProcId noIntrWaitPid(ProcId pid, int* status, int options)
00187 {
00188 pid_t waitpidrv;
00189 do
00190 {
00191 Thread::testCancel();
00192 waitpidrv = safeWaitPid(pid, status, options);
00193 } while (waitpidrv == -1 && errno == EINTR);
00194 return waitpidrv;
00195 }
00196
00198 static inline void
00199 milliSleep(UInt32 milliSeconds)
00200 {
00201 Thread::sleep(milliSeconds);
00202 }
00204 static inline void
00205 secSleep(UInt32 seconds)
00206 {
00207 Thread::sleep(seconds * 1000);
00208 }
00210 static bool
00211 timedWaitPid(ProcId pid, int * pstatus, UInt32 wait_time)
00212 {
00213 UInt32 const N = 154;
00214 UInt32 const M = 128;
00215 UInt32 const MAXPERIOD = 5000;
00216 UInt32 period = 100;
00217 UInt32 t = 0;
00218 ProcId waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00219 while (t < wait_time && waitpidrv == 0) {
00220 milliSleep(period);
00221 t += period;
00222 period *= N;
00223 period /= M;
00224 if (period > MAXPERIOD)
00225 {
00226 period = MAXPERIOD;
00227 }
00228 waitpidrv = noIntrWaitPid(pid, pstatus, WNOHANG);
00229 }
00230 if (waitpidrv < 0) {
00231 OW_THROW_ERRNO_MSG(ExecErrorException, "waitpid() failed.");
00232 }
00233 return waitpidrv != 0;
00234 }
00235
00237
00238
00239
00240 static bool killWait(
00241 ProcId pid, int * pstatus, UInt32 wait_time, int sig, char const * signame
00242 )
00243 {
00244 if (::kill(pid, sig) == -1) {
00245
00246 int errnum = errno;
00247
00248 if (noIntrWaitPid(pid, pstatus, WNOHANG) > 0) {
00249 return true;
00250 }
00251 else {
00252 Format fmt("Failed sending %1 to process %2.", signame, pid);
00253 char const * msg = fmt.c_str();
00254 errno = errnum;
00255 OW_THROW_ERRNO_MSG(ExecErrorException, msg);
00256 }
00257 }
00258 return timedWaitPid(pid, pstatus, wait_time);
00259 }
00260
00262 int PopenStreamsImpl::getExitStatus()
00263 {
00264 return this->getExitStatus(0, 10 *1000, 10 * 1000);
00265 }
00266
00268 int PopenStreamsImpl::getExitStatus(
00269 UInt32 wait_initial, UInt32 wait_close, UInt32 wait_term)
00270 {
00271 if (m_pid < 0)
00272 {
00273 return m_processstatus;
00274 }
00275 if (m_pid == ::getpid())
00276 {
00277 OW_THROW(ExecErrorException, "PopenStreamsImpl::getExitStatus: m_pid == getpid()");
00278 }
00279
00280 ProcId pid = m_pid;
00281 m_pid = -1;
00282 int * pstatus = &m_processstatus;
00283
00284
00285 wait_initial *= 1000;
00286 wait_close *= 1000;
00287 wait_term *= 1000;
00288
00289 if (wait_initial > 0 && timedWaitPid(pid, pstatus, wait_initial))
00290 {
00291 return m_processstatus;
00292 }
00293
00294 if (wait_close > 0)
00295 {
00296
00297
00298
00299
00300 UnnamedPipeRef upr;
00301 if (upr = in())
00302 {
00303 upr->close();
00304 }
00305 if (upr = out())
00306 {
00307 upr->close();
00308 }
00309 if (upr = err())
00310 {
00311 upr->close();
00312 }
00313 if (timedWaitPid(pid, pstatus, wait_close))
00314 {
00315 return m_processstatus;
00316 }
00317 }
00318
00319 if (wait_term > 0 && killWait(pid, pstatus, wait_term, SIGTERM, "SIGTERM"))
00320 {
00321 return m_processstatus;
00322 }
00323 if (!killWait(pid, pstatus, 5000, SIGKILL, "SIGKILL")) {
00324 OW_THROW(
00325 ExecErrorException, "PopenStreamsImpl::getExitStatus: Child process has not exited after sending it a SIGKILL."
00326 );
00327 }
00328 return m_processstatus;
00329 }
00331 PopenStreamsImpl::~PopenStreamsImpl()
00332 {
00333 try
00334 {
00335
00336 getExitStatus();
00337 }
00338 catch (...)
00339 {
00340 }
00341 }
00342
00344 PopenStreams::PopenStreams()
00345 : m_impl(new PopenStreamsImpl)
00346 {
00347 }
00349 PopenStreams::~PopenStreams()
00350 {
00351 }
00353 UnnamedPipeRef PopenStreams::in() const
00354 {
00355 return m_impl->in();
00356 }
00358 void PopenStreams::in(const UnnamedPipeRef& pipe)
00359 {
00360 m_impl->in(pipe);
00361 }
00363 UnnamedPipeRef PopenStreams::out() const
00364 {
00365 return m_impl->out();
00366 }
00368 void PopenStreams::out(const UnnamedPipeRef& pipe)
00369 {
00370 m_impl->out(pipe);
00371 }
00373 UnnamedPipeRef PopenStreams::err() const
00374 {
00375 return m_impl->err();
00376 }
00378 void PopenStreams::err(const UnnamedPipeRef& pipe)
00379 {
00380 m_impl->err(pipe);
00381 }
00383 Array<UnnamedPipeRef> PopenStreams::extraPipes() const
00384 {
00385 return m_impl->extraPipes();
00386 }
00388 void PopenStreams::setExtraPipes(const Array<UnnamedPipeRef>& pipes)
00389 {
00390 m_impl->setExtraPipes(pipes);
00391 }
00393 pid_t PopenStreams::pid() const
00394 {
00395 return m_impl->pid();
00396 }
00398 void PopenStreams::pid(pid_t newPid)
00399 {
00400 m_impl->pid(newPid);
00401 }
00403 int PopenStreams::getExitStatus()
00404 {
00405 return m_impl->getExitStatus();
00406 }
00408 int PopenStreams::getExitStatus(UInt32 wait0, UInt32 wait1, UInt32 wait2)
00409 {
00410 return m_impl->getExitStatus(wait0, wait1, wait2);
00411 }
00413 void PopenStreams::setProcessStatus(int ps)
00414 {
00415 m_impl->setProcessStatus(ps);
00416 }
00418 PopenStreams::PopenStreams(const PopenStreams& src)
00419 : m_impl(src.m_impl)
00420 {
00421 }
00423 PopenStreams& PopenStreams::operator=(const PopenStreams& src)
00424 {
00425 m_impl = src.m_impl;
00426 return *this;
00427 }
00428
00430 bool operator==(const PopenStreams& x, const PopenStreams& y)
00431 {
00432 return x.m_impl == y.m_impl;
00433 }
00434
00436 namespace Exec
00437 {
00438
00440 int
00441 safeSystem(const Array<String>& command, const EnvVars& envVars)
00442 {
00443 const char* const* envp = (envVars.size() > 0) ? envVars.getenvp() : 0;
00444 return safeSystem(command, envp);
00445 }
00446
00448 int
00449 safeSystem(const Array<String>& command, const char* const envp[])
00450 {
00451 int status;
00452 pid_t pid;
00453 if (command.size() == 0)
00454 {
00455 return 1;
00456 }
00457
00458
00459 AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00460 for (size_t i = 0; i < command.size(); i++)
00461 {
00462 argv[i] = command[i].c_str();
00463 }
00464 argv[command.size()] = 0;
00465
00466 pid = ::fork();
00467 if (pid == -1)
00468 {
00469 return -1;
00470 }
00471 if (pid == 0)
00472 {
00473 try
00474 {
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493 sigset_t emptymask;
00494 sigemptyset(&emptymask);
00495 ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00496
00497 for (size_t sig = 1; sig <= NSIG; ++sig)
00498 {
00499 struct sigaction temp;
00500 sigaction(sig, 0, &temp);
00501 temp.sa_handler = SIG_DFL;
00502 sigaction(sig, &temp, NULL);
00503 }
00504
00505
00506 rlimit rl;
00507 int i = sysconf(_SC_OPEN_MAX);
00508 if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00509 {
00510 if ( i < 0 )
00511 {
00512 i = rl.rlim_max;
00513 }
00514 else
00515 {
00516 i = std::min<int>(rl.rlim_max, i);
00517 }
00518 }
00519 while (i > 2)
00520 {
00521
00522 ::fcntl(i, F_SETFD, FD_CLOEXEC);
00523 i--;
00524 }
00525
00526 int rval;
00527 if (envp)
00528 {
00529 rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00530 }
00531 else
00532 {
00533 rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00534 }
00535 cerr << Format( "Exec::safeSystem: execv failed for program "
00536 "%1, rval is %2", argv[0], rval);
00537 }
00538 catch (...)
00539 {
00540 cerr << "something threw an exception after fork()!";
00541 }
00542 _exit(127);
00543 }
00544 do
00545 {
00546 Thread::testCancel();
00547 if (waitpid(pid, &status, 0) == -1)
00548 {
00549 if (errno != EINTR)
00550 {
00551 return -1;
00552 }
00553 }
00554 else
00555 {
00556 return WEXITSTATUS(status);
00557 }
00558 } while (1);
00559 }
00560
00562 PopenStreams
00563 safePopen(const Array<String>& command,
00564 const String& initialInput)
00565 {
00566 PopenStreams retval = safePopen(command);
00567
00568 if (initialInput != "")
00569 {
00570 if (retval.in()->write(initialInput.c_str(), initialInput.length()) == -1)
00571 {
00572 OW_THROW_ERRNO_MSG(IOException, "Exec::safePopen: Failed writing input to process");
00573 }
00574 }
00575
00576 return retval;
00577 }
00578
00580 PopenStreams
00581 safePopen(const Array<String>& command, const EnvVars& envVars)
00582 {
00583 const char* const* envp = (envVars.size() > 0) ? envVars.getenvp() : 0;
00584 return safePopen(command, envp);
00585 }
00586
00588 PopenStreams
00589 safePopen(const Array<String>& command, const char* const envp[])
00590 {
00591
00592
00593
00594 const int UNKNOWN_EXCEPTION = -2000;
00595
00596 if (command.size() == 0)
00597 {
00598 OW_THROW(ExecErrorException, "Exec::safePopen: command is empty");
00599 }
00600
00601 PopenStreams retval;
00602 retval.in( UnnamedPipe::createUnnamedPipe() );
00603 UnnamedPipeRef upipeOut = UnnamedPipe::createUnnamedPipe();
00604 retval.out( upipeOut );
00605 UnnamedPipeRef upipeErr = UnnamedPipe::createUnnamedPipe();
00606 retval.err( upipeErr );
00607
00608 UnnamedPipeRef execErrorPipe = UnnamedPipe::createUnnamedPipe();
00609
00610
00611 AutoPtrVec<const char*> argv(new const char*[command.size() + 1]);
00612 for (size_t i = 0; i < command.size(); i++)
00613 {
00614 argv[i] = command[i].c_str();
00615 }
00616 argv[command.size()] = 0;
00617
00618 pid_t forkrv = ::fork();
00619 if (forkrv == -1)
00620 {
00621 OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::safePopen: fork() failed");
00622 }
00623 if (forkrv == 0)
00624 {
00625 int execErrorFd = -1;
00626 try
00627 {
00628
00629
00630
00631
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642
00643
00644
00645
00646
00647 sigset_t emptymask;
00648 sigemptyset(&emptymask);
00649 ::sigprocmask(SIG_SETMASK, &emptymask, 0);
00650
00651 for (size_t sig = 1; sig <= NSIG; ++sig)
00652 {
00653 struct sigaction temp;
00654 sigaction(sig, 0, &temp);
00655 temp.sa_handler = SIG_DFL;
00656 sigaction(sig, &temp, NULL);
00657 }
00658
00659
00660 close(0);
00661 close(1);
00662 close(2);
00663
00664
00665 UnnamedPipeRef foo1 = retval.in();
00666 PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00667
00668 UnnamedPipeRef foo2 = retval.out();
00669 PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00670
00671 UnnamedPipeRef foo3 = retval.err();
00672 PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00673
00674
00675 OW_ASSERT(in);
00676 OW_ASSERT(out);
00677 OW_ASSERT(err);
00678
00679 int rv = dup2(in->getInputHandle(), 0);
00680 OW_ASSERT(rv != -1);
00681 rv = dup2(out->getOutputHandle(), 1);
00682 OW_ASSERT(rv != -1);
00683 rv = dup2(err->getOutputHandle(), 2);
00684 OW_ASSERT(rv != -1);
00685
00686
00687 PosixUnnamedPipeRef execError = execErrorPipe.cast_to<PosixUnnamedPipe>();
00688 OW_ASSERT(execError);
00689 execErrorFd = execError->getOutputHandle();
00690
00691
00692
00693 rlimit rl;
00694 int i = sysconf(_SC_OPEN_MAX);
00695 if (getrlimit(RLIMIT_NOFILE, &rl) != -1)
00696 {
00697 if ( i < 0 )
00698 {
00699 i = rl.rlim_max;
00700 }
00701 else
00702 {
00703 i = std::min<int>(rl.rlim_max, i);
00704 }
00705 }
00706 while (i > 2)
00707 {
00708
00709 ::fcntl(i, F_SETFD, FD_CLOEXEC);
00710 i--;
00711 }
00712
00713 int rval = 0;
00714 if (envp)
00715 {
00716 rval = execve(argv[0], const_cast<char* const*>(argv.get()), const_cast<char* const*>(envp));
00717 }
00718 else
00719 {
00720 rval = execv(argv[0], const_cast<char* const*>(argv.get()));
00721 }
00722
00723 int lerrno = errno;
00724 write(execErrorFd, &lerrno, sizeof(lerrno));
00725 }
00726 catch (...)
00727 {
00728 int errorVal = UNKNOWN_EXCEPTION;
00729 write(execErrorFd, &errorVal, sizeof(errorVal));
00730 }
00731 _exit(127);
00732 }
00733
00734
00735 retval.pid (forkrv);
00736
00737
00738 UnnamedPipeRef foo1 = retval.in();
00739 PosixUnnamedPipeRef in = foo1.cast_to<PosixUnnamedPipe>();
00740 UnnamedPipeRef foo2 = retval.out();
00741 PosixUnnamedPipeRef out = foo2.cast_to<PosixUnnamedPipe>();
00742 UnnamedPipeRef foo3 = retval.err();
00743 PosixUnnamedPipeRef err = foo3.cast_to<PosixUnnamedPipe>();
00744 OW_ASSERT(in);
00745 OW_ASSERT(out);
00746 OW_ASSERT(err);
00747
00748 in->closeInputHandle();
00749 out->closeOutputHandle();
00750 err->closeOutputHandle();
00751
00752 PosixUnnamedPipeRef execErrorPosixPipe = execErrorPipe.cast_to<PosixUnnamedPipe>();
00753 OW_ASSERT(execErrorPosixPipe);
00754
00755 execErrorPosixPipe->closeOutputHandle();
00756
00757 const int SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC = 10;
00758 execErrorPipe->setReadTimeout(SECONDS_TO_WAIT_FOR_CHILD_TO_EXEC);
00759
00760 int childErrorCode = 0;
00761 int bytesRead = execErrorPipe->read(&childErrorCode, sizeof(childErrorCode));
00762
00763 if (bytesRead == ETIMEDOUT)
00764 {
00765
00766
00767 kill(forkrv, SIGKILL);
00768 OW_THROW(ExecErrorException, "Exec::safePopen: timed out waiting for child process to exec()");
00769 }
00770 if (bytesRead > 0)
00771 {
00772
00773 if (childErrorCode == UNKNOWN_EXCEPTION)
00774 {
00775 OW_THROW(ExecErrorException, "Exec::safePopen: child process caught an exception before reaching exec()");
00776 }
00777 else
00778 {
00779 errno = childErrorCode;
00780 OW_THROW_ERRNO_MSG(ExecErrorException, Format("Exec::safePopen: child process failed running exec() process = %1", command[0]));
00781 }
00782 }
00783
00784 return retval;
00785 }
00786
00787 namespace
00788 {
00789
00790 #ifndef OW_MIN
00791 #define OW_MIN(x, y) (x) < (y) ? (x) : (y)
00792 #endif
00793
00795 class StringOutputGatherer : public OutputCallback
00796 {
00797 public:
00798 StringOutputGatherer(String& output, int outputLimit)
00799 : m_output(output)
00800 , m_outputLimit(outputLimit)
00801 {
00802 }
00803 private:
00804 virtual void doHandleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00805 {
00806 if (m_outputLimit >= 0 && m_output.length() + dataLen > static_cast<size_t>(m_outputLimit))
00807 {
00808
00809 int lentocopy = OW_MIN(m_outputLimit - m_output.length(), dataLen);
00810 if (lentocopy >= 0)
00811 {
00812 m_output += String(data, lentocopy);
00813 }
00814 OW_THROW(ExecBufferFullException, "Exec::StringOutputGatherer::doHandleData(): buffer full");
00815 }
00816
00817 m_output += data;
00818 }
00819 String& m_output;
00820 int m_outputLimit;
00821 };
00822
00824 class SingleStringInputCallback : public InputCallback
00825 {
00826 public:
00827 SingleStringInputCallback(const String& s)
00828 : m_s(s)
00829 {
00830 }
00831 private:
00832 virtual void doGetData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00833 {
00834 if (m_s.length() > 0)
00835 {
00836 inputBuffer.insert(inputBuffer.end(), m_s.c_str(), m_s.c_str() + m_s.length());
00837 m_s.erase();
00838 }
00839 else if (theStream.in()->isOpen())
00840 {
00841 theStream.in()->close();
00842 }
00843 }
00844 String m_s;
00845 };
00846
00847 }
00848
00850 void
00851 executeProcessAndGatherOutput(const Array<String>& command,
00852 String& output, int& processStatus,
00853 int timeoutSecs, int outputLimit, const String& input)
00854 {
00855 executeProcessAndGatherOutput(command, output, processStatus, EnvVars(),
00856 timeoutSecs, outputLimit, input);
00857 }
00858
00860 void executeProcessAndGatherOutput(
00861 const Array<String>& command,
00862 String& output,
00863 int& processStatus,
00864 const EnvVars& envVars,
00865 int timeoutSecs,
00866 int outputLimit,
00867 const String& input)
00868 {
00869 processStatus = -1;
00870 Array<PopenStreams> streams;
00871 streams.push_back(safePopen(command, envVars));
00872 Array<ProcessStatus> processStatuses(1);
00873 SingleStringInputCallback singleStringInputCallback(input);
00874
00875 StringOutputGatherer gatherer(output, outputLimit);
00876 processInputOutput(gatherer, streams, processStatuses,
00877 singleStringInputCallback, timeoutSecs);
00878
00879 if (processStatuses[0].hasExited())
00880 {
00881 processStatus = processStatuses[0].getStatus();
00882 }
00883 else
00884 {
00885 processStatus = streams[0].getExitStatus();
00886 }
00887 }
00888
00890 void
00891 gatherOutput(String& output, PopenStreams& stream, int& processStatus, int timeoutSecs, int outputLimit)
00892 {
00893 Array<PopenStreams> streams;
00894 streams.push_back(stream);
00895 Array<ProcessStatus> processStatuses(1);
00896
00897 StringOutputGatherer gatherer(output, outputLimit);
00898 SingleStringInputCallback singleStringInputCallback = SingleStringInputCallback(String());
00899 processInputOutput(gatherer, streams, processStatuses, singleStringInputCallback, timeoutSecs);
00900 if (processStatuses[0].hasExited())
00901 {
00902 processStatus = processStatuses[0].getStatus();
00903 }
00904 }
00905
00907 OutputCallback::~OutputCallback()
00908 {
00909
00910 }
00911
00913 void
00914 OutputCallback::handleData(const char* data, size_t dataLen, EOutputSource outputSource, PopenStreams& theStream, size_t streamIndex, Array<char>& inputBuffer)
00915 {
00916 doHandleData(data, dataLen, outputSource, theStream, streamIndex, inputBuffer);
00917 }
00918
00920 InputCallback::~InputCallback()
00921 {
00922 }
00923
00925 void
00926 InputCallback::getData(Array<char>& inputBuffer, PopenStreams& theStream, size_t streamIndex)
00927 {
00928 doGetData(inputBuffer, theStream, streamIndex);
00929 }
00930
00931 namespace
00932 {
00933 struct ProcessOutputState
00934 {
00935 bool inIsOpen;
00936 bool outIsOpen;
00937 bool errIsOpen;
00938 size_t availableDataLen;
00939
00940 ProcessOutputState()
00941 : inIsOpen(true)
00942 , outIsOpen(true)
00943 , errIsOpen(true)
00944 , availableDataLen(0)
00945 {
00946 }
00947 };
00948
00949 }
00951 void
00952 processInputOutput(OutputCallback& output, Array<PopenStreams>& streams, Array<ProcessStatus>& processStatuses, InputCallback& input, int timeoutsecs)
00953 {
00954 processStatuses.clear();
00955 processStatuses.resize(streams.size());
00956
00957 Array<ProcessOutputState> processStates(streams.size());
00958 int numOpenPipes(streams.size() * 2);
00959
00960 DateTime curTime;
00961 curTime.setToCurrent();
00962 DateTime timeoutEnd(curTime);
00963 timeoutEnd += timeoutsecs;
00964
00965 Array<Array<char> > inputs(processStates.size());
00966 for (size_t i = 0; i < processStates.size(); ++i)
00967 {
00968 input.getData(inputs[i], streams[i], i);
00969 processStates[i].availableDataLen = inputs[i].size();
00970 if (!streams[i].out()->isOpen())
00971 {
00972 processStates[i].outIsOpen = false;
00973 }
00974 if (!streams[i].err()->isOpen())
00975 {
00976 processStates[i].errIsOpen = false;
00977 }
00978 if (!streams[i].in()->isOpen())
00979 {
00980 processStates[i].inIsOpen = false;
00981 }
00982
00983 }
00984
00985 while (numOpenPipes > 0)
00986 {
00987 Select::SelectObjectArray selObjs;
00988 std::map<int, int> inputIndexProcessIndex;
00989 std::map<int, int> outputIndexProcessIndex;
00990 for (size_t i = 0; i < streams.size(); ++i)
00991 {
00992 if (processStates[i].outIsOpen)
00993 {
00994 Select::SelectObject selObj(streams[i].out()->getSelectObj());
00995 selObj.waitForRead = true;
00996 selObjs.push_back(selObj);
00997 inputIndexProcessIndex[selObjs.size() - 1] = i;
00998 }
00999 if (processStates[i].errIsOpen)
01000 {
01001 Select::SelectObject selObj(streams[i].err()->getSelectObj());
01002 selObj.waitForRead = true;
01003 selObjs.push_back(selObj);
01004 inputIndexProcessIndex[selObjs.size() - 1] = i;
01005 }
01006 if (processStates[i].inIsOpen && processStates[i].availableDataLen > 0)
01007 {
01008 Select::SelectObject selObj(streams[i].in()->getWriteSelectObj());
01009 selObj.waitForWrite = true;
01010 selObjs.push_back(selObj);
01011 outputIndexProcessIndex[selObjs.size() - 1] = i;
01012 }
01013
01014
01015 if (streams[i].pid() != -1)
01016 {
01017 pid_t waitpidrv;
01018 int processStatus(-1);
01019 waitpidrv = noIntrWaitPid(streams[i].pid(), &processStatus, WNOHANG);
01020 if (waitpidrv == -1)
01021 {
01022 streams[i].pid(-1);
01023 OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: waitpid() failed");
01024 }
01025 else if (waitpidrv != 0)
01026 {
01027 streams[i].pid(-1);
01028 streams[i].setProcessStatus(processStatus);
01029 processStatuses[i] = ProcessStatus(processStatus);
01030 }
01031 }
01032 }
01033
01034 const int mstimeout = 100;
01035 int selectrval = Select::selectRW(selObjs, mstimeout);
01036 switch (selectrval)
01037 {
01038 case Select::SELECT_INTERRUPTED:
01039
01040 break;
01041 case Select::SELECT_ERROR:
01042 {
01043 OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: error selecting on stdout and stderr");
01044 }
01045 break;
01046 case Select::SELECT_TIMEOUT:
01047 {
01048
01049
01050 for (size_t i = 0; i < streams.size(); ++i)
01051 {
01052 if (streams[i].pid() == -1)
01053 {
01054 if (processStates[i].inIsOpen)
01055 {
01056 processStates[i].inIsOpen = false;
01057 streams[i].in()->close();
01058 }
01059 if (processStates[i].outIsOpen)
01060 {
01061 processStates[i].outIsOpen = false;
01062 streams[i].out()->close();
01063 --numOpenPipes;
01064 }
01065 if (processStates[i].errIsOpen)
01066 {
01067 processStates[i].errIsOpen = false;
01068 streams[i].err()->close();
01069 --numOpenPipes;
01070 }
01071 }
01072 }
01073
01074 curTime.setToCurrent();
01075 if (timeoutsecs >= 0 && curTime > timeoutEnd)
01076 {
01077 OW_THROW(ExecTimeoutException, "Exec::gatherOutput: timedout");
01078 }
01079 }
01080 break;
01081 default:
01082 {
01083 int availableToFind = selectrval;
01084
01085 curTime.setToCurrent();
01086 timeoutEnd = curTime;
01087 timeoutEnd += timeoutsecs;
01088
01089 for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01090 {
01091 if (!selObjs[i].readAvailable)
01092 {
01093 continue;
01094 }
01095 else
01096 {
01097 --availableToFind;
01098 }
01099 int streamIndex = inputIndexProcessIndex[i];
01100 UnnamedPipeRef readstream;
01101 if (processStates[streamIndex].outIsOpen)
01102 {
01103 if (streams[streamIndex].out()->getSelectObj() == selObjs[i].s)
01104 {
01105 readstream = streams[streamIndex].out();
01106 }
01107 }
01108
01109 if (!readstream && processStates[streamIndex].errIsOpen)
01110 {
01111 if (streams[streamIndex].err()->getSelectObj() == selObjs[i].s)
01112 {
01113 readstream = streams[streamIndex].err();
01114 }
01115 }
01116
01117 if (!readstream)
01118 {
01119 continue;
01120 }
01121
01122 char buff[1024];
01123 int readrc = readstream->read(buff, sizeof(buff) - 1);
01124 if (readrc == 0)
01125 {
01126 if (readstream == streams[streamIndex].out())
01127 {
01128 processStates[streamIndex].outIsOpen = false;
01129 streams[streamIndex].out()->close();
01130 }
01131 else
01132 {
01133 processStates[streamIndex].errIsOpen = false;
01134 streams[streamIndex].err()->close();
01135 }
01136 --numOpenPipes;
01137 }
01138 else if (readrc == -1)
01139 {
01140 OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: read error");
01141 }
01142 else
01143 {
01144 buff[readrc] = '\0';
01145 output.handleData(buff, readrc, readstream == streams[streamIndex].out() ? E_STDOUT : E_STDERR, streams[streamIndex],
01146 streamIndex, inputs[streamIndex]);
01147 }
01148 }
01149
01150
01151 for (size_t i = 0; i < selObjs.size() && availableToFind > 0; ++i)
01152 {
01153 if (!selObjs[i].writeAvailable)
01154 {
01155 continue;
01156 }
01157 else
01158 {
01159 --availableToFind;
01160 }
01161 int streamIndex = outputIndexProcessIndex[i];
01162 UnnamedPipeRef writestream;
01163 if (processStates[streamIndex].inIsOpen)
01164 {
01165 writestream = streams[streamIndex].in();
01166 }
01167
01168 if (!writestream)
01169 {
01170 continue;
01171 }
01172
01173 size_t offset = inputs[streamIndex].size() - processStates[streamIndex].availableDataLen;
01174 int writerc = writestream->write(&inputs[streamIndex][offset], processStates[streamIndex].availableDataLen);
01175 if (writerc == 0)
01176 {
01177 processStates[streamIndex].inIsOpen = false;
01178 streams[streamIndex].in()->close();
01179 }
01180 else if (writerc == -1)
01181 {
01182 OW_THROW_ERRNO_MSG(ExecErrorException, "Exec::gatherOutput: write error");
01183 }
01184 else
01185 {
01186 inputs[streamIndex].erase(inputs[streamIndex].begin(), inputs[streamIndex].begin() + writerc);
01187 input.getData(inputs[streamIndex], streams[streamIndex], streamIndex);
01188 processStates[streamIndex].availableDataLen = inputs[streamIndex].size();
01189 }
01190 }
01191 }
01192 break;
01193 }
01194 }
01195 }
01196
01197 }
01198 #endif
01199 }
01200