src/vfer_control.c

Go to the documentation of this file.
00001 /*
00002  * Copyright 2005, 2006, Internet2
00003  * Legal conditions are in file LICENSE
00004  * (MD5 = c434f2e53b8089d8b4d0172c7ce07360).
00005  */
00006 /**
00007  *
00008  * @file   vfer_control.c
00009  * @author Ivan Beschastnikh
00010  * @brief  Implements flow controls, fragmentation, sending, receiving, and acking logic.
00011  *
00012  * This file implements the brains of VFER
00013  *
00014  * -     05/13/06        ivan            Integrated most of logic from Control_Accept into ControlT, added new state: CONN_ACCEPTING
00015  * -     05/11/06        ivan            Removed the server and connect threads and made ControlT thread handle listening and connecting sockets
00016  * -     05/04-06/06     ivan            Removed reader and writer threads and made ControlT killable. Many other changes, notably to CC.
00017  * -     02/19/06        ivan            Fixed shift by 32 bug in SendAllData
00018  * -     02/18/06        ivan            Factored out CC managing functionality into ccontrol.c
00019  * -     02/18/06        ivan            Factored out frame managing functionality to datagram.c and datagram.h, corrected relfun for accepted sockets bug
00020  * -     02/05/06        ivan            Corrected some minor CC bugs to get test_rcp and test_rcpd working again
00021  * -     02/04/06        ivan            Updated for new return codes of Writer and Reader (in writer.c and reader.c)
00022  * -     12/30/05        ivan            Changed to use byte accounting instead of packet accounting for cc window
00023  * -     12/29/05        ivan            Exponential ramp in window size & Flow control
00024  * -     12/29/05        ivan            Control_Close_Conn now acts correctly in all cases- supports nonblocking sockets and timeouts. Added ccack receipt action to purge old frames.
00025  *                                       added Control_ConnectT to act as thread \ nonthread func for nonblocking \ blocking vfer_connect calls from api.c ;
00026  *                                       fixed state transitions for disconnected and closed states
00027  * -     12/28/05        ivan            static function defs which modularize the scheduling ; data_timeout and more
00028  * -     10/30/05        ivan            renamed to control.c
00029  * -     10/15/05        ivan            added close_req and reset packet sending
00030  * -     09/01/05        ivan            modified to use new packet structures
00031  * -     08/05/05        ivan            changed C_Control_New_Conn to connect to reader and writer
00032  * -     07/28/05        ivan            Send,Recv added: fragmentation + hooks to reader,writer
00033  * -     07/27/05        ivan            ControlT and many list internal functions added
00034  * -     07/21/05        ivan            created
00035  */
00036 
00037 #include "vfer_control.h"
00038 #include "vfer_ccontrol.h"
00039 
00040 static pthread_t ControlT_tid;
00041 
00042 /* private function prototypes */
00043 inline  static int              Control_Write           (packet* p, int fd);
00044         static int              Control_RecvAll         (vfer_sock* sock);
00045 inline  static int              Control_SendCtl         (vfer_sock* sock, packet* p);
00046 inline  static int              Control_Send_Ack        (vfer_sock* s);
00047         static int              Control_SendAllData     (vfer_sock* sock);
00048         static void             ControlT                (void* ptr);
00049         static void             Control_Remove_Socket   (vfer_sock* sock);
00050         static int              Unconnect_Rebind        (vfer_sock* sock);
00051 
00052 
00053 /******************
00054  * Public Functions
00055  ******************/
00056 
00057 /**
00058  * Returns the top connection on te accept queue for a socket.
00059  *
00060  * This function changes the state of ret_sock to CONN_ACCEPTING and
00061  * blocks in a cond_wait until ControlT threads goes through the
00062  * handshake, changes the state of ret_sock to CONN_CONNECTED (or
00063  * something else on error) and signals to this thread to wake up.
00064  *
00065  * @param sock a pointer to a socket structure
00066  * @param ret_sock a pointer to a socket structure that is 'connected' as a result of this call
00067  * @return
00068  *      - 0 on success
00069  *      - VFER_BADSOCK bad vfer_sock
00070  *      - VFER_WOULDBLOCK socket is non-blocking and there are no waiting connections
00071  *      - VFER_IMPL underlying socket error
00072  */
00073 int Control_Accept(vfer_sock* sock, vfer_sock* ret_sock)
00074 {
00075         int new_port;                   /* contains the port we will use for the rest of connection (not handshake) */
00076         struct sockaddr_in sa;
00077         char buf[INET_ADDRSTRLEN];      /* holds remote addr as string */
00078         socklen_t socklen;
00079         struct timespec timedwait_timeout; /* used as argument to pthread_cond_timedwait */
00080         struct timeval start_time;
00081         int ret;
00082 
00083         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "Entered with listening sock->fd[%d]", sock->fd);
00084 
00085         ret_sock->fd = socket(AF_INET, sock->type, 0);
00086         if (ret_sock->fd == -1) {
00087                 perror("server_accept :: socket");
00088                 return VFER_IMPL;
00089         }
00090 
00091         if (fcntl(ret_sock->fd, F_SETFD, O_NONBLOCK) == -1) {
00092                 perror("server_accept :: fcntl");
00093                 return VFER_IMPL;
00094         }
00095 
00096         /* we bind the local port, wait for a connection, and
00097            explicitly connect to the remote address */
00098 
00099         bzero((char*)(&sa), sizeof(sa));
00100         sa.sin_port             = htons(INADDR_ANY);
00101         sa.sin_addr.s_addr      = htonl(INADDR_ANY);
00102         sa.sin_family           = AF_INET;
00103 
00104         if (bind(ret_sock->fd, (SA*)(&sa), sizeof(sa)) < 0) {
00105                 perror("server_accept: bind for the new socket failed");
00106                 close(ret_sock->fd);
00107                 return VFER_IMPL;
00108         }
00109 
00110         socklen = sizeof(sa);
00111         if (getsockname(ret_sock->fd, (SA*)(&sa), &socklen) == -1) {
00112                 perror("server_accept: getsockname for new socket failed");
00113                 close(ret_sock->fd);
00114                 return VFER_IMPL;
00115         }
00116 
00117         new_port = ntohs(sa.sin_port);
00118 
00119         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "New port socket: %s.%d",
00120                     inet_ntop(AF_INET, &(sa.sin_addr), buf, INET_ADDRSTRLEN),
00121                     new_port);
00122 
00123         ret_sock->state = CONN_ACCEPTING;
00124         ret_sock->listening_sock = sock;
00125         ret_sock->last_response.tv_sec = 0;
00126         ret_sock->response = NULL;
00127         ret_sock->sa = sa;
00128         ret_sock->response_tries = MAX_RESPONSE_TRIES;
00129 
00130         pthread_mutex_unlock(&(ret_sock->mutex));
00131         Control_Register_Socket(ret_sock);
00132         pthread_mutex_lock(&(ret_sock->mutex));
00133         /***************/
00134         GET_TIME_OF_DAY(&(start_time));
00135 
00136         if (sock->accept_timeout != 0) {
00137                 /* compute the timeout for cond_timedwait */
00138                 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->accept_timeout / 1000000); /* seconds */
00139                 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->accept_timeout - (sock->accept_timeout / 1000000)) * 1000; /* nanoseconds */
00140                 do {
00141                         ret = pthread_cond_timedwait(&(ret_sock->cond), &(ret_sock->mutex), &(timedwait_timeout));
00142                         /* pthread_cond_wait(&(ret_sock->cond), &(ret_sock->mutex)); */
00143                         if (ret == ETIMEDOUT) {
00144                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "timedout while trying to accept a new connection");
00145                                 ret_sock->err = VFER_TIMEOUT;
00146                                 close(ret_sock->fd);
00147                                 return VFER_TIMEOUT;
00148                         }
00149                 } while (ret_sock->state == CONN_ACCEPTING);
00150         } else {
00151                 do {
00152                         pthread_cond_wait(&(ret_sock->cond), &(ret_sock->mutex));
00153                 } while (ret_sock->state == CONN_ACCEPTING);
00154         }
00155 
00156         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "reached non conn_accepting state[%d]", ret_sock->state);
00157 
00158         if (ret_sock->state != CONN_CONNECTED) {
00159                 close(ret_sock->fd);
00160                 return ret_sock->err;
00161         }
00162         return 0;
00163 } /* Control_Accept() */
00164 
00165 
00166 /**
00167  * This function changes the state of the socket to CONN_CONNECTING
00168  * and either return immediately if the socket is non blocking or
00169  * blocks waiting on a condition variable for the ControlT thread to
00170  * signal on it indicating that the socket is connected and is in
00171  * CONN_CONNECTED state. Follows CONNECT_TIMEOUT (defined in
00172  * globals.h) for timing out.
00173 
00174  * @param sock is a pointer to a socket structure on which to connect.
00175  * The socket must have its addr field set to the address to connect
00176  * to.
00177  * @return
00178  *      - 0  on connection succes
00179  *      - -1 on connection failure- system connect() call failed
00180  *      - -2 if the socket is nonblocking- the caller can find out about
00181  *           the success or failure of the connection via vfer_select()-
00182  *           NOTE: this is broken right now.
00183  */
00184 int Control_Connect(vfer_sock* sock) {
00185         SA *serv_addr;
00186         int addrlen;
00187         struct timespec timedwait_timeout; /* used as argument to pthread_cond_timedwait */
00188         struct timeval start_time;
00189         int ret;
00190 
00191         serv_addr = (SA*)(&(sock->addr));
00192         addrlen = sizeof(sock->addr);
00193 
00194         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "Entered sock->fd[%d]", sock->fd);
00195 
00196         /* try to connect the udp socket to proceed with the handshake*/
00197         if (connect(sock->fd, serv_addr, addrlen) < 0)  {
00198                 perror("Control_Connect :: connect");
00199                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "connect failed");
00200                 sock->err = VFER_NOCONN;
00201                 return -1;
00202         }
00203         memcpy(&sock->addr, serv_addr, addrlen);
00204         sock->connect_tries = 0;
00205         sock->request = NULL;
00206         sock->state = CONN_CONNECTING;
00207         bzero(&(sock->last_request), sizeof(struct timeval));
00208         pthread_mutex_unlock(&(sock->mutex));
00209         /*************/
00210 
00211         Control_Register_Socket(sock);
00212 
00213         if (SOCK_OPT_ISSET(sock, VFERSO_NONBLOCK)) {
00214                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "non blocking socket, connection in progress");
00215                 return -2;
00216         }
00217 
00218         /*************/
00219         /* lock the socket's mutex prior to calling pthread_cond_wait */
00220         pthread_mutex_lock(&(sock->mutex));
00221         GET_TIME_OF_DAY(&(start_time));
00222         if (sock->connect_timeout != 0) {
00223                 /* compute the timeout for cond_timedwait */
00224                 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->connect_timeout / 1000000); /* seconds */
00225                 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->connect_timeout - (sock->connect_timeout / 1000000)) * 1000; /* nanoseconds */
00226                 do {
00227                         // DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "waiting on sock->mutex until sock's state changes from CONN_CONNECTING");
00228                         pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00229                         /* pthread_cond_wait(&(sock->cond), &(sock->mutex)); */
00230                         if (ret == ETIMEDOUT) {
00231                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "timedout trying to connect");
00232                                 sock->err = VFER_TIMEOUT;
00233                                 return -1;
00234                         }
00235                 } while (sock->state == CONN_CONNECTING);
00236         } else {
00237                 do {
00238                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "waiting on sock->mutex until sock's state changes from CONN_CONNECTING");
00239                         pthread_cond_wait(&(sock->cond), &(sock->mutex));
00240                 } while (sock->state == CONN_CONNECTING);
00241         }
00242 
00243         if (sock->state != CONN_CONNECTED) {
00244                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "reached a non conn_connecting state[%d]", sock->state);
00245                 return -1;
00246         }
00247 
00248         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "reached connected state");
00249         return 0;
00250 } /* Control_Connect() */
00251 
00252 /**
00253  * Sets the socket's state to DISCONNECTED and blocks (if blocking
00254  * socket) until ControlT thread set socket's state to DISCONNECTED or
00255  * returns if nonblocking socket Socket's mutex must be locked before
00256  * calling this function Socket's state must be CONN_CONNECTED before
00257  * calling this function, this function does not unlock the mutex of
00258  * the socket
00259  *
00260  * @param sock is a socket pointer
00261  * @return
00262  *      0  on success
00263  *      -2 if socket is nonblocking, will disconnect eventually
00264  */
00265 int Control_Close(vfer_sock* sock) {
00266         struct timespec timedwait_timeout; /* used as argument to pthread_cond_timedwait */
00267         struct timeval start_time;
00268         int ret;
00269 
00270         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "Entered sock->fd[%d]", sock->fd);
00271         sock->state = CONN_DISCONNECTING;
00272         if (SOCK_OPT_ISSET(sock, VFERSO_NONBLOCK)) {
00273                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "close() : VFER_INPROGRESS, send_queue[%d]", sock->send_frames.num);
00274                 return -2;
00275         }
00276 
00277         GET_TIME_OF_DAY(&(start_time));
00278         if (sock->close_timeout != 0) {
00279                 /* compute the timeout for cond_timedwait */
00280                 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->close_timeout / 1000000); /* seconds */
00281                 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->close_timeout - (sock->close_timeout / 1000000)) * 1000; /* nanoseconds */
00282                 do {
00283                         pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00284                         if (ret == ETIMEDOUT) {
00285                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "timedout trying to close");
00286                                 sock->err = VFER_TIMEOUT;
00287                                 return -1;
00288                         }
00289                 } while (sock->state != CONN_DISCONNECTED);
00290         } else {
00291                 do {
00292                         pthread_cond_wait(&(sock->cond), &(sock->mutex));
00293                 } while (sock->state != CONN_DISCONNECTED);
00294         }
00295 
00296 
00297         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "reached disconnected|local_closed|err state");
00298         return 0;
00299 } /* Control_Close() */
00300 
00301 /**
00302  * Sends data on a socket. This function is called by the client via
00303  * the api vfer_send call. The socket should be locked before the call
00304  * to this function. This function unlocks the socket before returning.
00305  *
00306  * @param sock          a socket pointer
00307  * @param buffer        a pointer to data to send
00308  * @param len           the length of data in this buffer to send
00309  * @return
00310  *      -1 on error
00311  *      size of the buffer sent on succcess
00312  */
00313 size_t Control_Send(vfer_sock* sock, const void* buffer, int len) {
00314         struct timespec timedwait_timeout; /* used as argument to pthread_cond_timedwait */
00315         struct timeval start_time;
00316         int ret;
00317 
00318         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Send", "Entered with sock->fd[%d] buffer_len[%d]", sock->fd, len);
00319         if (len > MAX_SEND_WINDOW) {
00320                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Send", "buffer length is more than MAX_SEND_WINDOW");
00321                 return -1;
00322         }
00323 
00324         if (sock->state != CONN_CONNECTED) {
00325                 pthread_mutex_unlock(&(sock->mutex));
00326                 SET_ERROR_RETURN (sock, VFER_INVAL, -1);
00327         }
00328 
00329         if ((sock->send_frames.length + len) > MAX_SEND_WINDOW) {
00330                 /* return if this is a non blocking socket */
00331                 if ((sock->opts & VFERSO_NONBLOCK) == 1) {
00332                         pthread_mutex_unlock(&(sock->mutex));
00333                         SET_ERROR_RETURN (sock, VFER_WOULDBLOCK, -1);
00334                 }
00335                 /* block until we have enough space in datagram list to enqueue the buffer */
00336                 GET_TIME_OF_DAY(&(start_time));
00337                 if (sock->snd_timeout != 0) {
00338                         /* compute the timeout for cond_timedwait */
00339                         timedwait_timeout.tv_sec = start_time.tv_sec + (sock->snd_timeout / 1000000); /* seconds */
00340                         timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->snd_timeout - (sock->snd_timeout / 1000000)) * 1000; /* nanoseconds */
00341                         do {
00342                                 ret = pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00343                                 /* pthread_cond_wait(&(sock->cond), &(sock->mutex)); */
00344                                 if (ret == ETIMEDOUT) {
00345                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Send", "timedout while sending a frame");
00346                                         pthread_mutex_unlock(&(sock->mutex));
00347                                         SET_ERROR_RETURN (sock, VFER_TIMEOUT, -1);
00348                                 }
00349 
00350                         } while ((sock->send_frames.length + len) > MAX_SEND_WINDOW && sock->state == CONN_CONNECTED);
00351                 } else {
00352                         do {
00353                                 pthread_cond_wait(&(sock->cond), &(sock->mutex));
00354                         } while ((sock->send_frames.length + len) > MAX_SEND_WINDOW && sock->state == CONN_CONNECTED);
00355                 }
00356 
00357                 if (sock->state != CONN_CONNECTED) {
00358                         pthread_mutex_unlock(&(sock->mutex));
00359                         SET_ERROR_RETURN (sock, VFER_INVAL, -1);
00360                 }
00361         }
00362 
00363         if (Datagram_Enqueue(&sock->send_frames, buffer, len, sock->l_frame_num) == -1) {
00364                 pthread_mutex_unlock(&(sock->mutex));
00365                 SET_ERROR_RETURN (sock, VFER_BADSOCK, -1);
00366         }
00367         sock->l_frame_num++;
00368         sock->stats.data_bytes_sender_sent += len;
00369 
00370         pthread_mutex_unlock(&(sock->mutex));
00371         /****************/
00372         return (size_t)(len);
00373 } /* Control_Send() */
00374 
00375 /**
00376  * Receive data on a socket. This is a function that is called by the
00377  * client via the api call vfer_recv. The socket's mutex should be
00378  * locked before calling this function. This function unlocks the
00379  * socket's mutex before returning. Socket's state must be
00380  * conn_connected/conn_disconnected/conn_disconnecting on entry
00381  *
00382  * @param sock          a socket pointer
00383  * @param buffer        a pointer to buffer to which to write received data
00384  * @param len           buffer size
00385  * @return
00386  *      - size of the buffer received on succcess
00387  *      - VFER_BADSOCK bad vfer_sock
00388  *      - VFER_TIMEOUT timed out while waiting to receive
00389  *      - VFER_UNCONN sock is not connected
00390  *      - VFER_INVAL invalid buf or len
00391  *      - VFER_WOULDBLOCK socket is non-blocking and requested operation would block
00392  */
00393 int Control_Recv(vfer_sock* sock, void* buffer, int len) {
00394         int ret;
00395         struct timespec timedwait_timeout; /* used as argument to pthread_cond_timedwait */
00396         struct timeval start_time;
00397 
00398         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Entered sock->fd[%d] ; buffer len[%d]", sock->fd, len);
00399 
00400         if (sock->recvd_frames.num == 0) {
00401                 /* in order to receive more recvd frames, we need to be in connected state
00402                    note: to recv already recvd frames, this isn't true */
00403 
00404                 if (sock->state != CONN_CONNECTED) {
00405                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Returning 0 as socket state doesn't allow receiving");
00406                         pthread_mutex_unlock(&(sock->mutex));
00407                         return VFER_UNCONN;
00408                 }
00409                 /* block or return error if nonblocking opt is set */
00410                 if ((sock->opts & VFERSO_NONBLOCK) == 1) {
00411                         pthread_mutex_unlock(&(sock->mutex));
00412                         return VFER_WOULDBLOCK;
00413                 }
00414 
00415                 GET_TIME_OF_DAY(&(start_time));
00416 
00417                 if (sock->rcv_timeout != 0) {
00418                         /* compute the timeout for cond_timedwait */
00419                         timedwait_timeout.tv_sec = start_time.tv_sec + (sock->rcv_timeout / 1000000); /* seconds */
00420                         timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->rcv_timeout - (sock->rcv_timeout / 1000000)) * 1000; /* nanoseconds */
00421                         while (sock->recvd_frames.num == 0 && sock->state == CONN_CONNECTED) {
00422                                 ret = pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00423                                 if (ret == ETIMEDOUT) {
00424                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "timedout while receiving a new frame");
00425                                         pthread_mutex_unlock(&(sock->mutex));
00426                                         return VFER_TIMEOUT;
00427                                 }
00428                         }
00429                 } else {
00430                         while (sock->recvd_frames.num == 0 && sock->state == CONN_CONNECTED) {
00431                                 ret = pthread_cond_wait(&(sock->cond), &(sock->mutex));
00432                         }
00433                 }
00434 
00435                 if (sock->state != CONN_CONNECTED) {
00436                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Returning 0 as socket state doesn't allow receiving");
00437                         pthread_mutex_unlock(&(sock->mutex));
00438                         return VFER_UNCONN;
00439                 }
00440         }
00441 
00442         /* if we are here there is at least one frame in the recv_frames list */
00443         ret = Datagram_Dequeue(&sock->recvd_frames, buffer, len);
00444         if (ret == -1) {
00445                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Can't return frame: buffer too small");
00446                 pthread_mutex_unlock(&(sock->mutex));
00447                 return VFER_INVAL;
00448         }
00449         pthread_mutex_unlock(&(sock->mutex));
00450         /****************/
00451         return ret;
00452 } /* Control_Recv() */
00453 
00454 
00455 /**
00456  * This is a helper function that makes the socket passed as argument
00457  * 'active' so that the ConnectT thread will pay attention to it. It
00458  * also maintains the sockets.max_fd and the sockets.fds vars which
00459  * are used as args to the select() call in ControlT thread
00460  *
00461  * @param sock socket pointer to socket we would like listed as active
00462  */
00463 void Control_Register_Socket(vfer_sock* sock) {
00464         sigset_t set;
00465 
00466         /*************/
00467         pthread_mutex_lock(&(sockets.mutex));
00468         sockets.num_active++;
00469 
00470         FD_SET(sock->fd, &(sockets.fds));
00471         if (sockets.max_fd < sock->fd) {
00472                 sockets.max_fd = sock->fd;
00473         }
00474 
00475         /* printf("register_socket new num_active[%d]\n", sockets.num_active); */
00476         if (sockets.num_active == 1) {
00477                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Register_Socket", "Starting ControlT thread, num_active = 1");
00478                 if (pthread_create(&ControlT_tid, NULL, (void (*))ControlT, NULL) != 0) {
00479                         ERROR_PRINT("control.c", "Control_Register_Socket", "ERROR: Couldn't create ControlT thread");
00480                 } else {
00481                         pthread_sigmask(SIG_BLOCK, NULL, &set);
00482                         sigaddset(&set, SIGINT);
00483                         pthread_sigmask(SIG_BLOCK, &set, NULL);
00484                         pthread_detach(ControlT_tid);
00485                 }
00486         }
00487         pthread_mutex_unlock(&(sockets.mutex));
00488         /*************/
00489 } /* Control_Register_Socket() */
00490 
00491 /**
00492  * This function removes socket sock from the 'active' list of sockets
00493  * that are observed by the ControlT thread. This thread takes care to
00494  * maintain sockets.max_fd and sockets.fds vars
00495  *
00496  * @param sock is a socket pointer that will be removed from the 'active' list of sockets
00497  */
00498 void Control_Unregister_Socket(vfer_sock* sock) {
00499         int i, j;
00500 
00501         /****************/
00502         pthread_mutex_lock(&(sockets.mutex));
00503         FD_CLR(sock->fd, &(sockets.fds));
00504         sockets.num_active--;
00505         
00506         /* close icmp socket */
00507         if(sock->icmp_fd != -1){
00508                 close(sock->icmp_fd);
00509         }
00510         
00511         /* printf("unregister_socket new num_active[%d]\n", sockets.num_active); */
00512         i = sockets.max_fd;
00513         if (sock->fd == i) {
00514                 /* recompute max_fd */
00515                 sockets.max_fd = 0;
00516                 i = j = 0;
00517                 while (j < sockets.num_active) {
00518                         pthread_mutex_lock(&(sockets.s[i].mutex));
00519                         if (((sockets.s[i].state == CONN_DISCONNECTING) ||
00520                              (sockets.s[i].state == CONN_CONNECTED) ||
00521                              (sockets.s[i].state == CONN_CONNECTING) ||
00522                              (sockets.s[i].state == CONN_ACCEPTING) ||
00523                              (sockets.s[i].state == CONN_LISTENING)) &&
00524 
00525                             (sockets.max_fd < sockets.s[i].fd)) {
00526                                 j++;
00527                                 sockets.max_fd = sockets.s[i].fd;
00528                         }
00529                         pthread_mutex_unlock(&(sockets.s[i].mutex));
00530                         i++;
00531                 }
00532         }
00533         pthread_mutex_unlock(&(sockets.mutex));
00534         /****************/
00535 } /* Control_Unregister_Socket() */
00536 
00537 /*************************************************************************
00538  * Private (Internal) functions
00539  *************************************************************************/
00540 
00541 /**
00542  * Writes the iov to an fd and then deallocs the iov (different for
00543  * DATA, DATAACK iovs).
00544  *
00545  * @return
00546  *       0      on success
00547  *       errno  on error (from Packet_Write(), see packet.c)
00548  */
00549 inline static int Control_Write(packet* p, int fd) {
00550         int ret;
00551 
00552         ret = Packet_Write(p, fd, NULL, 0);
00553         switch (p->type) {
00554         case DATA:
00555                 /* we do not want to release the data portion of this packet */
00556                 RELEASE_IOV(p->iov, 1);
00557                 RELEASE(p, sizeof(packet));
00558                 break;
00559          default:
00560                  RELEASE_PACK(p);
00561                  break;
00562         }
00563 
00564         if (ret != 0) {
00565                 if (ret == EAGAIN) return 0; /* as if we were able to send it */
00566                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Write", "Packet_Write returned error[%d]", ret);
00567                 return -1;
00568         }
00569         return 0;
00570 } /* Control_Write() */
00571 
00572 /**
00573  * Receives all the packets on a given socket
00574  * NOTE: socket's mutex must be locked before calling this function
00575  *
00576  * @param sock socket on which to receive packets
00577  * @return
00578  *      -1 on error
00579  *      0 if received no packets
00580  *      1 if received only data packets
00581  *      2 if received only control packets
00582  *      3 if received both data and control packets
00583  */
00584 static int Control_RecvAll(vfer_sock* sock) {
00585         frame_link* frame_l;
00586         frame_link* frame_l2;
00587         frame_link* frame_prev;
00588         packet* p;
00589         packet* ack;
00590         uint32_t pframe_num;    /* packet's frame num- used when receiving a data packet */
00591         int i;
00592         int ret_val;
00593         int is_phantom;         /* used upon receiving a data packet */
00594         ccontrol_t* c;
00595         int n;
00596         interval_t *interval_ptr, *interval_ptr2;
00597 
00598         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Entered with sock->fd[%d]", sock->fd);
00599         ret_val = 0;
00600         c = &(sock->ccontrol);
00601         
00602         while ((Packet_Read_ICMP(sock->icmp_fd, &(sock->pmtu)) >= 0) && ((n = Packet_Read(sock->fd, &p, NULL, NULL)) != EAGAIN)) {
00603                 switch (n) {
00604                 case -1:
00605                 case -2:
00606                 case -3:
00607                 case -4:
00608                         /* malformed packet errors */
00609                         continue;
00610                 }
00611                 if (n != 0) {
00612                         /* stop processing this socket immediately */
00613                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Packet_Read returned error[%d]", n);
00614                         return -1;
00615                 }
00616 
00617                 /* if we are the client, change the sign of the delay measure set by packet.c
00618                    this also lets us compute the RTT in the same way for client\server later */
00619                 if (SOCK_OPT_ISSET(sock, VFERSO_CLIENT_SOCK)) {
00620                         p->delay *= -1;
00621                 }
00622 
00623                 DEBUG_PACK(DEBUG_CTL, "control.c", "Control_RecvAll", p, "Received packet on sock->fd[%d]", sock->fd);
00624 
00625                 /* note that we count packets & bytes that might be malformed and later ignored */
00626                 sock->stats.packets_recvd++;
00627                 /* the whole packet is stored in p->iov[0] */
00628                 sock->stats.bytes_recvd += p->iov[0].iov_len;
00629 
00630                 /* handle the packet depending on its type */
00631                 switch (p->type) {
00632                 case REQUEST:
00633                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received REQUEST during wrong state ; ignoring");
00634                         break;
00635                 case RESPONSE:
00636                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received RESPONSE during wrong state ; ignoring");
00637                         /* this needs to do more than just send an ack back, particularly authenticate the response seq number
00638                            and zero out all the indicator bits so that all the data can be resent, also if we received a packet
00639                            such as ack or data after previous response implies this one is obviously a decoy and should be ignored */
00640                         ack = Packet_Form_CCAck(0, sock->request->seq + 1, p->seq, 0,
00641                                                 p->u.res.init_frame_num, p->u.res.init_frame_num,
00642                                                 0, p->delay, 0, 0, NULL);
00643                         if (Packet_Write(ack, sock->fd, NULL, 0) != 0) {
00644                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Write failed");
00645                                 sock->err       = VFER_NOCONN;
00646                                 RELEASE_PACK(ack);
00647                         }
00648                         break;
00649                 case CC_ACK:
00650                         CC_Recvd_Ack(p, sock);
00651 
00652                         /* check if the newest_frame seen field is not the last_frame_num
00653                            for send_frames list, reset all frames >= newest_frame value bit vectors to 0 */
00654 
00655                         /* remove frames from the send queue that are no longer of interest (ie. all frames whose frame_num < last_frame_num in ccack) */
00656 //                      printf("first[%d] oldest_acked[%d] send_frames_len1[%d] ",
00657 //                             (sock->send_frames.first == NULL ? -27 : sock->send_frames.first->frame.frame_num), p->u.ccack.oldest_frame, sock->send_frames.length);
00658 
00659                         while (sock->send_frames.first != NULL &&
00660                                sock->send_frames.first->frame.frame_num < p->u.ccack.oldest_frame) {
00661                                 frame_l = sock->send_frames.first;
00662                                 /* DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "removing frame[%d] implicitly by oldest_frame[%d] in ack",
00663                                    frame_l->frame.frame_num, p->u.ccack.oldest_frame); */
00664                                 Datagram_Remove (&(sock->send_frames), NULL, frame_l);
00665                                 
00666                                 /* PMTU: If probe in completed frame, increase MTU. */
00667                                 if(sock->pmtu.probe_state == PROBE_SENT && sock->pmtu.frame_num == frame_l->frame.frame_num){
00668                                         c->mtu = *(sock->pmtu.probe_size);
00669                                         sock->pmtu.search_low = sock->pmtu.probe_size;
00670                                         if(sock->pmtu.search_low == sock->pmtu.search_high){
00671                                                 /* PMTU found */
00672                                                 sock->pmtu.probe_state = MTU_FOUND;
00673                                                 struct timeval tv;
00674                                                 GET_TIME_OF_DAY(&tv);
00675                                                 sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
00676                                                 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()",
00677                                                                   "PMTU of size %d found [oldest frame]. Next probe to be begin at %lu.", c->mtu, (long int)sock->pmtu.next_probe_time);
00678                                         }else{
00679                                                 sock->pmtu.probe_size++;
00680                                                 sock->pmtu.probe_state = NO_PROBE_SENT;
00681                                                 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()", "Increasing PMTU to size %d. [oldest frame]", c->mtu);
00682                                         }
00683                                 }
00684                                 
00685                                 RELEASE(frame_l->frame.data, frame_l->frame.len);
00686                                 RELEASE(frame_l, sizeof(frame_link));
00687                         }
00688 
00689 //                      printf("send_frames_len2[%d]", sock->send_frames.length);
00690 
00691                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received CC_ACK ; newest_frame[%d] ; send_frames.first_frame_num[%d]",
00692                                     p->u.ccack.newest_frame, (sock->send_frames.first == NULL ? -0 : sock->send_frames.first->frame.frame_num));
00693 
00694                         /* CLEAR the unacked_data var ONLY if this ack acknowledges ALL of the past data we've sent
00695                            if it does not then do NOT reset it! */
00696                         if (p->u.ccack.last_seq == (sock->ccontrol.s_last_seq - 1)) {
00697 //                              printf("clearing last_unacked_data (last_seq onack matches our last_seq)\n");
00698                                 c->s_last_unacked_data.tv_sec = 0; /* clear the unacked data var */
00699                         }
00700 
00701                         /* update the frag_indicator for frames that are outside of the range seen by the received as implied by this ccack */
00702                         if (ret_val < 2) ret_val+=2;
00703 
00704                         if (p->u.ccack.interval_count == 0) {
00705                                 break;
00706                         }
00707                         /* update the missing data ranges for the datagram specified by this ack */
00708                         /* for now don't care what the sequence number is, but later make sure that
00709                            if a previous ack with larger seq number for this datagram was received then
00710                            we don't pay attention to this one */
00711                         // printf("updating from receved missing ranges in ACK!\n");
00712                         frame_l = sock->send_frames.first;
00713                         while (frame_l != NULL) {
00714                                 if (frame_l->frame.frame_num == p->u.ccack.intervals_frame_num && frame_l->frame.done == 0) {
00715                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Updating intervals for frame[%d]", p->u.ccack.intervals_frame_num);
00716                                         
00717                                         if (p->u.ccack.interval_count == 1 &&
00718                                             ntohl(p->u.ccack.raw_intervals[0]) == 0 &&
00719                                             ntohl(p->u.ccack.raw_intervals[1]) == 0) {
00720                                                 /* a (0,0) interval implies that none of the frame has been received
00721                                                    and thus the endhost doesn't even know the size of the frame */
00722                                                 p->u.ccack.raw_intervals[1] = htonl(frame_l->frame.len);
00723                                         }
00724                                         
00725                                         /* PMTU: If only missing interval and entirely consists of probe data, then stop searching upward */
00726                                         int net_interval_count = p->u.ccack.interval_count - frame_l->frame.missing_data.count;
00727                                         if(sock->pmtu.probe_state == PROBE_SENT && frame_l->frame.frame_num == sock->pmtu.frame_num && net_interval_count > 0){
00728                                                 i = 0;
00729                                                 while(i < p->u.ccack.interval_count){
00730                                                         int start = ntohl(p->u.ccack.raw_intervals[2*i]);
00731                                                         int end = ntohl(p->u.ccack.raw_intervals[(2*i) + 1]);
00732                                                         if(start == sock->pmtu.data_offset 
00733                                                                 && end == (sock->pmtu.data_offset + *(sock->pmtu.probe_size))
00734                                                                 && net_interval_count == 1){
00735                                                                         sock->pmtu.search_high = sock->pmtu.probe_size;
00736                                                                         sock->pmtu.probe_state = MTU_FOUND;
00737                                                                         struct timeval tv;
00738                                                                         GET_TIME_OF_DAY(&tv);
00739                                                                         sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
00740                                                                         DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()",
00741                                                                                           "PMTU of size %d found [bit vector indicated failure]. Next probe to be begin at %lu.",
00742                                                                                           c->mtu, (long int)sock->pmtu.next_probe_time);
00743                                                                         break;
00744                                                         }else if(start <= sock->pmtu.data_offset 
00745                                                                 && end >= (sock->pmtu.data_offset + *(sock->pmtu.probe_size))){
00746                                                                         /* PMTU:If probe missing but other data also missing, then probe should be resent */
00747                                                                         DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()", "Probe needs to be resent");
00748                                                                         sock->pmtu.probe_state = NO_PROBE_SENT;
00749                                                                         break;
00750                                                         }
00751                                                         i++;
00752                                                 }
00753                                                 
00754                                                 /* if went through all intervals but probe not missing and state not changed, then probe must have been received. 
00755                                                    In this case we can increase the path MTU. NOTE: Most likely the CC_ACK oldest frame will handle this but in 
00756                                                    case this appears first, can increase in this manner. The probe state will prevent any conflicts */
00757                                                    if(sock->pmtu.probe_state == PROBE_SENT){
00758                                                                 c->mtu = *(sock->pmtu.probe_size);
00759                                                                 sock->pmtu.search_low = sock->pmtu.probe_size;
00760                                                                 if(sock->pmtu.search_low == sock->pmtu.search_high){
00761                                                                         /* PMTU found */
00762                                                                         sock->pmtu.probe_state = MTU_FOUND;
00763                                                                         struct timeval tv;
00764                                                                         GET_TIME_OF_DAY(&tv);
00765                                                                         sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
00766                                                                         DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()",
00767                                                                                           "PMTU of size %d found [oldest frame]. Next probe to begin at %lu.", c->mtu, (long int)tv.tv_sec);
00768                                                                 }else{
00769                                                                         sock->pmtu.probe_size++;
00770                                                                         sock->pmtu.probe_state = NO_PROBE_SENT;
00771                                                                         DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()", "Increasing PMTU to size %d. [oldest frame]", c->mtu);
00772                                                                 }
00773                                                    }
00774                                                    
00775                                         } /* End PMTU */
00776 
00777                                         /* update the intervals list based on the list that's carried in the ack packet */
00778                                         interval_ptr = frame_l->frame.missing_data.head;
00779                                         interval_ptr2 = interval_ptr;
00780                                         i = 0;
00781                                         /* interval_ptr is treated as the previous elem in this loop except at start */
00782                                         // printf("old_count[%d] vec_ack_count[%d]\n", frame_l->frame.missing_data.count, p->u.ccack.interval_count);
00783                                         while (i < frame_l->frame.missing_data.count && i < p->u.ccack.interval_count) {
00784                                                 // printf("1 ");
00785                                                 // printf("old[%d %d] ", interval_ptr->start, interval_ptr->end);
00786                                                 interval_ptr->start = ntohl(p->u.ccack.raw_intervals[2*i]);
00787                                                 interval_ptr->end = ntohl(p->u.ccack.raw_intervals[(2*i)+1]);
00788                                                 // fflush(stdout);
00789                                                 // printf("new[%d %d] ", interval_ptr->start, interval_ptr->end);
00790                                                 // fflush(stdout);
00791 
00792                                                 interval_ptr2 = interval_ptr;
00793                                                 interval_ptr = interval_ptr->next;
00794                                                 i++;
00795                                         }
00796                                         
00797                                         interval_ptr = interval_ptr2;
00798                                         /* interval_ptr points to the last updated interval in the missing_data intervals list */
00799                                         if (i == frame_l->frame.missing_data.count &&
00800                                             i < p->u.ccack.interval_count) {
00801                                                 /* make new intervals at end of interval list */
00802                                                 for (; i < p->u.ccack.interval_count; i++) {
00803                                                         // printf("2 ");
00804                                                         if (frame_l->frame.missing_data.head == NULL) {
00805                                                                 /* missing_data is empty, therefore have to start from head of list */
00806                                                                 // printf(". ");
00807                                                                 ALLOC(interval_ptr, interval_t*, interval_t, 1);
00808                                                                 frame_l->frame.missing_data.head = interval_ptr;
00809                                                                 interval_ptr->prev = NULL;
00810                                                         } else {
00811                                                                 /* missing_data is no empty, start from interval_ptr (where we left off) */
00812                                                                 //printf(", ");
00813                                                                 ALLOC((interval_ptr->next), interval_t*, interval_t, 1);
00814                                                                 interval_ptr->next->prev = interval_ptr;
00815                                                                 interval_ptr = interval_ptr->next;
00816                                                         }
00817                                                         /* interval_ptr here points to the newly created interval */
00818                                                         interval_ptr->next = NULL;
00819                                                         interval_ptr->start = ntohl(p->u.ccack.raw_intervals[2*i]);
00820                                                         interval_ptr->end = ntohl(p->u.ccack.raw_intervals[(2*i)+1]);
00821                                                         //printf("+new[%d %d] ", interval_ptr->start, interval_ptr->end);
00822                                                         // fflush(stdout);
00823                                                 }
00824                                         } else if (i < frame_l->frame.missing_data.count &&
00825                                                    i == p->u.ccack.interval_count) {
00826                                                 /* delete extra intervals at end of the missing_data list, start with interval after interval_ptr */
00827                                                 interval_ptr = interval_ptr->next;
00828                                                 if (interval_ptr != NULL) {
00829                                                         interval_ptr->prev->next = NULL;
00830                                                 }
00831                                                 for (; i < frame_l->frame.missing_data.count; i++) {
00832                                                         //printf("3 ");
00833                                                         //printf("-old[%d %d] ", interval_ptr->start, interval_ptr->end);
00834                                                         // fflush(stdout);
00835                                                         interval_ptr2 = interval_ptr->next;
00836                                                         RELEASE(interval_ptr, sizeof(interval_t));
00837                                                         interval_ptr = interval_ptr2;
00838                                                 }
00839                                         }
00840                                         //printf("\n");
00841                                         frame_l->frame.missing_data.count = p->u.ccack.interval_count;
00842                                         //printf("new_count[%d]\n", frame_l->frame.missing_data.count);
00843                                         break;
00844                                 }
00845                                 frame_l = frame_l->next;
00846                         }
00847                         break;
00848                 case CLOSE:
00849                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received CLOSE");
00850                         GET_TIME_OF_DAY(&(sock->stats.tv_end));
00851                         sock->state = CONN_DISCONNECTED;
00852                         CC_Recvd_Close(p, sock);
00853                         if (ret_val < 2) ret_val+=2;
00854                         return ret_val;
00855                 case DATA:
00856                         goto data_packet;
00857                 default:
00858                         if (p->type == DATA_ACK) {
00859                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received DATA_ACK ; ignoring for now");
00860                                 break;
00861                         } else {
00862                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received unknown packet type!");
00863                         }
00864                 } /* switch (p->type) */
00865                 RELEASE_PACK(p);
00866                 continue;
00867 
00868         data_packet:
00869 
00870                 /* if it's a data packet with sync option set, reply immediately with an ack */
00871                 if ((p->opts & OPT_SYNC) == OPT_SYNC) {
00872                         /* printf("sending ack b/c data packet has OPT_SYNC\n"); */
00873                         // sock->ccontrol.syncing = 1;
00874                         Control_Send_Ack (sock);
00875                         // sock->ccontrol.syncing = 0;
00876                 }
00877 
00878 
00879                 pframe_num = p->u.data.frame_num;
00880 //              DEBUG_PACK(1, "control.c", "Control_RecvAll", p, "DATA: ");
00881 
00882                 /* data packet fields sanity checks */
00883                 if (pframe_num < sock->recv_frames.first_frame_num) {
00884                         /* check if the frame referenced by packet is out of bounds (old frame) and ignore it */
00885                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet with old frame number[%d] outside bounds: [%d,inf], ignoring",
00886                                     pframe_num, sock->recv_frames.first_frame_num);
00887                         break;
00888                 } else if (p->u.data.frame_len == 0) {
00889                         /* check if the packet carries frame size */
00890                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet has no frame_len ; ignoring");
00891                         RELEASE_PACK(p);
00892                         continue;
00893                 } else if (p->u.data.frame_len > MAX_FRAME_SIZE) {
00894                         /* check that the frame doesn't exceed maximum frame size */
00895                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet has too large a frame_len[%d] ; my max[%d]", p->u.data.frame_len, MAX_FRAME_SIZE);
00896                         RELEASE_PACK(p);
00897                         continue;
00898                 /* } else if (p->u.data.data_len > MAX_FRAGMENT_SIZE) {
00899                         // check that the data size of the frame doesn't exceed max fragment size
00900                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet of new frame has frag_size too large[%d] ; my max[%d]", p->u.data.data_len, MAX_FRAME_SIZE);
00901                         RELEASE_PACK(p);
00902                         continue; */
00903                 }
00904 
00905                 /* from here on, the packet's frame_num is greater or equal to first_frame_num */
00906                 /* signal to cc that we have received a data packet */
00907                 CC_Recvd_Data(p, sock);
00908                 sock->stats.data_bytes_recvd+=p->u.data.data_len;
00909 
00910                 /* update the last_frame_num if the frame number in packet is the newest(largest) seen so far */
00911                 if (sock->recv_frames.last_frame_num < pframe_num) {
00912                         sock->recv_frames.last_frame_num = pframe_num;
00913                 }
00914 
00915                 /* add phanton frames to beginning of the queue if they are necessary */
00916                 if (sock->recv_frames.num == 0) {
00917                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "recv_frames is empty, creating phantoms [first[%d], %d]",
00918                                     sock->recv_frames.first_frame_num, pframe_num);
00919 
00920                         frame_prev = NULL;
00921                         /* this loop generates phantom frames and the packet's frame */
00922                         for (i = sock->recv_frames.first_frame_num; i <= pframe_num; i++) {
00923                                 /* create the missing frame as a phantom (see datagram.c) */
00924                                 frame_prev = Datagram_Insert_Phantom(&(sock->recv_frames), frame_prev, i);
00925                         }
00926                         is_phantom = 1;
00927                         frame_l = frame_prev;
00928                 } else {
00929 
00930 //                      printf("before creating phantoms and packet's frame:\n");
00931 //                      Datagrams_Print(&(sock->recv_frames));
00932 
00933                         frame_l = sock->recv_frames.first;
00934                         /* this loop assumes consecutive frame numbers in frames in the recv_frames list */
00935                         while (frame_l != NULL) {
00936                                 if (frame_l->frame.frame_num == pframe_num) {
00937 //                                      DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "###: found frame");
00938                                         is_phantom = 0;
00939                                         break;  /* found frame, it's stored in frame_l */
00940                                 } else if (frame_l->next == NULL) {
00941                                         if (frame_l->frame.frame_num >= pframe_num) {
00942                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "ERR: Inconsistent recv_frame list");
00943                                         }
00944 //                                      DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "###: creating phantoms and frame at end of list");
00945                                         /* need to create the missing frame and possibly phantom frames
00946                                            appearing between pframe_num and the last frame in the recvd list */
00947                                         frame_prev = frame_l;
00948                                         for (i = frame_l->frame.frame_num + 1; i <= pframe_num; i++) {
00949                                                 frame_prev = Datagram_Insert_Phantom(&(sock->recv_frames), frame_prev, i);
00950                                         }
00951                                         /* remember the frame pointer to the packet's frame in frame_l */
00952                                         frame_l = frame_prev;
00953                                         is_phantom = 1;
00954                                         break;
00955                                 } else if (frame_l->frame.frame_num < pframe_num && frame_l->next->frame.frame_num > pframe_num) {
00956                                         /* need to create the missing frame and possibly phantom frames
00957                                            appearing between frame_l's frame_num and pframe_num, and
00958                                            between pframe_num and frame_l->next's frame_num */
00959 //                                      DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "###: creating phantoms and frame in middle of list");
00960                                         frame_prev = frame_l;
00961                                         for (i = frame_l->frame.frame_num + 1; i < frame_l->next->frame.frame_num; i++) {
00962                                                 frame_prev = Datagram_Insert_Phantom(&(sock->recv_frames), frame_prev, i);
00963                                                 if (i == pframe_num) {
00964                                                         /* remember the frame pointer to the packet's frame in frame_l */
00965                                                         frame_l = frame_prev;
00966                                                 }
00967                                         }
00968                                         is_phantom = 1;
00969                                         break;
00970                                 }
00971                                 frame_l = frame_l->next;
00972                         }
00973                 }
00974 
00975                 /* frame_l should not be NULL here ; it should point
00976                  * to either the found frame for the packet or the
00977                  * newly created frame for the packet */
00978 
00979 //              printf("after creating phantoms and packet's frame:\n");
00980 //              Datagrams_Print(&(sock->recv_frames));
00981 
00982                 if (is_phantom == 1) {
00983                         /* packet's frame has just been created as a phantom */
00984                         ALLOC(frame_l->frame.data, void*, char, p->u.data.frame_len);
00985                         frame_l->frame.len = p->u.data.frame_len;
00986 
00987                         ALLOC(frame_l->frame.missing_data.head, interval_t*, interval_t, 1);
00988                         frame_l->frame.missing_data.head->next = NULL;
00989                         frame_l->frame.missing_data.head->prev = NULL;
00990                         frame_l->frame.missing_data.head->end = frame_l->frame.len;
00991                         frame_l->frame.missing_data.head->start = 0;
00992                         frame_l->frame.missing_data.count = 1;
00993                         frame_l->frame.frame_num        = p->u.data.frame_num;
00994                         frame_l->frame.urgency          = 0;
00995                         frame_l->frame.done             = 0;
00996 
00997                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "New recv_frame allocated: id[%d] len[%d] ; recv_frames->num[%d]",
00998                                     frame_l->frame.frame_num, frame_l->frame.len, sock->recv_frames.num);
00999 //                      Datagrams_Print(&(sock->recv_frames));
01000                 } else {
01001                         /* packet's frame was found and was not just created as a phantom */
01002                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Found the frame in recv_frames");
01003                         /* check if this fragment is destined for a previously phantom created frame */
01004                         if (frame_l->frame.len == 0) {
01005                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received first packet to a previously phantom frame[%d]", frame_l->frame.frame_num);
01006                                 /* update the previously blank frame with the correct properties for the frame from the fragment */
01007                                 frame_l->frame.len      = p->u.data.frame_len;
01008                                 if (ALLOC(frame_l->frame.data, void*, char, p->u.data.frame_len) == 0) {
01009                                         frame_l->frame.len = 0;
01010                                         RELEASE_PACK(p);
01011                                         continue;
01012                                 }
01013                                 ALLOC(frame_l->frame.missing_data.head, interval_t*, interval_t, 1);
01014                                 frame_l->frame.missing_data.head->next = NULL;
01015                                 frame_l->frame.missing_data.head->prev = NULL;
01016                                 frame_l->frame.missing_data.head->end = frame_l->frame.len;
01017                                 frame_l->frame.missing_data.head->start = 0;
01018                                 frame_l->frame.missing_data.count = 1;
01019                                 /* urgency is reset in ControlT right after the first nack_vec is sent */
01020                         }
01021                 }
01022 
01023 
01024                 /* check if the frame referenced by packet needs the data interval carried by this packet */
01025                 interval_ptr = frame_l->frame.missing_data.head;
01026                 //printf("intervals: ");
01027                 while (interval_ptr != NULL) {
01028                         //printf("{%d %d ", interval_ptr->start, interval_ptr->end);
01029                         if ((interval_ptr->start == p->u.data.data_offset) &&
01030                             (interval_ptr->end == (p->u.data.data_offset + p->u.data.data_len))) {
01031                                 //printf("1");
01032                                 /* delete the old interval the data for which has been received */
01033                                 if (interval_ptr->prev != NULL) {
01034                                         interval_ptr->prev->next = interval_ptr->next;
01035                                         if (interval_ptr->next != NULL) {
01036                                                 interval_ptr->next->prev = interval_ptr->prev;
01037                                         }
01038                                 } else { /* this is the first interval */
01039                                         frame_l->frame.missing_data.head = interval_ptr->next;
01040                                         if (interval_ptr->next != NULL) {
01041                                                 interval_ptr->next->prev = NULL;
01042                                         }
01043                                 }
01044                                 RELEASE(interval_ptr, sizeof(interval_t));
01045                                 frame_l->frame.missing_data.count--;
01046                                 break;
01047                         } else if ((interval_ptr->start < p->u.data.data_offset) &&
01048                                    (interval_ptr->end == (p->u.data.data_offset + p->u.data.data_len))) {
01049                                 //printf("2");
01050                                 /* update the old interval with a new endpoint */
01051                                 interval_ptr->end = p->u.data.data_offset;
01052                                 break;
01053                         } else if ((interval_ptr->start == p->u.data.data_offset) &&
01054                                    (interval_ptr->end > (p->u.data.data_offset + p->u.data.data_len))) {
01055                                 //printf("3");
01056                                 /* update the old interval with a new start point */
01057                                 interval_ptr->start = interval_ptr->start + p->u.data.data_len;
01058                                 break;
01059                         } else if ((interval_ptr->start < p->u.data.data_offset) &&
01060                                    (interval_ptr->end > (p->u.data.data_offset + p->u.data.data_len))) {
01061                                 //printf("4");
01062                                 /* update the old interval with a new endpoint and create a new interval for the remaining section at the end */
01063                                 ALLOC(interval_ptr2, interval_t*, interval_t, 1);
01064                                 interval_ptr2->next = interval_ptr->next;
01065                                 interval_ptr2->prev = interval_ptr;
01066                                 interval_ptr->next = interval_ptr2;
01067                                 interval_ptr2->start = p->u.data.data_offset + p->u.data.data_len;
01068                                 interval_ptr2->end = interval_ptr->end;
01069                                 interval_ptr->end = p->u.data.data_offset;
01070                                 frame_l->frame.missing_data.count++;
01071                                 break;
01072                         }
01073                         //printf("} ");
01074                         interval_ptr = interval_ptr->next;
01075                 }
01076                 //printf("\n");
01077 
01078                 if (interval_ptr == NULL) {
01079                         /* this means we couldn't find a suitable
01080                            interval for the data [ie. no interval
01081                            exists that include (data_offset,
01082                            data_offset+data_len)] or this data has
01083                            been already received so we ignore this
01084                            packet */
01085                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet with offset[%d] len[%d] ignored",
01086                                     p->u.data.data_offset, p->u.data.data_len);
01087                         RELEASE_PACK(p);
01088                         continue;
01089                 }
01090 
01091                 /* copy fragment :( */
01092                 memcpy(frame_l->frame.data + p->u.data.data_offset,
01093                        p->u.data.data,
01094                        p->u.data.data_len);
01095 
01096                 sock->recv_frames.length += p->u.data.data_len;
01097 
01098                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Added data to frame[%d] at offset[%d] with frame_len[%d] ",
01099                             frame_l->frame.frame_num, p->u.data.data_offset, frame_l->frame.len);
01100 
01101                 /* update the urgency for all the receiving frames for this socket:
01102                    -> increment for all frames < this frame
01103                    -> decrement for all frames >= this frame
01104                 */
01105 
01106                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Updating urgencies");
01107                 frame_l2 = sock->recv_frames.first;
01108                 while (frame_l2 != frame_l) {
01109                         if (frame_l2->frame.urgency >= 0 && frame_l2->frame.done == 0 &&
01110                             (frame_l2->frame.urgency < URGENCY_THRESHOLD)) {
01111                                 frame_l2->frame.urgency++;
01112 //                              printf("incrementing frame[%d] urgency to [%d]\n", frame_l2->frame.frame_num, frame_l2->frame.urgency);
01113 //                              DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "frame[%d].urgency: [%d]", frame_l2->frame.frame_num, frame_l2->frame.urgency);
01114                         }
01115                         frame_l2 = frame_l2->next;
01116                 }
01117 
01118                 frame_l2 = frame_l;
01119                 while (frame_l2 != NULL) {
01120                         if (frame_l2->frame.urgency > 0 && frame_l2->frame.done == 0) {
01121 //                              printf("resetting frame[%d] urgency to 0\n", frame_l2->frame.frame_num);
01122                                 frame_l2->frame.urgency = 0;
01123                         }
01124 //                      DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "frame[%d].urgency: [%d]", frame_l2->frame.frame_num, frame_l2->frame.urgency);
01125                         frame_l2 = frame_l2->next;
01126                 }
01127 
01128                 /* check if the front frame is ready to be delivered (moved to recvd queue)
01129                    but do not move it if it's a phantom frame- a blank
01130                    or if it is incomplete */
01131                 if (frame_l->frame.len > 0 && (frame_l->frame.missing_data.count == 0)) {
01132                         /* no missing data intervals in the frame */
01133                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "frame[%d] len[%d] DONE", frame_l->frame.frame_num, frame_l->frame.len);
01134                         frame_l->frame.done = 1;
01135                         if (sock->recv_frames.first == frame_l) {
01136                                 frame_l2 = frame_l;
01137                                 while (frame_l2 != NULL) {
01138                                         if (frame_l2->frame.len == 0) break; /* break because we reached a phantom frame */
01139 
01140                                         /* if frame completed and in
01141                                          * line to be delivered to the
01142                                          * app - move it to the
01143                                          * recevied queue where we
01144                                          * keep completed frames and
01145                                          * loop through the rest of
01146                                          * the recv_frames list and
01147                                          * move the frames that are
01148                                          * completed and right after
01149                                          * this frame to the recvd
01150                                          * queue (note: the next statement
01151                                          * is evaluated twice but it's
01152                                          * better than twice the
01153                                          * amount of code inside loop
01154                                          * body) */
01155                                         frame_l = frame_l2;
01156                                         frame_l2 = frame_l->next;
01157 
01158                                         if (frame_l->frame.missing_data.count == 0) {
01159                                                 /* recv_frames */
01160                                                 sock->recv_frames.first = frame_l->next;
01161                                                 if (sock->recv_frames.last == frame_l) {
01162                                                         sock->recv_frames.last = NULL;
01163                                                         sock->recv_frames.last_frame_num = frame_l->frame.frame_num + 1;
01164                                                 }
01165 
01166                                                 sock->recv_frames.first_frame_num = frame_l->frame.frame_num + 1;
01167                                                 sock->recv_frames.num--;
01168 
01169                                                 /* recvd_frames */
01170                                                 if (sock->recvd_frames.num == 0) {
01171                                                         sock->recvd_frames.first = frame_l;
01172                                                 } else {
01173                                                         sock->recvd_frames.last->next = frame_l;
01174                                                 }
01175 
01176                                                 sock->recvd_frames.last = frame_l;
01177                                                 frame_l->next = NULL;
01178 
01179                                                 sock->recvd_frames.num++;
01180                                                 sock->recvd_frames.length += frame_l->frame.len;
01181                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Frame[%d] moved into received queue; recv_frames->num[%d] ; recvd_frames->num[%d]",
01182                                                             frame_l->frame.frame_num, sock->recv_frames.num, sock->recvd_frames.num);
01183                                                 /* Datagrams_Print(&(sock->recvd_frames)); */
01184 
01185                                         } else {
01186                                                 break;
01187                                         }
01188                                         /* frame_l2 holds the next element in the recv_frames queue */
01189                                 } /* while (frame_l2 != NULL) */
01190                         } /* if sock->recv_frames.first == frame_l */
01191                 } /* if frame.len > 0 && missing_data.count == 0 */
01192 
01193                 if (ret_val == 0 || ret_val == 2) ret_val+=1;
01194                 /* release the received packet */
01195                 RELEASE_PACK(p);
01196         }
01197         return ret_val;
01198 } /* Control_RecvAll() */
01199 
01200 /**
01201  * Sends all the control packets that are in the control packet queue
01202  * maintained by control.c for each socket
01203  * NOTE: socket's mutex must be locked before calling this function
01204  *
01205  * @param sock vfer_sock pointer
01206  * @param p control packet to send on the socket
01207  * @return
01208  *      - 0 on success
01209  *      - -1 on error
01210  */
01211 inline static int Control_SendCtl (vfer_sock* sock, packet* p) {
01212         /* control packets are not congestion controlled because they do not have a data payload */
01213         Packet_Set_Seq(p,  sock->l_seq);
01214         DEBUG_PACK(DEBUG_CTL, "control.c", "ControlSendCtl", p, "-");
01215 
01216         sock->stats.bytes_sent+=p->iov[0].iov_len;
01217         sock->stats.packets_sent++;
01218 
01219         /* Control_Write releases the packet mem */
01220         if (Control_Write(p, sock->fd) != 0) {
01221                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_SendAllCtl", "Control_Write failed; sock->fd[%d] ; CONN_ERR", sock->fd);
01222                 sock->state = CONN_ERR;
01223                 return -1;
01224         }
01225         CC_Sent_CtlPack(p, sock);
01226         sock->l_seq++;
01227         return 0;
01228 } /* Control_SendCtl() */
01229 
01230 /**
01231  * Sends a congestion control ack packet on socket by creating it and
01232  * then enqueueing it into the control packet queue.
01233  *
01234  * CC is byte accounted (for data carrying packets only) so CC always
01235  * allows for sending CCAcks
01236  *
01237  * @param s socket on which to send the ccack
01238  * @return 0 on success
01239  * @return -1 on error
01240  */
01241 inline static int Control_Send_Ack (vfer_sock* s) {
01242         packet* p;
01243         ccontrol_t* c;
01244         struct timeval tv;
01245         intervals_t* missing;
01246         
01247         c = &(s->ccontrol);
01248         GET_TIME_OF_DAY(&tv);
01249 
01250         missing = NULL;
01251         if (s->recv_frames.first != NULL &&
01252             s->recv_frames.first->frame.missing_data.count != 0) {
01253                 // printf("setting missing_data to ~NULL b/c first->frame.missing_data.count = %d\n", s->recv_frames.first->frame.missing_data.count);
01254                 missing = &(s->recv_frames.first->frame.missing_data);
01255         }
01256         
01257         p = Packet_Form_CCAck(0, 0, c->r_last_seq - 1, c->r_packets_lost,
01258                               s->recv_frames.first_frame_num, s->recv_frames.last_frame_num,
01259                               Tree_Max(&(c->rev_path_curr_delayhist), tv) - Tree_Min(&(c->rev_path_base_delayhist), tv),
01260                               Tree_Max(&(c->rev_path_curr_delayhist), tv), c->r_bytes_total, s->recv_frames.first_frame_num, missing);
01261         
01262         if (Control_SendCtl(s, p) != 0) {
01263                 return -1;
01264         }
01265         c->r_first_seq = c->r_last_seq;
01266         return 0;
01267 } /* Control_Send_Ack() */
01268 
01269 /**
01270  * Sends all data packets that need to be sent on a socket
01271  * @param sock socket link pointer to send data on
01272  * @return
01273  *      0 on success no packets sent
01274  *      1 on success some packets sent
01275  *      -1 on sending error
01276  */
01277 static int Control_SendAllData (vfer_sock* sock) {
01278         /* note that if we are in CONN_CLOSING state, we can still send data packets ; we just can't accept any to send (Control_Send)*/
01279         frame_link* frame_l;
01280         frame* frame;
01281         interval_t* interval_ptr;
01282         interval_t* interval_ptr2;
01283         uint32_t opts;
01284         uint32_t data_size;
01285         uint32_t frame_len;
01286         int packets_sent;
01287         packet* pack;
01288         int ret;
01289         ccontrol_t* c;
01290 
01291         c = &(sock->ccontrol);
01292         packets_sent = 0;
01293         frame_l = sock->send_frames.first;
01294         while (frame_l != NULL) {
01295 
01296                 /* itterate through existing missing data intervals and send data in chunks of current MTU size
01297                    upon every interval send, remove the interval from the list */
01298                 frame = &(frame_l->frame);
01299                 interval_ptr = frame->missing_data.head;
01300                 while (interval_ptr != NULL) {
01301                         /* check if there is space in the bin for this socket for another packet */
01302 
01303                         /* TODO: check if any packets arrived while we are
01304                          * sending data in case we need to adjust the
01305                          * window to be smaller and thereby avoid
01306                          * congestion collapse */
01307 
01308                         opts = 0;
01309                         frame_len = frame->len; /* for now include frame length on all data packets, not just on the initial packet */
01310                         while ((interval_ptr->end - interval_ptr->start) != 0) {
01311                                 if(sock->pmtu.probe_state == NO_PROBE_SENT && (interval_ptr->end - interval_ptr->start) >= *(sock->pmtu.probe_size)){
01312                                         /* PMTU: Send probe if none outstanding and enough data in frame left to fill probe */
01313                                         data_size = *(sock->pmtu.probe_size);
01314                                         sock->pmtu.frame_num = frame->frame_num;
01315                                         sock->pmtu.data_offset = interval_ptr->start;
01316                                 }else if ((interval_ptr->end - interval_ptr->start) >= c->mtu) {
01317                                         data_size = c->mtu;
01318                                 } else {
01319                                         /* send the end piece in the interval that might be less than c->mtu */
01320                                         data_size = interval_ptr->end - interval_ptr->start;
01321                                 }
01322 
01323                                 /* check with CC control whether we can send another packet */
01324                                 if (CC_Can_Send_Data(data_size, sock) == 0) {
01325                                         // printf("sendalldata : CC said we can't send more data; exiting sendalldata function\n");
01326                                         return packets_sent;
01327                                 }
01328 
01329                                 /* printf("sending frame[%d] offset[%d] data_size[%d]\n", frame->frame_num, interval_ptr->start, data_size); */
01330                                 
01331                                 if (c->syncing == 1) {
01332                                         /* printf("SETTING OPT_SYNC\n"); */
01333                                         opts = opts | OPT_SYNC;
01334                                         c->syncing = 0;
01335                                 }
01336 
01337                                 if ((pack = Packet_Form_Data(opts,
01338                                                              sock->l_seq,
01339                                                              frame->frame_num,
01340                                                              interval_ptr->start,
01341                                                              frame_len,
01342                                                              (frame->data + interval_ptr->start),
01343                                                              data_size)) == NULL) {
01344                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_SendAllData", "Couldn't form data packet for frame[%d] sock->fd[%d]", frame->frame_num, sock->fd);
01345                                 } else {
01346                                         DEBUG_PACK(DEBUG_CTL, "control.c", "Control_SendAllData", pack, "Sending DATA packet on skt->fd[%d]", sock->fd);
01347                                         /* note we assume that a data packets is composed of 2 buffers, this is a bad dependency */
01348                                         ret = Control_Write(pack, sock->fd);
01349                                         
01350                                         if(ret != 0 && sock->pmtu.probe_state == NO_PROBE_SENT && *(sock->pmtu.probe_size) == data_size){
01351                                                 /* PMTU: send failed because packet bigger than local MTU. */
01352                                                 //clear error
01353                                                 int err_val;
01354                                                 socklen_t err_val_len = 4;
01355                                                 if (getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &err_val, &err_val_len) == -1)   perror("getsockopt");
01356                                                 
01357                                                 sock->pmtu.search_high = sock->pmtu.probe_size;
01358                                                 sock->pmtu.probe_state = MTU_FOUND;
01359                                                 struct timeval tv;
01360                                                 GET_TIME_OF_DAY(&tv);
01361                                                 sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
01362                                                 
01363                                                 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_SendAllData",
01364                                                                   "PMTU of size %d found [Probe write failed]. Next probe to be begin at %lu.", c->mtu, (long int)sock->pmtu.next_probe_time);
01365                                                 continue;
01366                                         }else if(sock->pmtu.probe_state == NO_PROBE_SENT && *(sock->pmtu.probe_size) == data_size){
01367                                                 /* PMTU: Successful write so set probe as sent */
01368                                                 sock->pmtu.probe_state = PROBE_SENT;
01369                                                 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_SendAllData",
01370                                                                   "Probe of size %d sent frame_num[%d] data_offset[%d]", data_size, sock->pmtu.frame_num, sock->pmtu.data_offset);
01371                                         }else if (ret != 0) {
01372                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_SendAllData", "Control_Write failed; skt->fd[%d] ; CONN_ERR", sock->fd);
01373                                                 sock->state = CONN_ERR;
01374                                                 return -1;
01375                                         }
01376 
01377                                         /* indicates that this packet was sent */
01378                                         /* update the stats */
01379                                         sock->stats.bytes_sent+=pack->iov[0].iov_len;
01380                                         sock->stats.bytes_sent+=pack->iov[1].iov_len;
01381                                         sock->stats.data_bytes_sent+=pack->iov[1].iov_len;
01382                                         sock->stats.packets_sent++;
01383 
01384                                         CC_Sent_Data(pack, frame_l, sock); /* Let CC control update the CC information for this socket */
01385                                         packets_sent = 1; /* set the return value to indicate that we managed to send some data packet */
01386                                         sock->l_seq++;
01387                                 }
01388                                 /* update the interval with the fact that we've sent a piece from the front of size data_size */
01389                                 interval_ptr->start += data_size;
01390                         } /* interval_ptr->end - interval_ptr->start != 0 */
01391 
01392                         /* delete the missing data interval that was just sent in pieces of current MTU */
01393                         if (interval_ptr->prev != NULL) {
01394                                 interval_ptr->prev->next = interval_ptr->next;
01395                                 if (interval_ptr->next != NULL) {
01396                                         interval_ptr->next->prev = interval_ptr->prev;
01397                                 }
01398                         } else { /* this is the first interval */
01399                                 frame_l->frame.missing_data.head = interval_ptr->next;
01400                                 if (interval_ptr->next != NULL) {
01401                                         interval_ptr->next->prev = NULL;
01402                                 }
01403                         }
01404                         frame_l->frame.missing_data.count--;
01405                         interval_ptr2 = interval_ptr->next;
01406                         RELEASE(interval_ptr, sizeof(interval_t));
01407                         interval_ptr = interval_ptr2;
01408                 } /* interval_ptr != NULL */
01409                 frame_l = frame_l->next;
01410         } /* while (frame_l != NULL) */
01411         return packets_sent;
01412 } /* Control_SendAllData() */
01413 
01414 /**
01415  * Main Control thread that selects on incoming fds, receives all
01416  * packets, manages timers for congestion control and does a bunch of
01417  * other logic things for the protocol. This thread goes through the
01418  * socket linked list and performs the necessary operations to send &
01419  * recv frames to and from the api land and all of this is marshalled
01420  * based on the state of each of the sockets.
01421  *
01422  * @param
01423  *      ptr is not used for anything right now
01424  */
01425 static void ControlT(void* ptr) {
01426         int int_optval;
01427         socklen_t int_optlen;
01428         struct timeval tv, tv_const;
01429         // frame_link* frame_prev;
01430         // frame_link* frame_l;
01431         // frame_link* frame_l2;
01432         // context_t context;
01433         packet* p;
01434         uint32_t rtt;
01435         ccontrol_t *c;
01436         fd_set fds;
01437         int max_fd, num_active, i, j, ret;
01438         vfer_sock* sock;
01439         int val;
01440         int num_packets;
01441         /* server vars */
01442         struct sockaddr_in addr;
01443         packet* response;
01444         packet* ack;
01445         socklen_t addr_len;
01446         char buf[INET_ADDRSTRLEN];
01447         int err;
01448         interval_t *interval_ptr;
01449 
01450         tv_const.tv_sec = 0;
01451         tv_const.tv_usec = CONTROLT_SLEEPTIME;
01452 
01453         /* set fds, and max_fd for first itteration */
01454         pthread_mutex_lock(&(sockets.mutex));
01455         fds = sockets.fds;
01456         max_fd = sockets.max_fd;
01457         num_active = sockets.num_active;
01458         pthread_mutex_unlock(&(sockets.mutex));
01459 
01460         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Entered with num_active[%d]", num_active);
01461 
01462         while (1) {
01463         select:
01464                 tv = tv_const;
01465                 /* TODO: maintain a timeout set to smallest rtt of all sockets minus some epsilon */
01466 
01467                 if (select(max_fd + 1, &fds, 0, 0, &tv) == -1) {
01468                         if (errno == EINTR) {
01469                                 /* interrupted select call */
01470                                 pthread_mutex_lock(&(sockets.mutex));
01471                                 fds = sockets.fds;
01472                                 max_fd = sockets.max_fd;
01473                                 num_active = sockets.num_active;
01474                                 pthread_mutex_unlock(&(sockets.mutex));
01475                                 goto select; /* retry */
01476                         }
01477                 }
01478 
01479                 i = j = 0;
01480                 while (j < num_active && i < MAX_SOCKETS) {
01481                         pthread_mutex_lock(&(sockets.s[i].mutex));
01482                         c = &(sockets.s[i].ccontrol);
01483                         sock = &(sockets.s[i]);
01484                         i++;
01485 
01486 
01487                         switch (sock->state) {
01488                         case CONN_DISCONNECTING:
01489                         case CONN_ERR:
01490                                 if (sock->send_frames.num == 0) {
01491                                         j++;
01492                                         p = Packet_Form_Close(0, 0);
01493                                         Control_SendCtl(sock, p);
01494                                         goto disconnect_socket;
01495 
01496                                 }
01497                                 /* note that we don't break but continue to CONN_CONNECTED case
01498                                    because there are datagrams left that are unset
01499                                    note: j will be incremented in CONN_CONNECTED case */
01500 
01501                         case CONN_CONNECTED:
01502                                 j++;
01503 
01504                                 if (! FD_ISSET(sock->fd, &fds)) {
01505                                         /* nothing to read on this socket */
01506                                         goto skip_recvall;
01507                                 }
01508 
01509                                 ret = Control_RecvAll(sock);
01510                                 if (ret < 0 || sock->state == CONN_DISCONNECTED) {
01511                                         goto disconnect_socket;
01512                                 }
01513 
01514                                 if (ret != 0) {
01515                                         GET_TIME_OF_DAY(&(tv));
01516                                         if (ret == 1) { /* received only data packets */
01517                                                 c->r_last_data  = tv;
01518                                         } else if (ret == 2) {  /* received only control packets */
01519                                                 c->r_last_ccack = tv;
01520                                         } else if (ret == 3) {  /* received both data and control packets */
01521                                                 c->r_last_ccack = tv;
01522                                                 c->r_last_data  = tv;
01523                                         }
01524 
01525                                         // note, c->s_last_unacked_data is cleared in Control_RecvAll when CC Ack is received that acks last sent date
01526                                         if ((ret == 1 || ret == 3) &&
01527                                             (c->r_unacked_bytes >= UNACKED_THRESH * c->mtu)) {
01528                                                 /* received data, send an ack if unacked data bytes is above threshold */
01529                                                 if (Control_Send_Ack(sock) != 0) {
01530                                                         goto disconnect_socket;
01531                                                 }
01532                                                 c->r_unacked_bytes = 0;
01533                                                 c->s_last_ack = tv;
01534                                         }
01535                                 }
01536 
01537                         skip_recvall:
01538 
01539                                 GET_TIME_OF_DAY(&(tv));
01540 
01541                                 /* RTT returns rtt in microseconds */
01542                                 rtt = RTT(sock, tv);
01543                                 if (rtt == 0) {
01544                                         printf("RTT EMPTY\n");
01545                                         rtt = 5000; /* 5ms */
01546                                 }
01547 
01548                                 if (TIMEVAL_US_DIFF(c->s_rtt_start, tv) >= rtt) {
01549                                         c->s_rtt_start = tv;
01550                                         c->s_cwnd_sent = 0;
01551                                 }
01552 
01553                                 /* reset the recorded timestamp of last loss event if set and older than rtt */
01554                                 if (c->r_last_loss.tv_sec != 0 &&
01555                                     TIMEVAL_US_DIFF(c->r_last_loss, tv) >= rtt) {
01556                                         c->r_last_loss.tv_sec = 0;
01557                                 }
01558 
01559                                 /* send an ack if we have unacked bytes to ack and
01560                                    time since we sent last ack is >= rtt/2 */
01561                                 if (c->r_unacked_bytes != 0 &&
01562                                     TIMEVAL_US_DIFF(c->s_last_ack, tv) >= rtt/2.0) {
01563                                         if (Control_Send_Ack(sock) != 0) {
01564                                                 goto disconnect_socket;
01565                                         }
01566                                         c->r_unacked_bytes = 0;
01567                                         c->s_last_ack = tv;
01568                                 }
01569 
01570                                 /* reset s_last_vec after X*RTT indicating that we can send another vec ack now */
01571                                 // if (c->s_last_vec.tv_sec != 0 &&
01572                                 // TIMEVAL_US_DIFF(c->s_last_vec, tv) >= (2*rtt)) {
01573                                         // printf("resetting s_last_vec from [%d,%d] and tv[%d,%d] and rtt[%d]\n", c->s_last_vec.tv_sec, c->s_last_vec.tv_usec, tv.tv_sec, tv.tv_usec, rtt);
01574                                 // c->s_last_vec.tv_sec = 0;
01575                                 // }
01576 
01577                                 // if (c->s_last_vec.tv_sec == 1) {
01578                                         /* check urgency values and generate nack vecs as necessary */
01579                                         /* frame_l = sock->recv_frames.first;
01580                                         frame_prev = NULL;
01581                                         while (frame_l != NULL) {
01582                                                 if (frame_l->frame.done == 1) {
01583                                                         frame_prev = frame_l;
01584                                                         frame_l = frame_l->next;
01585                                                         continue;
01586                                                 }
01587                                         */
01588                                                 /* if no data in 2 round trip times then we send a vec ack for first unfinished frame */
01589                                                 // if (frame_l->frame.urgency == URGENCY_THRESHOLD || frame_l->frame.urgency == -1) {
01590                                                         /* send a vec ack for the first occuring threshold frame */
01591                                                         /* evaluate the reliability function to determine if user still interested in the datagram */
01592                                                         /* context.f = frame_l->frame;
01593                                                         sock->relfun(NULL);
01594                                                         context.stats = &(sock->stats);
01595                                                         context.fstats = &(frame_l->frame.stats); */
01596                                                         // if (sock->relfun(&context) == 0) {
01597                                                                 /* discard frame */
01598                                                         /* Datagram_Remove (&(sock->recv_frames), frame_prev, frame_l);
01599                                                                 frame_l2 = frame_l->next;
01600                                                                 RELEASE(frame_l->frame.data, frame_l->frame.len);
01601                                                                 RELEASE(frame_l, sizeof(frame_link));
01602                                                                 frame_l = frame_l2;
01603                                                                 continue; */ /* continue the loop with frame_l set to next frame in list and prev frame unchanged */
01604                                                         // }
01605 
01606                                                         // printf("frame[%d] reached THRESHOLD, sending vec ack\n", frame_l->frame.frame_num);
01607 
01608                                                         /* start the negative counter in the urgency for this frame */
01609                                                         // p = Packet_Form_VecAck(0, 0, frame_l->frame.frame_num, &(frame_l->frame.missing_data)); /* seq filled in by Control_SendAllCtl() */
01610                                         /* if (Control_SendCtl(sock, p) != 0) {
01611                                                                 goto disconnect_socket;
01612                                                                 } */
01613 
01614                                                         /* empty frame created initially with urgency of -1 ;
01615                                                            set it's urgency to THRESHOLD so as to make it behave as a normal frame in the future */
01616                                                         /* frame_l->frame.urgency = URGENCY_THRESHOLD;
01617                                                         c->s_last_vec = tv;
01618                                                 }
01619                                                 frame_prev = frame_l;
01620                                                 frame_l = frame_l->next;
01621                                                 }
01622                                                 } */
01623                         
01624                                 // printf("checking last_unacked_data[%d %d] with [%d %d]\n", c->s_last_unacked_data.tv_sec, c->s_last_unacked_data.tv_usec, tv.tv_sec, tv.tv_usec);
01625                                 if (c->s_last_unacked_data.tv_sec != 0 &&
01626                                     (TIMEVAL_US_DIFF(c->s_last_unacked_data, tv) >= 5*rtt)) {
01627                                         /* consider this a timeout, ie. previous data unacked so we must resend it */
01628                                         /* printf("expired waiting for ack with last_unacked_data[%d,%d] and tv[%d,%d] rtt[%d]\n", c->s_last_unacked_data.tv_sec, c->s_last_unacked_data.tv_usec, tv.tv_sec, tv.tv_usec, rtt); */
01629                                         c->s_last_unacked_data.tv_sec = 0; /* reset the timestamp (it will be set once the repeat data pack leaves */
01630 
01631                                         interval_ptr = c->s_last_data_frame->frame.missing_data.head;
01632                                         if (interval_ptr == NULL) {
01633                                                 ALLOC(interval_ptr, interval_t*, interval_t, 1);
01634                                                 interval_ptr->next = NULL;
01635                                                 interval_ptr->prev = NULL;
01636                                                 interval_ptr->start = c->s_last_data_offset;
01637                                                 interval_ptr->end = c->s_last_data_offset + c->mtu; /* note: this depends on the current MTU */
01638                                                 c->s_last_data_frame->frame.missing_data.head = interval_ptr;
01639                                                 c->s_last_data_frame->frame.missing_data.count = 1;
01640                                         }
01641                                         c->syncing = 1;
01642                                 }
01643 
01644                                 /* send all data packets on every socket */
01645 
01646                                 if (sock->send_frames.length != 0 && CC_Can_Send_Data(c->mtu, sock) != 0) {
01647                                         /* send data packets */
01648                                         // printf("trying to sendall data\n");
01649                                         ret = Control_SendAllData(sock);
01650                                         switch(ret) {
01651                                         case 1:
01652                                                 /* if  did send some data packets remember the time of this data transmission */
01653                                                 GET_TIME_OF_DAY(&(c->s_last_unacked_data));
01654                                                 break;
01655                                         case -1:
01656                                                 goto disconnect_socket;
01657                                         }
01658                                 }
01659 
01660                                 /* alert any threads waiting to dequeue recvd frames or to send frames */
01661                                 if (sock->recvd_frames.num != 0) {
01662                                         if (sock->select_active == 1 && (sock->select_mark & VFER_READABLE) == VFER_READABLE) {
01663                                                 sock->select_result |= VFER_READABLE;
01664                                                 pthread_mutex_unlock(&(sock->mutex));
01665                                                 pthread_mutex_lock(&(sockets.select_mutex));
01666                                                 pthread_cond_signal(&(sockets.select_cond));
01667                                         } else {
01668                                                 pthread_mutex_unlock(&(sock->mutex));
01669                                         }
01670                                         pthread_cond_broadcast(&(sock->cond));
01671                                 } else if ((sock->send_frames.length + c->mtu) <= MAX_SEND_WINDOW) {
01672                                         if (sock->select_active == 1 && (sock->select_mark & VFER_WRITABLE) == VFER_WRITABLE) {
01673                                                 sock->select_result |= VFER_WRITABLE;
01674                                                 pthread_mutex_unlock(&(sock->mutex));
01675                                                 pthread_mutex_lock(&(sockets.select_mutex));
01676                                                 pthread_cond_signal(&(sockets.select_cond));
01677                                         } else {
01678                                                 pthread_mutex_unlock(&(sock->mutex));
01679                                         }
01680                                         pthread_cond_broadcast(&(sock->cond));
01681                                 } else {
01682                                         pthread_mutex_unlock(&(sock->mutex));
01683                                 }
01684                                 continue;
01685 
01686                         case CONN_LISTENING:
01687 
01688                                 j++;
01689                                 if (! FD_ISSET(sock->fd, &fds)) {
01690                                         /* nothing to read on this socket */
01691                                         break;
01692                                 }
01693 
01694                                 addr_len = sizeof(SA);
01695                                 if ((err = Packet_Read(sock->fd, &sock->request, (SA*)(&addr), &addr_len)) != 0) {
01696                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Read returned error[%d]", err);
01697                                         break;
01698                                 }
01699 
01700                                 if (sock->request->type != REQUEST) {
01701                                         RELEASE_PACK(sock->request);
01702                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet is not REQUEST, ignoring");
01703                                         break;
01704                                 }
01705                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "received request: [%s : %d]",
01706                                             inet_ntop(AF_INET, &(addr.sin_addr), buf, INET_ADDRSTRLEN), ntohs(addr.sin_port));
01707 
01708                                 if (Accept_Queue_Enqueue (sock->accept_queue, sock->fd, *((SA*)(&addr)), sock->request) == -1) {
01709                                         /* have to drop the connection some kind of queue error */
01710                                         RELEASE_PACK(sock->request);
01711                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "couldn't enqueue new connection");
01712                                         break;
01713                                 }
01714 
01715                                 pthread_mutex_unlock(&(sock->mutex));
01716                                 pthread_cond_signal(&(sock->cond));
01717                                 continue;
01718 
01719                         case CONN_CONNECTING:
01720 
01721                                 j++;
01722 
01723                                 /* set up the addr structure to unconnect the socket if necessary */
01724                                 bzero((char*)(&addr), sizeof(addr));
01725                                 addr.sin_family = AF_UNSPEC;
01726                                 addr_len = sizeof(addr);
01727 
01728                                 // printf("conn_conencting state fd[%d] max_fd[%d]\n", sock->fd, max_fd);
01729                                 if (FD_ISSET(sock->fd, &(fds))) {
01730                                         ret = Packet_Read(sock->fd, &response, NULL, NULL);
01731                                         if (ret != 0) {
01732                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_ConnectT", "Packet_Read failed[%d]", ret);
01733                                                 switch (ret) {
01734                                                 case EAGAIN:
01735                                                         /* try handshake again */
01736                                                         break;
01737                                                 case ECONNREFUSED: /* perhaps a syscall error of some sort */
01738                                                         sock->err = VFER_REFUSED;
01739                                                         goto could_not_connect;
01740                                                 default:
01741                                                         sock->err = VFER_NOCONN;
01742                                                         goto could_not_connect;
01743                                                 }
01744                                                 RELEASE_PACK(sock->request);
01745                                                 break;
01746                                         }
01747 
01748                                         /* since we are the client, change the sign of the delay measure set by packet.c */
01749                                         response->delay *= -1;
01750                                         DEBUG_PACK(DEBUG_CTL, "control.c", "Control_ConnectT",
01751                                                    response, "Received [2/3 handshake: Response] on sock->fd[%d]", sock->fd);
01752                                         
01753                                         if (response->type != RESPONSE) {
01754                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_ConnectT",
01755                                                             "Received wrong packet type[%d] for [2/3 handshake] ; expected RESPONSE[%d]",
01756                                                             response->type, RESPONSE);
01757                                                 RELEASE_PACK(response);
01758                                                 goto could_not_connect;
01759                                         }
01760 
01761                                         /* unconnect the socket - we have to (re)connect to the port the server sent us) */
01762                                         /* This does not work with the VSL code-
01763                                            connect(sock->fd, (SA*) (&addr), addr_len); */
01764                                         getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &int_optval, &int_optlen); /* reset error if any */
01765 
01766                                         /* set up the new addr structure we will connect to */
01767                                         sock->addr.sin_port = htons(response->u.res.connection_port);
01768                                         sock->addr_len = sizeof(sock->addr);
01769 
01770                                         // printf("received response port: %d\n", response->u.res.connection_port);
01771                                         // printf("received response host: %s\n", inet_ntop(AF_INET, &(sock->addr.sin_addr), buf, INET_ADDRSTRLEN));
01772                                         
01773                                         /* open a new connection on the port specified by the server */
01774                                         if (connect(sock->fd, (SA*) &sock->addr, sock->addr_len) < 0 ) {
01775                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_ConnectT", "connecting to port specified in response packet failed, %s",
01776                                                             strerror(errno));
01777                                                 sock->err = VFER_NOCONN;
01778                                                 RELEASE_PACK(response);
01779                                                 goto could_not_connect;
01780                                         }
01781                                         
01782                                         /* fill in the ack packet structure */
01783                                         ack = Packet_Form_CCAck(0, sock->request->seq + 1, response->seq, 0,
01784                                                                 response->u.res.init_frame_num, response->u.res.init_frame_num,
01785                                                                 0, response->delay, 0, 0, NULL);
01786                                         
01787                                         DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", ack,
01788                                                    "Sending  [3/3 handshake: Ack] on sock->fd[%d]", sock->fd);
01789 
01790                                         /* send back an ack packet without specifying address (udp socket already connected) */
01791                                         if (Packet_Write(ack, sock->fd, NULL, 0) != 0) {
01792                                                 /* note: Packet_Write will check to see if system connect() generated an error on this socket
01793                                                    but depending on latency the error might not be caught until later */
01794                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Write failed");
01795                                                 sock->err       = VFER_NOCONN;
01796                                                 RELEASE_PACK(response);
01797                                                 RELEASE_PACK(ack);
01798                                                 goto could_not_connect;
01799                                         }
01800 
01801                                         /* set up all the socket parameters stored in the
01802                                          * request\response\ack packets here and in control.c */
01803                                         sock->opts |= VFERSO_CLIENT_SOCK;
01804                                         sock->l_seq = ack->seq + 1;
01805 
01806                                         pthread_mutex_unlock(&(sock->mutex));
01807                                         Control_Add_Socket(sock, sock->request, response, ack, 1);
01808                                         pthread_cond_signal(&(sock->cond));
01809 
01810                                         RELEASE_PACK(sock->request);
01811                                         RELEASE_PACK(response);
01812                                         RELEASE_PACK(ack);
01813                                         /* mutex unlocked, continue to next socket */
01814                                         continue;
01815                                 } /* FD_ISSET */
01816 
01817                                 /* socket not readable (or just made CONN_CONNECTING and therefore not in the reading set */
01818                                 /* check to make sure we are not over max_connect_tries */
01819                                 if (sock->connect_tries == MAX_REQUEST_TRIES) {
01820                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Reached MAX_REQUEST_TRIES[%d]", MAX_REQUEST_TRIES);
01821                                         goto could_not_connect;
01822                                 }
01823 
01824                                 /* make sure we timeout before each request based on the REQUEST_RETRY_INTERVAL */
01825                                 GET_TIME_OF_DAY(&(tv));
01826                                 if ((unsigned int)(TIMEVAL_US_DIFF(sock->last_request, tv)) < (unsigned int)(REQUEST_RETRY_INTERVAL)) {
01827                                         break;
01828                                 }
01829 
01830                                 sock->last_request = tv;
01831 
01832                                 sock->connect_tries++;
01833                                 /* fill in the request packet */
01834                                 if (sock->request == NULL) {
01835                                         sock->request = Packet_Form_Request (0, Packet_Gen_SeqNum(), MAX_FRAME_SIZE,
01836                                                                              sock->udp_recv_buf_size, sock->udp_send_buf_size, Packet_Gen_FrameNum());
01837                                 }
01838 
01839                                 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", sock->request,
01840                                            "Sending  [1/3 handshake: Request] on sock->fd[%d]", sock->fd);
01841 
01842                                 /* send request packet (because the socket is connected we do not specify addr/addrlen) */
01843                                 ret = Packet_Write(sock->request, sock->fd, NULL, 0);
01844                                 if (ret != 0) {
01845                                         /* test the returned pending error on the socket */
01846                                         switch (ret) {
01847                                         case ECONNREFUSED:
01848                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "connection refused on Packet_Write");
01849                                                 sock->err = VFER_REFUSED;
01850                                                 break;
01851                                         default:
01852                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Write failed with error[%d]", ret);
01853                                                 sock->err = VFER_NOCONN;
01854                                         }
01855                                         RELEASE_PACK(sock->request);
01856                                         goto could_not_connect;
01857                                 }
01858                                 break;
01859 
01860                         could_not_connect:
01861 
01862                                 connect(sock->fd, (SA*) &addr, addr_len);       /* unconnect the socket */
01863                                 sock->state = CONN_BOUND;               /* reset connection state to bound */
01864                                 getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &int_optval, &int_optlen); /* reset error if any */
01865 
01866                                 if (sock->select_active == 1 && (sock->select_mark & VFER_EXCEPTION) == VFER_EXCEPTION) {
01867                                         sock->select_result |= VFER_EXCEPTION;
01868                                         pthread_mutex_unlock(&(sock->mutex));
01869                                         pthread_mutex_lock(&(sockets.select_mutex));
01870                                         pthread_cond_signal(&(sockets.select_cond));
01871                                 } else {
01872                                         pthread_mutex_unlock(&(sock->mutex));
01873                                 }
01874                                 pthread_cond_signal(&(sock->cond));
01875                                 break;
01876 
01877                         case CONN_ACCEPTING:
01878 
01879                                 j++;
01880 
01881                                 if (sock->listening_sock->state != CONN_LISTENING) {
01882                                         /* race condition here- if listning socket is closed before a connection is fully accepted, vfer_accept() will fail */
01883                                         sock->err = VFER_IMPL;
01884                                         goto disconnect_socket;
01885                                 }
01886 
01887                                 if (FD_ISSET(sock->fd, &(fds))) {
01888                                         /* if socket readable try to receive cc_ack packet on connection fd */
01889                                         sock->addr_len = sizeof(SA);
01890                                         /* printf("conn_accepting readable socket\n"); */
01891                                         if ((err = Packet_Read(sock->fd, &ack, (SA*) &(sock->addr), &(sock->addr_len))) != 0
01892                                             || ack->type != CC_ACK) {
01893                                                 if (err != 0) {
01894                                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] Packet_Read returned error[%d]", sock->response_tries, err);
01895                                                 } else {
01896                                                         /* printf("Try[%d] Received a non CC_ACK packet as [3/3 handshake] on sock->fd[%d]", sock->response_tries, sock->fd); */
01897                                                         DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", ack,
01898                                                                    "Try[%d] Received a non CC_ACK packet as [3/3 handshake] on sock->fd[%d]", sock->response_tries, sock->fd);
01899                                                         RELEASE_PACK(ack);
01900                                                 }
01901                                                 if (Unconnect_Rebind(sock) == -1) {
01902                                                         /* couldn't rebind socket */
01903                                                         close(sock->fd);
01904                                                         RELEASE_PACK(sock->qc->request);
01905                                                         RELEASE(sock->qc, sizeof(queued_conn));
01906                                                         RELEASE_PACK(sock->response);
01907                                                         sock->err = VFER_IMPL;
01908                                                         goto disconnect_socket;
01909                                                 }
01910                                                 /* try sending the response packet again */
01911                                                 goto try_response;
01912                                         }
01913 
01914                                         /* compare the host addresses on the received packet and previously received request packet */
01915                                         if (memcmp(&(sock->qc->sa), &(sock->addr), sizeof(sock->qc->sa)) != 0) {
01916                                                 RELEASE_PACK(ack);
01917                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT",
01918                                                             "Try[%d] Mismatched addresses of remote host ignoring this conn request", sock->response_tries);
01919                                                 if (Unconnect_Rebind(sock) == -1) {
01920                                                         /* couldn't rebind socket */
01921                                                         close(sock->fd);
01922                                                         RELEASE_PACK(sock->qc->request);
01923                                                         RELEASE(sock->qc, sizeof(queued_conn));
01924                                                         RELEASE_PACK(sock->response);
01925                                                         sock->err = VFER_IMPL;
01926                                                         goto disconnect_socket;
01927                                                 }
01928                                                 /* try sending the response packet again */
01929                                                 goto try_response;
01930                                         }
01931 
01932                                         DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", ack, "Try[%d] Received [3/3 handshake] on sock->fd[%d]", sock->response_tries, sock->fd);
01933                                         err = 0;
01934                                         if (ack->seq != sock->qc->request->seq + 1) {
01935                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad seq number on cc_ack[%u] should be[%u]", sock->response_tries, ack->seq,
01936                                                             (sock->qc->request->seq + 1));
01937                                                 err = -1;
01938                                         } else if (ack->u.ccack.last_seq != sock->response->seq) {
01939                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad last_seq_recv number on cc_ack", sock->response_tries);
01940                                                 err = -1;
01941                                         } else if (ack->u.ccack.packets_lost != 0) {
01942                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad packets lost on cc_ack", sock->response_tries);
01943                                                 err = -1;
01944                                         } else if (ack->u.ccack.oldest_frame != sock->response->u.res.init_frame_num) {
01945                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad oldest_frame on cc_ack", sock->response_tries);
01946                                                 err = -1;
01947                                         }
01948 
01949                                         if (err == -1) { /* bad cc_ack */
01950                                                 RELEASE_PACK(ack);
01951                                                 if (Unconnect_Rebind(sock) == -1) {
01952                                                         /* couldn't rebind socket */
01953                                                         close(sock->fd);
01954                                                         RELEASE_PACK(sock->qc->request);
01955                                                         RELEASE(sock->qc, sizeof(queued_conn));
01956                                                         RELEASE_PACK(sock->response);
01957                                                         sock->err = VFER_IMPL;
01958                                                         goto disconnect_socket;
01959                                                 }
01960                                                 /* try sending the response packet again */
01961                                                 goto try_response;
01962                                         }
01963 
01964                                         /* fill in the parameters in the sock structure */
01965                                         sock->r_seq = ack->seq + 1; /* the others are filled in control.c from original vfer_accept() call */
01966 
01967                                         /* connected ! */
01968                                         sock->type              = SOCK_DGRAM;
01969                                         sock->opts              = 0;
01970                                         sock->err               = 0;
01971                                         sock->select_mark       = 0;
01972                                         sock->select_result     = 0;
01973                                         sock->select_active     = 0;
01974 
01975                                         /* address structure copying */
01976                                         bzero(&addr, sock->addr_len);
01977                                         memcpy(&addr, &sock->addr, sock->addr_len);
01978 
01979                                         /* signal to ccontrol that we have a new connection to
01980                                            monitor for incoming and outgoing packets */
01981                                         pthread_mutex_unlock(&(sock->mutex));
01982                                         Control_Add_Socket(sock, sock->qc->request, sock->response, ack, 0);
01983                                         pthread_cond_signal(&(sock->cond));
01984 
01985                                         RELEASE_PACK(sock->qc->request);
01986                                         RELEASE(sock->qc, sizeof(queued_conn));
01987                                         RELEASE_PACK(sock->response);
01988                                         RELEASE_PACK(ack);
01989                                         continue;
01990                                 }
01991 
01992                                 /* response packet sending logic, sends a response if either
01993                                    1. This is the first time we enter the loop
01994                                    2. We have waited longer than HANDSHAKE_ACK_WAIT_INTERVAL microsecs since sending a previous response
01995                                 */
01996 
01997                         try_response:
01998                                 GET_TIME_OF_DAY(&(tv));
01999                                 if (sock->last_response.tv_sec != 0 &&
02000                                     (unsigned int)(TIMEVAL_US_DIFF(sock->last_response, tv)) < (unsigned int)(HANDSHAKE_ACK_WAIT_INTERVAL)) {
02001                                         break;
02002                                 }
02003 
02004                                 if (sock->response_tries == MAX_RESPONSE_TRIES) {
02005                                 try_next_conn:
02006                                         /* grab the top connection waiting to be accepted  */
02007                                         sock->qc = Accept_Queue_Dequeue (sock->listening_sock->accept_queue);
02008                                         if (sock->qc == NULL) {
02009                                                 /* server socket is non-blocking => don't wait for new connections */
02010                                                 if (SOCK_OPT_ISSET(sock, VFERSO_NONBLOCK)) {
02011                                                         sock->err = VFER_WOULDBLOCK;
02012                                                         goto disconnect_socket;
02013                                                 }
02014                                                 /* server blocking sockets => wait indefinitly for a connection */
02015                                                 // DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "blocking with empty accepted queue");
02016                                                 break;
02017                                         }
02018 
02019                                         DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", sock->qc->request,
02020                                                    "Dequeued [1/3 handshake] from acceptQ backlog for server_sock->fd[%d]", sock->fd);
02021 
02022                                         /* connect the return socket(already bound to new port) to the host we dequeued from the accept queue */
02023                                         if (connect(sock->fd, &(sock->qc->sa), sizeof(sock->qc->sa)) != 0) {
02024                                                 perror("ControlT[connect]");
02025                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "connect on new socket failed");
02026                                                 close(sock->fd);
02027                                                 RELEASE_PACK(sock->qc->request);
02028                                                 RELEASE(sock->qc, sizeof(queued_conn));
02029                                                 sock->err = VFER_IMPL;
02030                                                 goto disconnect_socket;
02031                                         }
02032 
02033 
02034                                         /* try to set the receive buffer sizes */
02035                                         num_packets = DEFAULT_RECV_BUF_MAXPACKETS;
02036                                         val = CALC_UDP_RECV_BUF_SIZE(num_packets);
02037                                         do {
02038                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Trying to set SO_RCVBUF to %d for accepting socket", val);
02039                                                 ret = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val));
02040                                                 if (ret != 0) {
02041                                                         if (errno == ENOBUFS && num_packets > 100) {
02042                                                                 num_packets -= 100;
02043                                                                 val = CALC_UDP_RECV_BUF_SIZE(num_packets);
02044                                                         } else {
02045                                                                 perror("ControlT :: setsockopt");
02046                                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Could not set RCVBUF size to [%d]", val);
02047                                                                 close(sock->fd);
02048                                                                 RELEASE_PACK(sock->qc->request);
02049                                                                 RELEASE(sock->qc, sizeof(queued_conn));
02050                                                                 sock->err = VFER_IMPL;
02051                                                                 goto disconnect_socket;
02052                                                         }
02053                                                 }
02054                                         } while (ret != 0);
02055                                         sock->udp_recv_buf_size = val;
02056 
02057                                         /* try to set the send buffer size */
02058                                         num_packets = DEFAULT_SEND_BUF_MAXPACKETS;
02059                                         val = CALC_UDP_SEND_BUF_SIZE(num_packets);
02060                                         do {
02061                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Trying to set SO_SNDBUF to %d for accepting socket", val);
02062                                                 ret = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val));
02063                                                 if (ret != 0) {
02064                                                         if (errno == ENOBUFS && num_packets > 100) {
02065                                                                 num_packets -= 100;
02066                                                                 val = CALC_UDP_SEND_BUF_SIZE(num_packets);
02067                                                         } else {
02068                                                                 perror("ControlT :: setsockopt");
02069                                                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Could not set SNDBUF size to [%d]", val);
02070                                                                 close(sock->fd);
02071                                                                 RELEASE_PACK(sock->qc->request);
02072                                                                 RELEASE(sock->qc, sizeof(queued_conn));
02073                                                                 sock->err = VFER_IMPL;
02074                                                                 goto disconnect_socket;
02075                                                         }
02076                                                 }
02077                                         } while (ret != 0);
02078                                         sock->udp_send_buf_size = val;
02079 
02080                                         /* form response */
02081                                         if (sock->response != NULL) {
02082                                                 /* sock->response is NULL on first itteration */
02083                                                 RELEASE_PACK(sock->response);
02084                                         }
02085 
02086                                         sock->response = Packet_Form_Response(0, Packet_Gen_SeqNum(), Packet_Gen_FrameNum(), ntohs(sock->sa.sin_port),
02087                                                                               MAX_FRAME_SIZE, sock->udp_recv_buf_size, sock->udp_send_buf_size, sock->qc->request->delay);
02088                                         sock->response_tries = 0;
02089                                 }
02090 
02091                                 sock->last_response = tv;
02092                                 sock->response_tries++;
02093                                 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", sock->response, "Try[%d] Sending [2/3 handshake] on sock->fd[%d]", sock->response_tries, sock->fd);
02094 
02095                                 /* memcpy(&addr, &(sock->qc->sa), sizeof(sock->qc->sa));
02096                                 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "sending response to [%s : %d]",
02097                                 inet_ntop(AF_INET, &(addr.sin_addr), buf, INET_ADDRSTRLEN), ntohs(addr.sin_port)); */
02098 
02099                                 /* send response packet on handshake fd */
02100                                 if ((ret = Packet_Write(sock->response, sock->listening_sock->fd, &(sock->qc->sa), sizeof(sock->qc->sa))) != 0) {
02101                                         /* ignore this connection, move on */
02102                                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] Packet_Write failed with error[%d]", sock->response_tries, ret);
02103                                         /* unconnect & rebind the socket */
02104                                         if (Unconnect_Rebind(sock) == -1) {
02105                                                 /* couldn't rebind socket */
02106                                                 close(sock->fd);
02107                                                 RELEASE_PACK(sock->qc->request);
02108                                                 RELEASE(sock->qc, sizeof(queued_conn));
02109                                                 RELEASE_PACK(sock->response);
02110                                                 sock->err = VFER_IMPL;
02111                                                 goto disconnect_socket;
02112                                         }
02113                                         /* try the next queued connection */
02114                                         RELEASE_PACK(sock->response);
02115                                         sock->response = NULL;  /* this is crucial for the RELEASE_PACK above */
02116                                         sock->response_tries = MAX_RESPONSE_TRIES;
02117                                         goto try_next_conn;
02118                                 }
02119                                 break;
02120 
02121                         case CONN_DISCONNECTED:
02122                         case CONN_ACQUIRED:
02123                         case CONN_BOUND:
02124                                 /* not an active socket */
02125                                 pthread_mutex_unlock(&(sock->mutex));
02126                                 continue;
02127                         } /* switch */
02128 
02129                         pthread_mutex_unlock(&(sock->mutex));
02130                         continue;
02131 
02132                 disconnect_socket:
02133                         GET_TIME_OF_DAY(&(sock->stats.tv_end));
02134                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "fd[%d] CONN_DISCONNECTED", sock->fd);
02135                         sock->state = CONN_DISCONNECTED;
02136                         if (sock->select_active == 1 && (sock->select_mark & VFER_EXCEPTION) == VFER_EXCEPTION) {
02137                                 sock->select_result |= VFER_EXCEPTION;
02138                                 pthread_mutex_unlock(&(sock->mutex));
02139                                 pthread_mutex_lock(&(sockets.select_mutex));
02140                                 pthread_cond_signal(&(sockets.select_cond));
02141                         } else {
02142                                 pthread_mutex_unlock(&(sock->mutex));
02143                         }
02144                         Control_Remove_Socket(sock);
02145                         pthread_cond_signal(&(sock->cond));
02146 
02147                 } /* while (i < num_active) */
02148 
02149                 /****************/
02150                 pthread_mutex_lock(&(sockets.mutex));
02151                 if (sockets.num_active == 0) {
02152                         /* exits the control thread if there aren't any active sockets left */
02153                         DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "reached 0 active sockets, exiting control thread");
02154                         sockets.max_fd = 0;
02155                         pthread_mutex_unlock(&(sockets.mutex));
02156                         i = 0;
02157                         pthread_exit(&i);
02158                 }
02159                 /* set fds, and max_fd for next itteration */
02160                 fds = sockets.fds;
02161                 max_fd = sockets.max_fd;
02162                 num_active = sockets.num_active;
02163                 pthread_mutex_unlock(&(sockets.mutex));
02164                 /****************/
02165         } /* while (1) */
02166 } /* ControlT() */
02167 
02168 
02169 /**
02170  * Sets up the socket structure with default parameters based on the
02171  * initial handshake packets (request/response/ack). It sets the state
02172  * of the socket to CONN_CONNECTED.
02173  *
02174  * @param sock socket pointer to a vfer_sock
02175  * @param request request packet corresponding to the new connection
02176  * @param response packet corresponding to the new connection
02177  * @param ack ack corresponding to the new connection
02178  * @param client specifies whether this connection was initiated with a request (client == 1) OR
02179  *        replied to and accepted with a response (client != 1)
02180  */
02181 void Control_Add_Socket(vfer_sock* sock, packet* request, packet* response, packet* ack, char client) {
02182         ccontrol_t *c;
02183         struct timeval tv;
02184         int32_t t;
02185         int common_mtus[] = COMMON_MTU_LIST;
02186         int j;
02187         
02188         c = &(sock->ccontrol);
02189 
02190         /* sock structure specific settings */
02191         c->mtu = INIT_MTU;
02192         sock->relfun = rf_always_reliable;      /* full reliability by default for all new connections */
02193         
02194         /* PMTU: Initialize path mtu values for socket */
02195         if(sock->is_df_set != 1){
02196                 /* ivan: we know we can set icmp_fd to -1 here, but where else does it get set for accepted sockets if PMTUD is done without ICMP? */
02197                 sock->icmp_fd = -1;
02198                 sock->pmtu.probe_state = NO_PMTUD;
02199                 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_Add_Socket", "Not performing PMTUD");
02200         }else{
02201                 sock->pmtu.probe_state = NO_PROBE_SENT;
02202                 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_Add_Socket", "Initial PMTUD values set.");
02203                 for(j = 0; j <= COMMON_MTU_MAX_INDEX; j++){
02204                         sock->pmtu.common_mtus[j] = common_mtus[j];
02205                 }
02206                 sock->pmtu.search_low = sock->pmtu.common_mtus;
02207                 sock->pmtu.search_high = sock->pmtu.common_mtus + COMMON_MTU_MAX_INDEX;
02208                 sock->pmtu.probe_size = sock->pmtu.search_low + 1;
02209         }
02210         
02211         CLR_SOCKET_STATS(&(sock->stats));
02212 
02213         /* record the current timestamp as a connection start */
02214         GET_TIME_OF_DAY(&(sock->stats.tv_start));
02215 
02216         /* client/server specific settings */
02217         if (client == 1) {
02218                 /* sock->l_seq is set in vfer_connect() ; uses ack packet */
02219                 // printf("CLIENT delay curr_min[%d] base_min[%d]\n", Tree_Min(&(sock->ccontrol.rev_path_curr_delayhist), sock->stats.tv_start),
02220                 // Tree_Min(&(sock->ccontrol.rev_path_base_delayhist), sock->stats.tv_start));
02221 
02222                 sock->send_frames.first_frame_num = sock->send_frames.last_frame_num = request->u.req.init_frame_num;
02223                 sock->recv_frames.first_frame_num = sock->recv_frames.last_frame_num = response->u.res.init_frame_num;
02224                 sock->r_opts            = response->opts;
02225                 sock->l_frame_num       = request->u.req.init_frame_num;
02226                 sock->r_seq             = response->seq + 1;
02227                 sock->r_frame_num       = response->u.res.init_frame_num;
02228                 sock->r_recv_buf_size   = response->u.res.recv_buf_size;
02229                 sock->r_send_buf_size   = response->u.res.send_buf_size;
02230                 sock->r_max_frame_size  = response->u.res.max_frame_size;
02231                 sock->stats.max_rtt     = abs(response->u.res.request_delay - response->delay);
02232                 c->delay                = response->u.res.request_delay;
02233                 if (sock->stats.max_rtt == 0) {
02234                         printf("RTT EMPTY\n");
02235                         sock->stats.max_rtt = 5000; /* 5ms */
02236                 }
02237 
02238                 /* printf("max_rtt [%d]\n", sock->stats.max_rtt); */
02239                 /* set the base delay window, TREE_INIT defined in
02240                    globals.h t is in microseconds just like max_rtt */
02241                 t = sock->stats.max_rtt * 100;
02242                 TREE_INIT(&(c->rev_path_base_delayhist), t, tv);
02243 
02244                 /* set the current delay window */
02245                 t = sock->stats.max_rtt * 10;
02246                 TREE_INIT(&(c->rev_path_curr_delayhist), t, tv);
02247 
02248                 /* set the initial reverse path delay values from the response packet */
02249                 Tree_Insert(&(sock->ccontrol.rev_path_base_delayhist), response->delay, sock->stats.tv_start);
02250                 Tree_Insert(&(sock->ccontrol.rev_path_curr_delayhist), response->delay, sock->stats.tv_start);
02251                 //sock->delay = response->delay;
02252         } else {
02253                 /* sock->r_seq is set in Control_Accept() ; uses ack packet from client */
02254                 // printf("Server delay curr_min[%d] base_min[%d]\n", Tree_Min(&(sock->ccontrol.rev_path_curr_delayhist), sock->stats.tv_start),
02255                 // Tree_Min(&(sock->ccontrol.rev_path_base_delayhist), sock->stats.tv_start));
02256 
02257                 sock->send_frames.first_frame_num = sock->send_frames.last_frame_num = response->u.res.init_frame_num;
02258                 sock->recv_frames.first_frame_num = sock->recv_frames.last_frame_num = request->u.req.init_frame_num;
02259                 sock->r_opts            = request->opts;
02260                 sock->r_max_frame_size  = request->u.req.max_frame_size;
02261                 sock->r_recv_buf_size   = request->u.req.recv_buf_size;
02262                 sock->r_send_buf_size   = request->u.req.send_buf_size;
02263                 sock->r_frame_num       = request->u.req.init_frame_num;
02264                 sock->l_seq             = response->seq + 1;
02265                 sock->l_frame_num       = response->u.res.init_frame_num;
02266                 sock->stats.max_rtt     = abs(ack->u.ccack.delay - ack->delay);
02267                 c->delay                = ack->u.ccack.delay;
02268                 if (sock->stats.max_rtt == 0) {
02269                         printf("RTT EMPTY\n");
02270                         sock->stats.max_rtt = 1;
02271                 }
02272 
02273                 /* set the base delay window */
02274                 t = sock->stats.max_rtt * 100;
02275                 TREE_INIT(&(c->rev_path_base_delayhist), t, tv);
02276 
02277                 /* set the current delay window */
02278                 t = sock->stats.max_rtt * 10;
02279                 TREE_INIT(&(c->rev_path_curr_delayhist), t, tv);
02280 
02281                 /* set the initial reverse path delay values from the response and ack packets */
02282                 Tree_Insert(&(sock->ccontrol.rev_path_base_delayhist), request->delay, sock->stats.tv_start);
02283                 Tree_Insert(&(sock->ccontrol.rev_path_curr_delayhist), request->delay, sock->stats.tv_start);
02284                 
02285                 Tree_Insert(&(sock->ccontrol.rev_path_base_delayhist), ack->delay, sock->stats.tv_start);
02286                 Tree_Insert(&(sock->ccontrol.rev_path_curr_delayhist), ack->delay, sock->stats.tv_start);
02287 
02288                 //sock->delay = ack->delay;
02289                 
02290         }
02291 
02292         /* stats structure settings (max_rtt set above in the conditional blocks) */
02293         sock->stats.avg_rtt                     = sock->stats.max_rtt;
02294         sock->stats.min_rtt                     = sock->stats.max_rtt;
02295         sock->ccontrol.rtt                      = sock->stats.max_rtt;
02296         sock->ccontrol.syncing                  = 0;
02297         sock->stats.rtt_measurements            = 1;
02298         sock->stats.data_bytes_sender_sent      = 0;
02299 
02300         // printf("FIRST ESTIMATED RTT: %f\n", sock->stats.avg_rtt);
02301 
02302         /* ccontrol structure settings */
02303         if (client == 1) {
02304                 c->s_first_seq  = request->seq + 2; /* handshake ack packet seq number */
02305                 c->s_last_seq   = request->seq + 2;
02306         } else {
02307                 c->s_first_seq  = response->seq + 1;
02308                 c->s_last_seq   = response->seq + 1;
02309         }
02310         /* set up max and current window size (in bytes) based on remote recv buf size and local UDP send buf size  */
02311         if (sock->udp_send_buf_size < sock->r_recv_buf_size) {
02312                 c->s_maxw = (int)(sock->udp_send_buf_size);
02313         } else {
02314                 c->s_maxw = (int)(sock->r_recv_buf_size);
02315         }
02316 
02317         c->s_cwnd               = INIT_MTU * 4;
02318         c->s_cwnd_sent          = 0;
02319         c->s_inflight           = 0;
02320         c->delay_delta          = 0;
02321         c->r_packets_lost       = 0;
02322         c->last_packets_lost    = 0;
02323         c->r_bytes_total        = 0;
02324         c->last_bytes_total     = 0;
02325 
02326         if (client == 1) {
02327                 c->r_first_seq  = response->seq+1;
02328                 c->r_last_seq   = response->seq+1;
02329         } else {
02330                 c->r_first_seq  = request->seq+2;
02331                 c->r_last_seq   = request->seq+2;
02332         }
02333         GET_TIME_OF_DAY(&(tv));
02334         c->r_last_ccack = tv;
02335         c->r_last_data  = tv;
02336         c->s_rtt_start  = tv;
02337         c->s_last_ack   = tv;
02338         c->s_last_unacked_data.tv_sec   = 0;
02339 //      c->s_last_vec.tv_sec            = 0;
02340         c->r_last_loss.tv_sec           = 0;
02341         c->s_need_ccack = 0;
02342 
02343         DEBUG_CC(DEBUG_CTL, "control.c", "Add_Socket", sock, "CC_Init");
02344 
02345         /* sock structure settings */
02346         sock->send_frames.first = sock->send_frames.last        = NULL;
02347         sock->recv_frames.first = sock->recv_frames.last        = NULL;
02348         sock->recvd_frames.first        = sock->recvd_frames.last       = NULL;
02349         sock->send_frames.num           = 0;
02350         sock->recv_frames.num           = 0;
02351         sock->recvd_frames.num          = 0;
02352         sock->recvd_frames.length       = 0;
02353         sock->recv_frames.length        = 0;
02354         sock->send_frames.length        = 0;
02355 
02356         /*************/
02357         pthread_mutex_lock(&(sock->mutex));
02358         sock->state = CONN_CONNECTED;
02359         pthread_mutex_unlock(&(sock->mutex));
02360         /*************/
02361 
02362         pthread_mutex_lock(&(sockets.mutex));
02363         FD_SET(sock->fd, &(sockets.fds));
02364         if (sockets.max_fd < sock->fd) {
02365                 sockets.max_fd = sock->fd;
02366         }
02367         pthread_mutex_unlock(&(sockets.mutex));
02368 } /* Add_Socket() */
02369 
02370 /**
02371  * Removes a socket from the sockets linked list Control, a thread
02372  * safe function
02373  *
02374  * @param sock is a pointer to an vfer_sock
02375  */
02376 static void Control_Remove_Socket(vfer_sock* sock) {
02377 
02378         DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Remove_Socket", "Entered with sock->fd[%d]", sock->fd);
02379 
02380         Control_Unregister_Socket(sock);
02381 
02382         /****************/
02383         pthread_mutex_lock(&(sock->mutex));
02384         /* free up all the alloced send frames */
02385         Datagrams_Clear(&(sock->send_frames));
02386         /* free up all the alloced recv frames */
02387         Datagrams_Clear(&(sock->recv_frames));
02388 
02389         /* NOTE: we do NOT clear the recvd_frames list! */
02390         pthread_mutex_unlock(&(sock->mutex));
02391         /****************/
02392         return;
02393 } /* Remove_Socket() */
02394 
02395 /**
02396  * Unconnects an vfer_socket and rebinds it
02397  *
02398  * Internal use only, this function is used only by Control_Accept()
02399  *
02400  * @param sock a pointer to a socket structure
02401  * @return
02402  *      0 on success
02403  *      -1 on error
02404  */
02405 static int Unconnect_Rebind(vfer_sock* sock) {
02406         /* for sake of portability, we'll disconnect the socket using AF_UNSPEC
02407            & also rebind it so that we know it has an ip and port associated with it */
02408         socklen_t socklen;
02409 
02410         bzero((char*)(&sock->sa), sizeof(sock->sa));
02411         sock->sa.sin_family = AF_UNSPEC;
02412         /* per R.Stevens: various platforms respond
02413          * differently, so we ignore the result of
02414          * this call */
02415         if (connect(sock->fd, (SA*)(&sock->sa), sizeof(sock->sa)) == -1) {
02416                 perror("unconnect_rebind[unconnect]");
02417         }
02418 
02419         /* rebind socket as a wildcard */
02420         bzero((char*)(&sock->sa), sizeof(sock->sa));
02421         sock->sa.sin_port               = htons(INADDR_ANY);
02422         sock->sa.sin_addr.s_addr        = htonl(INADDR_ANY);
02423         sock->sa.sin_family             = AF_INET;
02424         if (bind(sock->fd, (SA*)(&sock->sa), sizeof(sock->sa)) < 0) {
02425                 perror("unconnect_rebind[bind for the new socket]");
02426                 return -1;
02427         }
02428 
02429         socklen = sizeof(sock->sa);
02430         if (getsockname(sock->fd, (SA*)&(sock->sa), &socklen) == -1) {
02431                 perror("unconnect_rebind[getsockname for new socket]");
02432                 return -1;
02433         }
02434 
02435         return 0;
02436 } /* Unconnect_Rebind() */

Generated on Tue Aug 8 16:07:19 2006 for VFER by  doxygen 1.4.7