00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00035 #include "OW_config.h"
00036
00037 #if !defined(OW_WIN32)
00038
00039 #include "OW_SocketBaseImpl.hpp"
00040 #include "OW_SocketUtils.hpp"
00041 #include "OW_Format.hpp"
00042 #include "OW_Assertion.hpp"
00043 #include "OW_IOException.hpp"
00044 #include "OW_Mutex.hpp"
00045 #include "OW_MutexLock.hpp"
00046 #include "OW_PosixUnnamedPipe.hpp"
00047 #include "OW_Socket.hpp"
00048 #include "OW_Thread.hpp"
00049 #include "OW_DateTime.hpp"
00050
00051 extern "C"
00052 {
00053 #ifdef OW_HAVE_SYS_SELECT_H
00054 #include <sys/select.h>
00055 #endif
00056
00057 #include <sys/types.h>
00058 #include <sys/time.h>
00059 #include <sys/socket.h>
00060 #include <sys/stat.h>
00061 #include <netdb.h>
00062 #include <arpa/inet.h>
00063 #include <unistd.h>
00064 #include <fcntl.h>
00065 #include <netinet/in.h>
00066 }
00067
00068 #include <fstream>
00069 #include <cerrno>
00070 #include <cstdio>
00071
00072 namespace OW_NAMESPACE
00073 {
00074
00075 using std::istream;
00076 using std::ostream;
00077 using std::iostream;
00078 using std::ifstream;
00079 using std::ofstream;
00080 using std::fstream;
00081 using std::ios;
00082 String SocketBaseImpl::m_traceFileOut;
00083 String SocketBaseImpl::m_traceFileIn;
00084
00086 SocketBaseImpl::SocketBaseImpl()
00087 : SelectableIFC()
00088 , IOIFC()
00089 , m_isConnected(false)
00090 , m_sockfd(-1)
00091 , m_localAddress()
00092 , m_peerAddress()
00093 , m_recvTimeoutExprd(false)
00094 , m_streamBuf(this)
00095 , m_in(&m_streamBuf)
00096 , m_out(&m_streamBuf)
00097 , m_inout(&m_streamBuf)
00098 , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00099 , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00100 , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00101 {
00102 m_out.exceptions(std::ios::badbit);
00103 m_inout.exceptions(std::ios::badbit);
00104 }
00106 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd,
00107 SocketAddress::AddressType addrType)
00108 : SelectableIFC()
00109 , IOIFC()
00110 , m_isConnected(true)
00111 , m_sockfd(fd)
00112 , m_localAddress(SocketAddress::getAnyLocalHost())
00113 , m_peerAddress(SocketAddress::allocEmptyAddress(addrType))
00114 , m_recvTimeoutExprd(false)
00115 , m_streamBuf(this)
00116 , m_in(&m_streamBuf)
00117 , m_out(&m_streamBuf)
00118 , m_inout(&m_streamBuf)
00119 , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00120 , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00121 , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00122 {
00123 m_out.exceptions(std::ios::badbit);
00124 m_inout.exceptions(std::ios::badbit);
00125 if (addrType == SocketAddress::INET)
00126 {
00127 fillInetAddrParms();
00128 }
00129 else if (addrType == SocketAddress::UDS)
00130 {
00131 fillUnixAddrParms();
00132 }
00133 else
00134 {
00135 OW_ASSERT(0);
00136 }
00137 }
00139 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr)
00140 : SelectableIFC()
00141 , IOIFC()
00142 , m_isConnected(false)
00143 , m_sockfd(-1)
00144 , m_localAddress(SocketAddress::getAnyLocalHost())
00145 , m_peerAddress(addr)
00146 , m_recvTimeoutExprd(false)
00147 , m_streamBuf(this)
00148 , m_in(&m_streamBuf)
00149 , m_out(&m_streamBuf)
00150 , m_inout(&m_streamBuf)
00151 , m_recvTimeout(Socket::INFINITE_TIMEOUT)
00152 , m_sendTimeout(Socket::INFINITE_TIMEOUT)
00153 , m_connectTimeout(Socket::INFINITE_TIMEOUT)
00154 {
00155 m_out.exceptions(std::ios::badbit);
00156 m_inout.exceptions(std::ios::badbit);
00157 connect(m_peerAddress);
00158 }
00160 SocketBaseImpl::~SocketBaseImpl()
00161 {
00162 try
00163 {
00164 disconnect();
00165 }
00166 catch (...)
00167 {
00168
00169 }
00170 }
00172 Select_t
00173 SocketBaseImpl::getSelectObj() const
00174 {
00175 return m_sockfd;
00176 }
00178 void
00179 SocketBaseImpl::connect(const SocketAddress& addr)
00180 {
00181 if (m_isConnected)
00182 {
00183 disconnect();
00184 }
00185 m_streamBuf.reset();
00186 m_in.clear();
00187 m_out.clear();
00188 m_inout.clear();
00189 OW_ASSERT(addr.getType() == SocketAddress::INET
00190 || addr.getType() == SocketAddress::UDS);
00191 if ((m_sockfd = ::socket(addr.getType() == SocketAddress::INET ?
00192 AF_INET : PF_UNIX, SOCK_STREAM, 0)) == -1)
00193 {
00194 OW_THROW_ERRNO_MSG(SocketException,
00195 "Failed to create a socket");
00196 }
00197
00198 if (::fcntl(m_sockfd, F_SETFD, FD_CLOEXEC) == -1)
00199 {
00200 ::close(m_sockfd);
00201 OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket");
00202 }
00203 int n;
00204 int flags = ::fcntl(m_sockfd, F_GETFL, 0);
00205 ::fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK);
00206 if ((n = ::connect(m_sockfd, addr.getNativeForm(),
00207 addr.getNativeFormSize())) < 0)
00208 {
00209 if (errno != EINPROGRESS)
00210 {
00211 ::close(m_sockfd);
00212 OW_THROW_ERRNO_MSG(SocketException,
00213 Format("Failed to connect to: %1", addr.toString()).c_str());
00214 }
00215 }
00216 if (n == -1)
00217 {
00218
00219
00220 PosixUnnamedPipeRef lUPipe;
00221 int pipefd = -1;
00222 if (Socket::getShutDownMechanism())
00223 {
00224 UnnamedPipeRef foo = Socket::getShutDownMechanism();
00225 lUPipe = foo.cast_to<PosixUnnamedPipe>();
00226 OW_ASSERT(lUPipe);
00227 pipefd = lUPipe->getInputHandle();
00228 }
00229 fd_set rset, wset;
00230
00231 UInt32 remainingMsWait = m_connectTimeout != Socket::INFINITE_TIMEOUT ? m_connectTimeout * 1000 : ~0U;
00232 do
00233 {
00234 FD_ZERO(&rset);
00235 if (m_sockfd < 0 || m_sockfd >= FD_SETSIZE)
00236 {
00237 OW_THROW(SocketException, "Invalid fd (< 0 || >= FD_SETSIZE)");
00238 }
00239 FD_SET(m_sockfd, &rset);
00240 if (pipefd != -1 && pipefd < FD_SETSIZE)
00241 {
00242 FD_SET(pipefd, &rset);
00243 }
00244 FD_ZERO(&wset);
00245 FD_SET(m_sockfd, &wset);
00246 int maxfd = m_sockfd > pipefd ? m_sockfd : pipefd;
00247
00248 const UInt32 waitMs = 100;
00249 struct timeval tv;
00250 tv.tv_sec = 0;
00251 tv.tv_usec = std::min((waitMs % 1000), remainingMsWait) * 1000;
00252
00253 Thread::testCancel();
00254 n = ::select(maxfd+1, &rset, &wset, NULL, &tv);
00255
00256 if (m_connectTimeout != Socket::INFINITE_TIMEOUT)
00257 {
00258 remainingMsWait -= std::min(waitMs, remainingMsWait);
00259 }
00260 } while (n == 0 && remainingMsWait > 0);
00261
00262 if (n == 0)
00263 {
00264 ::close(m_sockfd);
00265 OW_THROW(SocketException, "SocketBaseImpl::connect() select timedout");
00266 }
00267 else if (n == -1)
00268 {
00269 ::close(m_sockfd);
00270 if (errno == EINTR)
00271 {
00272 Thread::testCancel();
00273 }
00274 OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed");
00275 }
00276 if (pipefd != -1 && FD_ISSET(pipefd, &rset))
00277 {
00278 ::close(m_sockfd);
00279 OW_THROW(SocketException, "Sockets have been shutdown");
00280 }
00281 else if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset))
00282 {
00283 int error = 0;
00284 socklen_t len = sizeof(error);
00285 if (::getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &error,
00286 &len) < 0)
00287 {
00288 ::close(m_sockfd);
00289 OW_THROW_ERRNO_MSG(SocketException,
00290 "SocketBaseImpl::connect() getsockopt() failed");
00291 }
00292 if (error != 0)
00293 {
00294 ::close(m_sockfd);
00295 errno = error;
00296 OW_THROW_ERRNO_MSG(SocketException,
00297 "SocketBaseImpl::connect() failed");
00298 }
00299 }
00300 else
00301 {
00302 ::close(m_sockfd);
00303 OW_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, m_sockfd not in FD set.");
00304 }
00305 }
00306 ::fcntl(m_sockfd, F_SETFL, flags);
00307 m_isConnected = true;
00308 m_peerAddress = addr;
00309 if (addr.getType() == SocketAddress::INET)
00310 {
00311 fillInetAddrParms();
00312 }
00313 else if (addr.getType() == SocketAddress::UDS)
00314 {
00315 fillUnixAddrParms();
00316 }
00317 else
00318 {
00319 OW_ASSERT(0);
00320 }
00321 }
00323 void
00324 SocketBaseImpl::disconnect()
00325 {
00326 if (m_in)
00327 {
00328 m_in.clear(ios::eofbit);
00329 }
00330 if (m_out)
00331 {
00332 m_out.clear(ios::eofbit);
00333 }
00334 if (m_inout)
00335 {
00336 m_inout.clear(ios::eofbit);
00337 }
00338 if (m_sockfd != -1 && m_isConnected)
00339 {
00340 ::close(m_sockfd);
00341 m_isConnected = false;
00342 m_sockfd = -1;
00343 }
00344 }
00346
00347 void
00348 SocketBaseImpl::fillInetAddrParms()
00349 {
00350 socklen_t len;
00351 InetSocketAddress_t addr;
00352 memset(&addr, 0, sizeof(addr));
00353 len = sizeof(addr);
00354 if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00355 {
00356
00357
00358
00359 }
00360 else
00361 {
00362 m_localAddress.assignFromNativeForm(&addr, len);
00363 }
00364 len = sizeof(addr);
00365 if (getpeername(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00366 {
00367
00368
00369
00370 }
00371 else
00372 {
00373 m_peerAddress.assignFromNativeForm(&addr, len);
00374 }
00375 }
00377 void
00378 SocketBaseImpl::fillUnixAddrParms()
00379 {
00380 socklen_t len;
00381 UnixSocketAddress_t addr;
00382 memset(&addr, 0, sizeof(addr));
00383 len = sizeof(addr);
00384 if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00385 {
00386 OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname");
00387 }
00388 m_localAddress.assignFromNativeForm(&addr, len);
00389 m_peerAddress.assignFromNativeForm(&addr, len);
00390 }
00391 static Mutex guard;
00393 int
00394 SocketBaseImpl::write(const void* dataOut, int dataOutLen, bool errorAsException)
00395 {
00396 int rc = 0;
00397 bool isError = false;
00398 if (m_isConnected)
00399 {
00400 isError = waitForOutput(m_sendTimeout);
00401 if (isError)
00402 {
00403 rc = -1;
00404 }
00405 else
00406 {
00407 rc = writeAux(dataOut, dataOutLen);
00408 if (!m_traceFileOut.empty() && rc > 0)
00409 {
00410 MutexLock ml(guard);
00411 ofstream traceFile(m_traceFileOut.c_str(), std::ios::app);
00412 if (!traceFile)
00413 {
00414 OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00415 }
00416 if (!traceFile.write(static_cast<const char*>(dataOut), rc))
00417 {
00418 OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00419 }
00420
00421 ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00422 if (!comboTraceFile)
00423 {
00424 OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00425 }
00426 DateTime curDateTime;
00427 curDateTime.setToCurrent();
00428 comboTraceFile << "\n--->Out " << rc << " bytes at " << curDateTime.toString("%X") <<
00429 '.' << curDateTime.getMicrosecond() << "<---\n";
00430 if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc))
00431 {
00432 OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00433 }
00434 }
00435 }
00436 }
00437 else
00438 {
00439 rc = -1;
00440 }
00441 if (rc < 0 && errorAsException)
00442 {
00443 OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write");
00444 }
00445 return rc;
00446 }
00448 int
00449 SocketBaseImpl::read(void* dataIn, int dataInLen, bool errorAsException)
00450 {
00451 int rc = 0;
00452 bool isError = false;
00453 if (m_isConnected)
00454 {
00455 isError = waitForInput(m_recvTimeout);
00456 if (isError)
00457 {
00458 rc = -1;
00459 }
00460 else
00461 {
00462 rc = readAux(dataIn, dataInLen);
00463 if (!m_traceFileIn.empty() && rc > 0)
00464 {
00465 MutexLock ml(guard);
00466 ofstream traceFile(m_traceFileIn.c_str(), std::ios::app);
00467 if (!traceFile)
00468 {
00469 OW_THROW_ERRNO_MSG(IOException, "Failed opening tracefile");
00470 }
00471 if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00472 {
00473 OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00474 }
00475
00476 ofstream comboTraceFile(String(m_traceFileOut + "Combo").c_str(), std::ios::app);
00477 if (!comboTraceFile)
00478 {
00479 OW_THROW_ERRNO_MSG(IOException, "Failed opening socket dump file");
00480 }
00481 DateTime curDateTime;
00482 curDateTime.setToCurrent();
00483 comboTraceFile << "\n--->In " << rc << " bytes at " << curDateTime.toString("%X") <<
00484 '.' << curDateTime.getMicrosecond() << "<---\n";
00485 if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00486 {
00487 OW_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00488 }
00489 }
00490 }
00491 }
00492 else
00493 {
00494 rc = -1;
00495 }
00496 if (rc < 0)
00497 {
00498 if (errorAsException)
00499 {
00500 OW_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read");
00501 }
00502 }
00503 return rc;
00504 }
00506 bool
00507 SocketBaseImpl::waitForInput(int timeOutSecs)
00508 {
00509 int rval = SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_INPUT);
00510 if (rval == ETIMEDOUT)
00511 {
00512 m_recvTimeoutExprd = true;
00513 }
00514 else
00515 {
00516 m_recvTimeoutExprd = false;
00517 }
00518 return (rval != 0);
00519 }
00521 bool
00522 SocketBaseImpl::waitForOutput(int timeOutSecs)
00523 {
00524 return SocketUtils::waitForIO(m_sockfd, timeOutSecs, SocketFlags::E_WAIT_FOR_OUTPUT) != 0;
00525 }
00527 istream&
00528 SocketBaseImpl::getInputStream()
00529 {
00530 return m_in;
00531 }
00533 ostream&
00534 SocketBaseImpl::getOutputStream()
00535 {
00536 return m_out;
00537 }
00539 iostream&
00540 SocketBaseImpl::getIOStream()
00541 {
00542 return m_inout;
00543 }
00545
00546 void
00547 SocketBaseImpl::setDumpFiles(const String& in, const String& out)
00548 {
00549 m_traceFileOut = out;
00550 m_traceFileIn = in;
00551 }
00552
00553 }
00554
00555 #endif // #if !defined(OW_WIN32)
00556