[4] | 1 | /****************************************************************************** |
---|
| 2 | * QUANTA - A toolkit for High Performance Data Sharing |
---|
| 3 | * Copyright (C) 2003 Electronic Visualization Laboratory, |
---|
| 4 | * University of Illinois at Chicago |
---|
| 5 | * |
---|
| 6 | * All rights reserved. |
---|
| 7 | * |
---|
| 8 | * Redistribution and use in source and binary forms, with or without |
---|
| 9 | * modification, are permitted provided that the following conditions are met: |
---|
| 10 | * |
---|
| 11 | * * Redistributions of source code must retain the above copyright |
---|
| 12 | * notice, this list of conditions and the following disclaimer. |
---|
| 13 | * * Redistributions in binary form must reproduce the above |
---|
| 14 | * copyright notice, this list of conditions and the following disclaimer |
---|
| 15 | * in the documentation and/or other materials provided with the distribution. |
---|
| 16 | * * Neither the name of the University of Illinois at Chicago nor |
---|
| 17 | * the names of its contributors may be used to endorse or promote |
---|
| 18 | * products derived from this software without specific prior written permission. |
---|
| 19 | * |
---|
| 20 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
---|
| 21 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
---|
| 22 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
---|
| 23 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
---|
| 24 | * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
---|
| 25 | * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
---|
| 26 | * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
---|
| 27 | * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
---|
| 28 | * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
---|
| 29 | * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
---|
| 30 | * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
| 31 | * |
---|
| 32 | * Direct questions, comments etc about Quanta to cavern@evl.uic.edu |
---|
| 33 | *****************************************************************************/ |
---|
| 34 | |
---|
| 35 | #include "QUANTAnet_tcp_c.hxx" |
---|
| 36 | #include <string.h> |
---|
| 37 | |
---|
| 38 | #ifndef WIN32 |
---|
| 39 | #include <netinet/tcp.h> |
---|
| 40 | #include <signal.h> |
---|
| 41 | #include <sys/errno.h> |
---|
| 42 | #include <sys/socket.h> |
---|
| 43 | #include <sys/time.h> |
---|
| 44 | #ifndef HAVE_UNISTD_H |
---|
| 45 | #include <unistd.h> |
---|
| 46 | #endif |
---|
| 47 | #ifndef errno |
---|
| 48 | extern int errno; |
---|
| 49 | #endif /* errno */ |
---|
| 50 | #endif /* !WIN32 */ |
---|
| 51 | |
---|
| 52 | #ifdef _WIN32_WCE |
---|
| 53 | #include <errno.h> |
---|
| 54 | #endif |
---|
| 55 | |
---|
| 56 | const int QUANTAnet_tcpServer_c::OK = 1; |
---|
| 57 | const int QUANTAnet_tcpServer_c::FAILED = 0; |
---|
| 58 | const int QUANTAnet_tcpServer_c::READ_BUFFER_SIZE = 2; |
---|
| 59 | const int QUANTAnet_tcpServer_c::WRITE_BUFFER_SIZE = 3; |
---|
| 60 | |
---|
| 61 | #ifdef sun |
---|
| 62 | long gethostid(void); |
---|
| 63 | #endif |
---|
| 64 | |
---|
| 65 | #ifdef WIN32 |
---|
| 66 | #define MAXHOSTNAME 128 /* Maximum length of a DNS hostname */ |
---|
| 67 | #define DEFAULT_BUFFER_SIZE 32766 /* Default socket bufffer size */ |
---|
| 68 | #else |
---|
| 69 | #define DEFAULT_BUFFER_SIZE 1024 |
---|
| 70 | #endif /* WIN32 */ |
---|
| 71 | |
---|
| 72 | #ifdef WIN32 |
---|
| 73 | // This function is excerpted from "Windows Sockets Network Programming" |
---|
| 74 | // by Bob Quinn and Dave Shute published from Addison-Wesley. |
---|
| 75 | static long gethostid(void) |
---|
| 76 | { |
---|
| 77 | char szLclHost [MAXHOSTNAME]; |
---|
| 78 | LPHOSTENT lpstHostent; |
---|
| 79 | SOCKADDR_IN stLclAddr; |
---|
| 80 | SOCKADDR_IN stRmtAddr; |
---|
| 81 | int nAddrSize = sizeof(SOCKADDR); |
---|
| 82 | SOCKET hSock; |
---|
| 83 | int nRet; |
---|
| 84 | |
---|
| 85 | /* Init local address (to zero) */ |
---|
| 86 | stLclAddr.sin_addr.s_addr = INADDR_ANY; |
---|
| 87 | |
---|
| 88 | /* Get the local hostname */ |
---|
| 89 | nRet = gethostname(szLclHost, MAXHOSTNAME); |
---|
| 90 | if (nRet != SOCKET_ERROR) { |
---|
| 91 | /* Resolve hostname for local address */ |
---|
| 92 | lpstHostent = gethostbyname((LPSTR)szLclHost); |
---|
| 93 | if (lpstHostent) |
---|
| 94 | stLclAddr.sin_addr.s_addr = *((u_long FAR*) (lpstHostent->h_addr)); |
---|
| 95 | } |
---|
| 96 | |
---|
| 97 | /* If still not resolved, then try second strategy */ |
---|
| 98 | if (stLclAddr.sin_addr.s_addr == INADDR_ANY) { |
---|
| 99 | /* Get a UDP socket */ |
---|
| 100 | hSock = socket(AF_INET, SOCK_DGRAM, 0); |
---|
| 101 | if (hSock != INVALID_SOCKET) { |
---|
| 102 | /* Connect to arbitrary port and address (NOT loopback) */ |
---|
| 103 | stRmtAddr.sin_family = AF_INET; |
---|
| 104 | stRmtAddr.sin_port = htons(IPPORT_ECHO); |
---|
| 105 | stRmtAddr.sin_addr.s_addr = inet_addr("128.127.50.1"); |
---|
| 106 | nRet = connect(hSock, (LPSOCKADDR)&stRmtAddr, |
---|
| 107 | sizeof(SOCKADDR)); |
---|
| 108 | if (nRet != SOCKET_ERROR) { |
---|
| 109 | /* Get local address */ |
---|
| 110 | getsockname(hSock, (LPSOCKADDR)&stLclAddr, |
---|
| 111 | (int FAR*)&nAddrSize); |
---|
| 112 | } |
---|
| 113 | closesocket(hSock); /* we're done with the socket */ |
---|
| 114 | } |
---|
| 115 | } |
---|
| 116 | return (stLclAddr.sin_addr.s_addr); |
---|
| 117 | } |
---|
| 118 | #endif |
---|
| 119 | |
---|
| 120 | QUANTAnet_tcpServer_c::QUANTAnet_tcpServer_c() |
---|
| 121 | { |
---|
| 122 | /***** Open a TCP socket (an Internet stream socket). */ |
---|
| 123 | /* Notice we want Internet, stream socket and not datagram */ |
---|
| 124 | #ifdef WIN32 |
---|
| 125 | if ((sockfd = socket(AF_INET,SOCK_STREAM,0)) == INVALID_SOCKET) { |
---|
| 126 | printf("QUANTAnet_tcpServer_c::FAILED. Cannot create socket\n"); |
---|
| 127 | } |
---|
| 128 | #else |
---|
| 129 | if ((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) { |
---|
| 130 | printf("QUANTAnet_tcpServer_c::FAILED. Cannot create socket\n"); |
---|
| 131 | } |
---|
| 132 | #endif |
---|
| 133 | |
---|
| 134 | timeOutPeriod = QUANTAnet_tcpClient_c::DEFAULT_TIME_OUT; |
---|
| 135 | } |
---|
| 136 | |
---|
| 137 | int QUANTAnet_tcpServer_c::close() |
---|
| 138 | { |
---|
| 139 | int closeResult; |
---|
| 140 | #ifdef WIN32 |
---|
| 141 | if (sockfd >= 0) closeResult = ::closesocket(sockfd); |
---|
| 142 | #else |
---|
| 143 | if (sockfd >= 0) closeResult = ::close(sockfd); |
---|
| 144 | #endif |
---|
| 145 | sockfd = 0; |
---|
| 146 | return closeResult; |
---|
| 147 | } |
---|
| 148 | |
---|
| 149 | void QUANTAnet_tcpServer_c::setSockOptions(int option, int buffersize) |
---|
| 150 | { |
---|
| 151 | /*Change the socket buffer size if default is not to be used*/ |
---|
| 152 | int optlen = sizeof(buffersize); |
---|
| 153 | |
---|
| 154 | if (option == QUANTAnet_tcpServer_c::READ_BUFFER_SIZE) |
---|
| 155 | if (setsockopt(sockfd,SOL_SOCKET, SO_RCVBUF, (char *) &buffersize, optlen) < 0) |
---|
| 156 | printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Read.\n"); |
---|
| 157 | |
---|
| 158 | if (option == QUANTAnet_tcpServer_c::WRITE_BUFFER_SIZE) |
---|
| 159 | if (setsockopt(sockfd,SOL_SOCKET, SO_SNDBUF, (char *) &buffersize, optlen) < 0) |
---|
| 160 | printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Write.\n"); |
---|
| 161 | } |
---|
| 162 | |
---|
| 163 | int QUANTAnet_tcpServer_c::init(int port) |
---|
| 164 | { |
---|
| 165 | int optlen; |
---|
| 166 | |
---|
| 167 | /* Set no delay to stop TCP/IP from buffering */ |
---|
| 168 | int noDelay = 0;// Set to 0 for no buffering. Set to 1 for buffering. |
---|
| 169 | optlen = sizeof(noDelay); |
---|
| 170 | |
---|
| 171 | if (setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY, (char *) &noDelay, optlen) < 0) |
---|
| 172 | printf("QUANTAnet_tcpServer_c::init: Cannot set no delay, you may experience sluggish performance.\n"); |
---|
| 173 | |
---|
| 174 | /* Use this to try to eliminate the problem of being unable to rebind to a static port */ |
---|
| 175 | int reuseFlag = 1; |
---|
| 176 | optlen = sizeof(reuseFlag); |
---|
| 177 | if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR, (char *) &reuseFlag, optlen) < 0) |
---|
| 178 | printf("QUANTAnet_tcpServer_c::init: Cannot reuse port address.\n"); |
---|
| 179 | /***** Bind our local address so that the client can send to us. */ |
---|
| 180 | |
---|
| 181 | struct linger lingerData; |
---|
| 182 | lingerData.l_onoff = 1; |
---|
| 183 | lingerData.l_linger = 1000; |
---|
| 184 | |
---|
| 185 | optlen = sizeof(lingerData); |
---|
| 186 | if (setsockopt(sockfd,SOL_SOCKET,SO_LINGER, (char *) &lingerData, optlen) < 0) |
---|
| 187 | printf("QUANTAnet_tcpServer_c::init: setsockopt: SO_LINGER failed.\n"); |
---|
| 188 | |
---|
| 189 | /* Clear out the structure */ |
---|
| 190 | memset((char *) &serverInfo, 0, sizeof(serverInfo)); |
---|
| 191 | |
---|
| 192 | serverInfo.sin_family = AF_INET; |
---|
| 193 | serverInfo.sin_addr.s_addr = htonl(INADDR_ANY); |
---|
| 194 | serverInfo.sin_port = htons(port); |
---|
| 195 | |
---|
| 196 | if (bind(sockfd, (struct sockaddr *) &serverInfo, sizeof(serverInfo)) < 0) { |
---|
| 197 | printf("QUANTAnet_tcpServer_c::init: Can't bind local address.\n"); |
---|
| 198 | return QUANTAnet_tcpServer_c::FAILED; |
---|
| 199 | } |
---|
| 200 | |
---|
| 201 | /* Listen for a connection. Allow queuing for 5 connection requests. |
---|
| 202 | */ |
---|
| 203 | listen(sockfd,5); |
---|
| 204 | |
---|
| 205 | /* Non blocking from now on. This is necessary in order for checkForNewConnections() to work.But this seems to be the main thing that is screwing up |
---|
| 206 | with threads. |
---|
| 207 | */ |
---|
| 208 | // fcntl(sockfd,F_SETFL,FNDELAY); |
---|
| 209 | |
---|
| 210 | return QUANTAnet_tcpServer_c::OK; |
---|
| 211 | } |
---|
| 212 | |
---|
| 213 | const int QUANTAnet_tcpClient_c::BLOCKING = 1; |
---|
| 214 | const int QUANTAnet_tcpClient_c::NON_BLOCKING= 2; |
---|
| 215 | const int QUANTAnet_tcpClient_c::NO_TIME_OUT = -1; |
---|
| 216 | const int QUANTAnet_tcpClient_c::DEFAULT_TIME_OUT = NO_TIME_OUT; |
---|
| 217 | const int QUANTAnet_tcpClient_c::SOCKET_NOT_OPEN = -1; |
---|
| 218 | const int QUANTAnet_tcpClient_c::CONNECTION_TERMINATED = -2; |
---|
| 219 | const int QUANTAnet_tcpClient_c::NON_BLOCKING_HAS_NO_DATA =-3; |
---|
| 220 | const int QUANTAnet_tcpClient_c::TIMED_OUT = -4; |
---|
| 221 | const int QUANTAnet_tcpClient_c::OK = 1; |
---|
| 222 | const int QUANTAnet_tcpClient_c::NON_BLOCKING_NOT_READY_TO_WRITE = -6; |
---|
| 223 | const int QUANTAnet_tcpClient_c::NOT_READY = -7; |
---|
| 224 | const int QUANTAnet_tcpClient_c::READY_TO_READ = -8; |
---|
| 225 | const int QUANTAnet_tcpClient_c::READY_TO_WRITE = -9; |
---|
| 226 | const int QUANTAnet_tcpClient_c::READY_TO_READ_AND_WRITE = -10; |
---|
| 227 | const int QUANTAnet_tcpClient_c::READ_BUFFER_SIZE = -11; |
---|
| 228 | const int QUANTAnet_tcpClient_c::WRITE_BUFFER_SIZE = -12; |
---|
| 229 | |
---|
| 230 | QUANTAnet_tcpClient_c* |
---|
| 231 | QUANTAnet_tcpServer_c::checkForNewConnections(const int& blockingTime) |
---|
| 232 | { |
---|
| 233 | struct sockaddr_in clientInfo; |
---|
| 234 | #ifdef HAVE_SOCKLEN_T |
---|
| 235 | socklen_t clilen; |
---|
| 236 | #else |
---|
| 237 | int clilen; |
---|
| 238 | #endif |
---|
| 239 | // Perform a non-blocking check to see if socket is ready for |
---|
| 240 | // an incoming connection. |
---|
| 241 | fd_set read_set; |
---|
| 242 | struct timeval timeout; |
---|
| 243 | timeout.tv_sec = blockingTime; |
---|
| 244 | timeout.tv_usec = 0; |
---|
| 245 | int ready_fd; |
---|
| 246 | |
---|
| 247 | FD_ZERO(&read_set); |
---|
| 248 | FD_SET(sockfd, &read_set); |
---|
| 249 | ready_fd = select( sockfd+1, &read_set, NULL, NULL, &timeout); |
---|
| 250 | |
---|
| 251 | // If it is not ready then return. |
---|
| 252 | if ((ready_fd <= 0) || (!FD_ISSET( sockfd, &read_set))) return NULL; |
---|
| 253 | |
---|
| 254 | clilen = sizeof(clientInfo); |
---|
| 255 | int newsockfd = accept(sockfd, |
---|
| 256 | (struct sockaddr *) &clientInfo, |
---|
| 257 | (socklen_t*)&clilen); |
---|
| 258 | if (newsockfd > 0) { |
---|
| 259 | struct linger lingerData; |
---|
| 260 | lingerData.l_onoff = 1; |
---|
| 261 | lingerData.l_linger = 1000; |
---|
| 262 | |
---|
| 263 | int optlen = sizeof(lingerData); |
---|
| 264 | |
---|
| 265 | if (setsockopt(newsockfd,SOL_SOCKET,SO_LINGER, (char *) &lingerData, optlen) < 0) |
---|
| 266 | printf("QUANTAnet_tcpServer_c::checkForNewConnections: setsockopt: SO_LINGER failed.\n"); |
---|
| 267 | |
---|
| 268 | QUANTAnet_tcpClient_c *clientObj; |
---|
| 269 | |
---|
| 270 | clientObj = new QUANTAnet_tcpClient_c; |
---|
| 271 | clientObj->timeOutPeriod = timeOutPeriod; |
---|
| 272 | clientObj->clientInfo = clientInfo; |
---|
| 273 | clientObj->clientSockFd = newsockfd; |
---|
| 274 | // clientObj->makeNonBlocking(); |
---|
| 275 | return clientObj; |
---|
| 276 | } else return NULL; |
---|
| 277 | } |
---|
| 278 | |
---|
| 279 | |
---|
| 280 | QUANTAnet_tcpClient_c* |
---|
| 281 | QUANTAnet_tcpServer_c::waitForNewConnection() |
---|
| 282 | { |
---|
| 283 | struct sockaddr_in clientInfo; |
---|
| 284 | #ifdef HAVE_SOCKLEN_T |
---|
| 285 | socklen_t clilen; |
---|
| 286 | #else |
---|
| 287 | int clilen; |
---|
| 288 | #endif |
---|
| 289 | |
---|
| 290 | // Perform a blocking check to see if socket is ready for an |
---|
| 291 | // incoming connection. |
---|
| 292 | fd_set read_set; |
---|
| 293 | int ready_fd; |
---|
| 294 | |
---|
| 295 | FD_ZERO(&read_set); |
---|
| 296 | FD_SET(sockfd, &read_set); |
---|
| 297 | ready_fd = select( sockfd+1, &read_set, NULL, NULL, NULL); |
---|
| 298 | |
---|
| 299 | // If it is not ready then return. |
---|
| 300 | if ((ready_fd <= 0) || (!FD_ISSET( sockfd, &read_set))) return NULL; |
---|
| 301 | |
---|
| 302 | clilen = sizeof(clientInfo); |
---|
| 303 | int newsockfd = accept(sockfd, |
---|
| 304 | (struct sockaddr *) &clientInfo, |
---|
| 305 | (socklen_t*)&clilen); |
---|
| 306 | if (newsockfd > 0) { |
---|
| 307 | struct linger lingerData; |
---|
| 308 | lingerData.l_onoff = 1; |
---|
| 309 | lingerData.l_linger = 1000; |
---|
| 310 | |
---|
| 311 | int optlen = sizeof(lingerData); |
---|
| 312 | |
---|
| 313 | if (setsockopt(newsockfd, SOL_SOCKET, SO_LINGER, |
---|
| 314 | reinterpret_cast<char*>(&lingerData), optlen) < 0) { |
---|
| 315 | printf("QUANTAnet_tcpServer_c::waitForNewConnection: setsockopt: SO_LINGER failed.\n"); |
---|
| 316 | } |
---|
| 317 | |
---|
| 318 | QUANTAnet_tcpClient_c *clientObj = new QUANTAnet_tcpClient_c; |
---|
| 319 | clientObj->timeOutPeriod = timeOutPeriod; |
---|
| 320 | clientObj->clientInfo = clientInfo; |
---|
| 321 | clientObj->clientSockFd = newsockfd; |
---|
| 322 | return clientObj; |
---|
| 323 | } else { |
---|
| 324 | return NULL; |
---|
| 325 | } |
---|
| 326 | } |
---|
| 327 | |
---|
| 328 | /* |
---|
| 329 | * For the blockingtype parameter: |
---|
| 330 | * NEVER_TIMEOUT_BLOCKING means the timeout period is bypassed so that it will |
---|
| 331 | * continue to read forever unless there is a break in the connection. |
---|
| 332 | * |
---|
| 333 | * NON_BLOCKING means that if there is no data to be read this call will |
---|
| 334 | * return immediately. |
---|
| 335 | * |
---|
| 336 | * BLOCKING means that it will block waiting for data or until the timeout |
---|
| 337 | * period expires. |
---|
| 338 | * Change the timeout period (in seconds) by changing the timeoutPeriod |
---|
| 339 | * member variable. |
---|
| 340 | * |
---|
| 341 | */ |
---|
| 342 | |
---|
| 343 | void QUANTAnet_tcpClient_c::setSockOptions(int option, int buffersize) |
---|
| 344 | { |
---|
| 345 | /*Change the socket buffer size if default is not to be used*/ |
---|
| 346 | int optlen = sizeof(buffersize); |
---|
| 347 | |
---|
| 348 | if (option == QUANTAnet_tcpClient_c::READ_BUFFER_SIZE) |
---|
| 349 | if (setsockopt(clientSockFd,SOL_SOCKET, SO_RCVBUF, (char *) &buffersize, optlen) < 0) |
---|
| 350 | printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Read.\n"); |
---|
| 351 | |
---|
| 352 | if (option == QUANTAnet_tcpClient_c::WRITE_BUFFER_SIZE) |
---|
| 353 | if (setsockopt(clientSockFd,SOL_SOCKET, SO_SNDBUF, (char *) &buffersize, optlen) < 0) |
---|
| 354 | printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Write.\n"); |
---|
| 355 | |
---|
| 356 | } |
---|
| 357 | |
---|
| 358 | int QUANTAnet_tcpClient_c::isReadyToRead() |
---|
| 359 | { |
---|
| 360 | struct timeval timeoutDelay; |
---|
| 361 | fd_set readfds; |
---|
| 362 | |
---|
| 363 | timeoutDelay.tv_sec = 0; |
---|
| 364 | timeoutDelay.tv_usec = 0; |
---|
| 365 | |
---|
| 366 | FD_ZERO(&readfds); |
---|
| 367 | FD_SET(clientSockFd, &readfds); |
---|
| 368 | select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay); |
---|
| 369 | |
---|
| 370 | if (!(FD_ISSET(clientSockFd, &readfds))) |
---|
| 371 | return NOT_READY; |
---|
| 372 | else |
---|
| 373 | return READY_TO_READ; |
---|
| 374 | } |
---|
| 375 | |
---|
| 376 | int QUANTAnet_tcpClient_c::isReadyToWrite() |
---|
| 377 | { |
---|
| 378 | struct timeval timeoutDelay; |
---|
| 379 | fd_set writefds; |
---|
| 380 | |
---|
| 381 | timeoutDelay.tv_sec = 0; |
---|
| 382 | timeoutDelay.tv_usec = 0; |
---|
| 383 | |
---|
| 384 | FD_ZERO(&writefds); |
---|
| 385 | FD_SET(clientSockFd, &writefds); |
---|
| 386 | select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay); |
---|
| 387 | |
---|
| 388 | if (!(FD_ISSET(clientSockFd, &writefds))) |
---|
| 389 | return NOT_READY; |
---|
| 390 | else |
---|
| 391 | return READY_TO_WRITE; |
---|
| 392 | } |
---|
| 393 | |
---|
| 394 | int QUANTAnet_tcpClient_c::isReady() |
---|
| 395 | { |
---|
| 396 | struct timeval timeoutDelay; |
---|
| 397 | fd_set writefds, readfds; |
---|
| 398 | |
---|
| 399 | timeoutDelay.tv_sec = 0; |
---|
| 400 | timeoutDelay.tv_usec = 0; |
---|
| 401 | |
---|
| 402 | FD_ZERO(&writefds); |
---|
| 403 | FD_ZERO(&readfds); |
---|
| 404 | FD_SET(clientSockFd, &writefds); |
---|
| 405 | FD_SET(clientSockFd, &readfds); |
---|
| 406 | select(clientSockFd+1,&readfds,&writefds,NULL, &timeoutDelay); |
---|
| 407 | |
---|
| 408 | if (FD_ISSET(clientSockFd, &writefds) |
---|
| 409 | && |
---|
| 410 | FD_ISSET(clientSockFd, &readfds)) return READY_TO_READ_AND_WRITE; |
---|
| 411 | if (FD_ISSET(clientSockFd, &writefds)) return READY_TO_WRITE; |
---|
| 412 | if (FD_ISSET(clientSockFd, &readfds)) return READY_TO_READ; |
---|
| 413 | |
---|
| 414 | return NOT_READY; |
---|
| 415 | } |
---|
| 416 | |
---|
| 417 | int QUANTAnet_tcpClient_c::read(char *ptr, int *nbytes, int blockingType) |
---|
| 418 | { |
---|
| 419 | |
---|
| 420 | //used for bandwidth calculations |
---|
| 421 | // int totalDataRead =0; |
---|
| 422 | |
---|
| 423 | struct timeval timeoutDelay; |
---|
| 424 | fd_set readfds; |
---|
| 425 | |
---|
| 426 | if (clientSockFd < 0) { |
---|
| 427 | printf("QUANTAnet_tcpClient_c::Read: This connection is not open.\n"); |
---|
| 428 | return SOCKET_NOT_OPEN; |
---|
| 429 | } |
---|
| 430 | |
---|
| 431 | int nleft, nread; |
---|
| 432 | nleft = *nbytes; |
---|
| 433 | timeOutStatus = 0; |
---|
| 434 | |
---|
| 435 | while (nleft > 0) { |
---|
| 436 | |
---|
| 437 | /*The totalDataRead is incremented each time something is read - in case of an error |
---|
| 438 | this is passed on to the performance monitoring instance using the incrementDataRead() function |
---|
| 439 | for bandwidth calculations - to calculate the bandwidth for the data read till the break in loop*/ |
---|
| 440 | |
---|
| 441 | FD_ZERO(&readfds); |
---|
| 442 | FD_SET(clientSockFd, &readfds); |
---|
| 443 | |
---|
| 444 | if (blockingType == NON_BLOCKING) { |
---|
| 445 | timeoutDelay.tv_sec = 0; |
---|
| 446 | timeoutDelay.tv_usec = 0; |
---|
| 447 | } |
---|
| 448 | if (blockingType == BLOCKING) { |
---|
| 449 | timeoutDelay.tv_sec = timeOutPeriod; |
---|
| 450 | timeoutDelay.tv_usec = 0; |
---|
| 451 | } |
---|
| 452 | |
---|
| 453 | errno = 0; |
---|
| 454 | |
---|
| 455 | // Wait for some file descriptor to be ready |
---|
| 456 | // If no timeout then wait indefinitely until data arrives |
---|
| 457 | #ifdef WIN32 |
---|
| 458 | int error; |
---|
| 459 | |
---|
| 460 | if ((blockingType == BLOCKING) && |
---|
| 461 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 462 | error = select(clientSockFd+1,&readfds,NULL,NULL, NULL); |
---|
| 463 | else |
---|
| 464 | error = select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay); |
---|
| 465 | if (error == SOCKET_ERROR) { |
---|
| 466 | errno = WSAGetLastError(); |
---|
| 467 | return errno; |
---|
| 468 | } |
---|
| 469 | #else |
---|
| 470 | // This one may not work as it should be.. |
---|
| 471 | // Unix socket's man page doesn't say select will alter the |
---|
| 472 | // value of errno... |
---|
| 473 | if ((blockingType == BLOCKING) && |
---|
| 474 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 475 | select(clientSockFd+1,&readfds,NULL,NULL, NULL); |
---|
| 476 | else |
---|
| 477 | select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay); |
---|
| 478 | if (errno) { |
---|
| 479 | return errno; |
---|
| 480 | } |
---|
| 481 | #endif |
---|
| 482 | |
---|
| 483 | |
---|
| 484 | // If fd is not set |
---|
| 485 | if (!(FD_ISSET(clientSockFd, &readfds))) { |
---|
| 486 | // If non blocking then it simply means there is no data. |
---|
| 487 | if (blockingType == NON_BLOCKING) |
---|
| 488 | return NON_BLOCKING_HAS_NO_DATA; |
---|
| 489 | |
---|
| 490 | // If blocking then timed out waiting for data. |
---|
| 491 | if (blockingType == BLOCKING) { |
---|
| 492 | timeOutStatus = 1; |
---|
| 493 | return TIMED_OUT; |
---|
| 494 | } |
---|
| 495 | |
---|
| 496 | } |
---|
| 497 | |
---|
| 498 | // If it passes this stage then things are set as blocking. |
---|
| 499 | blockingType = BLOCKING; |
---|
| 500 | |
---|
| 501 | #ifdef WIN32 |
---|
| 502 | nread = ::recv(clientSockFd, ptr, nleft, 0); |
---|
| 503 | |
---|
| 504 | if (nread == SOCKET_ERROR) { |
---|
| 505 | *nbytes = nread; |
---|
| 506 | errno = WSAGetLastError(); |
---|
| 507 | if (errno == WSAECONNRESET) |
---|
| 508 | return CONNECTION_TERMINATED; |
---|
| 509 | return errno; |
---|
| 510 | } |
---|
| 511 | #else |
---|
| 512 | errno = 0; |
---|
| 513 | nread = ::read(clientSockFd,ptr,nleft); |
---|
| 514 | |
---|
| 515 | if (errno) { |
---|
| 516 | *nbytes = nread; |
---|
| 517 | return errno; |
---|
| 518 | } |
---|
| 519 | #endif |
---|
| 520 | |
---|
| 521 | if (nread == 0) { |
---|
| 522 | *nbytes = (*nbytes - nleft); |
---|
| 523 | return CONNECTION_TERMINATED; |
---|
| 524 | } |
---|
| 525 | |
---|
| 526 | if (nread > 0) { |
---|
| 527 | nleft -= nread; |
---|
| 528 | ptr += nread; |
---|
| 529 | pmonitor.incrementDataRead(nread); |
---|
| 530 | } |
---|
| 531 | |
---|
| 532 | } |
---|
| 533 | *nbytes = *nbytes - nleft; |
---|
| 534 | return OK; |
---|
| 535 | } |
---|
| 536 | |
---|
| 537 | int QUANTAnet_tcpClient_c::write(const char *ptr, int *nbytes, int blockingType) |
---|
| 538 | { |
---|
| 539 | //used for bandwidth calculations |
---|
| 540 | // int totalDataSent =0; |
---|
| 541 | |
---|
| 542 | // Place here to prevent crashes when write occurs to a socket |
---|
| 543 | // closed by an external source. |
---|
| 544 | #ifndef WIN32 |
---|
| 545 | signal(SIGPIPE,SIG_IGN); |
---|
| 546 | #endif |
---|
| 547 | |
---|
| 548 | struct timeval timeoutDelay; |
---|
| 549 | fd_set writefds; |
---|
| 550 | |
---|
| 551 | if (clientSockFd < 0) { |
---|
| 552 | printf("QUANTAnet_tcpClient_c::Write: This connection is not open.\n"); |
---|
| 553 | return SOCKET_NOT_OPEN; |
---|
| 554 | } |
---|
| 555 | |
---|
| 556 | int nleft, nwrite; |
---|
| 557 | nleft = *nbytes; |
---|
| 558 | timeOutStatus = 0; |
---|
| 559 | |
---|
| 560 | while (nleft > 0) { |
---|
| 561 | |
---|
| 562 | /*The totalDataSent is incremented each time something is written - in case of an error this is passed on to the performance monitoring instance using the incrementDataSent() function for bandwidth calculations - to calculate the bandwidth for the data written till the break in the loop*/ |
---|
| 563 | |
---|
| 564 | FD_ZERO(&writefds); |
---|
| 565 | FD_SET(clientSockFd, &writefds); |
---|
| 566 | |
---|
| 567 | if (blockingType == NON_BLOCKING) { |
---|
| 568 | timeoutDelay.tv_sec = 0; |
---|
| 569 | timeoutDelay.tv_usec = 0; |
---|
| 570 | } |
---|
| 571 | if (blockingType == BLOCKING) { |
---|
| 572 | timeoutDelay.tv_sec = timeOutPeriod; |
---|
| 573 | timeoutDelay.tv_usec = 0; |
---|
| 574 | } |
---|
| 575 | |
---|
| 576 | errno = 0; |
---|
| 577 | |
---|
| 578 | // Wait for some file descriptor to be ready |
---|
| 579 | #ifdef WIN32 |
---|
| 580 | int error; |
---|
| 581 | |
---|
| 582 | if ((blockingType == BLOCKING) && |
---|
| 583 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 584 | error = select(clientSockFd+1,NULL,&writefds,NULL, NULL); |
---|
| 585 | else |
---|
| 586 | error = select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay); |
---|
| 587 | if (error == SOCKET_ERROR) { |
---|
| 588 | errno = WSAGetLastError(); |
---|
| 589 | return errno; |
---|
| 590 | } |
---|
| 591 | #else |
---|
| 592 | // This one may not work as it should be.. |
---|
| 593 | // Unix socket's man page doesn't say select will alter the |
---|
| 594 | // value of errno... |
---|
| 595 | if ((blockingType == BLOCKING) && |
---|
| 596 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 597 | select(clientSockFd+1,NULL,&writefds,NULL, NULL); |
---|
| 598 | else |
---|
| 599 | select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay); |
---|
| 600 | if (errno) { |
---|
| 601 | return errno; |
---|
| 602 | } |
---|
| 603 | #endif |
---|
| 604 | |
---|
| 605 | // If fd is not set |
---|
| 606 | if (!(FD_ISSET(clientSockFd, &writefds))) { |
---|
| 607 | |
---|
| 608 | // If non blocking then it simply means there is no data. |
---|
| 609 | if (blockingType == NON_BLOCKING) |
---|
| 610 | return NON_BLOCKING_NOT_READY_TO_WRITE; |
---|
| 611 | |
---|
| 612 | // If blocking then timed out waiting for data. |
---|
| 613 | if (blockingType == BLOCKING) { |
---|
| 614 | timeOutStatus = 1; |
---|
| 615 | return TIMED_OUT; |
---|
| 616 | } |
---|
| 617 | } |
---|
| 618 | |
---|
| 619 | |
---|
| 620 | // If it passes this stage then things are set as blocking. |
---|
| 621 | blockingType = BLOCKING; |
---|
| 622 | |
---|
| 623 | #ifdef WIN32 |
---|
| 624 | nwrite = ::send(clientSockFd, ptr, nleft, 0); |
---|
| 625 | if (nwrite == SOCKET_ERROR) { |
---|
| 626 | *nbytes = nwrite; |
---|
| 627 | errno = WSAGetLastError(); |
---|
| 628 | return errno; |
---|
| 629 | } |
---|
| 630 | #else |
---|
| 631 | errno = 0; |
---|
| 632 | nwrite = ::write(clientSockFd,ptr,nleft); |
---|
| 633 | |
---|
| 634 | if (errno) { |
---|
| 635 | *nbytes = nwrite; |
---|
| 636 | return errno; |
---|
| 637 | } |
---|
| 638 | #endif |
---|
| 639 | |
---|
| 640 | if (nwrite == 0) { |
---|
| 641 | *nbytes = (*nbytes - nleft); |
---|
| 642 | return CONNECTION_TERMINATED; |
---|
| 643 | } |
---|
| 644 | |
---|
| 645 | if (nwrite > 0) { |
---|
| 646 | nleft -= nwrite; |
---|
| 647 | ptr += nwrite; |
---|
| 648 | pmonitor.incrementDataSent(nwrite); |
---|
| 649 | } |
---|
| 650 | |
---|
| 651 | } |
---|
| 652 | *nbytes = *nbytes - nleft; |
---|
| 653 | return OK; |
---|
| 654 | |
---|
| 655 | } |
---|
| 656 | |
---|
| 657 | int QUANTAnet_tcpClient_c::connectToServer(const char* ip, int port) |
---|
| 658 | { |
---|
| 659 | |
---|
| 660 | char serverAddr[128]; |
---|
| 661 | |
---|
| 662 | int status = hostnameToIP(ip, serverAddr); |
---|
| 663 | |
---|
| 664 | |
---|
| 665 | if (status == 0) { |
---|
| 666 | printf("QUANTAnet_tcpClient_c::Connect: Host IP address %s is invalid.\n", ip); |
---|
| 667 | return -1; |
---|
| 668 | } |
---|
| 669 | |
---|
| 670 | /* Fill in the structure "clientInfo" with the address of the |
---|
| 671 | * server that we want to connect with. |
---|
| 672 | */ |
---|
| 673 | |
---|
| 674 | |
---|
| 675 | |
---|
| 676 | memset((char *) &clientInfo,0, sizeof(clientInfo)); |
---|
| 677 | |
---|
| 678 | clientInfo.sin_family = AF_INET; |
---|
| 679 | clientInfo.sin_addr.s_addr = inet_addr(serverAddr); |
---|
| 680 | clientInfo.sin_port = htons(port); |
---|
| 681 | |
---|
| 682 | int optlen; |
---|
| 683 | |
---|
| 684 | /* Set no delay to stop TCP/IP from buffering */ |
---|
| 685 | int noDelay = 1; |
---|
| 686 | optlen = sizeof(noDelay); |
---|
| 687 | |
---|
| 688 | if (setsockopt(clientSockFd,IPPROTO_TCP,TCP_NODELAY, (char *) &noDelay, optlen) < 0) |
---|
| 689 | printf("QUANTAnet_tcpClient_c::Open: Cannot set no delay, you may experience sluggish performance.\n"); |
---|
| 690 | |
---|
| 691 | /* Use this to try to eliminate the problem of being unable to rebind to a static port */ |
---|
| 692 | int reuseFlag = 1; |
---|
| 693 | optlen = sizeof(reuseFlag); |
---|
| 694 | if (setsockopt(clientSockFd,SOL_SOCKET,SO_REUSEADDR, (char *) &reuseFlag, optlen) < 0) |
---|
| 695 | printf("QUANTAnet_tcpClient_c::Open: Cannot reuse port address.\n"); |
---|
| 696 | /***** Bind our local address so that the client can send to us. */ |
---|
| 697 | |
---|
| 698 | struct linger lingerData; |
---|
| 699 | lingerData.l_onoff = 1; |
---|
| 700 | lingerData.l_linger = 1000; |
---|
| 701 | |
---|
| 702 | optlen = sizeof(lingerData); |
---|
| 703 | if (setsockopt(clientSockFd,SOL_SOCKET,SO_LINGER, (char *) &lingerData, optlen) < 0) |
---|
| 704 | printf("QUANTAnet_tcpClient_c::Open: setsockopt: SO_LINGER failed.\n"); |
---|
| 705 | |
---|
| 706 | /* Connect to server. */ |
---|
| 707 | int connectResult; |
---|
| 708 | if ((connectResult = connect(clientSockFd, (struct sockaddr *) &clientInfo, sizeof(clientInfo))) < 0) { |
---|
| 709 | printf("QUANTAnet_tcpClient_c::Connect: Can't connect to server.\n"); |
---|
| 710 | return connectResult; |
---|
| 711 | } |
---|
| 712 | return clientSockFd; |
---|
| 713 | } |
---|
| 714 | |
---|
| 715 | void QUANTAnet_tcpClient_c::makeNonBlocking() |
---|
| 716 | { |
---|
| 717 | #ifdef __sgi |
---|
| 718 | fcntl(clientSockFd, F_SETFL, FNDELAY); |
---|
| 719 | #endif |
---|
| 720 | #ifdef linux |
---|
| 721 | fcntl(clientSockFd, F_SETFL, O_NONBLOCK); |
---|
| 722 | #endif |
---|
| 723 | #ifdef sun |
---|
| 724 | fcntl(clientSockFd, F_SETFL, O_NDELAY); |
---|
| 725 | #endif |
---|
| 726 | #ifdef WIN32 |
---|
| 727 | u_long nonBlockingMode = 1L; |
---|
| 728 | |
---|
| 729 | ioctlsocket(clientSockFd, FIONBIO, &nonBlockingMode); |
---|
| 730 | #endif |
---|
| 731 | } |
---|
| 732 | |
---|
| 733 | int QUANTAnet_tcpClient_c::close() |
---|
| 734 | { |
---|
| 735 | int closeResult; |
---|
| 736 | #ifdef WIN32 |
---|
| 737 | if (clientSockFd > 0) closeResult = ::closesocket(clientSockFd); |
---|
| 738 | #else |
---|
| 739 | if (clientSockFd > 0) closeResult = ::close(clientSockFd); |
---|
| 740 | #endif |
---|
| 741 | clientSockFd = 0; |
---|
| 742 | return closeResult; |
---|
| 743 | } |
---|
| 744 | |
---|
| 745 | QUANTAnet_tcpClient_c::QUANTAnet_tcpClient_c() { |
---|
| 746 | timeOutPeriod = QUANTAnet_tcpClient_c::DEFAULT_TIME_OUT; |
---|
| 747 | clientSockFd = 0; |
---|
| 748 | #ifdef WIN32_WCE |
---|
| 749 | clientSockFd = -1; |
---|
| 750 | #endif |
---|
| 751 | |
---|
| 752 | /* Open a TCP socket (an Internet stream socket). */ |
---|
| 753 | #ifdef WIN32 |
---|
| 754 | if ((clientSockFd = socket(AF_INET,SOCK_STREAM,0)) == INVALID_SOCKET) { |
---|
| 755 | printf("QUANTAnet_tcpClient_c: Can't open stream socket.\n"); |
---|
| 756 | } |
---|
| 757 | #else |
---|
| 758 | if ((clientSockFd = socket(AF_INET,SOCK_STREAM,0)) < 0) { |
---|
| 759 | printf("QUANTAnet_tcpClient_c: Can't open stream socket.\n"); |
---|
| 760 | } |
---|
| 761 | #endif |
---|
| 762 | |
---|
| 763 | } |
---|
| 764 | |
---|
| 765 | void QUANTAnet_tcpClient_c::showStatus(int status, int nbytes) |
---|
| 766 | { |
---|
| 767 | switch (status) { |
---|
| 768 | case SOCKET_NOT_OPEN: |
---|
| 769 | printf("QUANTAnet_tcpClient_c::showStatus: Socket not open.\n"); |
---|
| 770 | break; |
---|
| 771 | case CONNECTION_TERMINATED: |
---|
| 772 | printf("QUANTAnet_tcpClient_c::showStatus: Connection terminated with %d bytes transacted.\n", nbytes); |
---|
| 773 | break; |
---|
| 774 | case NON_BLOCKING_HAS_NO_DATA: |
---|
| 775 | printf("QUANTAnet_tcpClient_c::showStatus: Non-blocking read found no data on stream.\n"); |
---|
| 776 | break; |
---|
| 777 | case NON_BLOCKING_NOT_READY_TO_WRITE: |
---|
| 778 | printf("QUANTAnet_tcpClient_c::showStatus: Non-blocking connection is not ready to write.\n"); |
---|
| 779 | break; |
---|
| 780 | case TIMED_OUT: |
---|
| 781 | printf("QUANTAnet_tcpClient_c::showStatus: Transaction timed out after %d seconds.\n", timeOutPeriod); |
---|
| 782 | break; |
---|
| 783 | // case NON_BLOCKING_WROTE_NO_DATA: |
---|
| 784 | // printf("Non-blocking write wrote no data.\n"); |
---|
| 785 | // break; |
---|
| 786 | case OK: |
---|
| 787 | printf("QUANTAnet_tcpClient_c::showStatus: Transaction ok.\n"); |
---|
| 788 | break; |
---|
| 789 | default: |
---|
| 790 | printf("QUANTAnet_tcpClient_c::showStatus: UNIX Socket error no: %d\n ",errno); |
---|
| 791 | //perror(""); |
---|
| 792 | break; |
---|
| 793 | } |
---|
| 794 | fflush(stdout); |
---|
| 795 | } |
---|
| 796 | |
---|
| 797 | |
---|
| 798 | unsigned int QUANTAnet_tcpClient_c :: getRemoteIP() { |
---|
| 799 | return clientInfo.sin_addr.s_addr; |
---|
| 800 | } |
---|
| 801 | |
---|
| 802 | void QUANTAnet_tcpClient_c :: getRemoteIP(char* name) |
---|
| 803 | { |
---|
| 804 | ipNumToString(getRemoteIP(),name); |
---|
| 805 | |
---|
| 806 | } |
---|
| 807 | |
---|
| 808 | int QUANTAnet_tcpClient_c :: getSelfPort() { |
---|
| 809 | struct sockaddr_in name; |
---|
| 810 | #ifdef HAVE_SOCKLEN_T |
---|
| 811 | socklen_t size; |
---|
| 812 | #else |
---|
| 813 | int size; |
---|
| 814 | #endif |
---|
| 815 | size = sizeof(name); |
---|
| 816 | getsockname (clientSockFd, (sockaddr *)&name, (socklen_t*)&size); |
---|
| 817 | return ntohs(name.sin_port); |
---|
| 818 | } |
---|
| 819 | |
---|
| 820 | /* Get the port # of the client connection */ |
---|
| 821 | int QUANTAnet_tcpClient_c :: getRemotePort() { |
---|
| 822 | return ntohs(clientInfo.sin_port); |
---|
| 823 | } |
---|
| 824 | |
---|
| 825 | //Functions added for performance monitoring interface |
---|
| 826 | |
---|
| 827 | void QUANTAnet_tcpClient_c ::showStats(char* streamInfo, char* comment) |
---|
| 828 | { |
---|
| 829 | char *selfIp = new char[256]; |
---|
| 830 | char *remoteIp = new char[256]; |
---|
| 831 | QUANTAnet_socketbase_c::getSelfIP(selfIp); |
---|
| 832 | getRemoteIP(remoteIp); |
---|
| 833 | pmonitor.setIPs(selfIp, remoteIp); |
---|
| 834 | pmonitor.setPorts(getSelfPort(), getRemotePort()); |
---|
| 835 | pmonitor.showStats(streamInfo, comment); |
---|
| 836 | delete[] selfIp; |
---|
| 837 | delete[] remoteIp; |
---|
| 838 | } |
---|
| 839 | |
---|
| 840 | int QUANTAnet_tcpClient_c ::logStats(char* streamInfo, char* comment, FILE* filePtr) |
---|
| 841 | { |
---|
| 842 | char *selfIp = new char[256]; |
---|
| 843 | char *remoteIp = new char[256]; |
---|
| 844 | QUANTAnet_socketbase_c::getSelfIP(selfIp); |
---|
| 845 | getRemoteIP(remoteIp); |
---|
| 846 | pmonitor.setIPs(selfIp, remoteIp); |
---|
| 847 | pmonitor.setPorts(getSelfPort(), getRemotePort()); |
---|
| 848 | int result = pmonitor.logStats(streamInfo, comment, filePtr); |
---|
| 849 | delete[] selfIp; |
---|
| 850 | delete[] remoteIp; |
---|
| 851 | return result; |
---|
| 852 | } |
---|
| 853 | |
---|
| 854 | |
---|
| 855 | int QUANTAnet_tcpClient_c::sendStats(char* streamInfo, char* comment) |
---|
| 856 | { |
---|
| 857 | char *selfIp = new char[256]; |
---|
| 858 | char *remoteIp = new char[256]; |
---|
| 859 | QUANTAnet_socketbase_c::getSelfIP(selfIp); |
---|
| 860 | getRemoteIP(remoteIp); |
---|
| 861 | pmonitor.setIPs(selfIp, remoteIp); |
---|
| 862 | pmonitor.setPorts(getSelfPort(), getRemotePort()); |
---|
| 863 | int result = pmonitor.sendStats(streamInfo, comment); |
---|
| 864 | delete[] selfIp; |
---|
| 865 | delete[] remoteIp; |
---|
| 866 | return result; |
---|
| 867 | } |
---|
| 868 | |
---|
| 869 | int QUANTAnet_tcpClient_c::initSendStats(const char* monitorClientIP, int port) |
---|
| 870 | { |
---|
| 871 | return pmonitor.initSendStats(monitorClientIP, port); |
---|
| 872 | } |
---|
| 873 | |
---|
| 874 | void QUANTAnet_tcpClient_c::exitSendStats() |
---|
| 875 | { |
---|
| 876 | pmonitor.exitSendStats(); |
---|
| 877 | } |
---|
| 878 | |
---|
| 879 | void QUANTAnet_tcpClient_c::setInstantaneousLatency(double latency) |
---|
| 880 | { |
---|
| 881 | pmonitor.setInstantLatency(latency); |
---|
| 882 | } |
---|
| 883 | |
---|
| 884 | int QUANTAnet_tcpClient_c::getSocketId() |
---|
| 885 | { |
---|
| 886 | return clientSockFd; |
---|
| 887 | } |
---|
| 888 | |
---|
| 889 | |
---|
| 890 | //Readv() method to read iovec |
---|
| 891 | |
---|
| 892 | int QUANTAnet_tcpClient_c::readv(QUANTAnet_iovec_c recv_iovec, int blockingType) |
---|
| 893 | { |
---|
| 894 | struct timeval timeoutDelay; |
---|
| 895 | fd_set readfds; |
---|
| 896 | |
---|
| 897 | if (clientSockFd < 0) { |
---|
| 898 | printf("QUANTAnet_tcpClient_c::Read: This connection is not open.\n"); |
---|
| 899 | return SOCKET_NOT_OPEN; |
---|
| 900 | } |
---|
| 901 | |
---|
| 902 | int nread; |
---|
| 903 | timeOutStatus = 0; |
---|
| 904 | |
---|
| 905 | |
---|
| 906 | FD_ZERO(&readfds); |
---|
| 907 | FD_SET(clientSockFd, &readfds); |
---|
| 908 | |
---|
| 909 | if (blockingType == NON_BLOCKING) { |
---|
| 910 | timeoutDelay.tv_sec = 0; |
---|
| 911 | timeoutDelay.tv_usec = 0; |
---|
| 912 | } |
---|
| 913 | if (blockingType == BLOCKING) { |
---|
| 914 | timeoutDelay.tv_sec = timeOutPeriod; |
---|
| 915 | timeoutDelay.tv_usec = 0; |
---|
| 916 | } |
---|
| 917 | |
---|
| 918 | errno = 0; |
---|
| 919 | |
---|
| 920 | // Wait for some file descriptor to be ready |
---|
| 921 | // If no timeout then wait indefinitely until data arrives |
---|
| 922 | #ifdef WIN32 |
---|
| 923 | int error; |
---|
| 924 | |
---|
| 925 | if ((blockingType == BLOCKING) && |
---|
| 926 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 927 | error = select(clientSockFd+1,&readfds,NULL,NULL, NULL); |
---|
| 928 | else |
---|
| 929 | error = select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay); |
---|
| 930 | if (error == SOCKET_ERROR) { |
---|
| 931 | errno = WSAGetLastError(); |
---|
| 932 | return errno; |
---|
| 933 | } |
---|
| 934 | #else |
---|
| 935 | // This one may not work as it should be.. |
---|
| 936 | // Unix socket's man page doesn't say select will alter the |
---|
| 937 | // value of errno... |
---|
| 938 | if ((blockingType == BLOCKING) && |
---|
| 939 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 940 | select(clientSockFd+1,&readfds,NULL,NULL, NULL); |
---|
| 941 | else |
---|
| 942 | select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay); |
---|
| 943 | if (errno) { |
---|
| 944 | return errno; |
---|
| 945 | } |
---|
| 946 | #endif |
---|
| 947 | |
---|
| 948 | |
---|
| 949 | // If fd is not set |
---|
| 950 | if (!(FD_ISSET(clientSockFd, &readfds))) { |
---|
| 951 | // If non blocking then it simply means there is no data. |
---|
| 952 | if (blockingType == NON_BLOCKING) |
---|
| 953 | return NON_BLOCKING_HAS_NO_DATA; |
---|
| 954 | |
---|
| 955 | // If blocking then timed out waiting for data. |
---|
| 956 | if (blockingType == BLOCKING) { |
---|
| 957 | timeOutStatus = 1; |
---|
| 958 | return TIMED_OUT; |
---|
| 959 | } |
---|
| 960 | |
---|
| 961 | } |
---|
| 962 | |
---|
| 963 | // If it passes this stage then things are set as blocking. |
---|
| 964 | blockingType = BLOCKING; |
---|
| 965 | |
---|
| 966 | #ifdef WIN32 |
---|
| 967 | DWORD noOfBytesRecvd; |
---|
| 968 | DWORD WSAFlags = 0; |
---|
| 969 | nread = ::WSARecv(clientSockFd, recv_iovec.getBuffer(),recv_iovec.size(),&noOfBytesRecvd,&WSAFlags,NULL,NULL); |
---|
| 970 | |
---|
| 971 | if (nread == SOCKET_ERROR) { |
---|
| 972 | //*nbytes = nread; |
---|
| 973 | errno = WSAGetLastError(); |
---|
| 974 | if (errno == WSAECONNRESET) |
---|
| 975 | return CONNECTION_TERMINATED; |
---|
| 976 | return errno; |
---|
| 977 | } |
---|
| 978 | else |
---|
| 979 | { |
---|
| 980 | nread = noOfBytesRecvd; |
---|
| 981 | } |
---|
| 982 | #else |
---|
| 983 | errno = 0; |
---|
| 984 | |
---|
| 985 | nread = ::readv(clientSockFd, recv_iovec.getBuffer(), recv_iovec.size()); |
---|
| 986 | |
---|
| 987 | if(errno != 0) |
---|
| 988 | { |
---|
| 989 | perror("QUANTAnet_tcp_c :: readv - could not read"); |
---|
| 990 | return errno; |
---|
| 991 | } |
---|
| 992 | |
---|
| 993 | #endif |
---|
| 994 | |
---|
| 995 | if (nread == 0) |
---|
| 996 | { |
---|
| 997 | //*nbytes = (*nbytes - nleft); |
---|
| 998 | return CONNECTION_TERMINATED; |
---|
| 999 | } |
---|
| 1000 | |
---|
| 1001 | if (nread > 0) |
---|
| 1002 | { |
---|
| 1003 | pmonitor.incrementDataRead(nread); |
---|
| 1004 | } |
---|
| 1005 | |
---|
| 1006 | return OK; |
---|
| 1007 | } |
---|
| 1008 | |
---|
| 1009 | //writev() method to write iovec |
---|
| 1010 | |
---|
| 1011 | int QUANTAnet_tcpClient_c::writev(QUANTAnet_iovec_c send_iovec, int blockingType) |
---|
| 1012 | { |
---|
| 1013 | |
---|
| 1014 | #ifndef WIN32 |
---|
| 1015 | signal(SIGPIPE,SIG_IGN); |
---|
| 1016 | #endif |
---|
| 1017 | |
---|
| 1018 | struct timeval timeoutDelay; |
---|
| 1019 | fd_set writefds; |
---|
| 1020 | |
---|
| 1021 | if (clientSockFd < 0) { |
---|
| 1022 | printf("QUANTAnet_tcpClient_c::Writev: This connection is not open.\n"); |
---|
| 1023 | return SOCKET_NOT_OPEN; |
---|
| 1024 | } |
---|
| 1025 | |
---|
| 1026 | int nwrite; |
---|
| 1027 | timeOutStatus = 0; |
---|
| 1028 | |
---|
| 1029 | /*The totalDataSent is incremented each time something is written - in case of an error this is passed on to the |
---|
| 1030 | performance monitoring instance using the incrementDataSent() function for bandwidth calculations - |
---|
| 1031 | to calculate the bandwidth for the data written till the break in the loop*/ |
---|
| 1032 | |
---|
| 1033 | FD_ZERO(&writefds); |
---|
| 1034 | FD_SET(clientSockFd, &writefds); |
---|
| 1035 | |
---|
| 1036 | if (blockingType == NON_BLOCKING) { |
---|
| 1037 | timeoutDelay.tv_sec = 0; |
---|
| 1038 | timeoutDelay.tv_usec = 0; |
---|
| 1039 | } |
---|
| 1040 | if (blockingType == BLOCKING) { |
---|
| 1041 | timeoutDelay.tv_sec = timeOutPeriod; |
---|
| 1042 | timeoutDelay.tv_usec = 0; |
---|
| 1043 | } |
---|
| 1044 | |
---|
| 1045 | errno = 0; |
---|
| 1046 | |
---|
| 1047 | // Wait for some file descriptor to be ready |
---|
| 1048 | #ifdef WIN32 |
---|
| 1049 | int error; |
---|
| 1050 | |
---|
| 1051 | if ((blockingType == BLOCKING) && |
---|
| 1052 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 1053 | error = select(clientSockFd+1,NULL,&writefds,NULL, NULL); |
---|
| 1054 | else |
---|
| 1055 | error = select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay); |
---|
| 1056 | if (error == SOCKET_ERROR) { |
---|
| 1057 | errno = WSAGetLastError(); |
---|
| 1058 | return errno; |
---|
| 1059 | } |
---|
| 1060 | #else |
---|
| 1061 | // This one may not work as it should be.. |
---|
| 1062 | // Unix socket's man page doesn't say select will alter the |
---|
| 1063 | // value of errno... |
---|
| 1064 | if ((blockingType == BLOCKING) && |
---|
| 1065 | (timeOutPeriod == NO_TIME_OUT)) |
---|
| 1066 | select(clientSockFd+1,NULL,&writefds,NULL, NULL); |
---|
| 1067 | else |
---|
| 1068 | select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay); |
---|
| 1069 | if (errno) { |
---|
| 1070 | return errno; |
---|
| 1071 | } |
---|
| 1072 | #endif |
---|
| 1073 | |
---|
| 1074 | // If fd is not set |
---|
| 1075 | if (!(FD_ISSET(clientSockFd, &writefds))) { |
---|
| 1076 | |
---|
| 1077 | // If non blocking then it simply means there is no data. |
---|
| 1078 | if (blockingType == NON_BLOCKING) |
---|
| 1079 | return NON_BLOCKING_NOT_READY_TO_WRITE; |
---|
| 1080 | |
---|
| 1081 | // If blocking then timed out waiting for data. |
---|
| 1082 | if (blockingType == BLOCKING) { |
---|
| 1083 | timeOutStatus = 1; |
---|
| 1084 | return TIMED_OUT; |
---|
| 1085 | } |
---|
| 1086 | } |
---|
| 1087 | |
---|
| 1088 | |
---|
| 1089 | // If it passes this stage then things are set as blocking. |
---|
| 1090 | blockingType = BLOCKING; |
---|
| 1091 | |
---|
| 1092 | #ifdef WIN32 |
---|
| 1093 | DWORD noOfBytesSent; |
---|
| 1094 | DWORD WSAFlags = 0; |
---|
| 1095 | |
---|
| 1096 | nwrite = ::WSASend(clientSockFd,send_iovec.getBuffer(),send_iovec.size(),&noOfBytesSent,WSAFlags,NULL,NULL); |
---|
| 1097 | if (nwrite == SOCKET_ERROR) { |
---|
| 1098 | //*nbytes = noOfBytesSent; |
---|
| 1099 | errno = WSAGetLastError(); |
---|
| 1100 | return errno; |
---|
| 1101 | } |
---|
| 1102 | else { |
---|
| 1103 | nwrite = noOfBytesSent; |
---|
| 1104 | } |
---|
| 1105 | #else |
---|
| 1106 | errno = 0; |
---|
| 1107 | nwrite = ::writev(clientSockFd, send_iovec.getBuffer(), send_iovec.size()); |
---|
| 1108 | |
---|
| 1109 | if (errno != 0) { |
---|
| 1110 | //*nbytes = nwrite; |
---|
| 1111 | return errno; |
---|
| 1112 | } |
---|
| 1113 | #endif |
---|
| 1114 | |
---|
| 1115 | if (nwrite == 0) |
---|
| 1116 | { |
---|
| 1117 | return CONNECTION_TERMINATED; |
---|
| 1118 | } |
---|
| 1119 | |
---|
| 1120 | if (nwrite > 0) |
---|
| 1121 | { |
---|
| 1122 | pmonitor.incrementDataSent(nwrite); |
---|
| 1123 | } |
---|
| 1124 | |
---|
| 1125 | return OK; |
---|
| 1126 | } |
---|
| 1127 | |
---|