00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "vfer_control.h"
00038 #include "vfer_ccontrol.h"
00039
00040 static pthread_t ControlT_tid;
00041
00042
00043 inline static int Control_Write (packet* p, int fd);
00044 static int Control_RecvAll (vfer_sock* sock);
00045 inline static int Control_SendCtl (vfer_sock* sock, packet* p);
00046 inline static int Control_Send_Ack (vfer_sock* s);
00047 static int Control_SendAllData (vfer_sock* sock);
00048 static void ControlT (void* ptr);
00049 static void Control_Remove_Socket (vfer_sock* sock);
00050 static int Unconnect_Rebind (vfer_sock* sock);
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073 int Control_Accept(vfer_sock* sock, vfer_sock* ret_sock)
00074 {
00075 int new_port;
00076 struct sockaddr_in sa;
00077 char buf[INET_ADDRSTRLEN];
00078 socklen_t socklen;
00079 struct timespec timedwait_timeout;
00080 struct timeval start_time;
00081 int ret;
00082
00083 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "Entered with listening sock->fd[%d]", sock->fd);
00084
00085 ret_sock->fd = socket(AF_INET, sock->type, 0);
00086 if (ret_sock->fd == -1) {
00087 perror("server_accept :: socket");
00088 return VFER_IMPL;
00089 }
00090
00091 if (fcntl(ret_sock->fd, F_SETFD, O_NONBLOCK) == -1) {
00092 perror("server_accept :: fcntl");
00093 return VFER_IMPL;
00094 }
00095
00096
00097
00098
00099 bzero((char*)(&sa), sizeof(sa));
00100 sa.sin_port = htons(INADDR_ANY);
00101 sa.sin_addr.s_addr = htonl(INADDR_ANY);
00102 sa.sin_family = AF_INET;
00103
00104 if (bind(ret_sock->fd, (SA*)(&sa), sizeof(sa)) < 0) {
00105 perror("server_accept: bind for the new socket failed");
00106 close(ret_sock->fd);
00107 return VFER_IMPL;
00108 }
00109
00110 socklen = sizeof(sa);
00111 if (getsockname(ret_sock->fd, (SA*)(&sa), &socklen) == -1) {
00112 perror("server_accept: getsockname for new socket failed");
00113 close(ret_sock->fd);
00114 return VFER_IMPL;
00115 }
00116
00117 new_port = ntohs(sa.sin_port);
00118
00119 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "New port socket: %s.%d",
00120 inet_ntop(AF_INET, &(sa.sin_addr), buf, INET_ADDRSTRLEN),
00121 new_port);
00122
00123 ret_sock->state = CONN_ACCEPTING;
00124 ret_sock->listening_sock = sock;
00125 ret_sock->last_response.tv_sec = 0;
00126 ret_sock->response = NULL;
00127 ret_sock->sa = sa;
00128 ret_sock->response_tries = MAX_RESPONSE_TRIES;
00129
00130 pthread_mutex_unlock(&(ret_sock->mutex));
00131 Control_Register_Socket(ret_sock);
00132 pthread_mutex_lock(&(ret_sock->mutex));
00133
00134 GET_TIME_OF_DAY(&(start_time));
00135
00136 if (sock->accept_timeout != 0) {
00137
00138 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->accept_timeout / 1000000);
00139 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->accept_timeout - (sock->accept_timeout / 1000000)) * 1000;
00140 do {
00141 ret = pthread_cond_timedwait(&(ret_sock->cond), &(ret_sock->mutex), &(timedwait_timeout));
00142
00143 if (ret == ETIMEDOUT) {
00144 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "timedout while trying to accept a new connection");
00145 ret_sock->err = VFER_TIMEOUT;
00146 close(ret_sock->fd);
00147 return VFER_TIMEOUT;
00148 }
00149 } while (ret_sock->state == CONN_ACCEPTING);
00150 } else {
00151 do {
00152 pthread_cond_wait(&(ret_sock->cond), &(ret_sock->mutex));
00153 } while (ret_sock->state == CONN_ACCEPTING);
00154 }
00155
00156 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Accept", "reached non conn_accepting state[%d]", ret_sock->state);
00157
00158 if (ret_sock->state != CONN_CONNECTED) {
00159 close(ret_sock->fd);
00160 return ret_sock->err;
00161 }
00162 return 0;
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184 int Control_Connect(vfer_sock* sock) {
00185 SA *serv_addr;
00186 int addrlen;
00187 struct timespec timedwait_timeout;
00188 struct timeval start_time;
00189 int ret;
00190
00191 serv_addr = (SA*)(&(sock->addr));
00192 addrlen = sizeof(sock->addr);
00193
00194 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "Entered sock->fd[%d]", sock->fd);
00195
00196
00197 if (connect(sock->fd, serv_addr, addrlen) < 0) {
00198 perror("Control_Connect :: connect");
00199 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "connect failed");
00200 sock->err = VFER_NOCONN;
00201 return -1;
00202 }
00203 memcpy(&sock->addr, serv_addr, addrlen);
00204 sock->connect_tries = 0;
00205 sock->request = NULL;
00206 sock->state = CONN_CONNECTING;
00207 bzero(&(sock->last_request), sizeof(struct timeval));
00208 pthread_mutex_unlock(&(sock->mutex));
00209
00210
00211 Control_Register_Socket(sock);
00212
00213 if (SOCK_OPT_ISSET(sock, VFERSO_NONBLOCK)) {
00214 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "non blocking socket, connection in progress");
00215 return -2;
00216 }
00217
00218
00219
00220 pthread_mutex_lock(&(sock->mutex));
00221 GET_TIME_OF_DAY(&(start_time));
00222 if (sock->connect_timeout != 0) {
00223
00224 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->connect_timeout / 1000000);
00225 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->connect_timeout - (sock->connect_timeout / 1000000)) * 1000;
00226 do {
00227
00228 pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00229
00230 if (ret == ETIMEDOUT) {
00231 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "timedout trying to connect");
00232 sock->err = VFER_TIMEOUT;
00233 return -1;
00234 }
00235 } while (sock->state == CONN_CONNECTING);
00236 } else {
00237 do {
00238 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "waiting on sock->mutex until sock's state changes from CONN_CONNECTING");
00239 pthread_cond_wait(&(sock->cond), &(sock->mutex));
00240 } while (sock->state == CONN_CONNECTING);
00241 }
00242
00243 if (sock->state != CONN_CONNECTED) {
00244 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "reached a non conn_connecting state[%d]", sock->state);
00245 return -1;
00246 }
00247
00248 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Connect", "reached connected state");
00249 return 0;
00250 }
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265 int Control_Close(vfer_sock* sock) {
00266 struct timespec timedwait_timeout;
00267 struct timeval start_time;
00268 int ret;
00269
00270 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "Entered sock->fd[%d]", sock->fd);
00271 sock->state = CONN_DISCONNECTING;
00272 if (SOCK_OPT_ISSET(sock, VFERSO_NONBLOCK)) {
00273 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "close() : VFER_INPROGRESS, send_queue[%d]", sock->send_frames.num);
00274 return -2;
00275 }
00276
00277 GET_TIME_OF_DAY(&(start_time));
00278 if (sock->close_timeout != 0) {
00279
00280 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->close_timeout / 1000000);
00281 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->close_timeout - (sock->close_timeout / 1000000)) * 1000;
00282 do {
00283 pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00284 if (ret == ETIMEDOUT) {
00285 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "timedout trying to close");
00286 sock->err = VFER_TIMEOUT;
00287 return -1;
00288 }
00289 } while (sock->state != CONN_DISCONNECTED);
00290 } else {
00291 do {
00292 pthread_cond_wait(&(sock->cond), &(sock->mutex));
00293 } while (sock->state != CONN_DISCONNECTED);
00294 }
00295
00296
00297 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Close", "reached disconnected|local_closed|err state");
00298 return 0;
00299 }
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313 size_t Control_Send(vfer_sock* sock, const void* buffer, int len) {
00314 struct timespec timedwait_timeout;
00315 struct timeval start_time;
00316 int ret;
00317
00318 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Send", "Entered with sock->fd[%d] buffer_len[%d]", sock->fd, len);
00319 if (len > MAX_SEND_WINDOW) {
00320 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Send", "buffer length is more than MAX_SEND_WINDOW");
00321 return -1;
00322 }
00323
00324 if (sock->state != CONN_CONNECTED) {
00325 pthread_mutex_unlock(&(sock->mutex));
00326 SET_ERROR_RETURN (sock, VFER_INVAL, -1);
00327 }
00328
00329 if ((sock->send_frames.length + len) > MAX_SEND_WINDOW) {
00330
00331 if ((sock->opts & VFERSO_NONBLOCK) == 1) {
00332 pthread_mutex_unlock(&(sock->mutex));
00333 SET_ERROR_RETURN (sock, VFER_WOULDBLOCK, -1);
00334 }
00335
00336 GET_TIME_OF_DAY(&(start_time));
00337 if (sock->snd_timeout != 0) {
00338
00339 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->snd_timeout / 1000000);
00340 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->snd_timeout - (sock->snd_timeout / 1000000)) * 1000;
00341 do {
00342 ret = pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00343
00344 if (ret == ETIMEDOUT) {
00345 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Send", "timedout while sending a frame");
00346 pthread_mutex_unlock(&(sock->mutex));
00347 SET_ERROR_RETURN (sock, VFER_TIMEOUT, -1);
00348 }
00349
00350 } while ((sock->send_frames.length + len) > MAX_SEND_WINDOW && sock->state == CONN_CONNECTED);
00351 } else {
00352 do {
00353 pthread_cond_wait(&(sock->cond), &(sock->mutex));
00354 } while ((sock->send_frames.length + len) > MAX_SEND_WINDOW && sock->state == CONN_CONNECTED);
00355 }
00356
00357 if (sock->state != CONN_CONNECTED) {
00358 pthread_mutex_unlock(&(sock->mutex));
00359 SET_ERROR_RETURN (sock, VFER_INVAL, -1);
00360 }
00361 }
00362
00363 if (Datagram_Enqueue(&sock->send_frames, buffer, len, sock->l_frame_num) == -1) {
00364 pthread_mutex_unlock(&(sock->mutex));
00365 SET_ERROR_RETURN (sock, VFER_BADSOCK, -1);
00366 }
00367 sock->l_frame_num++;
00368 sock->stats.data_bytes_sender_sent += len;
00369
00370 pthread_mutex_unlock(&(sock->mutex));
00371
00372 return (size_t)(len);
00373 }
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393 int Control_Recv(vfer_sock* sock, void* buffer, int len) {
00394 int ret;
00395 struct timespec timedwait_timeout;
00396 struct timeval start_time;
00397
00398 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Entered sock->fd[%d] ; buffer len[%d]", sock->fd, len);
00399
00400 if (sock->recvd_frames.num == 0) {
00401
00402
00403
00404 if (sock->state != CONN_CONNECTED) {
00405 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Returning 0 as socket state doesn't allow receiving");
00406 pthread_mutex_unlock(&(sock->mutex));
00407 return VFER_UNCONN;
00408 }
00409
00410 if ((sock->opts & VFERSO_NONBLOCK) == 1) {
00411 pthread_mutex_unlock(&(sock->mutex));
00412 return VFER_WOULDBLOCK;
00413 }
00414
00415 GET_TIME_OF_DAY(&(start_time));
00416
00417 if (sock->rcv_timeout != 0) {
00418
00419 timedwait_timeout.tv_sec = start_time.tv_sec + (sock->rcv_timeout / 1000000);
00420 timedwait_timeout.tv_nsec = (start_time.tv_usec + sock->rcv_timeout - (sock->rcv_timeout / 1000000)) * 1000;
00421 while (sock->recvd_frames.num == 0 && sock->state == CONN_CONNECTED) {
00422 ret = pthread_cond_timedwait(&(sock->cond), &(sock->mutex), &(timedwait_timeout));
00423 if (ret == ETIMEDOUT) {
00424 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "timedout while receiving a new frame");
00425 pthread_mutex_unlock(&(sock->mutex));
00426 return VFER_TIMEOUT;
00427 }
00428 }
00429 } else {
00430 while (sock->recvd_frames.num == 0 && sock->state == CONN_CONNECTED) {
00431 ret = pthread_cond_wait(&(sock->cond), &(sock->mutex));
00432 }
00433 }
00434
00435 if (sock->state != CONN_CONNECTED) {
00436 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Returning 0 as socket state doesn't allow receiving");
00437 pthread_mutex_unlock(&(sock->mutex));
00438 return VFER_UNCONN;
00439 }
00440 }
00441
00442
00443 ret = Datagram_Dequeue(&sock->recvd_frames, buffer, len);
00444 if (ret == -1) {
00445 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Recv", "Can't return frame: buffer too small");
00446 pthread_mutex_unlock(&(sock->mutex));
00447 return VFER_INVAL;
00448 }
00449 pthread_mutex_unlock(&(sock->mutex));
00450
00451 return ret;
00452 }
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463 void Control_Register_Socket(vfer_sock* sock) {
00464 sigset_t set;
00465
00466
00467 pthread_mutex_lock(&(sockets.mutex));
00468 sockets.num_active++;
00469
00470 FD_SET(sock->fd, &(sockets.fds));
00471 if (sockets.max_fd < sock->fd) {
00472 sockets.max_fd = sock->fd;
00473 }
00474
00475
00476 if (sockets.num_active == 1) {
00477 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Register_Socket", "Starting ControlT thread, num_active = 1");
00478 if (pthread_create(&ControlT_tid, NULL, (void (*))ControlT, NULL) != 0) {
00479 ERROR_PRINT("control.c", "Control_Register_Socket", "ERROR: Couldn't create ControlT thread");
00480 } else {
00481 pthread_sigmask(SIG_BLOCK, NULL, &set);
00482 sigaddset(&set, SIGINT);
00483 pthread_sigmask(SIG_BLOCK, &set, NULL);
00484 pthread_detach(ControlT_tid);
00485 }
00486 }
00487 pthread_mutex_unlock(&(sockets.mutex));
00488
00489 }
00490
00491
00492
00493
00494
00495
00496
00497
00498 void Control_Unregister_Socket(vfer_sock* sock) {
00499 int i, j;
00500
00501
00502 pthread_mutex_lock(&(sockets.mutex));
00503 FD_CLR(sock->fd, &(sockets.fds));
00504 sockets.num_active--;
00505
00506
00507 if(sock->icmp_fd != -1){
00508 close(sock->icmp_fd);
00509 }
00510
00511
00512 i = sockets.max_fd;
00513 if (sock->fd == i) {
00514
00515 sockets.max_fd = 0;
00516 i = j = 0;
00517 while (j < sockets.num_active) {
00518 pthread_mutex_lock(&(sockets.s[i].mutex));
00519 if (((sockets.s[i].state == CONN_DISCONNECTING) ||
00520 (sockets.s[i].state == CONN_CONNECTED) ||
00521 (sockets.s[i].state == CONN_CONNECTING) ||
00522 (sockets.s[i].state == CONN_ACCEPTING) ||
00523 (sockets.s[i].state == CONN_LISTENING)) &&
00524
00525 (sockets.max_fd < sockets.s[i].fd)) {
00526 j++;
00527 sockets.max_fd = sockets.s[i].fd;
00528 }
00529 pthread_mutex_unlock(&(sockets.s[i].mutex));
00530 i++;
00531 }
00532 }
00533 pthread_mutex_unlock(&(sockets.mutex));
00534
00535 }
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547
00548
00549 inline static int Control_Write(packet* p, int fd) {
00550 int ret;
00551
00552 ret = Packet_Write(p, fd, NULL, 0);
00553 switch (p->type) {
00554 case DATA:
00555
00556 RELEASE_IOV(p->iov, 1);
00557 RELEASE(p, sizeof(packet));
00558 break;
00559 default:
00560 RELEASE_PACK(p);
00561 break;
00562 }
00563
00564 if (ret != 0) {
00565 if (ret == EAGAIN) return 0;
00566 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Write", "Packet_Write returned error[%d]", ret);
00567 return -1;
00568 }
00569 return 0;
00570 }
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584 static int Control_RecvAll(vfer_sock* sock) {
00585 frame_link* frame_l;
00586 frame_link* frame_l2;
00587 frame_link* frame_prev;
00588 packet* p;
00589 packet* ack;
00590 uint32_t pframe_num;
00591 int i;
00592 int ret_val;
00593 int is_phantom;
00594 ccontrol_t* c;
00595 int n;
00596 interval_t *interval_ptr, *interval_ptr2;
00597
00598 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Entered with sock->fd[%d]", sock->fd);
00599 ret_val = 0;
00600 c = &(sock->ccontrol);
00601
00602 while ((Packet_Read_ICMP(sock->icmp_fd, &(sock->pmtu)) >= 0) && ((n = Packet_Read(sock->fd, &p, NULL, NULL)) != EAGAIN)) {
00603 switch (n) {
00604 case -1:
00605 case -2:
00606 case -3:
00607 case -4:
00608
00609 continue;
00610 }
00611 if (n != 0) {
00612
00613 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Packet_Read returned error[%d]", n);
00614 return -1;
00615 }
00616
00617
00618
00619 if (SOCK_OPT_ISSET(sock, VFERSO_CLIENT_SOCK)) {
00620 p->delay *= -1;
00621 }
00622
00623 DEBUG_PACK(DEBUG_CTL, "control.c", "Control_RecvAll", p, "Received packet on sock->fd[%d]", sock->fd);
00624
00625
00626 sock->stats.packets_recvd++;
00627
00628 sock->stats.bytes_recvd += p->iov[0].iov_len;
00629
00630
00631 switch (p->type) {
00632 case REQUEST:
00633 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received REQUEST during wrong state ; ignoring");
00634 break;
00635 case RESPONSE:
00636 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received RESPONSE during wrong state ; ignoring");
00637
00638
00639
00640 ack = Packet_Form_CCAck(0, sock->request->seq + 1, p->seq, 0,
00641 p->u.res.init_frame_num, p->u.res.init_frame_num,
00642 0, p->delay, 0, 0, NULL);
00643 if (Packet_Write(ack, sock->fd, NULL, 0) != 0) {
00644 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Write failed");
00645 sock->err = VFER_NOCONN;
00646 RELEASE_PACK(ack);
00647 }
00648 break;
00649 case CC_ACK:
00650 CC_Recvd_Ack(p, sock);
00651
00652
00653
00654
00655
00656
00657
00658
00659 while (sock->send_frames.first != NULL &&
00660 sock->send_frames.first->frame.frame_num < p->u.ccack.oldest_frame) {
00661 frame_l = sock->send_frames.first;
00662
00663
00664 Datagram_Remove (&(sock->send_frames), NULL, frame_l);
00665
00666
00667 if(sock->pmtu.probe_state == PROBE_SENT && sock->pmtu.frame_num == frame_l->frame.frame_num){
00668 c->mtu = *(sock->pmtu.probe_size);
00669 sock->pmtu.search_low = sock->pmtu.probe_size;
00670 if(sock->pmtu.search_low == sock->pmtu.search_high){
00671
00672 sock->pmtu.probe_state = MTU_FOUND;
00673 struct timeval tv;
00674 GET_TIME_OF_DAY(&tv);
00675 sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
00676 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()",
00677 "PMTU of size %d found [oldest frame]. Next probe to be begin at %lu.", c->mtu, (long int)sock->pmtu.next_probe_time);
00678 }else{
00679 sock->pmtu.probe_size++;
00680 sock->pmtu.probe_state = NO_PROBE_SENT;
00681 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()", "Increasing PMTU to size %d. [oldest frame]", c->mtu);
00682 }
00683 }
00684
00685 RELEASE(frame_l->frame.data, frame_l->frame.len);
00686 RELEASE(frame_l, sizeof(frame_link));
00687 }
00688
00689
00690
00691 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received CC_ACK ; newest_frame[%d] ; send_frames.first_frame_num[%d]",
00692 p->u.ccack.newest_frame, (sock->send_frames.first == NULL ? -0 : sock->send_frames.first->frame.frame_num));
00693
00694
00695
00696 if (p->u.ccack.last_seq == (sock->ccontrol.s_last_seq - 1)) {
00697
00698 c->s_last_unacked_data.tv_sec = 0;
00699 }
00700
00701
00702 if (ret_val < 2) ret_val+=2;
00703
00704 if (p->u.ccack.interval_count == 0) {
00705 break;
00706 }
00707
00708
00709
00710
00711
00712 frame_l = sock->send_frames.first;
00713 while (frame_l != NULL) {
00714 if (frame_l->frame.frame_num == p->u.ccack.intervals_frame_num && frame_l->frame.done == 0) {
00715 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Updating intervals for frame[%d]", p->u.ccack.intervals_frame_num);
00716
00717 if (p->u.ccack.interval_count == 1 &&
00718 ntohl(p->u.ccack.raw_intervals[0]) == 0 &&
00719 ntohl(p->u.ccack.raw_intervals[1]) == 0) {
00720
00721
00722 p->u.ccack.raw_intervals[1] = htonl(frame_l->frame.len);
00723 }
00724
00725
00726 int net_interval_count = p->u.ccack.interval_count - frame_l->frame.missing_data.count;
00727 if(sock->pmtu.probe_state == PROBE_SENT && frame_l->frame.frame_num == sock->pmtu.frame_num && net_interval_count > 0){
00728 i = 0;
00729 while(i < p->u.ccack.interval_count){
00730 int start = ntohl(p->u.ccack.raw_intervals[2*i]);
00731 int end = ntohl(p->u.ccack.raw_intervals[(2*i) + 1]);
00732 if(start == sock->pmtu.data_offset
00733 && end == (sock->pmtu.data_offset + *(sock->pmtu.probe_size))
00734 && net_interval_count == 1){
00735 sock->pmtu.search_high = sock->pmtu.probe_size;
00736 sock->pmtu.probe_state = MTU_FOUND;
00737 struct timeval tv;
00738 GET_TIME_OF_DAY(&tv);
00739 sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
00740 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()",
00741 "PMTU of size %d found [bit vector indicated failure]. Next probe to be begin at %lu.",
00742 c->mtu, (long int)sock->pmtu.next_probe_time);
00743 break;
00744 }else if(start <= sock->pmtu.data_offset
00745 && end >= (sock->pmtu.data_offset + *(sock->pmtu.probe_size))){
00746
00747 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()", "Probe needs to be resent");
00748 sock->pmtu.probe_state = NO_PROBE_SENT;
00749 break;
00750 }
00751 i++;
00752 }
00753
00754
00755
00756
00757 if(sock->pmtu.probe_state == PROBE_SENT){
00758 c->mtu = *(sock->pmtu.probe_size);
00759 sock->pmtu.search_low = sock->pmtu.probe_size;
00760 if(sock->pmtu.search_low == sock->pmtu.search_high){
00761
00762 sock->pmtu.probe_state = MTU_FOUND;
00763 struct timeval tv;
00764 GET_TIME_OF_DAY(&tv);
00765 sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
00766 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()",
00767 "PMTU of size %d found [oldest frame]. Next probe to begin at %lu.", c->mtu, (long int)tv.tv_sec);
00768 }else{
00769 sock->pmtu.probe_size++;
00770 sock->pmtu.probe_state = NO_PROBE_SENT;
00771 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_RecvAll()", "Increasing PMTU to size %d. [oldest frame]", c->mtu);
00772 }
00773 }
00774
00775 }
00776
00777
00778 interval_ptr = frame_l->frame.missing_data.head;
00779 interval_ptr2 = interval_ptr;
00780 i = 0;
00781
00782
00783 while (i < frame_l->frame.missing_data.count && i < p->u.ccack.interval_count) {
00784
00785
00786 interval_ptr->start = ntohl(p->u.ccack.raw_intervals[2*i]);
00787 interval_ptr->end = ntohl(p->u.ccack.raw_intervals[(2*i)+1]);
00788
00789
00790
00791
00792 interval_ptr2 = interval_ptr;
00793 interval_ptr = interval_ptr->next;
00794 i++;
00795 }
00796
00797 interval_ptr = interval_ptr2;
00798
00799 if (i == frame_l->frame.missing_data.count &&
00800 i < p->u.ccack.interval_count) {
00801
00802 for (; i < p->u.ccack.interval_count; i++) {
00803
00804 if (frame_l->frame.missing_data.head == NULL) {
00805
00806
00807 ALLOC(interval_ptr, interval_t*, interval_t, 1);
00808 frame_l->frame.missing_data.head = interval_ptr;
00809 interval_ptr->prev = NULL;
00810 } else {
00811
00812
00813 ALLOC((interval_ptr->next), interval_t*, interval_t, 1);
00814 interval_ptr->next->prev = interval_ptr;
00815 interval_ptr = interval_ptr->next;
00816 }
00817
00818 interval_ptr->next = NULL;
00819 interval_ptr->start = ntohl(p->u.ccack.raw_intervals[2*i]);
00820 interval_ptr->end = ntohl(p->u.ccack.raw_intervals[(2*i)+1]);
00821
00822
00823 }
00824 } else if (i < frame_l->frame.missing_data.count &&
00825 i == p->u.ccack.interval_count) {
00826
00827 interval_ptr = interval_ptr->next;
00828 if (interval_ptr != NULL) {
00829 interval_ptr->prev->next = NULL;
00830 }
00831 for (; i < frame_l->frame.missing_data.count; i++) {
00832
00833
00834
00835 interval_ptr2 = interval_ptr->next;
00836 RELEASE(interval_ptr, sizeof(interval_t));
00837 interval_ptr = interval_ptr2;
00838 }
00839 }
00840
00841 frame_l->frame.missing_data.count = p->u.ccack.interval_count;
00842
00843 break;
00844 }
00845 frame_l = frame_l->next;
00846 }
00847 break;
00848 case CLOSE:
00849 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received CLOSE");
00850 GET_TIME_OF_DAY(&(sock->stats.tv_end));
00851 sock->state = CONN_DISCONNECTED;
00852 CC_Recvd_Close(p, sock);
00853 if (ret_val < 2) ret_val+=2;
00854 return ret_val;
00855 case DATA:
00856 goto data_packet;
00857 default:
00858 if (p->type == DATA_ACK) {
00859 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received DATA_ACK ; ignoring for now");
00860 break;
00861 } else {
00862 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received unknown packet type!");
00863 }
00864 }
00865 RELEASE_PACK(p);
00866 continue;
00867
00868 data_packet:
00869
00870
00871 if ((p->opts & OPT_SYNC) == OPT_SYNC) {
00872
00873
00874 Control_Send_Ack (sock);
00875
00876 }
00877
00878
00879 pframe_num = p->u.data.frame_num;
00880
00881
00882
00883 if (pframe_num < sock->recv_frames.first_frame_num) {
00884
00885 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet with old frame number[%d] outside bounds: [%d,inf], ignoring",
00886 pframe_num, sock->recv_frames.first_frame_num);
00887 break;
00888 } else if (p->u.data.frame_len == 0) {
00889
00890 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet has no frame_len ; ignoring");
00891 RELEASE_PACK(p);
00892 continue;
00893 } else if (p->u.data.frame_len > MAX_FRAME_SIZE) {
00894
00895 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet has too large a frame_len[%d] ; my max[%d]", p->u.data.frame_len, MAX_FRAME_SIZE);
00896 RELEASE_PACK(p);
00897 continue;
00898
00899
00900
00901
00902
00903 }
00904
00905
00906
00907 CC_Recvd_Data(p, sock);
00908 sock->stats.data_bytes_recvd+=p->u.data.data_len;
00909
00910
00911 if (sock->recv_frames.last_frame_num < pframe_num) {
00912 sock->recv_frames.last_frame_num = pframe_num;
00913 }
00914
00915
00916 if (sock->recv_frames.num == 0) {
00917 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "recv_frames is empty, creating phantoms [first[%d], %d]",
00918 sock->recv_frames.first_frame_num, pframe_num);
00919
00920 frame_prev = NULL;
00921
00922 for (i = sock->recv_frames.first_frame_num; i <= pframe_num; i++) {
00923
00924 frame_prev = Datagram_Insert_Phantom(&(sock->recv_frames), frame_prev, i);
00925 }
00926 is_phantom = 1;
00927 frame_l = frame_prev;
00928 } else {
00929
00930
00931
00932
00933 frame_l = sock->recv_frames.first;
00934
00935 while (frame_l != NULL) {
00936 if (frame_l->frame.frame_num == pframe_num) {
00937
00938 is_phantom = 0;
00939 break;
00940 } else if (frame_l->next == NULL) {
00941 if (frame_l->frame.frame_num >= pframe_num) {
00942 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "ERR: Inconsistent recv_frame list");
00943 }
00944
00945
00946
00947 frame_prev = frame_l;
00948 for (i = frame_l->frame.frame_num + 1; i <= pframe_num; i++) {
00949 frame_prev = Datagram_Insert_Phantom(&(sock->recv_frames), frame_prev, i);
00950 }
00951
00952 frame_l = frame_prev;
00953 is_phantom = 1;
00954 break;
00955 } else if (frame_l->frame.frame_num < pframe_num && frame_l->next->frame.frame_num > pframe_num) {
00956
00957
00958
00959
00960 frame_prev = frame_l;
00961 for (i = frame_l->frame.frame_num + 1; i < frame_l->next->frame.frame_num; i++) {
00962 frame_prev = Datagram_Insert_Phantom(&(sock->recv_frames), frame_prev, i);
00963 if (i == pframe_num) {
00964
00965 frame_l = frame_prev;
00966 }
00967 }
00968 is_phantom = 1;
00969 break;
00970 }
00971 frame_l = frame_l->next;
00972 }
00973 }
00974
00975
00976
00977
00978
00979
00980
00981
00982 if (is_phantom == 1) {
00983
00984 ALLOC(frame_l->frame.data, void*, char, p->u.data.frame_len);
00985 frame_l->frame.len = p->u.data.frame_len;
00986
00987 ALLOC(frame_l->frame.missing_data.head, interval_t*, interval_t, 1);
00988 frame_l->frame.missing_data.head->next = NULL;
00989 frame_l->frame.missing_data.head->prev = NULL;
00990 frame_l->frame.missing_data.head->end = frame_l->frame.len;
00991 frame_l->frame.missing_data.head->start = 0;
00992 frame_l->frame.missing_data.count = 1;
00993 frame_l->frame.frame_num = p->u.data.frame_num;
00994 frame_l->frame.urgency = 0;
00995 frame_l->frame.done = 0;
00996
00997 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "New recv_frame allocated: id[%d] len[%d] ; recv_frames->num[%d]",
00998 frame_l->frame.frame_num, frame_l->frame.len, sock->recv_frames.num);
00999
01000 } else {
01001
01002 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Found the frame in recv_frames");
01003
01004 if (frame_l->frame.len == 0) {
01005 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Received first packet to a previously phantom frame[%d]", frame_l->frame.frame_num);
01006
01007 frame_l->frame.len = p->u.data.frame_len;
01008 if (ALLOC(frame_l->frame.data, void*, char, p->u.data.frame_len) == 0) {
01009 frame_l->frame.len = 0;
01010 RELEASE_PACK(p);
01011 continue;
01012 }
01013 ALLOC(frame_l->frame.missing_data.head, interval_t*, interval_t, 1);
01014 frame_l->frame.missing_data.head->next = NULL;
01015 frame_l->frame.missing_data.head->prev = NULL;
01016 frame_l->frame.missing_data.head->end = frame_l->frame.len;
01017 frame_l->frame.missing_data.head->start = 0;
01018 frame_l->frame.missing_data.count = 1;
01019
01020 }
01021 }
01022
01023
01024
01025 interval_ptr = frame_l->frame.missing_data.head;
01026
01027 while (interval_ptr != NULL) {
01028
01029 if ((interval_ptr->start == p->u.data.data_offset) &&
01030 (interval_ptr->end == (p->u.data.data_offset + p->u.data.data_len))) {
01031
01032
01033 if (interval_ptr->prev != NULL) {
01034 interval_ptr->prev->next = interval_ptr->next;
01035 if (interval_ptr->next != NULL) {
01036 interval_ptr->next->prev = interval_ptr->prev;
01037 }
01038 } else {
01039 frame_l->frame.missing_data.head = interval_ptr->next;
01040 if (interval_ptr->next != NULL) {
01041 interval_ptr->next->prev = NULL;
01042 }
01043 }
01044 RELEASE(interval_ptr, sizeof(interval_t));
01045 frame_l->frame.missing_data.count--;
01046 break;
01047 } else if ((interval_ptr->start < p->u.data.data_offset) &&
01048 (interval_ptr->end == (p->u.data.data_offset + p->u.data.data_len))) {
01049
01050
01051 interval_ptr->end = p->u.data.data_offset;
01052 break;
01053 } else if ((interval_ptr->start == p->u.data.data_offset) &&
01054 (interval_ptr->end > (p->u.data.data_offset + p->u.data.data_len))) {
01055
01056
01057 interval_ptr->start = interval_ptr->start + p->u.data.data_len;
01058 break;
01059 } else if ((interval_ptr->start < p->u.data.data_offset) &&
01060 (interval_ptr->end > (p->u.data.data_offset + p->u.data.data_len))) {
01061
01062
01063 ALLOC(interval_ptr2, interval_t*, interval_t, 1);
01064 interval_ptr2->next = interval_ptr->next;
01065 interval_ptr2->prev = interval_ptr;
01066 interval_ptr->next = interval_ptr2;
01067 interval_ptr2->start = p->u.data.data_offset + p->u.data.data_len;
01068 interval_ptr2->end = interval_ptr->end;
01069 interval_ptr->end = p->u.data.data_offset;
01070 frame_l->frame.missing_data.count++;
01071 break;
01072 }
01073
01074 interval_ptr = interval_ptr->next;
01075 }
01076
01077
01078 if (interval_ptr == NULL) {
01079
01080
01081
01082
01083
01084
01085 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "DATA Packet with offset[%d] len[%d] ignored",
01086 p->u.data.data_offset, p->u.data.data_len);
01087 RELEASE_PACK(p);
01088 continue;
01089 }
01090
01091
01092 memcpy(frame_l->frame.data + p->u.data.data_offset,
01093 p->u.data.data,
01094 p->u.data.data_len);
01095
01096 sock->recv_frames.length += p->u.data.data_len;
01097
01098 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Added data to frame[%d] at offset[%d] with frame_len[%d] ",
01099 frame_l->frame.frame_num, p->u.data.data_offset, frame_l->frame.len);
01100
01101
01102
01103
01104
01105
01106 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Updating urgencies");
01107 frame_l2 = sock->recv_frames.first;
01108 while (frame_l2 != frame_l) {
01109 if (frame_l2->frame.urgency >= 0 && frame_l2->frame.done == 0 &&
01110 (frame_l2->frame.urgency < URGENCY_THRESHOLD)) {
01111 frame_l2->frame.urgency++;
01112
01113
01114 }
01115 frame_l2 = frame_l2->next;
01116 }
01117
01118 frame_l2 = frame_l;
01119 while (frame_l2 != NULL) {
01120 if (frame_l2->frame.urgency > 0 && frame_l2->frame.done == 0) {
01121
01122 frame_l2->frame.urgency = 0;
01123 }
01124
01125 frame_l2 = frame_l2->next;
01126 }
01127
01128
01129
01130
01131 if (frame_l->frame.len > 0 && (frame_l->frame.missing_data.count == 0)) {
01132
01133 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "frame[%d] len[%d] DONE", frame_l->frame.frame_num, frame_l->frame.len);
01134 frame_l->frame.done = 1;
01135 if (sock->recv_frames.first == frame_l) {
01136 frame_l2 = frame_l;
01137 while (frame_l2 != NULL) {
01138 if (frame_l2->frame.len == 0) break;
01139
01140
01141
01142
01143
01144
01145
01146
01147
01148
01149
01150
01151
01152
01153
01154
01155 frame_l = frame_l2;
01156 frame_l2 = frame_l->next;
01157
01158 if (frame_l->frame.missing_data.count == 0) {
01159
01160 sock->recv_frames.first = frame_l->next;
01161 if (sock->recv_frames.last == frame_l) {
01162 sock->recv_frames.last = NULL;
01163 sock->recv_frames.last_frame_num = frame_l->frame.frame_num + 1;
01164 }
01165
01166 sock->recv_frames.first_frame_num = frame_l->frame.frame_num + 1;
01167 sock->recv_frames.num--;
01168
01169
01170 if (sock->recvd_frames.num == 0) {
01171 sock->recvd_frames.first = frame_l;
01172 } else {
01173 sock->recvd_frames.last->next = frame_l;
01174 }
01175
01176 sock->recvd_frames.last = frame_l;
01177 frame_l->next = NULL;
01178
01179 sock->recvd_frames.num++;
01180 sock->recvd_frames.length += frame_l->frame.len;
01181 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_RecvAll", "Frame[%d] moved into received queue; recv_frames->num[%d] ; recvd_frames->num[%d]",
01182 frame_l->frame.frame_num, sock->recv_frames.num, sock->recvd_frames.num);
01183
01184
01185 } else {
01186 break;
01187 }
01188
01189 }
01190 }
01191 }
01192
01193 if (ret_val == 0 || ret_val == 2) ret_val+=1;
01194
01195 RELEASE_PACK(p);
01196 }
01197 return ret_val;
01198 }
01199
01200
01201
01202
01203
01204
01205
01206
01207
01208
01209
01210
01211 inline static int Control_SendCtl (vfer_sock* sock, packet* p) {
01212
01213 Packet_Set_Seq(p, sock->l_seq);
01214 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlSendCtl", p, "-");
01215
01216 sock->stats.bytes_sent+=p->iov[0].iov_len;
01217 sock->stats.packets_sent++;
01218
01219
01220 if (Control_Write(p, sock->fd) != 0) {
01221 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_SendAllCtl", "Control_Write failed; sock->fd[%d] ; CONN_ERR", sock->fd);
01222 sock->state = CONN_ERR;
01223 return -1;
01224 }
01225 CC_Sent_CtlPack(p, sock);
01226 sock->l_seq++;
01227 return 0;
01228 }
01229
01230
01231
01232
01233
01234
01235
01236
01237
01238
01239
01240
01241 inline static int Control_Send_Ack (vfer_sock* s) {
01242 packet* p;
01243 ccontrol_t* c;
01244 struct timeval tv;
01245 intervals_t* missing;
01246
01247 c = &(s->ccontrol);
01248 GET_TIME_OF_DAY(&tv);
01249
01250 missing = NULL;
01251 if (s->recv_frames.first != NULL &&
01252 s->recv_frames.first->frame.missing_data.count != 0) {
01253
01254 missing = &(s->recv_frames.first->frame.missing_data);
01255 }
01256
01257 p = Packet_Form_CCAck(0, 0, c->r_last_seq - 1, c->r_packets_lost,
01258 s->recv_frames.first_frame_num, s->recv_frames.last_frame_num,
01259 Tree_Max(&(c->rev_path_curr_delayhist), tv) - Tree_Min(&(c->rev_path_base_delayhist), tv),
01260 Tree_Max(&(c->rev_path_curr_delayhist), tv), c->r_bytes_total, s->recv_frames.first_frame_num, missing);
01261
01262 if (Control_SendCtl(s, p) != 0) {
01263 return -1;
01264 }
01265 c->r_first_seq = c->r_last_seq;
01266 return 0;
01267 }
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277 static int Control_SendAllData (vfer_sock* sock) {
01278
01279 frame_link* frame_l;
01280 frame* frame;
01281 interval_t* interval_ptr;
01282 interval_t* interval_ptr2;
01283 uint32_t opts;
01284 uint32_t data_size;
01285 uint32_t frame_len;
01286 int packets_sent;
01287 packet* pack;
01288 int ret;
01289 ccontrol_t* c;
01290
01291 c = &(sock->ccontrol);
01292 packets_sent = 0;
01293 frame_l = sock->send_frames.first;
01294 while (frame_l != NULL) {
01295
01296
01297
01298 frame = &(frame_l->frame);
01299 interval_ptr = frame->missing_data.head;
01300 while (interval_ptr != NULL) {
01301
01302
01303
01304
01305
01306
01307
01308 opts = 0;
01309 frame_len = frame->len;
01310 while ((interval_ptr->end - interval_ptr->start) != 0) {
01311 if(sock->pmtu.probe_state == NO_PROBE_SENT && (interval_ptr->end - interval_ptr->start) >= *(sock->pmtu.probe_size)){
01312
01313 data_size = *(sock->pmtu.probe_size);
01314 sock->pmtu.frame_num = frame->frame_num;
01315 sock->pmtu.data_offset = interval_ptr->start;
01316 }else if ((interval_ptr->end - interval_ptr->start) >= c->mtu) {
01317 data_size = c->mtu;
01318 } else {
01319
01320 data_size = interval_ptr->end - interval_ptr->start;
01321 }
01322
01323
01324 if (CC_Can_Send_Data(data_size, sock) == 0) {
01325
01326 return packets_sent;
01327 }
01328
01329
01330
01331 if (c->syncing == 1) {
01332
01333 opts = opts | OPT_SYNC;
01334 c->syncing = 0;
01335 }
01336
01337 if ((pack = Packet_Form_Data(opts,
01338 sock->l_seq,
01339 frame->frame_num,
01340 interval_ptr->start,
01341 frame_len,
01342 (frame->data + interval_ptr->start),
01343 data_size)) == NULL) {
01344 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_SendAllData", "Couldn't form data packet for frame[%d] sock->fd[%d]", frame->frame_num, sock->fd);
01345 } else {
01346 DEBUG_PACK(DEBUG_CTL, "control.c", "Control_SendAllData", pack, "Sending DATA packet on skt->fd[%d]", sock->fd);
01347
01348 ret = Control_Write(pack, sock->fd);
01349
01350 if(ret != 0 && sock->pmtu.probe_state == NO_PROBE_SENT && *(sock->pmtu.probe_size) == data_size){
01351
01352
01353 int err_val;
01354 socklen_t err_val_len = 4;
01355 if (getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &err_val, &err_val_len) == -1) perror("getsockopt");
01356
01357 sock->pmtu.search_high = sock->pmtu.probe_size;
01358 sock->pmtu.probe_state = MTU_FOUND;
01359 struct timeval tv;
01360 GET_TIME_OF_DAY(&tv);
01361 sock->pmtu.next_probe_time = tv.tv_sec + PMTUD_TIME_INTERVAL;
01362
01363 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_SendAllData",
01364 "PMTU of size %d found [Probe write failed]. Next probe to be begin at %lu.", c->mtu, (long int)sock->pmtu.next_probe_time);
01365 continue;
01366 }else if(sock->pmtu.probe_state == NO_PROBE_SENT && *(sock->pmtu.probe_size) == data_size){
01367
01368 sock->pmtu.probe_state = PROBE_SENT;
01369 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_SendAllData",
01370 "Probe of size %d sent frame_num[%d] data_offset[%d]", data_size, sock->pmtu.frame_num, sock->pmtu.data_offset);
01371 }else if (ret != 0) {
01372 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_SendAllData", "Control_Write failed; skt->fd[%d] ; CONN_ERR", sock->fd);
01373 sock->state = CONN_ERR;
01374 return -1;
01375 }
01376
01377
01378
01379 sock->stats.bytes_sent+=pack->iov[0].iov_len;
01380 sock->stats.bytes_sent+=pack->iov[1].iov_len;
01381 sock->stats.data_bytes_sent+=pack->iov[1].iov_len;
01382 sock->stats.packets_sent++;
01383
01384 CC_Sent_Data(pack, frame_l, sock);
01385 packets_sent = 1;
01386 sock->l_seq++;
01387 }
01388
01389 interval_ptr->start += data_size;
01390 }
01391
01392
01393 if (interval_ptr->prev != NULL) {
01394 interval_ptr->prev->next = interval_ptr->next;
01395 if (interval_ptr->next != NULL) {
01396 interval_ptr->next->prev = interval_ptr->prev;
01397 }
01398 } else {
01399 frame_l->frame.missing_data.head = interval_ptr->next;
01400 if (interval_ptr->next != NULL) {
01401 interval_ptr->next->prev = NULL;
01402 }
01403 }
01404 frame_l->frame.missing_data.count--;
01405 interval_ptr2 = interval_ptr->next;
01406 RELEASE(interval_ptr, sizeof(interval_t));
01407 interval_ptr = interval_ptr2;
01408 }
01409 frame_l = frame_l->next;
01410 }
01411 return packets_sent;
01412 }
01413
01414
01415
01416
01417
01418
01419
01420
01421
01422
01423
01424
01425 static void ControlT(void* ptr) {
01426 int int_optval;
01427 socklen_t int_optlen;
01428 struct timeval tv, tv_const;
01429
01430
01431
01432
01433 packet* p;
01434 uint32_t rtt;
01435 ccontrol_t *c;
01436 fd_set fds;
01437 int max_fd, num_active, i, j, ret;
01438 vfer_sock* sock;
01439 int val;
01440 int num_packets;
01441
01442 struct sockaddr_in addr;
01443 packet* response;
01444 packet* ack;
01445 socklen_t addr_len;
01446 char buf[INET_ADDRSTRLEN];
01447 int err;
01448 interval_t *interval_ptr;
01449
01450 tv_const.tv_sec = 0;
01451 tv_const.tv_usec = CONTROLT_SLEEPTIME;
01452
01453
01454 pthread_mutex_lock(&(sockets.mutex));
01455 fds = sockets.fds;
01456 max_fd = sockets.max_fd;
01457 num_active = sockets.num_active;
01458 pthread_mutex_unlock(&(sockets.mutex));
01459
01460 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Entered with num_active[%d]", num_active);
01461
01462 while (1) {
01463 select:
01464 tv = tv_const;
01465
01466
01467 if (select(max_fd + 1, &fds, 0, 0, &tv) == -1) {
01468 if (errno == EINTR) {
01469
01470 pthread_mutex_lock(&(sockets.mutex));
01471 fds = sockets.fds;
01472 max_fd = sockets.max_fd;
01473 num_active = sockets.num_active;
01474 pthread_mutex_unlock(&(sockets.mutex));
01475 goto select;
01476 }
01477 }
01478
01479 i = j = 0;
01480 while (j < num_active && i < MAX_SOCKETS) {
01481 pthread_mutex_lock(&(sockets.s[i].mutex));
01482 c = &(sockets.s[i].ccontrol);
01483 sock = &(sockets.s[i]);
01484 i++;
01485
01486
01487 switch (sock->state) {
01488 case CONN_DISCONNECTING:
01489 case CONN_ERR:
01490 if (sock->send_frames.num == 0) {
01491 j++;
01492 p = Packet_Form_Close(0, 0);
01493 Control_SendCtl(sock, p);
01494 goto disconnect_socket;
01495
01496 }
01497
01498
01499
01500
01501 case CONN_CONNECTED:
01502 j++;
01503
01504 if (! FD_ISSET(sock->fd, &fds)) {
01505
01506 goto skip_recvall;
01507 }
01508
01509 ret = Control_RecvAll(sock);
01510 if (ret < 0 || sock->state == CONN_DISCONNECTED) {
01511 goto disconnect_socket;
01512 }
01513
01514 if (ret != 0) {
01515 GET_TIME_OF_DAY(&(tv));
01516 if (ret == 1) {
01517 c->r_last_data = tv;
01518 } else if (ret == 2) {
01519 c->r_last_ccack = tv;
01520 } else if (ret == 3) {
01521 c->r_last_ccack = tv;
01522 c->r_last_data = tv;
01523 }
01524
01525
01526 if ((ret == 1 || ret == 3) &&
01527 (c->r_unacked_bytes >= UNACKED_THRESH * c->mtu)) {
01528
01529 if (Control_Send_Ack(sock) != 0) {
01530 goto disconnect_socket;
01531 }
01532 c->r_unacked_bytes = 0;
01533 c->s_last_ack = tv;
01534 }
01535 }
01536
01537 skip_recvall:
01538
01539 GET_TIME_OF_DAY(&(tv));
01540
01541
01542 rtt = RTT(sock, tv);
01543 if (rtt == 0) {
01544 printf("RTT EMPTY\n");
01545 rtt = 5000;
01546 }
01547
01548 if (TIMEVAL_US_DIFF(c->s_rtt_start, tv) >= rtt) {
01549 c->s_rtt_start = tv;
01550 c->s_cwnd_sent = 0;
01551 }
01552
01553
01554 if (c->r_last_loss.tv_sec != 0 &&
01555 TIMEVAL_US_DIFF(c->r_last_loss, tv) >= rtt) {
01556 c->r_last_loss.tv_sec = 0;
01557 }
01558
01559
01560
01561 if (c->r_unacked_bytes != 0 &&
01562 TIMEVAL_US_DIFF(c->s_last_ack, tv) >= rtt/2.0) {
01563 if (Control_Send_Ack(sock) != 0) {
01564 goto disconnect_socket;
01565 }
01566 c->r_unacked_bytes = 0;
01567 c->s_last_ack = tv;
01568 }
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578
01579
01580
01581
01582
01583
01584
01585
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596
01597
01598
01599
01600
01601
01602
01603
01604
01605
01606
01607
01608
01609
01610
01611
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622
01623
01624
01625 if (c->s_last_unacked_data.tv_sec != 0 &&
01626 (TIMEVAL_US_DIFF(c->s_last_unacked_data, tv) >= 5*rtt)) {
01627
01628
01629 c->s_last_unacked_data.tv_sec = 0;
01630
01631 interval_ptr = c->s_last_data_frame->frame.missing_data.head;
01632 if (interval_ptr == NULL) {
01633 ALLOC(interval_ptr, interval_t*, interval_t, 1);
01634 interval_ptr->next = NULL;
01635 interval_ptr->prev = NULL;
01636 interval_ptr->start = c->s_last_data_offset;
01637 interval_ptr->end = c->s_last_data_offset + c->mtu;
01638 c->s_last_data_frame->frame.missing_data.head = interval_ptr;
01639 c->s_last_data_frame->frame.missing_data.count = 1;
01640 }
01641 c->syncing = 1;
01642 }
01643
01644
01645
01646 if (sock->send_frames.length != 0 && CC_Can_Send_Data(c->mtu, sock) != 0) {
01647
01648
01649 ret = Control_SendAllData(sock);
01650 switch(ret) {
01651 case 1:
01652
01653 GET_TIME_OF_DAY(&(c->s_last_unacked_data));
01654 break;
01655 case -1:
01656 goto disconnect_socket;
01657 }
01658 }
01659
01660
01661 if (sock->recvd_frames.num != 0) {
01662 if (sock->select_active == 1 && (sock->select_mark & VFER_READABLE) == VFER_READABLE) {
01663 sock->select_result |= VFER_READABLE;
01664 pthread_mutex_unlock(&(sock->mutex));
01665 pthread_mutex_lock(&(sockets.select_mutex));
01666 pthread_cond_signal(&(sockets.select_cond));
01667 } else {
01668 pthread_mutex_unlock(&(sock->mutex));
01669 }
01670 pthread_cond_broadcast(&(sock->cond));
01671 } else if ((sock->send_frames.length + c->mtu) <= MAX_SEND_WINDOW) {
01672 if (sock->select_active == 1 && (sock->select_mark & VFER_WRITABLE) == VFER_WRITABLE) {
01673 sock->select_result |= VFER_WRITABLE;
01674 pthread_mutex_unlock(&(sock->mutex));
01675 pthread_mutex_lock(&(sockets.select_mutex));
01676 pthread_cond_signal(&(sockets.select_cond));
01677 } else {
01678 pthread_mutex_unlock(&(sock->mutex));
01679 }
01680 pthread_cond_broadcast(&(sock->cond));
01681 } else {
01682 pthread_mutex_unlock(&(sock->mutex));
01683 }
01684 continue;
01685
01686 case CONN_LISTENING:
01687
01688 j++;
01689 if (! FD_ISSET(sock->fd, &fds)) {
01690
01691 break;
01692 }
01693
01694 addr_len = sizeof(SA);
01695 if ((err = Packet_Read(sock->fd, &sock->request, (SA*)(&addr), &addr_len)) != 0) {
01696 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Read returned error[%d]", err);
01697 break;
01698 }
01699
01700 if (sock->request->type != REQUEST) {
01701 RELEASE_PACK(sock->request);
01702 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet is not REQUEST, ignoring");
01703 break;
01704 }
01705 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "received request: [%s : %d]",
01706 inet_ntop(AF_INET, &(addr.sin_addr), buf, INET_ADDRSTRLEN), ntohs(addr.sin_port));
01707
01708 if (Accept_Queue_Enqueue (sock->accept_queue, sock->fd, *((SA*)(&addr)), sock->request) == -1) {
01709
01710 RELEASE_PACK(sock->request);
01711 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "couldn't enqueue new connection");
01712 break;
01713 }
01714
01715 pthread_mutex_unlock(&(sock->mutex));
01716 pthread_cond_signal(&(sock->cond));
01717 continue;
01718
01719 case CONN_CONNECTING:
01720
01721 j++;
01722
01723
01724 bzero((char*)(&addr), sizeof(addr));
01725 addr.sin_family = AF_UNSPEC;
01726 addr_len = sizeof(addr);
01727
01728
01729 if (FD_ISSET(sock->fd, &(fds))) {
01730 ret = Packet_Read(sock->fd, &response, NULL, NULL);
01731 if (ret != 0) {
01732 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_ConnectT", "Packet_Read failed[%d]", ret);
01733 switch (ret) {
01734 case EAGAIN:
01735
01736 break;
01737 case ECONNREFUSED:
01738 sock->err = VFER_REFUSED;
01739 goto could_not_connect;
01740 default:
01741 sock->err = VFER_NOCONN;
01742 goto could_not_connect;
01743 }
01744 RELEASE_PACK(sock->request);
01745 break;
01746 }
01747
01748
01749 response->delay *= -1;
01750 DEBUG_PACK(DEBUG_CTL, "control.c", "Control_ConnectT",
01751 response, "Received [2/3 handshake: Response] on sock->fd[%d]", sock->fd);
01752
01753 if (response->type != RESPONSE) {
01754 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_ConnectT",
01755 "Received wrong packet type[%d] for [2/3 handshake] ; expected RESPONSE[%d]",
01756 response->type, RESPONSE);
01757 RELEASE_PACK(response);
01758 goto could_not_connect;
01759 }
01760
01761
01762
01763
01764 getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &int_optval, &int_optlen);
01765
01766
01767 sock->addr.sin_port = htons(response->u.res.connection_port);
01768 sock->addr_len = sizeof(sock->addr);
01769
01770
01771
01772
01773
01774 if (connect(sock->fd, (SA*) &sock->addr, sock->addr_len) < 0 ) {
01775 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_ConnectT", "connecting to port specified in response packet failed, %s",
01776 strerror(errno));
01777 sock->err = VFER_NOCONN;
01778 RELEASE_PACK(response);
01779 goto could_not_connect;
01780 }
01781
01782
01783 ack = Packet_Form_CCAck(0, sock->request->seq + 1, response->seq, 0,
01784 response->u.res.init_frame_num, response->u.res.init_frame_num,
01785 0, response->delay, 0, 0, NULL);
01786
01787 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", ack,
01788 "Sending [3/3 handshake: Ack] on sock->fd[%d]", sock->fd);
01789
01790
01791 if (Packet_Write(ack, sock->fd, NULL, 0) != 0) {
01792
01793
01794 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Write failed");
01795 sock->err = VFER_NOCONN;
01796 RELEASE_PACK(response);
01797 RELEASE_PACK(ack);
01798 goto could_not_connect;
01799 }
01800
01801
01802
01803 sock->opts |= VFERSO_CLIENT_SOCK;
01804 sock->l_seq = ack->seq + 1;
01805
01806 pthread_mutex_unlock(&(sock->mutex));
01807 Control_Add_Socket(sock, sock->request, response, ack, 1);
01808 pthread_cond_signal(&(sock->cond));
01809
01810 RELEASE_PACK(sock->request);
01811 RELEASE_PACK(response);
01812 RELEASE_PACK(ack);
01813
01814 continue;
01815 }
01816
01817
01818
01819 if (sock->connect_tries == MAX_REQUEST_TRIES) {
01820 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Reached MAX_REQUEST_TRIES[%d]", MAX_REQUEST_TRIES);
01821 goto could_not_connect;
01822 }
01823
01824
01825 GET_TIME_OF_DAY(&(tv));
01826 if ((unsigned int)(TIMEVAL_US_DIFF(sock->last_request, tv)) < (unsigned int)(REQUEST_RETRY_INTERVAL)) {
01827 break;
01828 }
01829
01830 sock->last_request = tv;
01831
01832 sock->connect_tries++;
01833
01834 if (sock->request == NULL) {
01835 sock->request = Packet_Form_Request (0, Packet_Gen_SeqNum(), MAX_FRAME_SIZE,
01836 sock->udp_recv_buf_size, sock->udp_send_buf_size, Packet_Gen_FrameNum());
01837 }
01838
01839 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", sock->request,
01840 "Sending [1/3 handshake: Request] on sock->fd[%d]", sock->fd);
01841
01842
01843 ret = Packet_Write(sock->request, sock->fd, NULL, 0);
01844 if (ret != 0) {
01845
01846 switch (ret) {
01847 case ECONNREFUSED:
01848 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "connection refused on Packet_Write");
01849 sock->err = VFER_REFUSED;
01850 break;
01851 default:
01852 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Packet_Write failed with error[%d]", ret);
01853 sock->err = VFER_NOCONN;
01854 }
01855 RELEASE_PACK(sock->request);
01856 goto could_not_connect;
01857 }
01858 break;
01859
01860 could_not_connect:
01861
01862 connect(sock->fd, (SA*) &addr, addr_len);
01863 sock->state = CONN_BOUND;
01864 getsockopt(sock->fd, SOL_SOCKET, SO_ERROR, &int_optval, &int_optlen);
01865
01866 if (sock->select_active == 1 && (sock->select_mark & VFER_EXCEPTION) == VFER_EXCEPTION) {
01867 sock->select_result |= VFER_EXCEPTION;
01868 pthread_mutex_unlock(&(sock->mutex));
01869 pthread_mutex_lock(&(sockets.select_mutex));
01870 pthread_cond_signal(&(sockets.select_cond));
01871 } else {
01872 pthread_mutex_unlock(&(sock->mutex));
01873 }
01874 pthread_cond_signal(&(sock->cond));
01875 break;
01876
01877 case CONN_ACCEPTING:
01878
01879 j++;
01880
01881 if (sock->listening_sock->state != CONN_LISTENING) {
01882
01883 sock->err = VFER_IMPL;
01884 goto disconnect_socket;
01885 }
01886
01887 if (FD_ISSET(sock->fd, &(fds))) {
01888
01889 sock->addr_len = sizeof(SA);
01890
01891 if ((err = Packet_Read(sock->fd, &ack, (SA*) &(sock->addr), &(sock->addr_len))) != 0
01892 || ack->type != CC_ACK) {
01893 if (err != 0) {
01894 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] Packet_Read returned error[%d]", sock->response_tries, err);
01895 } else {
01896
01897 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", ack,
01898 "Try[%d] Received a non CC_ACK packet as [3/3 handshake] on sock->fd[%d]", sock->response_tries, sock->fd);
01899 RELEASE_PACK(ack);
01900 }
01901 if (Unconnect_Rebind(sock) == -1) {
01902
01903 close(sock->fd);
01904 RELEASE_PACK(sock->qc->request);
01905 RELEASE(sock->qc, sizeof(queued_conn));
01906 RELEASE_PACK(sock->response);
01907 sock->err = VFER_IMPL;
01908 goto disconnect_socket;
01909 }
01910
01911 goto try_response;
01912 }
01913
01914
01915 if (memcmp(&(sock->qc->sa), &(sock->addr), sizeof(sock->qc->sa)) != 0) {
01916 RELEASE_PACK(ack);
01917 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT",
01918 "Try[%d] Mismatched addresses of remote host ignoring this conn request", sock->response_tries);
01919 if (Unconnect_Rebind(sock) == -1) {
01920
01921 close(sock->fd);
01922 RELEASE_PACK(sock->qc->request);
01923 RELEASE(sock->qc, sizeof(queued_conn));
01924 RELEASE_PACK(sock->response);
01925 sock->err = VFER_IMPL;
01926 goto disconnect_socket;
01927 }
01928
01929 goto try_response;
01930 }
01931
01932 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", ack, "Try[%d] Received [3/3 handshake] on sock->fd[%d]", sock->response_tries, sock->fd);
01933 err = 0;
01934 if (ack->seq != sock->qc->request->seq + 1) {
01935 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad seq number on cc_ack[%u] should be[%u]", sock->response_tries, ack->seq,
01936 (sock->qc->request->seq + 1));
01937 err = -1;
01938 } else if (ack->u.ccack.last_seq != sock->response->seq) {
01939 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad last_seq_recv number on cc_ack", sock->response_tries);
01940 err = -1;
01941 } else if (ack->u.ccack.packets_lost != 0) {
01942 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad packets lost on cc_ack", sock->response_tries);
01943 err = -1;
01944 } else if (ack->u.ccack.oldest_frame != sock->response->u.res.init_frame_num) {
01945 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] bad oldest_frame on cc_ack", sock->response_tries);
01946 err = -1;
01947 }
01948
01949 if (err == -1) {
01950 RELEASE_PACK(ack);
01951 if (Unconnect_Rebind(sock) == -1) {
01952
01953 close(sock->fd);
01954 RELEASE_PACK(sock->qc->request);
01955 RELEASE(sock->qc, sizeof(queued_conn));
01956 RELEASE_PACK(sock->response);
01957 sock->err = VFER_IMPL;
01958 goto disconnect_socket;
01959 }
01960
01961 goto try_response;
01962 }
01963
01964
01965 sock->r_seq = ack->seq + 1;
01966
01967
01968 sock->type = SOCK_DGRAM;
01969 sock->opts = 0;
01970 sock->err = 0;
01971 sock->select_mark = 0;
01972 sock->select_result = 0;
01973 sock->select_active = 0;
01974
01975
01976 bzero(&addr, sock->addr_len);
01977 memcpy(&addr, &sock->addr, sock->addr_len);
01978
01979
01980
01981 pthread_mutex_unlock(&(sock->mutex));
01982 Control_Add_Socket(sock, sock->qc->request, sock->response, ack, 0);
01983 pthread_cond_signal(&(sock->cond));
01984
01985 RELEASE_PACK(sock->qc->request);
01986 RELEASE(sock->qc, sizeof(queued_conn));
01987 RELEASE_PACK(sock->response);
01988 RELEASE_PACK(ack);
01989 continue;
01990 }
01991
01992
01993
01994
01995
01996
01997 try_response:
01998 GET_TIME_OF_DAY(&(tv));
01999 if (sock->last_response.tv_sec != 0 &&
02000 (unsigned int)(TIMEVAL_US_DIFF(sock->last_response, tv)) < (unsigned int)(HANDSHAKE_ACK_WAIT_INTERVAL)) {
02001 break;
02002 }
02003
02004 if (sock->response_tries == MAX_RESPONSE_TRIES) {
02005 try_next_conn:
02006
02007 sock->qc = Accept_Queue_Dequeue (sock->listening_sock->accept_queue);
02008 if (sock->qc == NULL) {
02009
02010 if (SOCK_OPT_ISSET(sock, VFERSO_NONBLOCK)) {
02011 sock->err = VFER_WOULDBLOCK;
02012 goto disconnect_socket;
02013 }
02014
02015
02016 break;
02017 }
02018
02019 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", sock->qc->request,
02020 "Dequeued [1/3 handshake] from acceptQ backlog for server_sock->fd[%d]", sock->fd);
02021
02022
02023 if (connect(sock->fd, &(sock->qc->sa), sizeof(sock->qc->sa)) != 0) {
02024 perror("ControlT[connect]");
02025 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "connect on new socket failed");
02026 close(sock->fd);
02027 RELEASE_PACK(sock->qc->request);
02028 RELEASE(sock->qc, sizeof(queued_conn));
02029 sock->err = VFER_IMPL;
02030 goto disconnect_socket;
02031 }
02032
02033
02034
02035 num_packets = DEFAULT_RECV_BUF_MAXPACKETS;
02036 val = CALC_UDP_RECV_BUF_SIZE(num_packets);
02037 do {
02038 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Trying to set SO_RCVBUF to %d for accepting socket", val);
02039 ret = setsockopt(sock->fd, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val));
02040 if (ret != 0) {
02041 if (errno == ENOBUFS && num_packets > 100) {
02042 num_packets -= 100;
02043 val = CALC_UDP_RECV_BUF_SIZE(num_packets);
02044 } else {
02045 perror("ControlT :: setsockopt");
02046 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Could not set RCVBUF size to [%d]", val);
02047 close(sock->fd);
02048 RELEASE_PACK(sock->qc->request);
02049 RELEASE(sock->qc, sizeof(queued_conn));
02050 sock->err = VFER_IMPL;
02051 goto disconnect_socket;
02052 }
02053 }
02054 } while (ret != 0);
02055 sock->udp_recv_buf_size = val;
02056
02057
02058 num_packets = DEFAULT_SEND_BUF_MAXPACKETS;
02059 val = CALC_UDP_SEND_BUF_SIZE(num_packets);
02060 do {
02061 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Trying to set SO_SNDBUF to %d for accepting socket", val);
02062 ret = setsockopt(sock->fd, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val));
02063 if (ret != 0) {
02064 if (errno == ENOBUFS && num_packets > 100) {
02065 num_packets -= 100;
02066 val = CALC_UDP_SEND_BUF_SIZE(num_packets);
02067 } else {
02068 perror("ControlT :: setsockopt");
02069 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Could not set SNDBUF size to [%d]", val);
02070 close(sock->fd);
02071 RELEASE_PACK(sock->qc->request);
02072 RELEASE(sock->qc, sizeof(queued_conn));
02073 sock->err = VFER_IMPL;
02074 goto disconnect_socket;
02075 }
02076 }
02077 } while (ret != 0);
02078 sock->udp_send_buf_size = val;
02079
02080
02081 if (sock->response != NULL) {
02082
02083 RELEASE_PACK(sock->response);
02084 }
02085
02086 sock->response = Packet_Form_Response(0, Packet_Gen_SeqNum(), Packet_Gen_FrameNum(), ntohs(sock->sa.sin_port),
02087 MAX_FRAME_SIZE, sock->udp_recv_buf_size, sock->udp_send_buf_size, sock->qc->request->delay);
02088 sock->response_tries = 0;
02089 }
02090
02091 sock->last_response = tv;
02092 sock->response_tries++;
02093 DEBUG_PACK(DEBUG_CTL, "control.c", "ControlT", sock->response, "Try[%d] Sending [2/3 handshake] on sock->fd[%d]", sock->response_tries, sock->fd);
02094
02095
02096
02097
02098
02099
02100 if ((ret = Packet_Write(sock->response, sock->listening_sock->fd, &(sock->qc->sa), sizeof(sock->qc->sa))) != 0) {
02101
02102 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "Try[%d] Packet_Write failed with error[%d]", sock->response_tries, ret);
02103
02104 if (Unconnect_Rebind(sock) == -1) {
02105
02106 close(sock->fd);
02107 RELEASE_PACK(sock->qc->request);
02108 RELEASE(sock->qc, sizeof(queued_conn));
02109 RELEASE_PACK(sock->response);
02110 sock->err = VFER_IMPL;
02111 goto disconnect_socket;
02112 }
02113
02114 RELEASE_PACK(sock->response);
02115 sock->response = NULL;
02116 sock->response_tries = MAX_RESPONSE_TRIES;
02117 goto try_next_conn;
02118 }
02119 break;
02120
02121 case CONN_DISCONNECTED:
02122 case CONN_ACQUIRED:
02123 case CONN_BOUND:
02124
02125 pthread_mutex_unlock(&(sock->mutex));
02126 continue;
02127 }
02128
02129 pthread_mutex_unlock(&(sock->mutex));
02130 continue;
02131
02132 disconnect_socket:
02133 GET_TIME_OF_DAY(&(sock->stats.tv_end));
02134 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "fd[%d] CONN_DISCONNECTED", sock->fd);
02135 sock->state = CONN_DISCONNECTED;
02136 if (sock->select_active == 1 && (sock->select_mark & VFER_EXCEPTION) == VFER_EXCEPTION) {
02137 sock->select_result |= VFER_EXCEPTION;
02138 pthread_mutex_unlock(&(sock->mutex));
02139 pthread_mutex_lock(&(sockets.select_mutex));
02140 pthread_cond_signal(&(sockets.select_cond));
02141 } else {
02142 pthread_mutex_unlock(&(sock->mutex));
02143 }
02144 Control_Remove_Socket(sock);
02145 pthread_cond_signal(&(sock->cond));
02146
02147 }
02148
02149
02150 pthread_mutex_lock(&(sockets.mutex));
02151 if (sockets.num_active == 0) {
02152
02153 DEBUG_PRINT(DEBUG_CTL, "control.c", "ControlT", "reached 0 active sockets, exiting control thread");
02154 sockets.max_fd = 0;
02155 pthread_mutex_unlock(&(sockets.mutex));
02156 i = 0;
02157 pthread_exit(&i);
02158 }
02159
02160 fds = sockets.fds;
02161 max_fd = sockets.max_fd;
02162 num_active = sockets.num_active;
02163 pthread_mutex_unlock(&(sockets.mutex));
02164
02165 }
02166 }
02167
02168
02169
02170
02171
02172
02173
02174
02175
02176
02177
02178
02179
02180
02181 void Control_Add_Socket(vfer_sock* sock, packet* request, packet* response, packet* ack, char client) {
02182 ccontrol_t *c;
02183 struct timeval tv;
02184 int32_t t;
02185 int common_mtus[] = COMMON_MTU_LIST;
02186 int j;
02187
02188 c = &(sock->ccontrol);
02189
02190
02191 c->mtu = INIT_MTU;
02192 sock->relfun = rf_always_reliable;
02193
02194
02195 if(sock->is_df_set != 1){
02196
02197 sock->icmp_fd = -1;
02198 sock->pmtu.probe_state = NO_PMTUD;
02199 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_Add_Socket", "Not performing PMTUD");
02200 }else{
02201 sock->pmtu.probe_state = NO_PROBE_SENT;
02202 DEBUG_PMTUD_PRINT(DEBUG_PMTUD, "control.c", "Control_Add_Socket", "Initial PMTUD values set.");
02203 for(j = 0; j <= COMMON_MTU_MAX_INDEX; j++){
02204 sock->pmtu.common_mtus[j] = common_mtus[j];
02205 }
02206 sock->pmtu.search_low = sock->pmtu.common_mtus;
02207 sock->pmtu.search_high = sock->pmtu.common_mtus + COMMON_MTU_MAX_INDEX;
02208 sock->pmtu.probe_size = sock->pmtu.search_low + 1;
02209 }
02210
02211 CLR_SOCKET_STATS(&(sock->stats));
02212
02213
02214 GET_TIME_OF_DAY(&(sock->stats.tv_start));
02215
02216
02217 if (client == 1) {
02218
02219
02220
02221
02222 sock->send_frames.first_frame_num = sock->send_frames.last_frame_num = request->u.req.init_frame_num;
02223 sock->recv_frames.first_frame_num = sock->recv_frames.last_frame_num = response->u.res.init_frame_num;
02224 sock->r_opts = response->opts;
02225 sock->l_frame_num = request->u.req.init_frame_num;
02226 sock->r_seq = response->seq + 1;
02227 sock->r_frame_num = response->u.res.init_frame_num;
02228 sock->r_recv_buf_size = response->u.res.recv_buf_size;
02229 sock->r_send_buf_size = response->u.res.send_buf_size;
02230 sock->r_max_frame_size = response->u.res.max_frame_size;
02231 sock->stats.max_rtt = abs(response->u.res.request_delay - response->delay);
02232 c->delay = response->u.res.request_delay;
02233 if (sock->stats.max_rtt == 0) {
02234 printf("RTT EMPTY\n");
02235 sock->stats.max_rtt = 5000;
02236 }
02237
02238
02239
02240
02241 t = sock->stats.max_rtt * 100;
02242 TREE_INIT(&(c->rev_path_base_delayhist), t, tv);
02243
02244
02245 t = sock->stats.max_rtt * 10;
02246 TREE_INIT(&(c->rev_path_curr_delayhist), t, tv);
02247
02248
02249 Tree_Insert(&(sock->ccontrol.rev_path_base_delayhist), response->delay, sock->stats.tv_start);
02250 Tree_Insert(&(sock->ccontrol.rev_path_curr_delayhist), response->delay, sock->stats.tv_start);
02251
02252 } else {
02253
02254
02255
02256
02257 sock->send_frames.first_frame_num = sock->send_frames.last_frame_num = response->u.res.init_frame_num;
02258 sock->recv_frames.first_frame_num = sock->recv_frames.last_frame_num = request->u.req.init_frame_num;
02259 sock->r_opts = request->opts;
02260 sock->r_max_frame_size = request->u.req.max_frame_size;
02261 sock->r_recv_buf_size = request->u.req.recv_buf_size;
02262 sock->r_send_buf_size = request->u.req.send_buf_size;
02263 sock->r_frame_num = request->u.req.init_frame_num;
02264 sock->l_seq = response->seq + 1;
02265 sock->l_frame_num = response->u.res.init_frame_num;
02266 sock->stats.max_rtt = abs(ack->u.ccack.delay - ack->delay);
02267 c->delay = ack->u.ccack.delay;
02268 if (sock->stats.max_rtt == 0) {
02269 printf("RTT EMPTY\n");
02270 sock->stats.max_rtt = 1;
02271 }
02272
02273
02274 t = sock->stats.max_rtt * 100;
02275 TREE_INIT(&(c->rev_path_base_delayhist), t, tv);
02276
02277
02278 t = sock->stats.max_rtt * 10;
02279 TREE_INIT(&(c->rev_path_curr_delayhist), t, tv);
02280
02281
02282 Tree_Insert(&(sock->ccontrol.rev_path_base_delayhist), request->delay, sock->stats.tv_start);
02283 Tree_Insert(&(sock->ccontrol.rev_path_curr_delayhist), request->delay, sock->stats.tv_start);
02284
02285 Tree_Insert(&(sock->ccontrol.rev_path_base_delayhist), ack->delay, sock->stats.tv_start);
02286 Tree_Insert(&(sock->ccontrol.rev_path_curr_delayhist), ack->delay, sock->stats.tv_start);
02287
02288
02289
02290 }
02291
02292
02293 sock->stats.avg_rtt = sock->stats.max_rtt;
02294 sock->stats.min_rtt = sock->stats.max_rtt;
02295 sock->ccontrol.rtt = sock->stats.max_rtt;
02296 sock->ccontrol.syncing = 0;
02297 sock->stats.rtt_measurements = 1;
02298 sock->stats.data_bytes_sender_sent = 0;
02299
02300
02301
02302
02303 if (client == 1) {
02304 c->s_first_seq = request->seq + 2;
02305 c->s_last_seq = request->seq + 2;
02306 } else {
02307 c->s_first_seq = response->seq + 1;
02308 c->s_last_seq = response->seq + 1;
02309 }
02310
02311 if (sock->udp_send_buf_size < sock->r_recv_buf_size) {
02312 c->s_maxw = (int)(sock->udp_send_buf_size);
02313 } else {
02314 c->s_maxw = (int)(sock->r_recv_buf_size);
02315 }
02316
02317 c->s_cwnd = INIT_MTU * 4;
02318 c->s_cwnd_sent = 0;
02319 c->s_inflight = 0;
02320 c->delay_delta = 0;
02321 c->r_packets_lost = 0;
02322 c->last_packets_lost = 0;
02323 c->r_bytes_total = 0;
02324 c->last_bytes_total = 0;
02325
02326 if (client == 1) {
02327 c->r_first_seq = response->seq+1;
02328 c->r_last_seq = response->seq+1;
02329 } else {
02330 c->r_first_seq = request->seq+2;
02331 c->r_last_seq = request->seq+2;
02332 }
02333 GET_TIME_OF_DAY(&(tv));
02334 c->r_last_ccack = tv;
02335 c->r_last_data = tv;
02336 c->s_rtt_start = tv;
02337 c->s_last_ack = tv;
02338 c->s_last_unacked_data.tv_sec = 0;
02339
02340 c->r_last_loss.tv_sec = 0;
02341 c->s_need_ccack = 0;
02342
02343 DEBUG_CC(DEBUG_CTL, "control.c", "Add_Socket", sock, "CC_Init");
02344
02345
02346 sock->send_frames.first = sock->send_frames.last = NULL;
02347 sock->recv_frames.first = sock->recv_frames.last = NULL;
02348 sock->recvd_frames.first = sock->recvd_frames.last = NULL;
02349 sock->send_frames.num = 0;
02350 sock->recv_frames.num = 0;
02351 sock->recvd_frames.num = 0;
02352 sock->recvd_frames.length = 0;
02353 sock->recv_frames.length = 0;
02354 sock->send_frames.length = 0;
02355
02356
02357 pthread_mutex_lock(&(sock->mutex));
02358 sock->state = CONN_CONNECTED;
02359 pthread_mutex_unlock(&(sock->mutex));
02360
02361
02362 pthread_mutex_lock(&(sockets.mutex));
02363 FD_SET(sock->fd, &(sockets.fds));
02364 if (sockets.max_fd < sock->fd) {
02365 sockets.max_fd = sock->fd;
02366 }
02367 pthread_mutex_unlock(&(sockets.mutex));
02368 }
02369
02370
02371
02372
02373
02374
02375
02376 static void Control_Remove_Socket(vfer_sock* sock) {
02377
02378 DEBUG_PRINT(DEBUG_CTL, "control.c", "Control_Remove_Socket", "Entered with sock->fd[%d]", sock->fd);
02379
02380 Control_Unregister_Socket(sock);
02381
02382
02383 pthread_mutex_lock(&(sock->mutex));
02384
02385 Datagrams_Clear(&(sock->send_frames));
02386
02387 Datagrams_Clear(&(sock->recv_frames));
02388
02389
02390 pthread_mutex_unlock(&(sock->mutex));
02391
02392 return;
02393 }
02394
02395
02396
02397
02398
02399
02400
02401
02402
02403
02404
02405 static int Unconnect_Rebind(vfer_sock* sock) {
02406
02407
02408 socklen_t socklen;
02409
02410 bzero((char*)(&sock->sa), sizeof(sock->sa));
02411 sock->sa.sin_family = AF_UNSPEC;
02412
02413
02414
02415 if (connect(sock->fd, (SA*)(&sock->sa), sizeof(sock->sa)) == -1) {
02416 perror("unconnect_rebind[unconnect]");
02417 }
02418
02419
02420 bzero((char*)(&sock->sa), sizeof(sock->sa));
02421 sock->sa.sin_port = htons(INADDR_ANY);
02422 sock->sa.sin_addr.s_addr = htonl(INADDR_ANY);
02423 sock->sa.sin_family = AF_INET;
02424 if (bind(sock->fd, (SA*)(&sock->sa), sizeof(sock->sa)) < 0) {
02425 perror("unconnect_rebind[bind for the new socket]");
02426 return -1;
02427 }
02428
02429 socklen = sizeof(sock->sa);
02430 if (getsockname(sock->fd, (SA*)&(sock->sa), &socklen) == -1) {
02431 perror("unconnect_rebind[getsockname for new socket]");
02432 return -1;
02433 }
02434
02435 return 0;
02436 }