00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00036 #include "OW_config.h"
00037 #include "OW_Select.hpp"
00038 #include "OW_AutoPtr.hpp"
00039 #include "OW_Assertion.hpp"
00040 #include "OW_Thread.hpp"
00041
00042 #if defined(OW_WIN32)
00043 #include <cassert>
00044 #endif
00045
00046 extern "C"
00047 {
00048
00049 #ifndef OW_WIN32
00050 #ifdef OW_HAVE_SYS_EPOLL_H
00051 #include <sys/epoll.h>
00052 #endif
00053 #if defined (OW_HAVE_SYS_POLL_H)
00054 #include <sys/poll.h>
00055 #endif
00056 #if defined (OW_HAVE_SYS_SELECT_H)
00057 #include <sys/select.h>
00058 #endif
00059 #endif
00060
00061 #ifdef OW_HAVE_SYS_TIME_H
00062 #include <sys/time.h>
00063 #endif
00064
00065 #include <sys/types.h>
00066
00067 #ifdef OW_HAVE_UNISTD_H
00068 #include <unistd.h>
00069 #endif
00070
00071 #include <errno.h>
00072 }
00073
00074 namespace OW_NAMESPACE
00075 {
00076
00077 namespace Select
00078 {
00079 #if defined(OW_WIN32)
00080
00081 int
00082 selectRW(SelectObjectArray& selarray, UInt32 ms)
00083 {
00084 int rc;
00085 size_t hcount = static_cast<DWORD>(selarray.size());
00086 AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
00087
00088 size_t handleidx = 0;
00089 for (size_t i = 0; i < selarray.size(); i++, handleidx++)
00090 {
00091 if(selarray[i].s.sockfd != INVALID_SOCKET
00092 && selarray[i].s.networkevents)
00093 {
00094 ::WSAEventSelect(selarray[i].s.sockfd,
00095 selarray[i].s.event, selarray[i].s.networkevents);
00096 }
00097
00098 hdls[handleidx] = selarray[i].s.event;
00099 }
00100
00101 DWORD timeout = (ms != ~0U) ? ms : INFINITE;
00102 DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timeout);
00103
00104 assert(cc != WAIT_ABANDONED);
00105
00106 switch (cc)
00107 {
00108 case WAIT_FAILED:
00109 rc = Select::SELECT_ERROR;
00110 break;
00111 case WAIT_TIMEOUT:
00112 rc = Select::SELECT_TIMEOUT;
00113 break;
00114 default:
00115 rc = cc - WAIT_OBJECT_0;
00116
00117
00118
00119 if(selarray[rc].s.sockfd != INVALID_SOCKET)
00120 {
00121 if(selarray[rc].s.networkevents
00122 && selarray[rc].s.doreset == false)
00123 {
00124 ::WSAEventSelect(selarray[rc].s.sockfd,
00125 selarray[rc].s.event, selarray[rc].s.networkevents);
00126 }
00127 else
00128 {
00129
00130 ::WSAEventSelect(selarray[rc].s.sockfd,
00131 selarray[rc].s.event, 0);
00132 u_long ioctlarg = 0;
00133 ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
00134 }
00135 }
00136 break;
00137 }
00138
00139 if( rc < 0 )
00140 return rc;
00141
00142 int availableCount = 0;
00143 for (size_t i = 0; i < selarray.size(); i++)
00144 {
00145 if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
00146 {
00147 if( selarray[i].waitForRead )
00148 selarray[i].readAvailable = true;
00149 if( selarray[i].waitForWrite )
00150 selarray[i].writeAvailable = true;
00151 ++availableCount;
00152 }
00153 else
00154 {
00155 selarray[i].readAvailable = false;
00156 selarray[i].writeAvailable = false;
00157 }
00158 }
00159 return availableCount;
00160 }
00161
00162
00163 #else
00164
00166
00167 int
00168 selectRWEpoll(SelectObjectArray& selarray, UInt32 ms)
00169 {
00170 #ifdef OW_HAVE_SYS_EPOLL_H
00171 int lerrno, ecc = 0;
00172 int timeout;
00173 AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
00174 int epfd = epoll_create(selarray.size());
00175 if(epfd == -1)
00176 {
00177 if (errno == ENOSYS)
00178 {
00179 return SELECT_NOT_IMPLEMENTED;
00180 }
00181
00182 return Select::SELECT_ERROR;
00183 }
00184
00185 UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
00186 UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00187 for (size_t i = 0; i < selarray.size(); i++)
00188 {
00189 OW_ASSERT(selarray[i].s >= 0);
00190 selarray[i].readAvailable = false;
00191 selarray[i].writeAvailable = false;
00192 selarray[i].wasError = false;
00193 events[i].data.u32 = i;
00194 events[i].events = 0;
00195 if(selarray[i].waitForRead)
00196 {
00197 events[i].events |= read_events;
00198 }
00199 if(selarray[i].waitForWrite)
00200 {
00201 events[i].events |= write_events;
00202 }
00203
00204 if(epoll_ctl(epfd, EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
00205 {
00206 int errnum = errno;
00207 ::close(epfd);
00208
00209 return errnum == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
00210 }
00211 }
00212
00213
00214 const Int32 loopMicroSeconds = 100 * 1000;
00215 timeval now, end;
00216 gettimeofday(&now, NULL);
00217 end = now;
00218 end.tv_sec += ms / 1000;
00219 end.tv_usec += (ms % 1000) * 1000;
00220
00221 while ((ecc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00222 || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00223 {
00224 timeval tv;
00225 tv.tv_sec = end.tv_sec - now.tv_sec;
00226 if (end.tv_usec >= now.tv_usec)
00227 {
00228 tv.tv_usec = end.tv_usec - now.tv_usec;
00229 }
00230 else
00231 {
00232 tv.tv_sec--;
00233 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00234 }
00235
00236 if ((tv.tv_sec != 0)
00237 || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00238 {
00239 tv.tv_sec = 0;
00240 tv.tv_usec = loopMicroSeconds;
00241 }
00242
00243 timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00244 Thread::testCancel();
00245 ecc = epoll_wait(epfd, events.get(), selarray.size(), timeout);
00246 lerrno = errno;
00247 Thread::testCancel();
00248 gettimeofday(&now, NULL);
00249 }
00250
00251 ::close(epfd);
00252 if (ecc < 0)
00253 {
00254 return (lerrno == EINTR) ? Select::SELECT_INTERRUPTED : Select::SELECT_ERROR;
00255 }
00256 if (ecc == 0)
00257 {
00258 return Select::SELECT_TIMEOUT;
00259 }
00260
00261 for(int i = 0; i < ecc; i++)
00262 {
00263 SelectObject & so = selarray[events[i].data.u32];
00264 so.readAvailable = so.waitForRead && (events[i].events & read_events);
00265 so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
00266 }
00267
00268 return ecc;
00269 #else
00270 return SELECT_NOT_IMPLEMENTED;
00271 #endif
00272 }
00273
00275
00276 int
00277 selectRWPoll(SelectObjectArray& selarray, UInt32 ms)
00278 {
00279 #if defined (OW_HAVE_SYS_POLL_H)
00280 int lerrno, rc = 0;
00281
00282 AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
00283
00284
00285 timeval now, end;
00286 const Int32 loopMicroSeconds = 100 * 1000;
00287 gettimeofday(&now, NULL);
00288 end = now;
00289 end.tv_sec += ms / 1000;
00290 end.tv_usec += (ms % 1000) * 1000;
00291 while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00292 || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00293 {
00294 for (size_t i = 0; i < selarray.size(); i++)
00295 {
00296 OW_ASSERT(selarray[i].s >= 0);
00297 selarray[i].readAvailable = false;
00298 selarray[i].writeAvailable = false;
00299 selarray[i].wasError = false;
00300 pfds[i].revents = 0;
00301 pfds[i].fd = selarray[i].s;
00302 pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
00303 if(selarray[i].waitForWrite)
00304 pfds[i].events |= POLLOUT;
00305 }
00306
00307 timeval tv;
00308 tv.tv_sec = end.tv_sec - now.tv_sec;
00309 if (end.tv_usec >= now.tv_usec)
00310 {
00311 tv.tv_usec = end.tv_usec - now.tv_usec;
00312 }
00313 else
00314 {
00315 tv.tv_sec--;
00316 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00317 }
00318
00319 if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00320 {
00321 tv.tv_sec = 0;
00322 tv.tv_usec = loopMicroSeconds;
00323 }
00324
00325
00326 int loopMSecs = tv.tv_sec * 1000 + tv.tv_usec / 1000;
00327
00328 Thread::testCancel();
00329 rc = ::poll(pfds.get(), selarray.size(), loopMSecs);
00330 lerrno = errno;
00331 Thread::testCancel();
00332
00333 gettimeofday(&now, NULL);
00334 }
00335
00336 if (rc < 0)
00337 {
00338 if (lerrno == EINTR)
00339 {
00340 #ifdef OW_NETWARE
00341
00342
00343
00344
00345 pthread_yield();
00346 #endif
00347 return Select::SELECT_INTERRUPTED;
00348 }
00349 else
00350 {
00351 return Select::SELECT_ERROR;
00352 }
00353 }
00354 if (rc == 0)
00355 {
00356 return Select::SELECT_TIMEOUT;
00357 }
00358 for (size_t i = 0; i < selarray.size(); i++)
00359 {
00360 if (pfds[i].revents & (POLLERR | POLLNVAL))
00361 {
00362 selarray[i].wasError = true;
00363 }
00364 else
00365 {
00366 if(selarray[i].waitForRead)
00367 {
00368 selarray[i].readAvailable = (pfds[i].revents &
00369 (POLLIN | POLLPRI | POLLHUP));
00370 }
00371
00372 if(selarray[i].waitForWrite)
00373 {
00374 selarray[i].writeAvailable = (pfds[i].revents &
00375 (POLLOUT | POLLHUP));
00376 }
00377 }
00378 }
00379
00380 return rc;
00381 #else
00382 return SELECT_NOT_IMPLEMENTED;
00383 #endif
00384 }
00386
00387 int
00388 selectRWSelect(SelectObjectArray& selarray, UInt32 ms)
00389 {
00390 #if defined (OW_HAVE_SYS_SELECT_H)
00391 int lerrno, rc = 0;
00392 fd_set ifds;
00393 fd_set ofds;
00394
00395
00396 timeval now, end;
00397 const Int32 loopMicroSeconds = 100 * 1000;
00398 gettimeofday(&now, NULL);
00399 end = now;
00400 end.tv_sec += ms / 1000;
00401 end.tv_usec += (ms % 1000) * 1000;
00402 while ((rc == 0) && ((ms == INFINITE_TIMEOUT) || (now.tv_sec < end.tv_sec)
00403 || ((now.tv_sec == end.tv_sec) && (now.tv_usec <= end.tv_usec))))
00404 {
00405 int maxfd = 0;
00406 FD_ZERO(&ifds);
00407 FD_ZERO(&ofds);
00408 for (size_t i = 0; i < selarray.size(); ++i)
00409 {
00410 int fd = selarray[i].s;
00411 OW_ASSERT(fd >= 0);
00412 if (maxfd < fd)
00413 {
00414 maxfd = fd;
00415 }
00416 if (fd < 0 || fd >= FD_SETSIZE)
00417 {
00418 return Select::SELECT_ERROR;
00419 }
00420 if (selarray[i].waitForRead)
00421 {
00422 FD_SET(fd, &ifds);
00423 }
00424 if (selarray[i].waitForWrite)
00425 {
00426 FD_SET(fd, &ofds);
00427 }
00428 }
00429
00430 timeval tv;
00431 tv.tv_sec = end.tv_sec - now.tv_sec;
00432 if (end.tv_usec >= now.tv_usec)
00433 {
00434 tv.tv_usec = end.tv_usec - now.tv_usec;
00435 }
00436 else
00437 {
00438 tv.tv_sec--;
00439 tv.tv_usec = 1000000 + end.tv_usec - now.tv_usec;
00440 }
00441
00442 if ((tv.tv_sec != 0) || (tv.tv_usec > loopMicroSeconds) || (ms == INFINITE_TIMEOUT))
00443 {
00444 tv.tv_sec = 0;
00445 tv.tv_usec = loopMicroSeconds;
00446 }
00447
00448 Thread::testCancel();
00449 rc = ::select(maxfd+1, &ifds, &ofds, NULL, &tv);
00450 lerrno = errno;
00451 Thread::testCancel();
00452
00453 gettimeofday(&now, NULL);
00454 }
00455
00456 if (rc < 0)
00457 {
00458 if (lerrno == EINTR)
00459 {
00460 #ifdef OW_NETWARE
00461
00462
00463
00464
00465 pthread_yield();
00466 #endif
00467 return Select::SELECT_INTERRUPTED;
00468 }
00469 else
00470 {
00471 return Select::SELECT_ERROR;
00472 }
00473 }
00474 if (rc == 0)
00475 {
00476 return Select::SELECT_TIMEOUT;
00477 }
00478 int availableCount = 0;
00479 int cval;
00480 for (size_t i = 0; i < selarray.size(); i++)
00481 {
00482 selarray[i].wasError = false;
00483 cval = 0;
00484 if (FD_ISSET(selarray[i].s, &ifds))
00485 {
00486 selarray[i].readAvailable = true;
00487 cval = 1;
00488 }
00489 else
00490 {
00491 selarray[i].readAvailable = false;
00492 }
00493
00494 if (FD_ISSET(selarray[i].s, &ofds))
00495 {
00496 selarray[i].writeAvailable = true;
00497 cval = 1;
00498 }
00499 else
00500 {
00501 selarray[i].writeAvailable = false;
00502 }
00503
00504 availableCount += cval;
00505
00506 }
00507
00508 return availableCount;
00509 #else
00510 return SELECT_NOT_IMPLEMENTED;
00511 #endif
00512 }
00513
00514 int
00515 selectRW(SelectObjectArray& selarray, UInt32 ms)
00516 {
00517 int rv = selectRWEpoll(selarray, ms);
00518 if (rv != SELECT_NOT_IMPLEMENTED)
00519 {
00520 return rv;
00521 }
00522
00523 rv = selectRWPoll(selarray, ms);
00524 if (rv != SELECT_NOT_IMPLEMENTED)
00525 {
00526 return rv;
00527 }
00528
00529 rv = selectRWSelect(selarray, ms);
00530 OW_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
00531 return rv;
00532 }
00533
00535 #endif // #else OW_WIN32
00536
00537 int
00538 select(const SelectTypeArray& selarray, UInt32 ms)
00539 {
00540 SelectObjectArray soa;
00541 soa.reserve(selarray.size());
00542 for (size_t i = 0; i < selarray.size(); ++i)
00543 {
00544 SelectObject curObj(selarray[i]);
00545 curObj.waitForRead = true;
00546 soa.push_back(curObj);
00547 }
00548 int rv = selectRW(soa, ms);
00549 if (rv < 0)
00550 {
00551 return rv;
00552 }
00553
00554
00555 for (size_t i = 0; i < soa.size(); ++i)
00556 {
00557 if (soa[i].readAvailable)
00558 {
00559 return i;
00560 }
00561 }
00562 return SELECT_ERROR;
00563 }
00564
00565 }
00566
00567 }
00568