OW_PosixUnnamedPipe.cpp

Go to the documentation of this file.
00001 /*******************************************************************************
00002 * Copyright (C) 2001-2004 Vintela, Inc. All rights reserved.
00003 *
00004 * Redistribution and use in source and binary forms, with or without
00005 * modification, are permitted provided that the following conditions are met:
00006 *
00007 *  - Redistributions of source code must retain the above copyright notice,
00008 *    this list of conditions and the following disclaimer.
00009 *
00010 *  - Redistributions in binary form must reproduce the above copyright notice,
00011 *    this list of conditions and the following disclaimer in the documentation
00012 *    and/or other materials provided with the distribution.
00013 *
00014 *  - Neither the name of Vintela, Inc. nor the names of its
00015 *    contributors may be used to endorse or promote products derived from this
00016 *    software without specific prior written permission.
00017 *
00018 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
00019 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00020 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00021 * ARE DISCLAIMED. IN NO EVENT SHALL Vintela, Inc. OR THE CONTRIBUTORS
00022 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00023 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00024 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00025 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00026 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00027 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00028 * POSSIBILITY OF SUCH DAMAGE.
00029 *******************************************************************************/
00030 
00036 #include "OW_config.h"
00037 #include "OW_PosixUnnamedPipe.hpp"
00038 #include "OW_AutoPtr.hpp"
00039 #include "OW_IOException.hpp"
00040 #include "OW_Format.hpp"
00041 #include "OW_SocketUtils.hpp"
00042 #include "OW_Assertion.hpp"
00043 
00044 extern "C"
00045 {
00046 #ifdef OW_WIN32
00047    #define _CLOSE ::_close
00048    #define _WRITE ::_write
00049    #define _READ ::_read
00050    #define _OPEN ::_open
00051    #include <io.h>
00052 #else
00053    #ifdef OW_HAVE_UNISTD_H
00054       #include <unistd.h>
00055    #endif
00056    #define _CLOSE ::close
00057    #define _WRITE ::write
00058    #define _READ ::read
00059    #define _OPEN ::open
00060 #endif
00061 
00062 #include <fcntl.h>
00063 #include <errno.h>
00064 }
00065 #include <cstring>
00066 
00067 namespace OW_NAMESPACE
00068 {
00069 
00070 #ifdef OW_NETWARE
00071 namespace
00072 {
00073 class AcceptThread
00074 {
00075 public:
00076    AcceptThread(int serversock)
00077       : m_serversock(serversock)
00078       , m_serverconn(-1)
00079    {
00080    }
00081 
00082    void acceptConnection();
00083    int getConnectFD() { return m_serverconn; }
00084 private:
00085    int m_serversock;
00086    int m_serverconn;
00087 };
00088 
00089 void
00090 AcceptThread::acceptConnection()
00091 {
00092     struct sockaddr_in sin;
00093    size_t val;
00094     int tmp = 1;
00095 
00096    tmp = 1;
00097    ::setsockopt(m_serversock, IPPROTO_TCP, 1,      // #define TCP_NODELAY 1
00098       (char*) &tmp, sizeof(int));
00099    
00100    val = sizeof(struct sockaddr_in);
00101    if ((m_serverconn = ::accept(m_serversock, (struct sockaddr*)&sin, &val))
00102       == -1)
00103    {
00104       return;
00105    }
00106    tmp = 1;
00107    ::setsockopt(m_serverconn, IPPROTO_TCP, 1, // #define TCP_NODELAY 1
00108       (char *) &tmp, sizeof(int));
00109    tmp = 0;
00110    ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
00111              (char*) &tmp, sizeof(int));
00112 }
00113 
00114 void*
00115 runConnClass(void* arg)
00116 {
00117    AcceptThread* acceptThread = (AcceptThread*)(arg);
00118    acceptThread->acceptConnection();
00119    ::pthread_exit(NULL);
00120    return 0;
00121 }
00122 
00123 int
00124 _pipe(int *fds)
00125 {
00126    int svrfd, lerrno, connectfd;
00127    size_t val;
00128     struct sockaddr_in sin;
00129 
00130    svrfd = socket( AF_INET, SOCK_STREAM, 0 );
00131    sin.sin_family = AF_INET;
00132    sin.sin_addr.s_addr = htonl( 0x7f000001 ); // loopback
00133    sin.sin_port = 0;
00134    memset(sin.sin_zero, 0, 8 );
00135    if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1) 
00136    {
00137       int lerrno = errno;
00138 		::close(svrfd);
00139       fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
00140       return -1;
00141    }
00142    if (listen(svrfd, 1) == -1) 
00143    {
00144       int lerrno = errno;
00145 		::close(svrfd);
00146       return -1;
00147    }
00148    val = sizeof(struct sockaddr_in);
00149    if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1) 
00150    {
00151       int lerrno = errno;
00152       fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
00153 		::close(svrfd);
00154       return -1;
00155    }
00156 
00157    AcceptThread* pat = new AcceptThread(svrfd);
00158    pthread_t athread;
00159    // Start thread that will accept connection on svrfd.
00160    // Once a connection is made the thread will exit.
00161    pthread_create(&athread, NULL, runConnClass, pat);
00162 
00163    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
00164    if (clientfd == -1)
00165    {
00166       delete pat;
00167       return -1;
00168    }
00169 
00170    // Connect to server 
00171    struct sockaddr_in csin;
00172    csin.sin_family = AF_INET;
00173    csin.sin_addr.s_addr = htonl(0x7f000001); // loopback
00174    csin.sin_port = sin.sin_port;
00175    if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
00176    {
00177       delete pat;
00178       return -1;
00179    }
00180 
00181 #define TCP_NODELAY 1
00182    int tmp = 1;
00183    //
00184    // Set for Non-blocking writes and disable keepalive
00185    //
00186    ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
00187    tmp = 0;
00188    ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
00189 
00190    void* threadResult;
00191    // Wait for accept thread to terminate
00192    ::pthread_join(athread, &threadResult);
00193 
00194 	::close(svrfd);
00195    fds[0] = pat->getConnectFD();
00196    fds[1] = clientfd;
00197    delete pat;
00198    return 0;
00199 }
00200 }
00201 #endif // OW_NETWARE
00202 
00204 // STATIC
00205 UnnamedPipeRef
00206 UnnamedPipe::createUnnamedPipe(EOpen doOpen)
00207 {
00208    return UnnamedPipeRef(new PosixUnnamedPipe(doOpen));
00209 }
00211 PosixUnnamedPipe::PosixUnnamedPipe(EOpen doOpen)
00212 #ifndef OW_WIN32
00213    : m_blocking(E_BLOCKING)
00214 #endif
00215 {
00216 #ifdef OW_WIN32
00217    m_blocking[0] = m_blocking[1] = E_BLOCKING;
00218 #endif
00219    m_fds[0] = m_fds[1] = -1;
00220    if (doOpen)
00221    {
00222       open();
00223    }
00224    setTimeouts(60 * 10); // 10 minutes. This helps break deadlocks when using safePopen()
00225    setBlocking(E_BLOCKING); // necessary to set the pipes up right.
00226 }
00227    
00229 PosixUnnamedPipe::~PosixUnnamedPipe()
00230 {
00231    close();
00232 }
00234 void
00235 PosixUnnamedPipe::setBlocking(EBlockingMode outputIsBlocking)
00236 {
00237 #ifdef OW_WIN32
00238    // precondition
00239    OW_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00240 
00241    m_blocking[0] = outputIsBlocking;
00242    m_blocking[1] = outputIsBlocking;
00243    // Unnamed pipes on Win32 cannot do non-blocking i/o (aka async, overlapped)
00244    // Only named pipes can. If this becomes a problem in the future, then
00245    // PosixUnnamedPipe can be implemented with NamedPipes. I know this can be
00246    // a problem with the signal handling mechanism that is used in the daemon
00247    // code, but I plan on do that differently on Win32
00248 // OW_ASSERT(outputIsBlocking);
00249    return;
00250 #else
00251    // precondition
00252    OW_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00253 
00254    m_blocking = outputIsBlocking;
00255 
00256    for (size_t i = 0; i <= 1; ++i)
00257    {
00258       int fdflags = fcntl(m_fds[i], F_GETFL, 0);
00259       if (fdflags == -1)
00260       {
00261          OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00262       }
00263       if (outputIsBlocking == E_BLOCKING)
00264       {
00265          fdflags &= !O_NONBLOCK;
00266       }
00267       else
00268       {
00269          fdflags |= O_NONBLOCK;
00270       }
00271       if (fcntl(m_fds[i], F_SETFL, fdflags) == -1)
00272       {
00273          OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00274       }
00275    }
00276 
00277 #endif
00278 }
00280 void
00281 PosixUnnamedPipe::setOutputBlocking(bool outputIsBlocking)
00282 {
00283 #ifdef OW_WIN32
00284    // precondition
00285    OW_ASSERT(m_fds[1] != -1);
00286    
00287    m_blocking[1] = outputIsBlocking ? E_BLOCKING : E_NONBLOCKING ;
00288    // Unnamed pipes on Win32 cannot do non-blocking i/o (aka async, overlapped)
00289    // Only named pipes can. If this becomes a problem in the future, then
00290    // PosixUnnamedPipe can be implemented with NamedPipes. I know this can be
00291    // a problem with the signal handling mechanism that is used in the daemon
00292    // code, but I plan on do that differently on Win32
00293 // OW_ASSERT(outputIsBlocking);
00294    return;
00295 #else
00296    // precondition
00297    OW_ASSERT(m_fds[1] != -1);
00298    
00299    m_blocking = outputIsBlocking ? E_BLOCKING : E_NONBLOCKING ;
00300    int fdflags = fcntl(m_fds[1], F_GETFL, 0);
00301    if (fdflags == -1)
00302    {
00303       OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00304    }
00305    if (outputIsBlocking)
00306    {
00307       fdflags ^= O_NONBLOCK;
00308    }
00309    else
00310    {
00311       fdflags |= O_NONBLOCK;
00312    }
00313    if (fcntl(m_fds[1], F_SETFL, fdflags) == -1)
00314    {
00315       OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00316    }
00317 #endif
00318 }
00320 void
00321 PosixUnnamedPipe::open()
00322 {
00323    if (m_fds[0] != -1)
00324    {
00325       close();
00326    }
00327 #if defined(OW_WIN32)
00328    HANDLE pipe = CreateNamedPipe( "\\\\.\\pipe\\TestPipe",
00329       PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED,
00330       PIPE_TYPE_MESSAGE,
00331       PIPE_UNLIMITED_INSTANCES,
00332       2560,
00333       2560,
00334       NMPWAIT_USE_DEFAULT_WAIT,
00335       NULL );
00336 
00337    HANDLE client = CreateFile( "\\\\.\\pipe\\TestPipe",
00338       GENERIC_READ,
00339       FILE_SHARE_READ,
00340       NULL,
00341       OPEN_EXISTING,
00342       FILE_FLAG_OVERLAPPED,
00343       NULL );
00344 
00345    HANDLE event1 = CreateEvent(NULL, TRUE, FALSE, NULL);
00346    HANDLE event2 = CreateEvent(NULL, TRUE, FALSE, NULL);
00347 
00348    // Should return immediately since the client connection is open.
00349    BOOL bConnected = ConnectNamedPipe( pipe, NULL );
00350    if( !bConnected && GetLastError() == ERROR_PIPE_CONNECTED )
00351       bConnected = TRUE;
00352 
00353    BOOL bSuccess = 
00354       pipe != INVALID_HANDLE_VALUE && 
00355       client != INVALID_HANDLE_VALUE && 
00356       event1 != INVALID_HANDLE_VALUE &&
00357       event2 != INVALID_HANDLE_VALUE &&
00358       bConnected;
00359 
00360    if( !bSuccess )
00361    {
00362       CloseHandle(pipe);
00363       CloseHandle(client);
00364       CloseHandle(event1);
00365       CloseHandle(event2);
00366    }
00367    else
00368    {
00369       m_fds[0] = (int)client;    // read descriptor
00370       m_fds[1] = (int)pipe;      // write descriptor
00371       m_events[0] = (int)event1;
00372       m_events[1] = (int)event2;
00373    }
00374 
00375    if( !bSuccess )
00376 // if (::_pipe(m_fds, 2560, _O_BINARY) == -1)
00377 #elif defined(OW_NETWARE)
00378    if (_pipe(m_fds) == -1)
00379 #else
00380    if (::pipe(m_fds) == -1)
00381 #endif
00382    {
00383       m_fds[0] = m_fds[1] = -1;
00384       OW_THROW(UnnamedPipeException, ::strerror(errno));
00385    }
00386 }
00388 int
00389 PosixUnnamedPipe::close()
00390 {
00391    int rc = -1;
00392    if (m_fds[0] != -1)
00393    {
00394 #ifdef OW_WIN32
00395       HANDLE h = (HANDLE)m_fds[0];
00396       HANDLE e = (HANDLE)m_events[0];
00397       if( CloseHandle(h) && CloseHandle(e) )
00398          rc = 0;
00399 #else
00400       rc = _CLOSE(m_fds[0]);
00401 #endif
00402       m_fds[0] = -1;
00403    }
00404    if (m_fds[1] != -1)
00405    {
00406 #ifdef OW_WIN32
00407       HANDLE h = (HANDLE)m_fds[1];
00408       HANDLE e = (HANDLE)m_events[1];
00409       if( CloseHandle(h) && CloseHandle(e) )
00410          rc = 0;
00411 #else
00412       rc = _CLOSE(m_fds[1]);
00413 #endif
00414       m_fds[1] = -1;
00415    }
00416    return rc;
00417 }
00419 bool
00420 PosixUnnamedPipe::isOpen() const
00421 {
00422    return (m_fds[0] != -1) || (m_fds[1] != -1);
00423 }
00424 
00426 int
00427 PosixUnnamedPipe::closeInputHandle()
00428 {
00429    int rc = -1;
00430    if (m_fds[0] != -1)
00431    {
00432 #ifdef OW_WIN32
00433       HANDLE h = (HANDLE)m_fds[0];
00434       HANDLE e = (HANDLE)m_events[0];
00435       if( CloseHandle(h) && CloseHandle(e) )
00436          rc = 0;
00437 #else
00438       rc = _CLOSE(m_fds[0]);
00439 #endif
00440       m_fds[0] = -1;
00441    }
00442    return rc;
00443 }
00445 int
00446 PosixUnnamedPipe::closeOutputHandle()
00447 {
00448    int rc = -1;
00449    if (m_fds[1] != -1)
00450    {
00451 #ifdef OW_WIN32
00452       HANDLE h = (HANDLE)m_fds[1];
00453       HANDLE e = (HANDLE)m_events[1];
00454       if( CloseHandle(h) && CloseHandle(e) )
00455          rc = 0;
00456 #else
00457       rc = _CLOSE(m_fds[1]);
00458 #endif
00459       m_fds[1] = -1;
00460    }
00461    return rc;
00462 }
00464 int
00465 PosixUnnamedPipe::write(const void* data, int dataLen, bool errorAsException)
00466 {
00467    int rc = -1;
00468    if (m_fds[1] != -1)
00469    {
00470 #ifndef OW_WIN32
00471       if (m_blocking == E_BLOCKING)
00472       {
00473          rc = SocketUtils::waitForIO(m_fds[1], m_writeTimeout, SocketFlags::E_WAIT_FOR_OUTPUT);
00474          if (rc != 0)
00475          {
00476             if (errorAsException)
00477             {
00478                OW_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00479             }
00480             else
00481             {
00482                return rc;
00483             }
00484          }
00485       }
00486       rc = _WRITE(m_fds[1], data, dataLen);
00487 #else
00488       BOOL bSuccess = FALSE;
00489 
00490       OVERLAPPED ovl;
00491 
00492       ovl.hEvent = (HANDLE)m_events[1];
00493       ovl.Offset = 0;
00494       ovl.OffsetHigh = 0;
00495 
00496       bSuccess = WriteFile(
00497          (HANDLE)m_fds[1],
00498          data,
00499          dataLen,
00500          NULL,
00501          &ovl);
00502 
00503       if( bSuccess && m_blocking[1] == E_BLOCKING )
00504       {
00505          bSuccess = WaitForSingleObject( (HANDLE)m_events[1], INFINITE ) == WAIT_OBJECT_0;
00506       }
00507 
00508       if( bSuccess )
00509          rc = 0;
00510 #endif
00511    }
00512    if (errorAsException && rc == -1)
00513    {
00514       OW_THROW_ERRNO_MSG(IOException, "pipe write failed.");
00515    }
00516    return rc;
00517 }
00519 int
00520 PosixUnnamedPipe::read(void* buffer, int bufferLen, bool errorAsException)
00521 {
00522    int rc = -1;
00523    if (m_fds[0] != -1)
00524    {
00525 #ifndef OW_WIN32
00526       if (m_blocking == E_BLOCKING)
00527       {
00528          rc = SocketUtils::waitForIO(m_fds[0], m_readTimeout, SocketFlags::E_WAIT_FOR_INPUT);
00529          if (rc != 0)
00530          {
00531             if (errorAsException)
00532             {
00533                OW_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00534             }
00535             else
00536             {
00537                return rc;
00538             }
00539          }
00540       }
00541       rc = _READ(m_fds[0], buffer, bufferLen);
00542 #else
00543       BOOL bSuccess = FALSE;
00544 
00545       OVERLAPPED ovl;
00546 
00547       ovl.hEvent = (HANDLE)m_events[0];
00548       ovl.Offset = 0;
00549       ovl.OffsetHigh = 0;
00550 
00551       bSuccess = ReadFile(
00552          (HANDLE)m_fds[0],
00553          buffer,
00554          bufferLen,
00555          NULL,
00556          &ovl);
00557 
00558       if( bSuccess && m_blocking[0] == E_BLOCKING )
00559       {
00560          bSuccess = WaitForSingleObject( (HANDLE)m_events[0], INFINITE ) == WAIT_OBJECT_0;
00561       }
00562 
00563       if( bSuccess )
00564          rc = 0;
00565 #endif
00566    }
00567    if (errorAsException && rc == -1)
00568    {
00569       OW_THROW_ERRNO_MSG(IOException, "pipe read failed.");
00570    }
00571    return rc;
00572 }
00574 Select_t
00575 PosixUnnamedPipe::getSelectObj() const
00576 {
00577 #ifdef OW_WIN32
00578    Select_t selectObj;
00579    selectObj.event = (HANDLE)m_events[0];
00580    selectObj.sockfd = INVALID_SOCKET;
00581    selectObj.networkevents = 0;
00582    selectObj.doreset = false;
00583 
00584    return selectObj;
00585 #else
00586    return m_fds[0];
00587 #endif
00588 }
00589 
00591 Select_t
00592 PosixUnnamedPipe::getWriteSelectObj() const
00593 {
00594 #ifdef OW_WIN32
00595    Select_t selectObj;
00596    selectObj.event = (HANDLE)m_events[1];
00597    selectObj.sockfd = INVALID_SOCKET;
00598    selectObj.networkevents = 0;
00599    selectObj.doreset = false;
00600 
00601    return selectObj;
00602 #else
00603    return m_fds[1];
00604 #endif
00605 }
00606 
00607 } // end namespace OW_NAMESPACE
00608 

Generated on Thu Feb 9 08:48:08 2006 for openwbem by  doxygen 1.4.6