00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00036 #include "OW_config.h"
00037 #include "OW_PosixUnnamedPipe.hpp"
00038 #include "OW_AutoPtr.hpp"
00039 #include "OW_IOException.hpp"
00040 #include "OW_Format.hpp"
00041 #include "OW_SocketUtils.hpp"
00042 #include "OW_Assertion.hpp"
00043
00044 extern "C"
00045 {
00046 #ifdef OW_WIN32
00047 #define _CLOSE ::_close
00048 #define _WRITE ::_write
00049 #define _READ ::_read
00050 #define _OPEN ::_open
00051 #include <io.h>
00052 #else
00053 #ifdef OW_HAVE_UNISTD_H
00054 #include <unistd.h>
00055 #endif
00056 #define _CLOSE ::close
00057 #define _WRITE ::write
00058 #define _READ ::read
00059 #define _OPEN ::open
00060 #endif
00061
00062 #include <fcntl.h>
00063 #include <errno.h>
00064 }
00065 #include <cstring>
00066
00067 namespace OW_NAMESPACE
00068 {
00069
00070 #ifdef OW_NETWARE
00071 namespace
00072 {
00073 class AcceptThread
00074 {
00075 public:
00076 AcceptThread(int serversock)
00077 : m_serversock(serversock)
00078 , m_serverconn(-1)
00079 {
00080 }
00081
00082 void acceptConnection();
00083 int getConnectFD() { return m_serverconn; }
00084 private:
00085 int m_serversock;
00086 int m_serverconn;
00087 };
00088
00089 void
00090 AcceptThread::acceptConnection()
00091 {
00092 struct sockaddr_in sin;
00093 size_t val;
00094 int tmp = 1;
00095
00096 tmp = 1;
00097 ::setsockopt(m_serversock, IPPROTO_TCP, 1,
00098 (char*) &tmp, sizeof(int));
00099
00100 val = sizeof(struct sockaddr_in);
00101 if ((m_serverconn = ::accept(m_serversock, (struct sockaddr*)&sin, &val))
00102 == -1)
00103 {
00104 return;
00105 }
00106 tmp = 1;
00107 ::setsockopt(m_serverconn, IPPROTO_TCP, 1,
00108 (char *) &tmp, sizeof(int));
00109 tmp = 0;
00110 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
00111 (char*) &tmp, sizeof(int));
00112 }
00113
00114 void*
00115 runConnClass(void* arg)
00116 {
00117 AcceptThread* acceptThread = (AcceptThread*)(arg);
00118 acceptThread->acceptConnection();
00119 ::pthread_exit(NULL);
00120 return 0;
00121 }
00122
00123 int
00124 _pipe(int *fds)
00125 {
00126 int svrfd, lerrno, connectfd;
00127 size_t val;
00128 struct sockaddr_in sin;
00129
00130 svrfd = socket( AF_INET, SOCK_STREAM, 0 );
00131 sin.sin_family = AF_INET;
00132 sin.sin_addr.s_addr = htonl( 0x7f000001 );
00133 sin.sin_port = 0;
00134 memset(sin.sin_zero, 0, 8 );
00135 if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1)
00136 {
00137 int lerrno = errno;
00138 ::close(svrfd);
00139 fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
00140 return -1;
00141 }
00142 if (listen(svrfd, 1) == -1)
00143 {
00144 int lerrno = errno;
00145 ::close(svrfd);
00146 return -1;
00147 }
00148 val = sizeof(struct sockaddr_in);
00149 if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1)
00150 {
00151 int lerrno = errno;
00152 fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
00153 ::close(svrfd);
00154 return -1;
00155 }
00156
00157 AcceptThread* pat = new AcceptThread(svrfd);
00158 pthread_t athread;
00159
00160
00161 pthread_create(&athread, NULL, runConnClass, pat);
00162
00163 int clientfd = socket(AF_INET, SOCK_STREAM, 0);
00164 if (clientfd == -1)
00165 {
00166 delete pat;
00167 return -1;
00168 }
00169
00170
00171 struct sockaddr_in csin;
00172 csin.sin_family = AF_INET;
00173 csin.sin_addr.s_addr = htonl(0x7f000001);
00174 csin.sin_port = sin.sin_port;
00175 if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
00176 {
00177 delete pat;
00178 return -1;
00179 }
00180
00181 #define TCP_NODELAY 1
00182 int tmp = 1;
00183
00184
00185
00186 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
00187 tmp = 0;
00188 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
00189
00190 void* threadResult;
00191
00192 ::pthread_join(athread, &threadResult);
00193
00194 ::close(svrfd);
00195 fds[0] = pat->getConnectFD();
00196 fds[1] = clientfd;
00197 delete pat;
00198 return 0;
00199 }
00200 }
00201 #endif // OW_NETWARE
00202
00204
00205 UnnamedPipeRef
00206 UnnamedPipe::createUnnamedPipe(EOpen doOpen)
00207 {
00208 return UnnamedPipeRef(new PosixUnnamedPipe(doOpen));
00209 }
00211 PosixUnnamedPipe::PosixUnnamedPipe(EOpen doOpen)
00212 #ifndef OW_WIN32
00213 : m_blocking(E_BLOCKING)
00214 #endif
00215 {
00216 #ifdef OW_WIN32
00217 m_blocking[0] = m_blocking[1] = E_BLOCKING;
00218 #endif
00219 m_fds[0] = m_fds[1] = -1;
00220 if (doOpen)
00221 {
00222 open();
00223 }
00224 setTimeouts(60 * 10);
00225 setBlocking(E_BLOCKING);
00226 }
00227
00229 PosixUnnamedPipe::~PosixUnnamedPipe()
00230 {
00231 close();
00232 }
00234 void
00235 PosixUnnamedPipe::setBlocking(EBlockingMode outputIsBlocking)
00236 {
00237 #ifdef OW_WIN32
00238
00239 OW_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00240
00241 m_blocking[0] = outputIsBlocking;
00242 m_blocking[1] = outputIsBlocking;
00243
00244
00245
00246
00247
00248
00249 return;
00250 #else
00251
00252 OW_ASSERT(m_fds[0] != -1 && m_fds[1] != -1);
00253
00254 m_blocking = outputIsBlocking;
00255
00256 for (size_t i = 0; i <= 1; ++i)
00257 {
00258 int fdflags = fcntl(m_fds[i], F_GETFL, 0);
00259 if (fdflags == -1)
00260 {
00261 OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00262 }
00263 if (outputIsBlocking == E_BLOCKING)
00264 {
00265 fdflags &= !O_NONBLOCK;
00266 }
00267 else
00268 {
00269 fdflags |= O_NONBLOCK;
00270 }
00271 if (fcntl(m_fds[i], F_SETFL, fdflags) == -1)
00272 {
00273 OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00274 }
00275 }
00276
00277 #endif
00278 }
00280 void
00281 PosixUnnamedPipe::setOutputBlocking(bool outputIsBlocking)
00282 {
00283 #ifdef OW_WIN32
00284
00285 OW_ASSERT(m_fds[1] != -1);
00286
00287 m_blocking[1] = outputIsBlocking ? E_BLOCKING : E_NONBLOCKING ;
00288
00289
00290
00291
00292
00293
00294 return;
00295 #else
00296
00297 OW_ASSERT(m_fds[1] != -1);
00298
00299 m_blocking = outputIsBlocking ? E_BLOCKING : E_NONBLOCKING ;
00300 int fdflags = fcntl(m_fds[1], F_GETFL, 0);
00301 if (fdflags == -1)
00302 {
00303 OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00304 }
00305 if (outputIsBlocking)
00306 {
00307 fdflags ^= O_NONBLOCK;
00308 }
00309 else
00310 {
00311 fdflags |= O_NONBLOCK;
00312 }
00313 if (fcntl(m_fds[1], F_SETFL, fdflags) == -1)
00314 {
00315 OW_THROW_ERRNO_MSG(IOException, "Failed to set pipe to non-blocking");
00316 }
00317 #endif
00318 }
00320 void
00321 PosixUnnamedPipe::open()
00322 {
00323 if (m_fds[0] != -1)
00324 {
00325 close();
00326 }
00327 #if defined(OW_WIN32)
00328 HANDLE pipe = CreateNamedPipe( "\\\\.\\pipe\\TestPipe",
00329 PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED,
00330 PIPE_TYPE_MESSAGE,
00331 PIPE_UNLIMITED_INSTANCES,
00332 2560,
00333 2560,
00334 NMPWAIT_USE_DEFAULT_WAIT,
00335 NULL );
00336
00337 HANDLE client = CreateFile( "\\\\.\\pipe\\TestPipe",
00338 GENERIC_READ,
00339 FILE_SHARE_READ,
00340 NULL,
00341 OPEN_EXISTING,
00342 FILE_FLAG_OVERLAPPED,
00343 NULL );
00344
00345 HANDLE event1 = CreateEvent(NULL, TRUE, FALSE, NULL);
00346 HANDLE event2 = CreateEvent(NULL, TRUE, FALSE, NULL);
00347
00348
00349 BOOL bConnected = ConnectNamedPipe( pipe, NULL );
00350 if( !bConnected && GetLastError() == ERROR_PIPE_CONNECTED )
00351 bConnected = TRUE;
00352
00353 BOOL bSuccess =
00354 pipe != INVALID_HANDLE_VALUE &&
00355 client != INVALID_HANDLE_VALUE &&
00356 event1 != INVALID_HANDLE_VALUE &&
00357 event2 != INVALID_HANDLE_VALUE &&
00358 bConnected;
00359
00360 if( !bSuccess )
00361 {
00362 CloseHandle(pipe);
00363 CloseHandle(client);
00364 CloseHandle(event1);
00365 CloseHandle(event2);
00366 }
00367 else
00368 {
00369 m_fds[0] = (int)client;
00370 m_fds[1] = (int)pipe;
00371 m_events[0] = (int)event1;
00372 m_events[1] = (int)event2;
00373 }
00374
00375 if( !bSuccess )
00376
00377 #elif defined(OW_NETWARE)
00378 if (_pipe(m_fds) == -1)
00379 #else
00380 if (::pipe(m_fds) == -1)
00381 #endif
00382 {
00383 m_fds[0] = m_fds[1] = -1;
00384 OW_THROW(UnnamedPipeException, ::strerror(errno));
00385 }
00386 }
00388 int
00389 PosixUnnamedPipe::close()
00390 {
00391 int rc = -1;
00392 if (m_fds[0] != -1)
00393 {
00394 #ifdef OW_WIN32
00395 HANDLE h = (HANDLE)m_fds[0];
00396 HANDLE e = (HANDLE)m_events[0];
00397 if( CloseHandle(h) && CloseHandle(e) )
00398 rc = 0;
00399 #else
00400 rc = _CLOSE(m_fds[0]);
00401 #endif
00402 m_fds[0] = -1;
00403 }
00404 if (m_fds[1] != -1)
00405 {
00406 #ifdef OW_WIN32
00407 HANDLE h = (HANDLE)m_fds[1];
00408 HANDLE e = (HANDLE)m_events[1];
00409 if( CloseHandle(h) && CloseHandle(e) )
00410 rc = 0;
00411 #else
00412 rc = _CLOSE(m_fds[1]);
00413 #endif
00414 m_fds[1] = -1;
00415 }
00416 return rc;
00417 }
00419 bool
00420 PosixUnnamedPipe::isOpen() const
00421 {
00422 return (m_fds[0] != -1) || (m_fds[1] != -1);
00423 }
00424
00426 int
00427 PosixUnnamedPipe::closeInputHandle()
00428 {
00429 int rc = -1;
00430 if (m_fds[0] != -1)
00431 {
00432 #ifdef OW_WIN32
00433 HANDLE h = (HANDLE)m_fds[0];
00434 HANDLE e = (HANDLE)m_events[0];
00435 if( CloseHandle(h) && CloseHandle(e) )
00436 rc = 0;
00437 #else
00438 rc = _CLOSE(m_fds[0]);
00439 #endif
00440 m_fds[0] = -1;
00441 }
00442 return rc;
00443 }
00445 int
00446 PosixUnnamedPipe::closeOutputHandle()
00447 {
00448 int rc = -1;
00449 if (m_fds[1] != -1)
00450 {
00451 #ifdef OW_WIN32
00452 HANDLE h = (HANDLE)m_fds[1];
00453 HANDLE e = (HANDLE)m_events[1];
00454 if( CloseHandle(h) && CloseHandle(e) )
00455 rc = 0;
00456 #else
00457 rc = _CLOSE(m_fds[1]);
00458 #endif
00459 m_fds[1] = -1;
00460 }
00461 return rc;
00462 }
00464 int
00465 PosixUnnamedPipe::write(const void* data, int dataLen, bool errorAsException)
00466 {
00467 int rc = -1;
00468 if (m_fds[1] != -1)
00469 {
00470 #ifndef OW_WIN32
00471 if (m_blocking == E_BLOCKING)
00472 {
00473 rc = SocketUtils::waitForIO(m_fds[1], m_writeTimeout, SocketFlags::E_WAIT_FOR_OUTPUT);
00474 if (rc != 0)
00475 {
00476 if (errorAsException)
00477 {
00478 OW_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00479 }
00480 else
00481 {
00482 return rc;
00483 }
00484 }
00485 }
00486 rc = _WRITE(m_fds[1], data, dataLen);
00487 #else
00488 BOOL bSuccess = FALSE;
00489
00490 OVERLAPPED ovl;
00491
00492 ovl.hEvent = (HANDLE)m_events[1];
00493 ovl.Offset = 0;
00494 ovl.OffsetHigh = 0;
00495
00496 bSuccess = WriteFile(
00497 (HANDLE)m_fds[1],
00498 data,
00499 dataLen,
00500 NULL,
00501 &ovl);
00502
00503 if( bSuccess && m_blocking[1] == E_BLOCKING )
00504 {
00505 bSuccess = WaitForSingleObject( (HANDLE)m_events[1], INFINITE ) == WAIT_OBJECT_0;
00506 }
00507
00508 if( bSuccess )
00509 rc = 0;
00510 #endif
00511 }
00512 if (errorAsException && rc == -1)
00513 {
00514 OW_THROW_ERRNO_MSG(IOException, "pipe write failed.");
00515 }
00516 return rc;
00517 }
00519 int
00520 PosixUnnamedPipe::read(void* buffer, int bufferLen, bool errorAsException)
00521 {
00522 int rc = -1;
00523 if (m_fds[0] != -1)
00524 {
00525 #ifndef OW_WIN32
00526 if (m_blocking == E_BLOCKING)
00527 {
00528 rc = SocketUtils::waitForIO(m_fds[0], m_readTimeout, SocketFlags::E_WAIT_FOR_INPUT);
00529 if (rc != 0)
00530 {
00531 if (errorAsException)
00532 {
00533 OW_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00534 }
00535 else
00536 {
00537 return rc;
00538 }
00539 }
00540 }
00541 rc = _READ(m_fds[0], buffer, bufferLen);
00542 #else
00543 BOOL bSuccess = FALSE;
00544
00545 OVERLAPPED ovl;
00546
00547 ovl.hEvent = (HANDLE)m_events[0];
00548 ovl.Offset = 0;
00549 ovl.OffsetHigh = 0;
00550
00551 bSuccess = ReadFile(
00552 (HANDLE)m_fds[0],
00553 buffer,
00554 bufferLen,
00555 NULL,
00556 &ovl);
00557
00558 if( bSuccess && m_blocking[0] == E_BLOCKING )
00559 {
00560 bSuccess = WaitForSingleObject( (HANDLE)m_events[0], INFINITE ) == WAIT_OBJECT_0;
00561 }
00562
00563 if( bSuccess )
00564 rc = 0;
00565 #endif
00566 }
00567 if (errorAsException && rc == -1)
00568 {
00569 OW_THROW_ERRNO_MSG(IOException, "pipe read failed.");
00570 }
00571 return rc;
00572 }
00574 Select_t
00575 PosixUnnamedPipe::getSelectObj() const
00576 {
00577 #ifdef OW_WIN32
00578 Select_t selectObj;
00579 selectObj.event = (HANDLE)m_events[0];
00580 selectObj.sockfd = INVALID_SOCKET;
00581 selectObj.networkevents = 0;
00582 selectObj.doreset = false;
00583
00584 return selectObj;
00585 #else
00586 return m_fds[0];
00587 #endif
00588 }
00589
00591 Select_t
00592 PosixUnnamedPipe::getWriteSelectObj() const
00593 {
00594 #ifdef OW_WIN32
00595 Select_t selectObj;
00596 selectObj.event = (HANDLE)m_events[1];
00597 selectObj.sockfd = INVALID_SOCKET;
00598 selectObj.networkevents = 0;
00599 selectObj.doreset = false;
00600
00601 return selectObj;
00602 #else
00603 return m_fds[1];
00604 #endif
00605 }
00606
00607 }
00608