#include <udp_srv.hh>
Inheritance diagram for CMS_SERVER_REMOTE_UDP_PORT:
|
Definition at line 40 of file udp_srv.cc. 00041 : 00042 CMS_SERVER_REMOTE_PORT (_cms_server) 00043 { 00044 client_ports = (RCS_LINKED_LIST *) NULL; 00045 udp_server_count++; 00046 dtimeout = -1.0; 00047 last_subscription_id = 1000 * udp_server_count + 1; 00048 connection_socket = 0; 00049 broadcast_address_set = 0; 00050 connection_port = 0; 00051 maxfdpl = 0; 00052 polling_enabled = 0; 00053 memset (&message_header, 0, sizeof (message_header)); 00054 message_header.msg_name = (caddr_t) NULL; 00055 message_header.msg_namelen = 0; 00056 message_header.msg_iov = (struct iovec *) NULL; 00057 message_header.msg_iovlen = 0; 00058 #if !defined(linux) && !defined(darwin) && !defined(qnx) 00059 message_header.msg_accrights = (caddr_t) NULL; 00060 message_header.msg_accrightslen = 0; 00061 #endif 00062 strcpy (temp_buffer, "UNINITIALIZED"); 00063 00064 memset (&server_socket_address, 0, sizeof (server_socket_address)); 00065 server_socket_address.sin_family = AF_INET; 00066 server_socket_address.sin_addr.s_addr = dl_htonl (INADDR_ANY); 00067 server_socket_address.sin_port = 0; 00068 00069 broadcast_subscriptions = 0; 00070 broadcast_ports = NULL; 00071 00072 subscription_buffers = NULL; 00073 client_ports = new RCS_LINKED_LIST; 00074 if (NULL == client_ports) 00075 { 00076 rcs_print_error ("Can not create linked list for client ports.\n"); 00077 return; 00078 } 00079 if (load_socket_interface () < 0) 00080 { 00081 rcs_print_error ("Can't load socket interface.\n"); 00082 return; 00083 } 00084 00085 request_header_size = 20; 00086 if ((min_compatible_version > 3.13 || min_compatible_version < 1e-6)) 00087 { 00088 request_header_size = 24; 00089 } 00090 reply_header_size = 20; 00091 } |
|
Definition at line 93 of file udp_srv.cc. 00094 { 00095 unregister_port (); 00096 if (NULL != client_ports) 00097 { 00098 delete client_ports; 00099 client_ports = (RCS_LINKED_LIST *) NULL; 00100 } 00101 if (NULL != broadcast_ports) 00102 { 00103 UDP_BROADCAST_DATA *broadcast_data = (UDP_BROADCAST_DATA *) 00104 broadcast_ports->get_head (); 00105 while (NULL != broadcast_data) 00106 { 00107 delete broadcast_data; 00108 broadcast_ports->delete_current_node (); 00109 broadcast_data = (UDP_BROADCAST_DATA *) 00110 broadcast_ports->get_next (); 00111 } 00112 delete broadcast_ports; 00113 broadcast_ports = NULL; 00114 } 00115 if (NULL != subscription_buffers) 00116 { 00117 delete subscription_buffers; 00118 subscription_buffers = NULL; 00119 } 00120 } |
|
Reimplemented from CMS_SERVER_REMOTE_PORT. Definition at line 184 of file udp_srv.cc. 00185 { 00186 int retval = 0; 00187 if (NULL == _cms) 00188 { 00189 return 0; 00190 } 00191 if (_cms->remote_port_type != CMS_UDP_REMOTE_PORT_TYPE) 00192 { 00193 return 0; 00194 } 00195 00196 if (NULL != _cms) 00197 { 00198 if (min_compatible_version < 1e-6 || 00199 (min_compatible_version > _cms->min_compatible_version && 00200 _cms->min_compatible_version > 1e-6)) 00201 { 00202 min_compatible_version = _cms->min_compatible_version; 00203 } 00204 if (_cms->confirm_write) 00205 { 00206 confirm_write = _cms->confirm_write; 00207 } 00208 } 00209 00210 if (server_socket_address.sin_port == 0) 00211 { 00212 server_socket_address.sin_port = 00213 dl_htons (((u_short) _cms->udp_port_number)); 00214 connection_port = _cms->udp_port_number; 00215 set_broadcast_address (_cms); 00216 char *broadcast_clnt_port_eq = 00217 strstr (_cms->buflineupper, "BROADCAST_PORT="); 00218 if (broadcast_clnt_port_eq != NULL) 00219 { 00220 broadcast_subscriptions = 1; 00221 #ifndef UNDER_CE 00222 int broadcast_clnt_port = 00223 strtol (broadcast_clnt_port_eq + 15, 0, 0); 00224 #else 00225 int broadcast_clnt_port = atol (broadcast_clnt_port_eq + 15); 00226 #endif 00227 if (_cms->udp_port_number == broadcast_clnt_port) 00228 { 00229 rcs_print_error 00230 ("Can't broadcast on the same port used to accept requests. (%d)\n", 00231 _cms->udp_port_number); 00232 return -1; 00233 } 00234 UDP_BROADCAST_DATA *broadcast_data = new UDP_BROADCAST_DATA (); 00235 memcpy (&(broadcast_data->broadcast_address), &broadcast_address, 00236 sizeof (sockaddr_in)); 00237 broadcast_data->buffer_number = _cms->buffer_number; 00238 broadcast_data->broadcast_clnt_port = broadcast_clnt_port; 00239 broadcast_data->broadcast_address.sin_port = 00240 dl_htons ((u_short) broadcast_data->broadcast_clnt_port); 00241 if (NULL == broadcast_ports) 00242 { 00243 broadcast_ports = new RCS_LINKED_LIST (); 00244 } 00245 broadcast_ports->store_at_tail (broadcast_data, 00246 sizeof (UDP_BROADCAST_DATA), 0); 00247 } 00248 return 1; 00249 } 00250 else if (server_socket_address.sin_port == 00251 dl_htons (((u_short) _cms->udp_port_number))) 00252 { 00253 int new_broadcast_subscriptions = 0; 00254 int new_broadcast_clnt_port = 0; 00255 char *broadcast_clnt_port_eq = 00256 strstr (_cms->buflineupper, "BROADCAST_PORT="); 00257 if (broadcast_clnt_port_eq != NULL) 00258 { 00259 new_broadcast_subscriptions = 1; 00260 #ifndef UNDER_CE 00261 new_broadcast_clnt_port = 00262 strtol (broadcast_clnt_port_eq + 15, 0, 0); 00263 #else 00264 new_broadcast_clnt_port = atol (broadcast_clnt_port_eq + 15); 00265 #endif 00266 } 00267 if (new_broadcast_subscriptions) 00268 { 00269 if (_cms->udp_port_number == new_broadcast_clnt_port) 00270 { 00271 rcs_print_error 00272 ("Can't broadcast on the same port used to accept requests. (%d)\n", 00273 _cms->udp_port_number); 00274 return -1; 00275 } 00276 if (NULL != broadcast_ports) 00277 { 00278 UDP_BROADCAST_DATA *broadcast_data = (UDP_BROADCAST_DATA *) 00279 broadcast_ports->get_head (); 00280 while (NULL != broadcast_data) 00281 { 00282 if (broadcast_data->buffer_number == _cms->buffer_number) 00283 { 00284 return 1; 00285 } 00286 if (broadcast_data->broadcast_clnt_port == 00287 new_broadcast_clnt_port) 00288 { 00289 rcs_print_error 00290 ("Can't broadcast data for both buffer %d and buffer %d on port %d.\n", 00291 broadcast_data->buffer_number, _cms->buffer_number, 00292 new_broadcast_clnt_port); 00293 return -1; 00294 } 00295 broadcast_data = (UDP_BROADCAST_DATA *) 00296 broadcast_ports->get_next (); 00297 } 00298 } 00299 if (!broadcast_address_set) 00300 { 00301 set_broadcast_address (_cms); 00302 } 00303 UDP_BROADCAST_DATA *broadcast_data = new UDP_BROADCAST_DATA (); 00304 memcpy (&(broadcast_data->broadcast_address), &broadcast_address, 00305 sizeof (sockaddr_in)); 00306 broadcast_data->buffer_number = _cms->buffer_number; 00307 broadcast_data->broadcast_clnt_port = new_broadcast_clnt_port; 00308 broadcast_data->broadcast_address.sin_port = 00309 dl_htons ((u_short) broadcast_data->broadcast_clnt_port); 00310 if (NULL == broadcast_ports) 00311 { 00312 broadcast_ports = new RCS_LINKED_LIST (); 00313 } 00314 broadcast_ports->store_at_tail (broadcast_data, 00315 sizeof (UDP_BROADCAST_DATA), 0); 00316 } 00317 port_num = _cms->udp_port_number; 00318 return 1; 00319 } 00320 00321 00322 return retval; 00323 } |
|
Reimplemented from CMS_SERVER_REMOTE_PORT. Definition at line 361 of file udp_srv.cc. 00362 { 00363 #if defined(WIN32) && !defined(gnuwin32) 00364 DWORD pid = GetCurrentProcessId (); 00365 DWORD tid = GetCurrentThreadId (); 00366 #else 00367 #ifdef VXWORKS 00368 int pid = taskIdSelf (); 00369 int tid = 0; 00370 #else 00371 pid_t pid = getpid (); 00372 pid_t tid = 0; 00373 #endif 00374 #endif 00375 server = find_server (pid, tid); 00376 if (NULL == server) 00377 { 00378 rcs_print_error ("Can`t run. (server = NULL)\n"); 00379 #ifdef UNDER_CE 00380 ExitThread (-1); 00381 #else 00382 exit (-1); 00383 #endif 00384 } 00385 if (NULL == server->write_req.data) 00386 { 00387 rcs_print_error ("Can`t run. (server->write_req.data = NULL)\n"); /* */ 00388 server->clean (2); 00389 #ifdef UNDER_CE 00390 ExitThread (-1); 00391 #else 00392 exit (-1); 00393 #endif 00394 } 00395 if (NULL == client_ports) 00396 { 00397 rcs_print_error ("CMS_SERVER: List of client ports is NULL.\n"); 00398 server->clean (2); 00399 #ifdef UNDER_CE 00400 ExitThread (-1); 00401 #else 00402 exit (-1); 00403 #endif 00404 } 00405 memset (&client_address, 0, sizeof (client_address)); 00406 client_address.sin_family = AF_INET; 00407 client_address.sin_addr.s_addr = dl_htonl (INADDR_ANY); 00408 client_address.sin_port = 0; 00409 00410 message_header.msg_name = (caddr_t) & client_address; 00411 message_header.msg_namelen = client_addresslen = sizeof (client_address); 00412 00413 request_header_size = 20; 00414 if ((min_compatible_version > 3.13 || min_compatible_version < 1e-6)) 00415 { 00416 request_header_size = 24; 00417 } 00418 if ((min_compatible_version > 3.43 || min_compatible_version < 1e-6)) 00419 { 00420 request_header_size = 28; 00421 } 00422 cms_server_count++; 00423 00424 00425 while (1) 00426 { 00427 iov2[0].iov_base = temp_buffer; 00428 iov2[0].iov_len = request_header_size; 00429 iov2[1].iov_base = (char *) server->write_req.data; 00430 iov2[1].iov_len = server->maximum_cms_size * 4; 00431 message_header.msg_iov = iov2; 00432 message_header.msg_iovlen = 2; 00433 if (polling_enabled) 00434 { 00435 request_length = recvmsgtq (connection_socket, 00436 &message_header, 0, dtimeout); 00437 if (request_length == 0 && polling_enabled) 00438 { 00439 update_subscriptions (); 00440 continue; 00441 } 00442 } 00443 else 00444 { 00445 request_length = recvmsgt (connection_socket, 00446 &message_header, 0, -1.0); 00447 } 00448 if (request_length < 0) 00449 { 00450 #ifndef UNDER_CE 00451 rcs_print_error ("server: recvmsg error: %d %s\n", 00452 errno, strerror (errno)); 00453 #endif 00454 continue; 00455 } 00456 00457 if (request_length < request_header_size) 00458 { 00459 rcs_print_error 00460 ("server: recvmsg error: recieved only %d bytes when atleast %d were expected.\n", 00461 request_length, request_header_size); 00462 continue; 00463 } 00464 handle_request (); 00465 if (polling_enabled) 00466 { 00467 update_subscriptions (); 00468 } 00469 } 00470 } |
|
Reimplemented from CMS_SERVER_REMOTE_PORT. Definition at line 326 of file udp_srv.cc. 00327 { 00328 port_registered = 0; 00329 if (server_socket_address.sin_port == 0) 00330 { 00331 rcs_print_error ("server can not register on port number 0.\n"); 00332 return; 00333 } 00334 if ((connection_socket = dl_socket (AF_INET, SOCK_DGRAM, 0)) < 0) 00335 { 00336 rcs_print_error ("Server can not open stream socket.\n"); 00337 return; 00338 } 00339 00340 if (set_udp_socket_options (connection_socket) < 0) 00341 { 00342 return; 00343 } 00344 if (broadcast_subscriptions) 00345 { 00346 if (make_udp_socket_broadcast (connection_socket) < 0) 00347 { 00348 return; 00349 } 00350 } 00351 if (dl_bind (connection_socket, (struct sockaddr *) &server_socket_address, 00352 sizeof (server_socket_address)) < 0) 00353 { 00354 rcs_print_error ("Server can not bind the connection socket.\n"); 00355 return; 00356 } 00357 port_registered = 1; 00358 } |
|
Reimplemented from CMS_SERVER_REMOTE_PORT. Definition at line 123 of file udp_srv.cc. Referenced by ~CMS_SERVER_REMOTE_UDP_PORT().
00124 { 00125 if (connection_socket > 0) 00126 { 00127 dl_closesocket (connection_socket); 00128 connection_socket = 0; 00129 } 00130 } |
|
Definition at line 500 of file udp_srv.cc. Referenced by run().
00501 { 00502 u_long request_type, buffer_number, subdiv, serial_number; 00503 long reply_length; 00504 request_type = dl_ntohl (*((u_long *) temp_buffer)); 00505 buffer_number = dl_ntohl (*((u_long *) temp_buffer + 1)); 00506 serial_number = dl_ntohl (*((u_long *) temp_buffer + 2)); 00507 subdiv = 0; 00508 if (min_compatible_version > 3.43 || min_compatible_version < 1e-6) 00509 { 00510 subdiv = dl_ntohl (*((u_long *) temp_buffer + 6)); 00511 } 00512 *((u_long *) temp_buffer) = dl_htonl (serial_number); /* used to check for missing or out or order packets */ 00513 iov2[0].iov_base = temp_buffer; 00514 iov2[0].iov_len = reply_header_size; 00515 00516 switch (request_type) 00517 { 00518 case REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE: 00519 { 00520 REMOTE_GET_BUF_NAME_REQUEST namereq; 00521 namereq.buffer_number = buffer_number; 00522 REMOTE_GET_BUF_NAME_REPLY *namereply = NULL; 00523 namereply = 00524 (REMOTE_GET_BUF_NAME_REPLY *) server->process_request (&namereq); 00525 memset (temp_buffer, 0, 32); 00526 if (NULL == namereply) 00527 { 00528 rcs_print_error ("Server could not process request.\n"); 00529 *((u_long *) temp_buffer) = dl_htonl (serial_number); /* used to check for missing or out or order packets */ 00530 *((u_long *) temp_buffer + 1) = 00531 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR); 00532 *((u_long *) temp_buffer + 2) = dl_htonl (0); /* size */ 00533 *((u_long *) temp_buffer + 3) = dl_htonl (0); /* write_id */ 00534 *((u_long *) temp_buffer + 4) = dl_htonl (0); /* was_read */ 00535 iov2[1].iov_len = 0; 00536 message_header.msg_iovlen = 1; 00537 sendmsgt (connection_socket, &message_header, 0, -1.0); 00538 return; 00539 } 00540 long name_data_len = strlen (namereply->name) + 1; 00541 *((u_long *) temp_buffer) = dl_htonl (serial_number); /* used to check for missing or out or order packets */ 00542 *((u_long *) temp_buffer + 1) = dl_htonl (namereply->status); 00543 *((u_long *) temp_buffer + 2) = dl_htonl (name_data_len); 00544 iov2[1].iov_base = (char *) namereply->name; 00545 iov2[1].iov_len = name_data_len; 00546 message_header.msg_iovlen = 2; 00547 reply_length = sendmsgt (connection_socket, &message_header, 0, -1.0); 00548 if (reply_length != name_data_len + 20) 00549 { 00550 rcs_print_error ("reply_length = %ld, name_data_len=%ld\n", 00551 reply_length, name_data_len); 00552 } 00553 } 00554 break; 00555 00556 00557 case REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE: 00558 { 00559 int subscription_type = dl_ntohl (*((u_long *) temp_buffer + 3)); 00560 int poll_interval_millis = dl_ntohl (*((u_long *) temp_buffer + 4)); 00561 int last_read_id = dl_ntohl (*((u_long *) temp_buffer + 5)); 00562 CLIENT_UDP_PORT *new_clnt_port = get_client_port (&client_address); 00563 memcpy (&(new_clnt_port->client_socket_address), &client_address, 00564 sizeof (struct sockaddr_in)); 00565 memcpy (&(new_clnt_port->reply_message_header), &message_header, 00566 sizeof (struct msghdr)); 00567 new_clnt_port->reply_message_header.msg_name = 00568 (caddr_t) & (new_clnt_port->client_socket_address); 00569 new_clnt_port->reply_message_header.msg_namelen = 00570 sizeof (struct sockaddr_in); 00571 new_clnt_port->reply_iov2[0].iov_base = temp_buffer; 00572 new_clnt_port->reply_iov2[0].iov_len = 20; 00573 new_clnt_port->reply_iov2[1].iov_base = 00574 (char *) server->write_req.data; 00575 new_clnt_port->reply_iov2[1].iov_len = server->maximum_cms_size * 4; 00576 new_clnt_port->reply_message_header.msg_iov = 00577 new_clnt_port->reply_iov2; 00578 new_clnt_port->reply_message_header.msg_iovlen = 2; 00579 new_clnt_port->serial_number = serial_number + 1; 00580 new_clnt_port->last_read_id = last_read_id; 00581 add_subscription_client (buffer_number, subdiv, 00582 subscription_type, 00583 poll_interval_millis, new_clnt_port); 00584 *((u_long *) temp_buffer) = dl_htonl (serial_number); /* used to check for missing or out or order packets */ 00585 *((u_long *) temp_buffer + 1) = dl_htonl (1); 00586 *((u_long *) temp_buffer + 2) = 00587 dl_htonl (new_clnt_port->subscription_id); 00588 new_clnt_port->reply_iov2[1].iov_base = 0; 00589 new_clnt_port->reply_iov2[1].iov_len = 0; 00590 new_clnt_port->reply_message_header.msg_iovlen = 1; 00591 reply_length = 00592 sendmsgt (connection_socket, &(new_clnt_port->reply_message_header), 00593 0, -1.0); 00594 if (reply_length != 20) 00595 { 00596 rcs_print_error ("reply_length = %ld, (expected 20) \n", 00597 reply_length); 00598 } 00599 } 00600 break; 00601 00602 case REMOTE_CMS_CANCEL_SUBSCRIPTION_REQUEST_TYPE: 00603 { 00604 int subscription_id = dl_ntohl (*((u_long *) temp_buffer + 3)); 00605 00606 remove_subscription (subscription_id, buffer_number, subdiv); 00607 *((u_long *) temp_buffer) = dl_htonl (serial_number); /* used to check for missing or out or order packets */ 00608 *((u_long *) temp_buffer + 1) = dl_htonl (1); 00609 *((u_long *) temp_buffer + 2) = dl_htonl (subscription_id); 00610 iov2[1].iov_base = 0; 00611 iov2[1].iov_len = 0; 00612 message_header.msg_iovlen = 1; 00613 reply_length = 00614 sendmsgt (connection_socket, &(message_header), 0, -1.0); 00615 if (reply_length != 20) 00616 { 00617 rcs_print_error ("reply_length = %ld, (expected 20) \n", 00618 reply_length); 00619 } 00620 } 00621 break; 00622 00623 case REMOTE_CMS_READ_REQUEST_TYPE: 00624 server->read_req.buffer_number = buffer_number; 00625 server->read_req.access_type = dl_ntohl (*((u_long *) temp_buffer + 3)); 00626 server->read_req.last_id_read = 00627 dl_ntohl (*((u_long *) temp_buffer + 4)); 00628 server->read_req.subdiv = subdiv; 00629 server->read_reply = 00630 (REMOTE_READ_REPLY *) server->process_request (&server->read_req); 00631 if (NULL == server->read_reply) 00632 { 00633 rcs_print_error ("Server could not process request.\n"); 00634 *((u_long *) temp_buffer + 1) = 00635 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR); 00636 *((u_long *) temp_buffer + 2) = dl_htonl (0); /* size */ 00637 *((u_long *) temp_buffer + 3) = dl_htonl (0); /* write_id */ 00638 *((u_long *) temp_buffer + 4) = dl_htonl (0); /* was_read */ 00639 iov2[1].iov_len = 0; 00640 message_header.msg_iovlen = 1; 00641 sendmsgt (connection_socket, &message_header, 0, -1.0); 00642 return; 00643 } 00644 *((u_long *) temp_buffer + 1) = dl_htonl (server->read_reply->status); 00645 *((u_long *) temp_buffer + 2) = dl_htonl (server->read_reply->size); 00646 *((u_long *) temp_buffer + 3) = dl_htonl (server->read_reply->write_id); 00647 *((u_long *) temp_buffer + 4) = dl_htonl (server->read_reply->was_read); 00648 iov2[1].iov_base = (char *) server->read_reply->data; 00649 iov2[1].iov_len = server->read_reply->size; 00650 message_header.msg_iovlen = 2; 00651 reply_length = sendmsgt (connection_socket, &message_header, 0, -1.0); 00652 if (reply_length != server->read_reply->size + 20) 00653 { 00654 rcs_print_error 00655 ("reply_length = %ld, server->read_reply->size+20=%ld\n", 00656 reply_length, server->read_reply->size + 20); 00657 } 00658 break; 00659 00660 case REMOTE_CMS_WRITE_REQUEST_TYPE: 00661 server->write_req.buffer_number = buffer_number; 00662 server->write_req.access_type = 00663 dl_ntohl (*((u_long *) temp_buffer + 3)); 00664 server->write_req.size = dl_ntohl (*((u_long *) temp_buffer + 4)); 00665 server->write_req.subdiv = subdiv; 00666 server->write_reply = 00667 (REMOTE_WRITE_REPLY *) server->process_request (&server->write_req); 00668 if (request_length < server->write_req.size + 20) 00669 { 00670 rcs_print_error 00671 ("request_length = %ld, server->write_req.size+20 = %ld\n", 00672 request_length, server->write_req.size + 20); 00673 } 00674 /* if(NULL == server->write_reply) 00675 { 00676 rcs_print_error("Server could not process request.\n"); 00677 *((u_long *)temp_buffer+1) = dl_htonl((unsigned long) CMS_SERVER_SIDE_ERROR); 00678 *((u_long *)temp_buffer+2) = dl_htonl(0); 00679 iov2[1].iov_len = 0; 00680 message_header.msg_iovlen = 1; 00681 sendmsgt(connection_socket, &message_header, 0, -1.0); 00682 return; 00683 } 00684 00685 *((u_long *)temp_buffer+1) = dl_htonl(server->write_reply->status); 00686 *((u_long *)temp_buffer+2) = dl_htonl(server->write_reply->was_read); 00687 iov2[1].iov_len = 0; 00688 message_header.msg_iovlen = 1; 00689 sendmsgt(connection_socket, &message_header, 0, -1.0); */ 00690 break; 00691 00692 case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE: 00693 server->check_if_read_req.buffer_number = buffer_number; 00694 server->check_if_read_req.subdiv = subdiv; 00695 server->check_if_read_reply = (REMOTE_CHECK_IF_READ_REPLY *) 00696 server->process_request (&server->check_if_read_req); 00697 if (NULL == server->check_if_read_reply) 00698 { 00699 rcs_print_error ("Server could not process request.\n"); 00700 *((u_long *) temp_buffer + 1) = 00701 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR); 00702 *((u_long *) temp_buffer + 2) = dl_htonl (0); /* was_read */ 00703 iov2[1].iov_len = 0; 00704 sendmsgt (connection_socket, &message_header, 0, -1.0); 00705 return; 00706 } 00707 *((u_long *) temp_buffer + 1) = 00708 dl_htonl (server->check_if_read_reply->status); 00709 *((u_long *) temp_buffer + 2) = 00710 dl_htonl (server->check_if_read_reply->was_read); 00711 iov2[1].iov_len = 0; 00712 message_header.msg_iovlen = 1; 00713 sendmsgt (connection_socket, &message_header, 0, -1.0); 00714 break; 00715 00716 case REMOTE_CMS_CLEAR_REQUEST_TYPE: 00717 server->clear_req.buffer_number = buffer_number; 00718 server->clear_req.subdiv = subdiv; 00719 server->clear_reply = 00720 (REMOTE_CLEAR_REPLY *) server->process_request (&server->clear_req); 00721 if (NULL == server->clear_reply) 00722 { 00723 rcs_print_error ("Server could not process request.\n"); 00724 *((u_long *) temp_buffer + 1) = 00725 dl_htonl ((u_long) CMS_SERVER_SIDE_ERROR); 00726 iov2[1].iov_len = 0; 00727 sendmsgt (connection_socket, &message_header, 0, -1); 00728 return; 00729 } 00730 *((u_long *) temp_buffer + 1) = dl_htonl (server->clear_reply->status); 00731 iov2[1].iov_len = 0; 00732 message_header.msg_iovlen = 1; 00733 //sendmsgt(connection_socket, &message_header, 0, -1); 00734 break; 00735 00736 case REMOTE_CMS_CLEAN_REQUEST_TYPE: 00737 server->spawner_pid = server->server_pid; 00738 server->kill_server (); 00739 break; 00740 00741 case REMOTE_CMS_CLOSE_CHANNEL_REQUEST_TYPE: 00742 *((u_long *) temp_buffer + 1) = dl_htonl (CMS_CLEAR_OK); 00743 iov2[1].iov_len = 0; 00744 message_header.msg_iovlen = 1; 00745 //sendmsgt(connection_socket, &message_header, 0, -1); 00746 break; 00747 00748 default: 00749 rcs_print_error ("Unrecognized request type received.(%ld)\n", 00750 request_type); 00751 break; 00752 } 00753 } |
|
Definition at line 1064 of file udp_srv.cc. Referenced by run().
01065 { 01066 #if defined(WIN32) && !defined(gnuwin32) 01067 DWORD pid = GetCurrentProcessId (); 01068 DWORD tid = GetCurrentThreadId (); 01069 #else 01070 #ifdef VXWORKS 01071 int pid = taskIdSelf (); 01072 int tid = 0; 01073 #else 01074 pid_t pid = getpid (); 01075 pid_t tid = 0; 01076 #endif 01077 #endif 01078 CMS_SERVER *server; 01079 server = find_server (pid, tid); 01080 if (NULL == server) 01081 { 01082 rcs_print_error ("Cannot find server object.\n"); 01083 return; 01084 } 01085 if (NULL == subscription_buffers) 01086 { 01087 return; 01088 } 01089 double cur_time = etime (); 01090 UDP_BUFFER_SUBSCRIPTION_INFO *buf_info = 01091 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head (); 01092 while (NULL != buf_info) 01093 { 01094 if (cur_time - buf_info->last_update_time < 01095 buf_info->min_update_interval) 01096 { 01097 buf_info = 01098 (UDP_BUFFER_SUBSCRIPTION_INFO *) 01099 subscription_buffers->get_next (); 01100 continue; 01101 } 01102 buf_info->last_update_time = cur_time; 01103 server->read_req.buffer_number = buf_info->buffer_number; 01104 server->read_req.subdiv = buf_info->subdiv; 01105 server->read_req.access_type = CMS_READ_ACCESS; 01106 server->read_req.last_id_read = buf_info->min_last_id; 01107 server->read_reply = 01108 (REMOTE_READ_REPLY *) server->process_request (&server->read_req); 01109 if (NULL == server->read_reply) 01110 { 01111 rcs_print_error ("Server could not process request.\n"); 01112 buf_info = 01113 (UDP_BUFFER_SUBSCRIPTION_INFO *) 01114 subscription_buffers->get_next (); 01115 continue; 01116 } 01117 *((u_long *) temp_buffer) = 0; 01118 *((u_long *) temp_buffer + 1) = dl_htonl (server->read_reply->status); 01119 *((u_long *) temp_buffer + 2) = dl_htonl (server->read_reply->size); 01120 *((u_long *) temp_buffer + 3) = dl_htonl (server->read_reply->write_id); 01121 *((u_long *) temp_buffer + 4) = dl_htonl (server->read_reply->was_read); 01122 if (server->read_reply->size < 1) 01123 { 01124 buf_info = 01125 (UDP_BUFFER_SUBSCRIPTION_INFO *) 01126 subscription_buffers->get_next (); 01127 continue; 01128 } 01129 if (server->read_reply->write_id == buf_info->min_last_id) 01130 { 01131 buf_info = 01132 (UDP_BUFFER_SUBSCRIPTION_INFO *) 01133 subscription_buffers->get_next (); 01134 continue; 01135 } 01136 buf_info->min_last_id = server->read_reply->write_id; 01137 if (buf_info->broadcast_data != NULL) 01138 { 01139 buf_info->max_serial_number++; 01140 *((u_long *) temp_buffer) = dl_htonl (buf_info->max_serial_number); 01141 iov2[0].iov_len = reply_header_size; 01142 iov2[1].iov_base = (caddr_t) server->read_reply->data; 01143 iov2[1].iov_len = server->read_reply->size; 01144 message_header.msg_iovlen = 2; 01145 message_header.msg_name = 01146 (caddr_t) & (buf_info->broadcast_data->broadcast_address); 01147 if (sendmsgt (connection_socket, &message_header, 0, dtimeout) < 0) 01148 { 01149 //rcs_print_sys_error(ERRNO_ERROR_SOURCE,"sendmsg error"); 01150 } 01151 buf_info = 01152 (UDP_BUFFER_SUBSCRIPTION_INFO *) 01153 subscription_buffers->get_next (); 01154 message_header.msg_name = (caddr_t) & client_address; 01155 continue; 01156 } 01157 UDP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 01158 (UDP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_head (); 01159 while (temp_clnt_info != NULL) 01160 { 01161 if (NULL == temp_clnt_info->clnt_port) 01162 { 01163 temp_clnt_info = 01164 (UDP_CLIENT_SUBSCRIPTION_INFO *) 01165 buf_info->sub_clnt_info->get_next (); 01166 continue; 01167 } 01168 double time_diff = cur_time - temp_clnt_info->last_sub_sent_time; 01169 int time_diff_millis = (int) ((double) time_diff * 1000.0); 01170 rcs_print_debug (PRINT_SERVER_SUBSCRIPTION_ACTIVITY, 01171 "Subscription time_diff_millis=%d\n", 01172 time_diff_millis); 01173 if ( 01174 ((temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION 01175 && time_diff_millis + 10 >= 01176 temp_clnt_info->poll_interval_millis) 01177 || temp_clnt_info->subscription_type == 01178 CMS_VARIABLE_SUBSCRIPTION) 01179 && temp_clnt_info->last_id_read != server->read_reply->write_id) 01180 { 01181 temp_clnt_info->last_id_read = server->read_reply->write_id; 01182 temp_clnt_info->last_sub_sent_time = cur_time; 01183 temp_clnt_info->clnt_port->serial_number++; 01184 *((u_long *) temp_buffer) = 01185 dl_htonl (temp_clnt_info->clnt_port->serial_number); 01186 temp_clnt_info->clnt_port->reply_iov2[1].iov_base = 01187 (caddr_t) server->read_reply->data; 01188 temp_clnt_info->clnt_port->reply_iov2[1].iov_len = 01189 server->read_reply->size; 01190 temp_clnt_info->clnt_port->reply_message_header.msg_iovlen = 2; 01191 if (sendmsgt 01192 (connection_socket, 01193 &(temp_clnt_info->clnt_port->reply_message_header), 0, 01194 dtimeout) < 0) 01195 { 01196 temp_clnt_info->clnt_port->errors++; 01197 return; 01198 } 01199 } 01200 if (temp_clnt_info->last_id_read < buf_info->min_last_id) 01201 { 01202 buf_info->min_last_id = temp_clnt_info->last_id_read; 01203 } 01204 if (temp_clnt_info->clnt_port->serial_number < 01205 buf_info->max_serial_number) 01206 { 01207 buf_info->max_serial_number = 01208 temp_clnt_info->clnt_port->serial_number; 01209 } 01210 temp_clnt_info = 01211 (UDP_CLIENT_SUBSCRIPTION_INFO *) 01212 buf_info->sub_clnt_info->get_next (); 01213 } 01214 buf_info = 01215 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next (); 01216 } 01217 } |
|
Definition at line 757 of file udp_srv.cc. Referenced by handle_request().
00762 { 00763 if (NULL == subscription_buffers) 00764 { 00765 subscription_buffers = new RCS_LINKED_LIST (); 00766 } 00767 if (NULL == subscription_buffers) 00768 { 00769 rcs_print_error ("Can`t create subscription_buffers list.\n"); 00770 } 00771 00772 UDP_BUFFER_SUBSCRIPTION_INFO *buf_info = 00773 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head (); 00774 while (NULL != buf_info) 00775 { 00776 if (buf_info->buffer_number == buffer_number && 00777 buf_info->subdiv == subdiv) 00778 { 00779 break; 00780 } 00781 buf_info = 00782 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next (); 00783 } 00784 if (NULL == buf_info) 00785 { 00786 buf_info = new UDP_BUFFER_SUBSCRIPTION_INFO (); 00787 buf_info->buffer_number = buffer_number; 00788 buf_info->subdiv = subdiv; 00789 buf_info->last_update_time = etime (); 00790 buf_info->sub_clnt_info = new RCS_LINKED_LIST (); 00791 buf_info->list_id = 00792 subscription_buffers->store_at_tail (buf_info, sizeof (*buf_info), 0); 00793 buf_info->min_last_id = clnt->last_read_id; 00794 if (NULL != broadcast_ports) 00795 { 00796 UDP_BROADCAST_DATA *broadcast_data = (UDP_BROADCAST_DATA *) 00797 broadcast_ports->get_head (); 00798 while (NULL != broadcast_data) 00799 { 00800 if (broadcast_data->buffer_number == buffer_number) 00801 { 00802 buf_info->broadcast_data = broadcast_data; 00803 break; 00804 } 00805 broadcast_data = (UDP_BROADCAST_DATA *) 00806 broadcast_ports->get_next (); 00807 } 00808 } 00809 } 00810 if (buf_info->max_serial_number < clnt->serial_number) 00811 { 00812 buf_info->max_serial_number = clnt->serial_number; 00813 } 00814 if (buf_info->min_last_id > clnt->last_read_id) 00815 { 00816 buf_info->min_last_id = clnt->last_read_id; 00817 } 00818 if (NULL == clnt->subscriptions) 00819 { 00820 clnt->subscriptions = new RCS_LINKED_LIST (); 00821 } 00822 UDP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 00823 (UDP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head (); 00824 while (temp_clnt_info != NULL) 00825 { 00826 if (temp_clnt_info->buffer_number == buffer_number && 00827 temp_clnt_info->subdiv == subdiv) 00828 { 00829 break; 00830 } 00831 temp_clnt_info = 00832 (UDP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next (); 00833 } 00834 if (NULL == temp_clnt_info) 00835 { 00836 temp_clnt_info = new UDP_CLIENT_SUBSCRIPTION_INFO (); 00837 temp_clnt_info->last_sub_sent_time = etime (); 00838 temp_clnt_info->buffer_number = buffer_number; 00839 temp_clnt_info->subdiv = subdiv; 00840 temp_clnt_info->subscription_paused = 0; 00841 temp_clnt_info->last_id_read = 0; 00842 temp_clnt_info->sub_buf_info = buf_info; 00843 temp_clnt_info->clnt_port = clnt; 00844 temp_clnt_info->last_sub_sent_time = etime (); 00845 temp_clnt_info->subscription_list_id = 00846 clnt->subscriptions->store_at_tail (temp_clnt_info, 00847 sizeof (*temp_clnt_info), 0); 00848 buf_info->sub_clnt_info->store_at_tail (temp_clnt_info, 00849 sizeof (*temp_clnt_info), 0); 00850 temp_clnt_info->subscription_id = clnt->subscription_id = 00851 ++last_subscription_id; 00852 } 00853 temp_clnt_info->subscription_type = subscription_type; 00854 temp_clnt_info->poll_interval_millis = poll_interval_millis; 00855 recalculate_polling_interval (); 00856 } |
|
Definition at line 860 of file udp_srv.cc. Referenced by handle_request().
00863 { 00864 if (NULL == subscription_buffers) 00865 { 00866 return; 00867 } 00868 00869 UDP_BUFFER_SUBSCRIPTION_INFO *buf_info = 00870 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head (); 00871 while (NULL != buf_info) 00872 { 00873 if (buf_info->buffer_number == buffer_number && 00874 buf_info->subdiv == subdiv) 00875 { 00876 break; 00877 } 00878 buf_info = 00879 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next (); 00880 } 00881 00882 if (NULL != buf_info) 00883 { 00884 if (NULL != buf_info->sub_clnt_info) 00885 { 00886 UDP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 00887 (UDP_CLIENT_SUBSCRIPTION_INFO *) 00888 buf_info->sub_clnt_info->get_head (); 00889 while (temp_clnt_info != NULL) 00890 { 00891 if (temp_clnt_info->subscription_id == subscription_id 00892 && NULL != temp_clnt_info->clnt_port) 00893 { 00894 remove_subscription_client (temp_clnt_info->clnt_port, 00895 buffer_number, subdiv); 00896 break; 00897 } 00898 if (NULL == buf_info) 00899 { 00900 break; 00901 } 00902 if (NULL == buf_info->sub_clnt_info) 00903 { 00904 break; 00905 } 00906 temp_clnt_info = 00907 (UDP_CLIENT_SUBSCRIPTION_INFO *) 00908 buf_info->sub_clnt_info->get_next (); 00909 } 00910 } 00911 } 00912 } |
|
Definition at line 915 of file udp_srv.cc. Referenced by remove_subscription().
00919 { 00920 if (NULL == clnt) 00921 { 00922 return; 00923 } 00924 if (NULL == clnt->subscriptions) 00925 { 00926 return; 00927 } 00928 UDP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 00929 (UDP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head (); 00930 while (temp_clnt_info != NULL) 00931 { 00932 if (temp_clnt_info->buffer_number == buffer_number && 00933 temp_clnt_info->subdiv == subdiv) 00934 { 00935 if (NULL != temp_clnt_info->sub_buf_info) 00936 { 00937 if (NULL != temp_clnt_info->sub_buf_info->sub_clnt_info) 00938 { 00939 temp_clnt_info->sub_buf_info->sub_clnt_info-> 00940 delete_node (temp_clnt_info->subscription_list_id); 00941 if (temp_clnt_info->sub_buf_info->sub_clnt_info-> 00942 list_size == 0) 00943 { 00944 subscription_buffers->delete_node (temp_clnt_info-> 00945 sub_buf_info-> 00946 list_id); 00947 if (subscription_buffers->list_size == 0) 00948 { 00949 delete subscription_buffers; 00950 subscription_buffers = NULL; 00951 } 00952 delete temp_clnt_info->sub_buf_info->sub_clnt_info; 00953 temp_clnt_info->sub_buf_info->sub_clnt_info = NULL; 00954 delete temp_clnt_info->sub_buf_info; 00955 temp_clnt_info->sub_buf_info = NULL; 00956 } 00957 } 00958 } 00959 clnt->subscriptions->delete_current_node (); 00960 delete temp_clnt_info; 00961 temp_clnt_info = NULL; 00962 break; 00963 } 00964 temp_clnt_info = 00965 (UDP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next (); 00966 } 00967 temp_clnt_info = 00968 (UDP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head (); 00969 while (temp_clnt_info != NULL) 00970 { 00971 if (temp_clnt_info->buffer_number == buffer_number && 00972 temp_clnt_info->subdiv == subdiv) 00973 { 00974 delete temp_clnt_info; 00975 temp_clnt_info = NULL; 00976 break; 00977 } 00978 temp_clnt_info = 00979 (UDP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next (); 00980 } 00981 00982 if (clnt->subscriptions->list_size == 0) 00983 { 00984 delete clnt->subscriptions; 00985 clnt->subscriptions = NULL; 00986 if (NULL != client_ports) 00987 { 00988 client_ports->delete_node (clnt->client_list_id); 00989 delete clnt; 00990 clnt = NULL; 00991 } 00992 if (client_ports->list_size == 0) 00993 { 00994 delete client_ports; 00995 client_ports = NULL; 00996 } 00997 } 00998 recalculate_polling_interval (); 00999 } |
|
Referenced by handle_request().
|
|
Definition at line 1002 of file udp_srv.cc. Referenced by add_subscription_client(), and remove_subscription_client().
01003 { 01004 int min_poll_interval_millis = 30000; 01005 polling_enabled = 0; 01006 dtimeout = -1.0; 01007 if (NULL == subscription_buffers) 01008 { 01009 current_poll_interval_millis = min_poll_interval_millis; 01010 select_timeout.tv_sec = 30; 01011 select_timeout.tv_usec = 0; 01012 return; 01013 } 01014 UDP_BUFFER_SUBSCRIPTION_INFO *buf_info = 01015 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head (); 01016 while (NULL != buf_info) 01017 { 01018 buf_info->min_update_interval = 3600; 01019 UDP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info = 01020 (UDP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_head (); 01021 while (temp_clnt_info != NULL) 01022 { 01023 if (temp_clnt_info->poll_interval_millis < min_poll_interval_millis 01024 && temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION) 01025 { 01026 min_poll_interval_millis = temp_clnt_info->poll_interval_millis; 01027 polling_enabled = 1; 01028 } 01029 if (temp_clnt_info->poll_interval_millis / 1000.0 < 01030 buf_info->min_update_interval) 01031 { 01032 buf_info->min_update_interval = 01033 temp_clnt_info->poll_interval_millis / 1000.0; 01034 } 01035 temp_clnt_info = 01036 (UDP_CLIENT_SUBSCRIPTION_INFO *) 01037 buf_info->sub_clnt_info->get_next (); 01038 } 01039 buf_info = 01040 (UDP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next (); 01041 } 01042 if (min_poll_interval_millis >= ((int) (clk_tck () * 1000.0))) 01043 { 01044 current_poll_interval_millis = min_poll_interval_millis; 01045 select_timeout.tv_sec = current_poll_interval_millis / 1000; 01046 select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000; 01047 dtimeout = (current_poll_interval_millis) / 1000.0; 01048 } 01049 else 01050 { 01051 current_poll_interval_millis = ((int) (clk_tck () * 1000.0)); 01052 if (((int) (clk_tck () * 1E6) % 1000) != 0) 01053 { 01054 current_poll_interval_millis++; 01055 } 01056 select_timeout.tv_sec = current_poll_interval_millis / 1000; 01057 select_timeout.tv_usec = (long) (ceil (clk_tck () * 1E6)); 01058 dtimeout = clk_tck (); 01059 } 01060 } |
|
Definition at line 134 of file udp_srv.cc. Referenced by accept_local_port_cms().
00135 { 00136 memset (&broadcast_address, 0, sizeof (broadcast_address)); 00137 broadcast_address.sin_family = AF_INET; 00138 broadcast_address.sin_addr.s_addr = dl_htonl (INADDR_ANY); 00139 broadcast_address.sin_port = 0; 00140 char localhostname[80]; 00141 if (dl_gethostname (localhostname, 80) < 0) 00142 { 00143 if (strcmp (_cms->BufferHost, "localhost") != 0) 00144 { 00145 strncpy (localhostname, _cms->BufferHost, 80); 00146 } 00147 else 00148 { 00149 strncpy (localhostname, _cms->ProcessHost, 80); 00150 } 00151 } 00152 #ifndef VXWORKS 00153 dl_gethostbyname (localhostname, &broadcast_server_host_entry); 00154 if (NULL == broadcast_server_host_entry) 00155 { 00156 rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n", 00157 localhostname); 00158 return; 00159 } 00160 #ifdef __MSDOS__ 00161 broadcast_address.sin_addr.s_addr = 00162 *((u_long *) broadcast_server_host_entry->h_addr_list[0]); 00163 #else 00164 broadcast_address.sin_addr.s_addr = 00165 *((int *) broadcast_server_host_entry->h_addr_list[0]); 00166 #endif 00167 broadcast_address.sin_family = broadcast_server_host_entry->h_addrtype; 00168 #else 00169 broadcast_address.sin_addr.s_addr = hostGetByName (localhostname); 00170 if (broadcast_address.sin_addr.s_addr == ERROR) 00171 { 00172 rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n", 00173 localhostname); 00174 return; 00175 } 00176 #endif 00177 broadcast_address.sin_addr.s_addr |= dl_htonl (0xff); 00178 rcs_print_debug (PRINT_SOCKET_CONNECT, "Broadcasting to IP address %s.\n", 00179 dl_inet_ntoa (broadcast_address.sin_addr)); 00180 broadcast_address_set = 1; 00181 } |
|
Definition at line 78 of file udp_srv.hh. |
|
Definition at line 81 of file udp_srv.hh. |
|
Definition at line 81 of file udp_srv.hh. |
|
Definition at line 83 of file udp_srv.hh. |
|
Definition at line 84 of file udp_srv.hh. |
|
Definition at line 85 of file udp_srv.hh. |
|
Definition at line 86 of file udp_srv.hh. |
|
Definition at line 87 of file udp_srv.hh. |
|
Definition at line 88 of file udp_srv.hh. |
|
Definition at line 89 of file udp_srv.hh. |
|
Definition at line 90 of file udp_srv.hh. |
|
Definition at line 91 of file udp_srv.hh. |
|
Definition at line 92 of file udp_srv.hh. |
|
Definition at line 93 of file udp_srv.hh. |
|
Definition at line 94 of file udp_srv.hh. |
|
Definition at line 95 of file udp_srv.hh. |
|
Definition at line 96 of file udp_srv.hh. |
|
Definition at line 97 of file udp_srv.hh. |
|
Definition at line 98 of file udp_srv.hh. |
|
Definition at line 99 of file udp_srv.hh. |
|
Definition at line 100 of file udp_srv.hh. |
|
Definition at line 112 of file udp_srv.hh. |
|
Definition at line 113 of file udp_srv.hh. |
|
Definition at line 114 of file udp_srv.hh. |
|
Definition at line 115 of file udp_srv.hh. |
|
Definition at line 116 of file udp_srv.hh. |
|
Definition at line 117 of file udp_srv.hh. |
|
Definition at line 120 of file udp_srv.hh. |
|
Definition at line 122 of file udp_srv.hh. |