source: trunk/src/testing/QUANTA/src/QUANTAnet_tcp_c.cxx @ 4

Revision 4, 30.3 KB checked in by ajaworski, 13 years ago (diff)

Added modified SAGE sources

Line 
1/******************************************************************************
2 * QUANTA - A toolkit for High Performance Data Sharing
3 * Copyright (C) 2003 Electronic Visualization Laboratory, 
4 * University of Illinois at Chicago
5 *
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 *  * Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 *  * Redistributions in binary form must reproduce the above
14 *    copyright notice, this list of conditions and the following disclaimer
15 *    in the documentation and/or other materials provided with the distribution.
16 *  * Neither the name of the University of Illinois at Chicago nor
17 *    the names of its contributors may be used to endorse or promote
18 *    products derived from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
27 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
28 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
29 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
30 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 * Direct questions, comments etc about Quanta to cavern@evl.uic.edu
33 *****************************************************************************/
34
35#include "QUANTAnet_tcp_c.hxx"
36#include <string.h>
37
38#ifndef WIN32
39#include <netinet/tcp.h>
40#include <signal.h>
41#include <sys/errno.h>
42#include <sys/socket.h>
43#include <sys/time.h>
44#ifndef HAVE_UNISTD_H
45#include <unistd.h>
46#endif
47#ifndef errno
48extern int errno;
49#endif /* errno */
50#endif /* !WIN32 */
51
52#ifdef _WIN32_WCE
53#include <errno.h>
54#endif
55
56const int QUANTAnet_tcpServer_c::OK = 1;
57const int QUANTAnet_tcpServer_c::FAILED = 0;
58const int QUANTAnet_tcpServer_c::READ_BUFFER_SIZE = 2;
59const int QUANTAnet_tcpServer_c::WRITE_BUFFER_SIZE = 3;
60
61#ifdef sun
62long gethostid(void);
63#endif
64
65#ifdef WIN32
66#define MAXHOSTNAME   128   /* Maximum length of a DNS hostname */
67#define DEFAULT_BUFFER_SIZE 32766 /* Default socket bufffer size */
68#else
69#define DEFAULT_BUFFER_SIZE 1024
70#endif /* WIN32 */
71
72#ifdef WIN32
73// This function is excerpted from "Windows Sockets Network Programming"
74// by Bob Quinn and Dave Shute published from Addison-Wesley.
75static long gethostid(void)
76{
77        char szLclHost [MAXHOSTNAME];
78        LPHOSTENT lpstHostent;
79        SOCKADDR_IN stLclAddr;
80        SOCKADDR_IN stRmtAddr;
81        int nAddrSize = sizeof(SOCKADDR);
82        SOCKET hSock;
83        int nRet;
84
85        /* Init local address (to zero) */
86        stLclAddr.sin_addr.s_addr = INADDR_ANY;
87
88        /* Get the local hostname */
89        nRet = gethostname(szLclHost, MAXHOSTNAME);
90        if (nRet != SOCKET_ERROR) {
91                /* Resolve hostname for local address */
92                lpstHostent = gethostbyname((LPSTR)szLclHost);
93                if (lpstHostent)
94                        stLclAddr.sin_addr.s_addr = *((u_long FAR*) (lpstHostent->h_addr));
95        }
96
97        /* If still not resolved, then try second strategy */
98        if (stLclAddr.sin_addr.s_addr == INADDR_ANY) {
99                /* Get a UDP socket */
100                hSock = socket(AF_INET, SOCK_DGRAM, 0);
101                if (hSock != INVALID_SOCKET)  {
102                        /* Connect to arbitrary port and address (NOT loopback) */
103                        stRmtAddr.sin_family = AF_INET;
104                        stRmtAddr.sin_port   = htons(IPPORT_ECHO);
105                        stRmtAddr.sin_addr.s_addr = inet_addr("128.127.50.1");
106                        nRet = connect(hSock, (LPSOCKADDR)&stRmtAddr,
107                                                   sizeof(SOCKADDR));
108                        if (nRet != SOCKET_ERROR) {
109                                /* Get local address */
110                                getsockname(hSock, (LPSOCKADDR)&stLclAddr,
111                                                        (int FAR*)&nAddrSize);
112                        }
113                        closesocket(hSock);   /* we're done with the socket */
114                }
115        }
116        return (stLclAddr.sin_addr.s_addr);
117}
118#endif
119
120QUANTAnet_tcpServer_c::QUANTAnet_tcpServer_c()
121{
122        /***** Open a TCP socket (an Internet stream socket). */
123        /* Notice we want Internet, stream socket and not datagram */
124#ifdef WIN32
125        if ((sockfd = socket(AF_INET,SOCK_STREAM,0)) == INVALID_SOCKET) {
126                printf("QUANTAnet_tcpServer_c::FAILED. Cannot create socket\n");
127        }
128#else
129        if ((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0) {
130                printf("QUANTAnet_tcpServer_c::FAILED. Cannot create socket\n");
131        }
132#endif
133
134        timeOutPeriod = QUANTAnet_tcpClient_c::DEFAULT_TIME_OUT;
135}
136
137int QUANTAnet_tcpServer_c::close()
138{
139  int closeResult;
140#ifdef WIN32
141        if (sockfd >= 0) closeResult = ::closesocket(sockfd);
142#else
143        if (sockfd >= 0) closeResult = ::close(sockfd);
144#endif
145        sockfd = 0;
146  return closeResult;
147}
148
149void QUANTAnet_tcpServer_c::setSockOptions(int option, int buffersize)
150{
151        /*Change the socket buffer size if default is not to be used*/
152        int optlen = sizeof(buffersize);
153
154        if (option == QUANTAnet_tcpServer_c::READ_BUFFER_SIZE)
155                if (setsockopt(sockfd,SOL_SOCKET, SO_RCVBUF, (char *) &buffersize, optlen) < 0)
156                        printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Read.\n");
157
158        if (option == QUANTAnet_tcpServer_c::WRITE_BUFFER_SIZE)
159                if (setsockopt(sockfd,SOL_SOCKET, SO_SNDBUF, (char *) &buffersize, optlen) < 0)
160                        printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Write.\n");
161}
162
163int QUANTAnet_tcpServer_c::init(int port)
164{
165        int optlen;
166
167        /* Set no delay to stop TCP/IP from buffering */
168        int noDelay = 0;// Set to 0 for no buffering. Set to 1 for buffering.
169        optlen = sizeof(noDelay);
170
171        if (setsockopt(sockfd,IPPROTO_TCP,TCP_NODELAY, (char *) &noDelay, optlen) < 0)
172                printf("QUANTAnet_tcpServer_c::init: Cannot set no delay, you may experience sluggish performance.\n");
173
174        /* Use this to try to eliminate the problem of being unable to rebind to a static port */
175        int reuseFlag = 1;
176        optlen = sizeof(reuseFlag);
177        if (setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR, (char *) &reuseFlag, optlen) < 0)
178                printf("QUANTAnet_tcpServer_c::init: Cannot reuse port address.\n");
179        /***** Bind our local address so that the client can send to us. */
180
181        struct linger lingerData;
182        lingerData.l_onoff = 1;
183        lingerData.l_linger = 1000;
184
185        optlen = sizeof(lingerData);
186        if (setsockopt(sockfd,SOL_SOCKET,SO_LINGER, (char *) &lingerData, optlen) < 0)
187                printf("QUANTAnet_tcpServer_c::init: setsockopt: SO_LINGER failed.\n");
188
189        /* Clear out the structure */
190        memset((char *) &serverInfo, 0, sizeof(serverInfo));
191
192        serverInfo.sin_family = AF_INET;
193        serverInfo.sin_addr.s_addr = htonl(INADDR_ANY);
194        serverInfo.sin_port = htons(port);
195
196        if (bind(sockfd, (struct sockaddr *) &serverInfo, sizeof(serverInfo)) < 0) {
197                printf("QUANTAnet_tcpServer_c::init: Can't bind local address.\n");
198                return QUANTAnet_tcpServer_c::FAILED;
199        }
200
201        /* Listen for a connection. Allow queuing for 5 connection requests.
202         */
203        listen(sockfd,5);
204
205        /* Non blocking from now on. This is necessary in order for checkForNewConnections() to work.But this seems to be the main thing that is screwing up
206           with threads.
207         */
208        //      fcntl(sockfd,F_SETFL,FNDELAY);
209
210        return QUANTAnet_tcpServer_c::OK;
211}
212
213const int QUANTAnet_tcpClient_c::BLOCKING = 1;
214const int QUANTAnet_tcpClient_c::NON_BLOCKING= 2;
215const int QUANTAnet_tcpClient_c::NO_TIME_OUT = -1;
216const int QUANTAnet_tcpClient_c::DEFAULT_TIME_OUT = NO_TIME_OUT;
217const int QUANTAnet_tcpClient_c::SOCKET_NOT_OPEN = -1;
218const int QUANTAnet_tcpClient_c::CONNECTION_TERMINATED = -2;
219const int QUANTAnet_tcpClient_c::NON_BLOCKING_HAS_NO_DATA =-3;
220const int QUANTAnet_tcpClient_c::TIMED_OUT = -4;
221const int QUANTAnet_tcpClient_c::OK = 1;
222const int QUANTAnet_tcpClient_c::NON_BLOCKING_NOT_READY_TO_WRITE = -6;
223const int QUANTAnet_tcpClient_c::NOT_READY = -7;
224const int QUANTAnet_tcpClient_c::READY_TO_READ = -8;
225const int QUANTAnet_tcpClient_c::READY_TO_WRITE = -9;
226const int QUANTAnet_tcpClient_c::READY_TO_READ_AND_WRITE = -10;
227const int QUANTAnet_tcpClient_c::READ_BUFFER_SIZE = -11; 
228const int QUANTAnet_tcpClient_c::WRITE_BUFFER_SIZE = -12;
229
230QUANTAnet_tcpClient_c*
231QUANTAnet_tcpServer_c::checkForNewConnections(const int& blockingTime)
232{
233    struct sockaddr_in clientInfo;
234    #ifdef HAVE_SOCKLEN_T
235        socklen_t clilen;
236    #else
237        int clilen;
238    #endif
239    // Perform a non-blocking check to see if socket is ready for
240    // an incoming connection.
241    fd_set read_set;
242    struct timeval timeout;
243    timeout.tv_sec = blockingTime;
244    timeout.tv_usec = 0;
245    int ready_fd;
246
247    FD_ZERO(&read_set);
248    FD_SET(sockfd, &read_set);
249    ready_fd = select( sockfd+1, &read_set, NULL, NULL, &timeout);
250
251    // If it is not ready then return.
252    if ((ready_fd <= 0)  ||  (!FD_ISSET( sockfd, &read_set))) return NULL;
253
254    clilen = sizeof(clientInfo);
255    int newsockfd = accept(sockfd,
256                   (struct sockaddr *) &clientInfo,
257                   (socklen_t*)&clilen);
258    if (newsockfd > 0) {
259        struct linger lingerData;
260        lingerData.l_onoff = 1;
261        lingerData.l_linger = 1000;
262
263        int optlen = sizeof(lingerData);
264
265        if (setsockopt(newsockfd,SOL_SOCKET,SO_LINGER, (char *) &lingerData, optlen) < 0)
266            printf("QUANTAnet_tcpServer_c::checkForNewConnections: setsockopt: SO_LINGER failed.\n");
267
268        QUANTAnet_tcpClient_c *clientObj;
269
270        clientObj = new QUANTAnet_tcpClient_c;
271        clientObj->timeOutPeriod = timeOutPeriod;
272        clientObj->clientInfo = clientInfo;
273        clientObj->clientSockFd = newsockfd;
274        //      clientObj->makeNonBlocking();
275        return clientObj;
276    } else return NULL;
277}
278
279
280QUANTAnet_tcpClient_c*
281QUANTAnet_tcpServer_c::waitForNewConnection()
282{
283  struct sockaddr_in clientInfo;
284#ifdef HAVE_SOCKLEN_T
285  socklen_t clilen;
286#else
287  int clilen;
288#endif
289
290  // Perform a blocking check to see if socket is ready for an
291  // incoming connection.
292  fd_set read_set;
293  int ready_fd;
294
295  FD_ZERO(&read_set);
296  FD_SET(sockfd, &read_set);
297  ready_fd = select( sockfd+1, &read_set, NULL, NULL, NULL);
298
299  // If it is not ready then return.
300  if ((ready_fd <= 0)  ||  (!FD_ISSET( sockfd, &read_set))) return NULL;
301
302  clilen = sizeof(clientInfo);
303  int newsockfd = accept(sockfd,
304                         (struct sockaddr *) &clientInfo,
305                         (socklen_t*)&clilen);
306  if (newsockfd > 0) {
307    struct linger lingerData;
308    lingerData.l_onoff = 1;
309    lingerData.l_linger = 1000;
310
311    int optlen = sizeof(lingerData);
312
313    if (setsockopt(newsockfd, SOL_SOCKET, SO_LINGER,
314                   reinterpret_cast<char*>(&lingerData), optlen) < 0) {
315      printf("QUANTAnet_tcpServer_c::waitForNewConnection: setsockopt: SO_LINGER failed.\n");
316    }
317
318    QUANTAnet_tcpClient_c *clientObj = new QUANTAnet_tcpClient_c;
319    clientObj->timeOutPeriod = timeOutPeriod;
320    clientObj->clientInfo = clientInfo;
321    clientObj->clientSockFd = newsockfd;
322    return clientObj;
323  } else {
324    return NULL;
325  }
326}
327
328/*
329 * For the blockingtype parameter:
330 * NEVER_TIMEOUT_BLOCKING means the timeout period is bypassed so that it will
331 * continue to read forever unless there is a break in the connection.
332 *
333 * NON_BLOCKING means that if there is no data to be read this call will
334 * return immediately.
335 *
336 * BLOCKING means that it will block waiting for data or until the timeout
337 * period expires.
338 * Change the timeout period (in seconds) by changing the timeoutPeriod
339 * member variable.
340 *
341 */
342
343void QUANTAnet_tcpClient_c::setSockOptions(int option, int buffersize)
344{
345        /*Change the socket buffer size if default is not to be used*/ 
346        int optlen = sizeof(buffersize);
347
348        if (option == QUANTAnet_tcpClient_c::READ_BUFFER_SIZE)
349                if (setsockopt(clientSockFd,SOL_SOCKET, SO_RCVBUF, (char *) &buffersize, optlen) < 0)
350                        printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Read.\n");
351
352        if (option == QUANTAnet_tcpClient_c::WRITE_BUFFER_SIZE)
353                if (setsockopt(clientSockFd,SOL_SOCKET, SO_SNDBUF, (char *) &buffersize, optlen) < 0)
354                        printf("QUANTAnet_tcpClient_c::setSockOptions: Cannot create a large enough socket buffer for Write.\n");
355
356}
357
358int QUANTAnet_tcpClient_c::isReadyToRead()
359{
360        struct timeval timeoutDelay;
361        fd_set readfds;
362
363        timeoutDelay.tv_sec = 0;
364        timeoutDelay.tv_usec = 0;
365
366        FD_ZERO(&readfds);
367        FD_SET(clientSockFd, &readfds);
368        select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay);
369       
370        if (!(FD_ISSET(clientSockFd, &readfds)))
371                return NOT_READY;
372        else
373                return READY_TO_READ;
374}
375
376int QUANTAnet_tcpClient_c::isReadyToWrite()
377{
378        struct timeval timeoutDelay;
379        fd_set writefds;
380
381        timeoutDelay.tv_sec = 0;
382        timeoutDelay.tv_usec = 0;
383
384        FD_ZERO(&writefds);
385        FD_SET(clientSockFd, &writefds);
386        select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay);
387       
388        if (!(FD_ISSET(clientSockFd, &writefds)))
389                return NOT_READY;
390        else
391                return READY_TO_WRITE;
392}
393
394int QUANTAnet_tcpClient_c::isReady()
395{
396        struct timeval timeoutDelay;
397        fd_set writefds, readfds;
398
399        timeoutDelay.tv_sec = 0;
400        timeoutDelay.tv_usec = 0;
401
402        FD_ZERO(&writefds);
403        FD_ZERO(&readfds);
404        FD_SET(clientSockFd, &writefds);
405        FD_SET(clientSockFd, &readfds);
406        select(clientSockFd+1,&readfds,&writefds,NULL, &timeoutDelay);
407       
408        if (FD_ISSET(clientSockFd, &writefds)
409            &&
410            FD_ISSET(clientSockFd, &readfds)) return READY_TO_READ_AND_WRITE;
411        if (FD_ISSET(clientSockFd, &writefds)) return READY_TO_WRITE;
412        if (FD_ISSET(clientSockFd, &readfds)) return READY_TO_READ;
413
414        return NOT_READY;
415}
416
417int QUANTAnet_tcpClient_c::read(char *ptr, int *nbytes, int blockingType)
418{
419
420    //used for bandwidth calculations
421 //   int totalDataRead =0;   
422
423        struct timeval timeoutDelay;
424        fd_set readfds;
425
426        if (clientSockFd < 0) {
427                printf("QUANTAnet_tcpClient_c::Read: This connection is not open.\n");
428                return SOCKET_NOT_OPEN;
429        }
430
431        int nleft, nread;
432        nleft = *nbytes;
433        timeOutStatus = 0;
434
435        while (nleft > 0) {
436
437        /*The totalDataRead is incremented  each time something is read - in case of an error
438                this is passed on to the performance monitoring instance using the incrementDataRead() function
439                for bandwidth calculations - to calculate the bandwidth for the data read till the break in loop*/
440
441        FD_ZERO(&readfds);
442                FD_SET(clientSockFd, &readfds);
443
444                if (blockingType == NON_BLOCKING) {
445                        timeoutDelay.tv_sec = 0;
446                        timeoutDelay.tv_usec = 0;
447                }
448                if (blockingType == BLOCKING) {
449                        timeoutDelay.tv_sec = timeOutPeriod;
450                        timeoutDelay.tv_usec = 0;
451                }
452
453                errno = 0;
454
455                // Wait for some file descriptor to be ready
456                // If no timeout then wait indefinitely until data arrives
457#ifdef WIN32
458                int error;
459
460                if ((blockingType == BLOCKING) &&
461                        (timeOutPeriod == NO_TIME_OUT))
462                                error = select(clientSockFd+1,&readfds,NULL,NULL, NULL);
463                else
464                        error = select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay);
465                if (error == SOCKET_ERROR) {
466                        errno = WSAGetLastError();
467                        return errno;
468                }
469#else
470                // This one may not work as it should be..
471                // Unix socket's man page doesn't say select will alter the
472                // value of errno...
473                if ((blockingType == BLOCKING) &&
474                        (timeOutPeriod == NO_TIME_OUT))
475                                select(clientSockFd+1,&readfds,NULL,NULL, NULL);
476                else
477                        select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay);
478                if (errno) {
479                        return errno;
480                }
481#endif
482
483
484                // If fd is not set
485                if (!(FD_ISSET(clientSockFd, &readfds))) {
486                        // If non blocking then it simply means there is no data.
487                        if (blockingType == NON_BLOCKING)
488                                return NON_BLOCKING_HAS_NO_DATA;
489
490                        // If blocking then timed out waiting for data.
491                        if (blockingType == BLOCKING) {
492                                timeOutStatus = 1;
493                                return TIMED_OUT;
494                        }
495
496                }
497
498                // If it passes this stage then things are set as blocking.
499                blockingType = BLOCKING;
500
501#ifdef WIN32
502                nread = ::recv(clientSockFd, ptr, nleft, 0);
503
504                if (nread == SOCKET_ERROR) {
505                        *nbytes = nread;
506                        errno = WSAGetLastError();
507                        if (errno == WSAECONNRESET)
508                                return CONNECTION_TERMINATED;
509                        return errno;
510                }
511#else
512                errno = 0;
513                nread = ::read(clientSockFd,ptr,nleft);
514
515                if (errno) {
516                        *nbytes = nread;
517                        return errno;
518                }
519#endif
520
521                if (nread == 0) {
522                        *nbytes = (*nbytes - nleft);
523                        return CONNECTION_TERMINATED;
524                }
525
526                if (nread > 0) {
527                        nleft -= nread;
528                        ptr += nread;
529            pmonitor.incrementDataRead(nread);
530                }
531
532        }
533        *nbytes = *nbytes - nleft;
534    return OK;
535}
536
537int QUANTAnet_tcpClient_c::write(const char *ptr, int  *nbytes, int blockingType)
538{
539    //used for bandwidth calculations
540//    int totalDataSent =0;
541
542        // Place here to prevent crashes when write occurs to a socket
543        // closed by an external source.
544#ifndef WIN32
545        signal(SIGPIPE,SIG_IGN);
546#endif
547
548        struct timeval timeoutDelay;
549        fd_set writefds;
550
551        if (clientSockFd < 0) {
552                printf("QUANTAnet_tcpClient_c::Write: This connection is not open.\n");
553                return SOCKET_NOT_OPEN;
554        }
555
556        int nleft, nwrite;
557        nleft = *nbytes;
558        timeOutStatus = 0;
559
560        while (nleft > 0) {
561               
562  /*The totalDataSent is incremented  each time something is written - in case of an error this is passed on to the performance monitoring instance using the incrementDataSent() function for bandwidth calculations - to calculate the bandwidth for the data written till the break in the loop*/
563
564        FD_ZERO(&writefds);
565                FD_SET(clientSockFd, &writefds);
566
567                if (blockingType == NON_BLOCKING) {
568                        timeoutDelay.tv_sec = 0;
569                        timeoutDelay.tv_usec = 0;
570                }
571                if (blockingType == BLOCKING) {
572                        timeoutDelay.tv_sec = timeOutPeriod;
573                        timeoutDelay.tv_usec = 0;
574                }
575
576                errno = 0;
577
578                // Wait for some file descriptor to be ready
579#ifdef WIN32
580                int error;
581
582                if ((blockingType == BLOCKING) &&
583                        (timeOutPeriod == NO_TIME_OUT))
584                        error = select(clientSockFd+1,NULL,&writefds,NULL, NULL);
585                else
586                        error = select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay);
587                if (error == SOCKET_ERROR) {
588                        errno = WSAGetLastError();
589                        return errno;
590                }
591#else
592                // This one may not work as it should be..
593                // Unix socket's man page doesn't say select will alter the
594                // value of errno...
595                if ((blockingType == BLOCKING) &&
596                        (timeOutPeriod == NO_TIME_OUT))
597                        select(clientSockFd+1,NULL,&writefds,NULL, NULL);
598                else
599                        select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay);
600                if (errno) {
601                        return errno;
602                }
603#endif
604
605                // If fd is not set
606                if (!(FD_ISSET(clientSockFd, &writefds))) {
607
608                        // If non blocking then it simply means there is no data.
609                        if (blockingType == NON_BLOCKING)
610                                return NON_BLOCKING_NOT_READY_TO_WRITE;
611
612                        // If blocking then timed out waiting for data.
613                        if (blockingType == BLOCKING) {
614                                timeOutStatus = 1;
615                                return TIMED_OUT;
616                        }
617                }
618
619
620                // If it passes this stage then things are set as blocking.
621                blockingType = BLOCKING;
622
623#ifdef WIN32
624                nwrite = ::send(clientSockFd, ptr, nleft, 0);
625        if (nwrite == SOCKET_ERROR) {
626                    *nbytes = nwrite;
627            errno = WSAGetLastError();
628                    return errno;
629                }
630#else
631                errno = 0;
632                nwrite = ::write(clientSockFd,ptr,nleft);
633
634                if (errno) {
635                        *nbytes = nwrite;
636                        return errno;
637                }
638#endif
639
640                if (nwrite == 0) {
641                        *nbytes = (*nbytes - nleft);
642                        return CONNECTION_TERMINATED;
643                }
644
645                if (nwrite > 0) {
646                        nleft -= nwrite;
647                        ptr += nwrite;
648            pmonitor.incrementDataSent(nwrite);
649                }
650
651        }
652        *nbytes = *nbytes - nleft;
653        return OK;
654
655}
656
657int  QUANTAnet_tcpClient_c::connectToServer(const char* ip, int port)
658{
659
660        char serverAddr[128];
661
662        int status = hostnameToIP(ip, serverAddr);
663       
664
665        if (status == 0) {
666                printf("QUANTAnet_tcpClient_c::Connect: Host IP address %s is invalid.\n", ip);
667                return -1;
668        }
669
670        /* Fill in the structure "clientInfo" with the address of the
671         * server that we want to connect with.
672         */
673
674
675
676        memset((char *) &clientInfo,0, sizeof(clientInfo));
677
678        clientInfo.sin_family = AF_INET;
679        clientInfo.sin_addr.s_addr = inet_addr(serverAddr);
680        clientInfo.sin_port = htons(port);
681
682        int optlen;
683
684        /* Set no delay to stop TCP/IP from buffering */
685        int noDelay = 1;
686        optlen = sizeof(noDelay);
687
688        if (setsockopt(clientSockFd,IPPROTO_TCP,TCP_NODELAY, (char *) &noDelay, optlen) < 0)
689                printf("QUANTAnet_tcpClient_c::Open: Cannot set no delay, you may experience sluggish performance.\n");
690
691        /* Use this to try to eliminate the problem of being unable to rebind to a static port */
692        int reuseFlag = 1;
693        optlen = sizeof(reuseFlag);
694        if (setsockopt(clientSockFd,SOL_SOCKET,SO_REUSEADDR, (char *) &reuseFlag, optlen) < 0)
695                printf("QUANTAnet_tcpClient_c::Open: Cannot reuse port address.\n");
696        /***** Bind our local address so that the client can send to us. */
697
698        struct linger lingerData;
699        lingerData.l_onoff = 1;
700        lingerData.l_linger = 1000;
701
702        optlen = sizeof(lingerData);
703        if (setsockopt(clientSockFd,SOL_SOCKET,SO_LINGER, (char *) &lingerData, optlen) < 0)
704                printf("QUANTAnet_tcpClient_c::Open: setsockopt: SO_LINGER failed.\n");
705
706        /* Connect to server. */
707    int connectResult;
708        if ((connectResult = connect(clientSockFd, (struct sockaddr *) &clientInfo, sizeof(clientInfo))) < 0) {
709                printf("QUANTAnet_tcpClient_c::Connect: Can't connect to server.\n");
710                return connectResult;
711        }
712        return clientSockFd;
713}
714
715void QUANTAnet_tcpClient_c::makeNonBlocking()
716{
717#ifdef __sgi
718        fcntl(clientSockFd, F_SETFL, FNDELAY);
719#endif
720#ifdef linux
721        fcntl(clientSockFd, F_SETFL, O_NONBLOCK);
722#endif
723#ifdef sun
724        fcntl(clientSockFd, F_SETFL, O_NDELAY);
725#endif
726#ifdef WIN32
727    u_long nonBlockingMode = 1L;
728
729    ioctlsocket(clientSockFd, FIONBIO, &nonBlockingMode);
730#endif
731}
732
733int QUANTAnet_tcpClient_c::close()
734{
735  int closeResult;
736#ifdef WIN32
737        if (clientSockFd > 0) closeResult = ::closesocket(clientSockFd);
738#else
739        if (clientSockFd > 0) closeResult = ::close(clientSockFd);
740#endif
741        clientSockFd = 0;
742  return closeResult;
743}
744
745QUANTAnet_tcpClient_c::QUANTAnet_tcpClient_c() {
746        timeOutPeriod = QUANTAnet_tcpClient_c::DEFAULT_TIME_OUT;
747        clientSockFd = 0;
748        #ifdef WIN32_WCE
749                clientSockFd = -1;
750        #endif
751
752        /* Open a TCP socket (an Internet stream socket). */
753#ifdef WIN32
754        if ((clientSockFd = socket(AF_INET,SOCK_STREAM,0)) == INVALID_SOCKET) {
755                printf("QUANTAnet_tcpClient_c: Can't open stream socket.\n");
756        }
757#else
758        if ((clientSockFd = socket(AF_INET,SOCK_STREAM,0)) < 0) {
759                printf("QUANTAnet_tcpClient_c: Can't open stream socket.\n");
760        }
761#endif
762
763}
764
765void QUANTAnet_tcpClient_c::showStatus(int status, int nbytes)
766{
767        switch (status) {
768        case SOCKET_NOT_OPEN:
769                printf("QUANTAnet_tcpClient_c::showStatus: Socket not open.\n");
770                break;
771        case CONNECTION_TERMINATED:
772                printf("QUANTAnet_tcpClient_c::showStatus: Connection terminated with %d bytes transacted.\n", nbytes);
773                break;
774        case NON_BLOCKING_HAS_NO_DATA:
775                printf("QUANTAnet_tcpClient_c::showStatus: Non-blocking read found no data on stream.\n");
776                break;
777        case NON_BLOCKING_NOT_READY_TO_WRITE:
778                printf("QUANTAnet_tcpClient_c::showStatus: Non-blocking connection is not ready to write.\n");
779                break;
780        case TIMED_OUT:
781                printf("QUANTAnet_tcpClient_c::showStatus: Transaction timed out after %d seconds.\n", timeOutPeriod);
782                break;
783                //      case NON_BLOCKING_WROTE_NO_DATA:
784                //              printf("Non-blocking write wrote no data.\n");
785                //              break;
786        case OK:
787                printf("QUANTAnet_tcpClient_c::showStatus: Transaction ok.\n");
788                break;
789        default:
790                printf("QUANTAnet_tcpClient_c::showStatus: UNIX Socket error no: %d\n ",errno);
791                //perror("");
792                break;
793        }
794        fflush(stdout);
795}
796
797
798unsigned int QUANTAnet_tcpClient_c :: getRemoteIP() {
799        return clientInfo.sin_addr.s_addr;
800}
801
802void QUANTAnet_tcpClient_c :: getRemoteIP(char* name)
803{
804        ipNumToString(getRemoteIP(),name);
805
806}
807
808int QUANTAnet_tcpClient_c :: getSelfPort() {
809        struct sockaddr_in name;
810    #ifdef HAVE_SOCKLEN_T
811        socklen_t size;
812    #else
813        int size;
814    #endif
815        size = sizeof(name);
816        getsockname (clientSockFd, (sockaddr *)&name, (socklen_t*)&size);
817        return ntohs(name.sin_port);
818}
819
820/* Get the port # of the client connection */
821int QUANTAnet_tcpClient_c :: getRemotePort() {
822        return ntohs(clientInfo.sin_port);
823}
824
825//Functions added for performance monitoring interface
826
827void QUANTAnet_tcpClient_c ::showStats(char* streamInfo, char* comment)
828{
829    char *selfIp = new char[256];
830    char *remoteIp = new char[256];   
831    QUANTAnet_socketbase_c::getSelfIP(selfIp);
832    getRemoteIP(remoteIp);
833    pmonitor.setIPs(selfIp, remoteIp);
834    pmonitor.setPorts(getSelfPort(), getRemotePort());
835    pmonitor.showStats(streamInfo, comment);
836    delete[] selfIp;
837    delete[] remoteIp;
838}
839
840int QUANTAnet_tcpClient_c ::logStats(char* streamInfo, char* comment, FILE* filePtr)
841{
842    char *selfIp = new char[256];
843    char *remoteIp = new char[256];   
844    QUANTAnet_socketbase_c::getSelfIP(selfIp);
845    getRemoteIP(remoteIp);
846    pmonitor.setIPs(selfIp, remoteIp);
847    pmonitor.setPorts(getSelfPort(), getRemotePort());
848    int result = pmonitor.logStats(streamInfo, comment, filePtr);
849    delete[] selfIp;
850    delete[] remoteIp;
851    return result;
852}
853
854
855int QUANTAnet_tcpClient_c::sendStats(char* streamInfo, char* comment)
856{
857    char *selfIp = new char[256];
858    char *remoteIp = new char[256];   
859    QUANTAnet_socketbase_c::getSelfIP(selfIp);
860    getRemoteIP(remoteIp);
861    pmonitor.setIPs(selfIp, remoteIp);
862    pmonitor.setPorts(getSelfPort(), getRemotePort());
863    int result = pmonitor.sendStats(streamInfo, comment);
864    delete[] selfIp;
865    delete[] remoteIp;
866    return result;
867}
868
869int QUANTAnet_tcpClient_c::initSendStats(const char* monitorClientIP, int port)
870{
871    return pmonitor.initSendStats(monitorClientIP, port);
872}
873
874void QUANTAnet_tcpClient_c::exitSendStats()
875{
876    pmonitor.exitSendStats();
877}
878
879void QUANTAnet_tcpClient_c::setInstantaneousLatency(double latency)
880{
881    pmonitor.setInstantLatency(latency);   
882}
883
884int QUANTAnet_tcpClient_c::getSocketId()
885{
886    return clientSockFd;
887}
888
889
890//Readv() method to read iovec
891
892int QUANTAnet_tcpClient_c::readv(QUANTAnet_iovec_c recv_iovec, int blockingType)
893{
894        struct timeval timeoutDelay;
895        fd_set readfds;
896
897        if (clientSockFd < 0) {
898                printf("QUANTAnet_tcpClient_c::Read: This connection is not open.\n");
899                return SOCKET_NOT_OPEN;
900        }
901
902        int nread;     
903        timeOutStatus = 0;
904
905
906    FD_ZERO(&readfds);
907        FD_SET(clientSockFd, &readfds);
908
909        if (blockingType == NON_BLOCKING) {
910                timeoutDelay.tv_sec = 0;
911                timeoutDelay.tv_usec = 0;
912        }
913        if (blockingType == BLOCKING) {
914                timeoutDelay.tv_sec = timeOutPeriod;
915                timeoutDelay.tv_usec = 0;
916        }
917
918        errno = 0;
919
920                // Wait for some file descriptor to be ready
921                // If no timeout then wait indefinitely until data arrives
922#ifdef WIN32
923        int error;
924
925        if ((blockingType == BLOCKING) &&
926                (timeOutPeriod == NO_TIME_OUT))
927                        error = select(clientSockFd+1,&readfds,NULL,NULL, NULL);
928        else
929                error = select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay);
930        if (error == SOCKET_ERROR) {
931                errno = WSAGetLastError();
932                return errno;
933        }
934#else
935        // This one may not work as it should be..
936        // Unix socket's man page doesn't say select will alter the
937        // value of errno...
938        if ((blockingType == BLOCKING) &&
939                (timeOutPeriod == NO_TIME_OUT))
940                        select(clientSockFd+1,&readfds,NULL,NULL, NULL);
941        else
942                select(clientSockFd+1,&readfds,NULL,NULL, &timeoutDelay);
943        if (errno) {
944                return errno;
945        }
946#endif
947
948
949        // If fd is not set
950        if (!(FD_ISSET(clientSockFd, &readfds))) {
951                // If non blocking then it simply means there is no data.
952                if (blockingType == NON_BLOCKING)
953                        return NON_BLOCKING_HAS_NO_DATA;
954
955                // If blocking then timed out waiting for data.
956                if (blockingType == BLOCKING) {
957                        timeOutStatus = 1;
958                        return TIMED_OUT;
959                }
960
961        }
962
963        // If it passes this stage then things are set as blocking.
964        blockingType = BLOCKING;
965
966#ifdef WIN32
967        DWORD noOfBytesRecvd;
968        DWORD WSAFlags = 0;
969        nread = ::WSARecv(clientSockFd, recv_iovec.getBuffer(),recv_iovec.size(),&noOfBytesRecvd,&WSAFlags,NULL,NULL);
970
971        if (nread == SOCKET_ERROR) {
972                //*nbytes = nread;
973                errno = WSAGetLastError();
974                if (errno == WSAECONNRESET)
975                        return CONNECTION_TERMINATED;
976                return errno;
977        }
978        else
979        {
980                nread = noOfBytesRecvd;
981        }
982#else
983        errno = 0;     
984
985        nread = ::readv(clientSockFd, recv_iovec.getBuffer(), recv_iovec.size());
986                       
987        if(errno != 0)         
988        {
989                perror("QUANTAnet_tcp_c :: readv - could not read");                   
990                return errno;
991        }
992
993#endif
994
995        if (nread == 0)
996        {
997                //*nbytes = (*nbytes - nleft);
998                return CONNECTION_TERMINATED;
999        }       
1000
1001        if (nread > 0)
1002        {
1003      pmonitor.incrementDataRead(nread);
1004        }
1005
1006        return OK;
1007}
1008
1009//writev() method to write iovec
1010
1011int QUANTAnet_tcpClient_c::writev(QUANTAnet_iovec_c send_iovec, int blockingType)
1012{
1013
1014#ifndef WIN32
1015        signal(SIGPIPE,SIG_IGN);
1016#endif
1017
1018        struct timeval timeoutDelay;
1019        fd_set writefds;
1020
1021        if (clientSockFd < 0) {
1022                printf("QUANTAnet_tcpClient_c::Writev: This connection is not open.\n");
1023                return SOCKET_NOT_OPEN;
1024        }
1025       
1026        int     nwrite;
1027        timeOutStatus = 0;     
1028               
1029  /*The totalDataSent is incremented  each time something is written - in case of an error this is passed on to the
1030        performance monitoring instance using the incrementDataSent() function for bandwidth calculations -
1031        to calculate the bandwidth for the data written till the break in the loop*/
1032
1033    FD_ZERO(&writefds);
1034        FD_SET(clientSockFd, &writefds);
1035
1036        if (blockingType == NON_BLOCKING) {
1037                timeoutDelay.tv_sec = 0;
1038                timeoutDelay.tv_usec = 0;
1039        }
1040        if (blockingType == BLOCKING) {
1041                timeoutDelay.tv_sec = timeOutPeriod;
1042                timeoutDelay.tv_usec = 0;
1043        }
1044
1045        errno = 0;
1046
1047                // Wait for some file descriptor to be ready
1048#ifdef WIN32
1049        int error;
1050
1051        if ((blockingType == BLOCKING) &&
1052                (timeOutPeriod == NO_TIME_OUT))
1053                error = select(clientSockFd+1,NULL,&writefds,NULL, NULL);
1054        else
1055                error = select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay);
1056        if (error == SOCKET_ERROR) {
1057                errno = WSAGetLastError();
1058                return errno;
1059        }
1060#else
1061        // This one may not work as it should be..
1062        // Unix socket's man page doesn't say select will alter the
1063        // value of errno...
1064        if ((blockingType == BLOCKING) &&
1065                (timeOutPeriod == NO_TIME_OUT))
1066                select(clientSockFd+1,NULL,&writefds,NULL, NULL);
1067        else
1068                select(clientSockFd+1,NULL,&writefds,NULL, &timeoutDelay);
1069        if (errno) {
1070                return errno;
1071        }
1072#endif
1073
1074        // If fd is not set
1075        if (!(FD_ISSET(clientSockFd, &writefds))) {
1076
1077                // If non blocking then it simply means there is no data.
1078                if (blockingType == NON_BLOCKING)
1079                        return NON_BLOCKING_NOT_READY_TO_WRITE;
1080
1081                // If blocking then timed out waiting for data.
1082                if (blockingType == BLOCKING) {
1083                        timeOutStatus = 1;
1084                        return TIMED_OUT;
1085                }
1086        }
1087
1088
1089        // If it passes this stage then things are set as blocking.
1090        blockingType = BLOCKING;
1091
1092#ifdef WIN32
1093        DWORD noOfBytesSent;
1094        DWORD WSAFlags = 0;
1095
1096        nwrite = ::WSASend(clientSockFd,send_iovec.getBuffer(),send_iovec.size(),&noOfBytesSent,WSAFlags,NULL,NULL);
1097    if (nwrite == SOCKET_ERROR) {
1098                //*nbytes = noOfBytesSent;
1099        errno = WSAGetLastError();
1100                return errno;
1101        }
1102        else {
1103                nwrite = noOfBytesSent;
1104        }
1105#else
1106        errno = 0;
1107        nwrite = ::writev(clientSockFd, send_iovec.getBuffer(), send_iovec.size());
1108
1109        if (errno != 0) {
1110                //*nbytes = nwrite;
1111                return errno;
1112        }
1113#endif
1114
1115        if (nwrite == 0)
1116        {               
1117                return CONNECTION_TERMINATED;
1118        }               
1119
1120        if (nwrite > 0)
1121        {
1122      pmonitor.incrementDataSent(nwrite);
1123        }
1124
1125        return OK;
1126}
1127
Note: See TracBrowser for help on using the repository browser.