00001
00002
00003
00004
00005
00006
00007 #include "rcs_defs.hh"
00008 #include "sokintrf.h"
00009
00010 #ifdef EXTERN_C_STD_HEADERS
00011 extern "C"
00012 {
00013 #endif
00014
00015 #include <string.h>
00016 #include <stdlib.h>
00017
00018 #ifndef UNDER_CE
00019 #include <errno.h>
00020 #include <signal.h>
00021 #endif
00022
00023 #if (defined(__CENTERLINE__) && !defined(VXWORKS)) || defined(sunos5) || defined(sparcworks)
00024 #include <sys/filio.h>
00025 char *strerror (int errnum);
00026 char *dl_inet_ntoa (struct in_addr);
00027 #endif
00028
00029 #ifdef EXTERN_C_STD_HEADERS
00030 }
00031 #endif
00032
00033 #include "cms.hh"
00034 #include "nml.hh"
00035 #include "tcp_srv.hh"
00036 #include "rcs_prnt.hh"
00037 #include "linklist.hh"
00038 #include "tcp_opts.hh"
00039 #include "timer.hh"
00040 #include "cmsdiag.hh"
00041
00042 extern "C"
00043 {
00044 #include "recvn.h"
00045 #include "sendn.h"
00046 }
00047
00048
00049 #ifdef VXWORKS
00050 #include "vxWorks.h"
00051 #include "taskLib.h"
00052 #endif
00053
00054 #ifdef UNIX_LIKE_PLAT
00055 #include <sys/types.h>
00056 #include <sys/wait.h>
00057 #endif
00058
00059 #ifndef NO_THREADS
00060 #ifdef SGI
00061 #include <sys/resource.h>
00062 #include <sys/prctl.h>
00063 #endif
00064
00065
00066 #ifdef WIN32
00067 #ifdef MULTITHREADED
00068 #ifndef UNDER_CE
00069 #include <process.h>
00070 #else
00071 #include <winbase.h>
00072 #endif
00073 #else
00074 #define NO_THREADS
00075 #endif
00076 #endif
00077 #endif
00078
00079 int tcpsvr_threads_created = 0;
00080 int tcpsvr_threads_killed = 0;
00081 int tcpsvr_threads_exited = 0;
00082 int tcpsvr_threads_returned_early = 0;
00083
00084 TCPSVR_BLOCKING_READ_REQUEST::TCPSVR_BLOCKING_READ_REQUEST ()
00085 {
00086 access_type = CMS_READ_ACCESS;
00087 last_id_read = 0;
00088
00089
00090 timeout_millis = -1;
00091
00092 _client_tcp_port = NULL;
00093 remport = NULL;
00094 server = NULL;
00095 _nml = NULL;
00096 _reply = NULL;
00097 _data = NULL;
00098 read_reply = NULL;
00099 }
00100
00101
00102 static inline double
00103 tcp_svr_reverse_double (double in)
00104 {
00105 double out;
00106 char *c1, *c2;
00107
00108 c1 = ((char *) &in) + 7;
00109 c2 = (char *) &out;
00110 for (int i = 0; i < 8; i++)
00111 {
00112 *c2 = *c1;
00113 c1--;
00114 c2++;
00115 }
00116 return out;
00117 }
00118
00119
00120 TCPSVR_BLOCKING_READ_REQUEST::~TCPSVR_BLOCKING_READ_REQUEST ()
00121 {
00122 if (NULL != _nml)
00123 {
00124 NML *nmlcopy = (NML *) _nml;
00125 _nml = NULL;
00126 delete nmlcopy;
00127 }
00128 if (NULL != _data)
00129 {
00130 DEBUG_FREE (_data);
00131 _data = NULL;
00132 }
00133 if (NULL != _reply)
00134 {
00135 DEBUG_FREE (_reply);
00136 _reply = NULL;
00137 read_reply = NULL;
00138 }
00139 if (NULL != read_reply)
00140 {
00141 if (NULL != read_reply->data)
00142 {
00143 DEBUG_FREE (read_reply->data);
00144 read_reply->data = NULL;
00145 }
00146 delete read_reply;
00147 read_reply = NULL;
00148 }
00149 }
00150
00151
00152 CMS_SERVER_REMOTE_TCP_PORT::CMS_SERVER_REMOTE_TCP_PORT (CMS_SERVER *
00153 _cms_server):
00154 CMS_SERVER_REMOTE_PORT (_cms_server)
00155 {
00156 client_ports = (RCS_LINKED_LIST *) NULL;
00157 connection_socket = 0;
00158 connection_port = 0;
00159 maxfdpl = 0;
00160 dtimeout = 20.0;
00161 if (load_socket_interface () < 0)
00162 {
00163 rcs_print_error ("Can't load socket interface.\n");
00164 return;
00165 }
00166
00167 memset (&server_socket_address, 0, sizeof (server_socket_address));
00168 server_socket_address.sin_family = AF_INET;
00169 server_socket_address.sin_addr.s_addr = dl_htonl (INADDR_ANY);
00170 server_socket_address.sin_port = 0;
00171
00172 client_ports = new RCS_LINKED_LIST;
00173 if (NULL == client_ports)
00174 {
00175 rcs_print_error ("Can not create linked list for client ports.\n");
00176 return;
00177 }
00178 polling_enabled = 0;
00179 memset (&select_timeout, 0, sizeof (select_timeout));
00180 select_timeout.tv_sec = 30;
00181 select_timeout.tv_usec = 30;
00182 subscription_buffers = NULL;
00183 current_poll_interval_millis = 30000;
00184 memset (&read_fd_set, 0, sizeof (read_fd_set));
00185 memset (&write_fd_set, 0, sizeof (write_fd_set));
00186 }
00187
00188 CMS_SERVER_REMOTE_TCP_PORT::~CMS_SERVER_REMOTE_TCP_PORT ()
00189 {
00190 unregister_port ();
00191 if (NULL != client_ports)
00192 {
00193 delete client_ports;
00194 client_ports = (RCS_LINKED_LIST *) NULL;
00195 }
00196 }
00197
00198 void
00199 blocking_thread_kill (long int id)
00200 {
00201
00202 if (id <= 0)
00203 {
00204 return;
00205 }
00206 #if defined(sunos5) && !defined(NO_THREADS)
00207 thr_kill (id, SIGINT);
00208 thr_join (id, NULL, NULL);
00209 #endif
00210 #ifdef POSIX_THREADS
00211 pthread_kill (id, SIGINT);
00212 pthread_join (id, NULL);
00213 #endif
00214 #ifdef VXWORKS
00215 if (taskIdVerify (id) != OK)
00216 {
00217 return;
00218 }
00219 kill (id, SIGINT);
00220 taskDelay (1);
00221 if (taskIdVerify (id) == OK)
00222 {
00223 taskDelete (id);
00224 }
00225 #endif
00226 #if defined(NO_THREADS) || defined(SGI)
00227 kill (id, SIGINT);
00228 waitpid (id, NULL, 0);
00229 #endif
00230 #ifdef WIN32
00231 TerminateThread ((HANDLE) id, -1);
00232 #endif
00233 tcpsvr_threads_killed++;
00234 }
00235
00236 void
00237 CMS_SERVER_REMOTE_TCP_PORT::unregister_port ()
00238 {
00239 CLIENT_TCP_PORT *client;
00240 int number_of_connected_clients = 0;
00241 #if defined(sunos5) || defined(VXWORKS)
00242 client = (CLIENT_TCP_PORT *) client_ports->get_head ();
00243 while (NULL != client)
00244 {
00245 if (client->threadId > 0 && client->blocking)
00246 {
00247 blocking_thread_kill (client->threadId);
00248 client->threadId = 0;
00249 }
00250 client = (CLIENT_TCP_PORT *) client_ports->get_next ();
00251 }
00252 #endif
00253
00254 client = (CLIENT_TCP_PORT *) client_ports->get_head ();
00255 while (NULL != client)
00256 {
00257 rcs_print ("Exiting even though client on %s is still connected.\n",
00258 dl_inet_ntoa (client->address.sin_addr));
00259 client = (CLIENT_TCP_PORT *) client_ports->get_next ();
00260 number_of_connected_clients++;
00261 }
00262 client = (CLIENT_TCP_PORT *) client_ports->get_head ();
00263 while (NULL != client)
00264 {
00265 delete client;
00266 client_ports->delete_current_node ();
00267 client = (CLIENT_TCP_PORT *) client_ports->get_next ();
00268 }
00269 if (NULL != subscription_buffers)
00270 {
00271 TCP_BUFFER_SUBSCRIPTION_INFO *sub_info =
00272 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
00273 while (NULL != sub_info)
00274 {
00275 delete sub_info;
00276 sub_info =
00277 (TCP_BUFFER_SUBSCRIPTION_INFO *)
00278 subscription_buffers->get_next ();
00279 }
00280 delete subscription_buffers;
00281 subscription_buffers = NULL;
00282 }
00283 if (number_of_connected_clients > 0)
00284 {
00285 esleep (2.0);
00286 }
00287 if (connection_socket > 0)
00288 {
00289 dl_closesocket (connection_socket);
00290 connection_socket = 0;
00291 }
00292 }
00293
00294 int
00295 CMS_SERVER_REMOTE_TCP_PORT::accept_local_port_cms (CMS * _cms)
00296 {
00297 if (NULL == _cms)
00298 {
00299 return 0;
00300 }
00301 if (_cms->remote_port_type != CMS_TCP_REMOTE_PORT_TYPE)
00302 {
00303 return 0;
00304 }
00305 if (NULL != _cms)
00306 {
00307 if (min_compatible_version < 1e-6 ||
00308 (min_compatible_version > _cms->min_compatible_version &&
00309 _cms->min_compatible_version > 1e-6))
00310 {
00311 min_compatible_version = _cms->min_compatible_version;
00312 }
00313 if (_cms->confirm_write)
00314 {
00315 confirm_write = _cms->confirm_write;
00316 }
00317 }
00318 if (_cms->total_subdivisions > max_total_subdivisions)
00319 {
00320 max_total_subdivisions = _cms->total_subdivisions;
00321 }
00322 if (server_socket_address.sin_port == 0)
00323 {
00324 server_socket_address.sin_port =
00325 dl_htons (((u_short) _cms->tcp_port_number));
00326 port_num = _cms->tcp_port_number;
00327 return 1;
00328 }
00329 if (server_socket_address.sin_port ==
00330 dl_htons (((u_short) _cms->tcp_port_number)))
00331 {
00332 port_num = _cms->tcp_port_number;
00333 return 1;
00334 }
00335 return 0;
00336 }
00337
00338 void
00339 CMS_SERVER_REMOTE_TCP_PORT::register_port ()
00340 {
00341 port_registered = 0;
00342 rcs_print_debug (PRINT_CMS_CONFIG_INFO,
00343 "Registering server on TCP port %d.\n",
00344 dl_ntohs (server_socket_address.sin_port));
00345 if (server_socket_address.sin_port == 0)
00346 {
00347 rcs_print_error ("server can not register on port number 0.\n");
00348 return;
00349 }
00350 if ((connection_socket = dl_socket (AF_INET, SOCK_STREAM, 0)) < 0)
00351 {
00352 #ifndef UNDER_CE
00353 rcs_print_error ("socket error: %d -- %s\n", errno, strerror (errno));
00354 #endif
00355 rcs_print_error ("Server can not open stream socket.\n");
00356 return;
00357 }
00358
00359 if (set_tcp_socket_options (connection_socket) < 0)
00360 {
00361 return;
00362 }
00363 if (dl_bind (connection_socket, (struct sockaddr *) &server_socket_address,
00364 sizeof (server_socket_address)) < 0)
00365 {
00366 #ifndef UNDER_CE
00367 rcs_print_error ("bind error: %d -- %s\n", errno, strerror (errno));
00368 #endif
00369 rcs_print_error
00370 ("Server can not bind the connection socket on port %d.\n",
00371 dl_ntohs (server_socket_address.sin_port));
00372 return;
00373 }
00374 if (dl_listen (connection_socket, 5) < 0)
00375 {
00376 #ifndef UNDER_CE
00377 rcs_print_error ("listen error: %d -- %s\n", errno, strerror (errno));
00378 #endif
00379 rcs_print_error ("TCP Server: error on call to listen for port %d.\n",
00380 dl_ntohs (server_socket_address.sin_port));
00381 return;
00382 }
00383 port_registered = 1;
00384
00385 }
00386
00387 static int last_pipe_signum = 0;
00388
00389 static void
00390 handle_pipe_error (int signum)
00391 {
00392 last_pipe_signum = signum;
00393 rcs_print_error ("SIGPIPE intercepted.\n");
00394 }
00395
00396 void
00397 CMS_SERVER_REMOTE_TCP_PORT::run ()
00398 {
00399 unsigned long bytes_ready;
00400 int ready_descriptors;
00401 if (NULL == client_ports)
00402 {
00403 rcs_print_error ("CMS_SERVER: List of client ports is NULL.\n");
00404 return;
00405 }
00406 CLIENT_TCP_PORT *new_client_port, *client_port_to_check;
00407 FD_ZERO (&read_fd_set);
00408 FD_ZERO (&write_fd_set);
00409 RCS_FD_SET (connection_socket, &read_fd_set);
00410 maxfdpl = connection_socket + 1;
00411 #ifndef DOS_WINDOWS
00412 signal (SIGPIPE, handle_pipe_error);
00413 #endif
00414 rcs_print_debug (PRINT_CMS_CONFIG_INFO,
00415 "running server for TCP port %d (connection_socket = %d).\n",
00416 dl_ntohs (server_socket_address.sin_port),
00417 connection_socket);
00418
00419 cms_server_count++;
00420 fd_set read_fd_set_copy, write_fd_set_copy;
00421 FD_ZERO (&read_fd_set_copy);
00422 FD_ZERO (&write_fd_set_copy);
00423 RCS_FD_SET (connection_socket, &read_fd_set_copy);
00424
00425 while (1)
00426 {
00427 if (polling_enabled)
00428 {
00429 memcpy (&read_fd_set_copy, &read_fd_set, sizeof (fd_set));
00430 memcpy (&write_fd_set_copy, &write_fd_set, sizeof (fd_set));
00431 ready_descriptors =
00432 dl_select (maxfdpl, &read_fd_set, &write_fd_set, (fd_set *) NULL,
00433 (timeval *) & select_timeout);
00434 if (ready_descriptors == 0)
00435 {
00436 update_subscriptions ();
00437 memcpy (&read_fd_set, &read_fd_set_copy, sizeof (fd_set));
00438 memcpy (&write_fd_set, &write_fd_set_copy, sizeof (fd_set));
00439 continue;
00440 }
00441 }
00442 else
00443 {
00444 ready_descriptors =
00445 dl_select (maxfdpl, &read_fd_set, &write_fd_set, (fd_set *) NULL,
00446 (timeval *) NULL);
00447
00448 }
00449 #ifndef UNDER_CE
00450 if (ready_descriptors < 0)
00451 {
00452 rcs_print_error ("server: select error.(errno = %d | %s)\n",
00453 errno, strerror (errno));
00454 }
00455 #endif
00456 if (NULL == client_ports)
00457 {
00458 rcs_print_error ("CMS_SERVER: List of client ports is NULL.\n");
00459 return;
00460 }
00461 client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head ();
00462 while (NULL != client_port_to_check)
00463 {
00464 if (dl_fd_isset (client_port_to_check->socket_fd, &read_fd_set))
00465 {
00466 #ifdef WIN32
00467 dl_ioctlsocket (client_port_to_check->socket_fd, FIONREAD,
00468 &bytes_ready);
00469 #else
00470 #ifndef VXWORKS
00471 ioctl (client_port_to_check->socket_fd, FIONREAD,
00472 (caddr_t) & bytes_ready);
00473 #else
00474 ioctl (client_port_to_check->socket_fd, FIONREAD,
00475 (int) &bytes_ready);
00476 #endif
00477 #endif
00478 if (bytes_ready <= 0)
00479 {
00480 rcs_print_debug (PRINT_SOCKET_CONNECT,
00481 "Socket closed by host with IP address %s.\n",
00482 dl_inet_ntoa
00483 (client_port_to_check->address.sin_addr));
00484 if (NULL != client_port_to_check->subscriptions)
00485 {
00486 TCP_CLIENT_SUBSCRIPTION_INFO *clnt_sub_info =
00487 (TCP_CLIENT_SUBSCRIPTION_INFO *)
00488 client_port_to_check->subscriptions->get_head ();
00489 while (NULL != clnt_sub_info)
00490 {
00491 if (NULL != clnt_sub_info->sub_buf_info &&
00492 clnt_sub_info->subscription_list_id >= 0)
00493 {
00494 if (NULL !=
00495 clnt_sub_info->sub_buf_info->sub_clnt_info)
00496 {
00497 clnt_sub_info->sub_buf_info->sub_clnt_info->
00498 delete_node (clnt_sub_info->
00499 subscription_list_id);
00500 if (clnt_sub_info->sub_buf_info->
00501 sub_clnt_info->list_size < 1)
00502 {
00503 delete clnt_sub_info->sub_buf_info->
00504 sub_clnt_info;
00505 clnt_sub_info->sub_buf_info->
00506 sub_clnt_info = NULL;
00507 if (NULL != subscription_buffers
00508 && clnt_sub_info->sub_buf_info->
00509 list_id >= 0)
00510 {
00511 subscription_buffers->delete_node
00512 (clnt_sub_info->sub_buf_info->
00513 list_id);
00514 delete clnt_sub_info->sub_buf_info;
00515 clnt_sub_info->sub_buf_info = NULL;
00516 }
00517 }
00518 clnt_sub_info->sub_buf_info = NULL;
00519 }
00520 delete clnt_sub_info;
00521 clnt_sub_info =
00522 (TCP_CLIENT_SUBSCRIPTION_INFO *)
00523 client_port_to_check->subscriptions->
00524 get_next ();
00525 }
00526 delete client_port_to_check->subscriptions;
00527 client_port_to_check->subscriptions = NULL;
00528 recalculate_polling_interval ();
00529 }
00530 }
00531 if (client_port_to_check->threadId > 0
00532 && client_port_to_check->blocking)
00533 {
00534 blocking_thread_kill (client_port_to_check->threadId);
00535 }
00536 dl_closesocket (client_port_to_check->socket_fd);
00537 RCS_FD_CLR (client_port_to_check->socket_fd, &read_fd_set);
00538 client_port_to_check->socket_fd = -1;
00539 delete client_port_to_check;
00540 client_ports->delete_current_node ();
00541 }
00542 else
00543 {
00544 if (client_port_to_check->blocking)
00545 {
00546 if (client_port_to_check->threadId > 0)
00547 {
00548 rcs_print_debug (PRINT_SERVER_THREAD_ACTIVITY,
00549 "Data recieved from %s:%d when it should be blocking (bytes_ready=%d).\n",
00550 dl_inet_ntoa
00551 (client_port_to_check->address.
00552 sin_addr),
00553 client_port_to_check->socket_fd,
00554 bytes_ready);
00555 rcs_print_debug (PRINT_SERVER_THREAD_ACTIVITY,
00556 "Killing handler %d.\n",
00557 client_port_to_check->threadId);
00558
00559 blocking_thread_kill
00560 (client_port_to_check->threadId);
00561 #if 0
00562 *((u_long *) temp_buffer) =
00563 dl_htonl (client_port_to_check->serial_number);
00564 *((u_long *) temp_buffer + 1) =
00565 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
00566 *((u_long *) temp_buffer + 2) = dl_htonl (0);
00567 *((u_long *) temp_buffer + 3) = dl_htonl (0);
00568 *((u_long *) temp_buffer + 4) = dl_htonl (0);
00569 sendn (client_port_to_check->socket_fd, temp_buffer,
00570 20, 0, dtimeout);
00571 #endif
00572 client_port_to_check->threadId = 0;
00573 client_port_to_check->blocking = 0;
00574 }
00575 }
00576 handle_request (client_port_to_check);
00577 }
00578 ready_descriptors--;
00579 }
00580 else
00581 {
00582 RCS_FD_SET (client_port_to_check->socket_fd, &read_fd_set);
00583 }
00584 client_port_to_check =
00585 (CLIENT_TCP_PORT *) client_ports->get_next ();
00586 }
00587 if (dl_fd_isset (connection_socket, &read_fd_set)
00588 && ready_descriptors > 0)
00589 {
00590 ready_descriptors--;
00591 int client_address_length;
00592 new_client_port = new CLIENT_TCP_PORT ();
00593 client_address_length = sizeof (new_client_port->address);
00594 new_client_port->socket_fd = dl_accept (connection_socket,
00595 (struct sockaddr *)
00596 &new_client_port->address,
00597 &client_address_length);
00598 current_clients++;
00599 if (current_clients > max_clients)
00600 {
00601 max_clients = current_clients;
00602 }
00603 if (new_client_port->socket_fd < 0)
00604 {
00605 #ifndef UNDER_CE
00606 rcs_print_error ("server: accept error -- %d %s \n", errno,
00607 strerror (errno));
00608 #endif
00609 continue;
00610 }
00611 rcs_print_debug (PRINT_SOCKET_CONNECT,
00612 "Socket opened by host with IP address %s.\n",
00613 dl_inet_ntoa (new_client_port->address.sin_addr));
00614 new_client_port->serial_number = 0;
00615 new_client_port->blocking = 0;
00616 if (NULL != client_ports)
00617 {
00618 client_ports->store_at_tail (new_client_port,
00619 sizeof (new_client_port), 0);
00620 }
00621 if (maxfdpl < new_client_port->socket_fd + 1)
00622 {
00623 maxfdpl = new_client_port->socket_fd + 1;
00624 }
00625 RCS_FD_SET (new_client_port->socket_fd, &read_fd_set);
00626 }
00627 else
00628 {
00629 RCS_FD_SET (connection_socket, &read_fd_set);
00630 }
00631 if (0 != ready_descriptors)
00632 {
00633 rcs_print_error ("%d descriptors ready but not serviced.\n",
00634 ready_descriptors);
00635 }
00636 update_subscriptions ();
00637 }
00638 }
00639
00640 static int tcpsvr_handle_blocking_request_sigint_count = 0;
00641 static int tcpsvr_last_sig = 0;
00642
00643 void
00644 tcpsvr_handle_blocking_request_sigint_handler (int sig)
00645 {
00646 tcpsvr_last_sig = sig;
00647 tcpsvr_handle_blocking_request_sigint_count++;
00648 }
00649
00650 #if defined(sunos5) || defined(VXWORKS) || defined(POSIX_THREADS) || defined(NO_THREADS) || defined(WIN32) || defined(SGI)
00651 #ifdef VXWORKS
00652 int
00653 tcpsvr_handle_blocking_request (void *_req)
00654 #else
00655 #ifdef WIN32
00656 #ifndef UNDER_CE
00657 void *__cdecl
00658 tcpsvr_handle_blocking_request (void *_req)
00659 #else
00660 unsigned long __stdcall
00661 tcpsvr_handle_blocking_request (void *_req)
00662 #endif
00663 #else
00664 #ifdef SGI
00665 void
00666 tcpsvr_handle_blocking_request (void *_req)
00667 #else
00668 void *
00669 tcpsvr_handle_blocking_request (void *_req)
00670 #endif
00671 #endif
00672 #endif
00673 {
00674 #ifndef UNDER_CE
00675 signal (SIGINT, tcpsvr_handle_blocking_request_sigint_handler);
00676 #endif
00677 TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req =
00678 (TCPSVR_BLOCKING_READ_REQUEST *) _req;
00679 char temp_buffer[0x2000];
00680 if (_req == NULL)
00681 {
00682 tcpsvr_threads_returned_early++;
00683 #ifndef SGI
00684 return 0;
00685 #else
00686 return;
00687 #endif
00688 }
00689 double dtimeout =
00690 ((double) (blocking_read_req->timeout_millis + 10)) / 1000.0;
00691 if (dtimeout < 0)
00692 {
00693 dtimeout = 600.0;
00694 }
00695 if (dtimeout < 0.5)
00696 {
00697 dtimeout = 0.5;
00698 }
00699 if (dtimeout > 600.0)
00700 {
00701 dtimeout = 600.0;
00702 }
00703 CLIENT_TCP_PORT *_client_tcp_port = blocking_read_req->_client_tcp_port;
00704 CMS_SERVER *server = blocking_read_req->server;
00705
00706 if (NULL == server || NULL == _client_tcp_port)
00707 {
00708 tcpsvr_threads_returned_early++;
00709 #ifndef SGI
00710 return 0;
00711 #else
00712 return;
00713 #endif
00714 }
00715 memset (temp_buffer, 0, 0x2000);
00716 REMOTE_BLOCKING_READ_REPLY *read_reply;
00717
00718
00719 if (NULL != _client_tcp_port->diag_info)
00720 {
00721 _client_tcp_port->diag_info->buffer_number =
00722 blocking_read_req->buffer_number;
00723 server->set_diag_info (_client_tcp_port->diag_info);
00724 }
00725 else if (server->diag_enabled)
00726 {
00727 server->reset_diag_info (blocking_read_req->buffer_number);
00728 }
00729
00730 read_reply =
00731 (REMOTE_BLOCKING_READ_REPLY *)
00732 server->process_request (blocking_read_req);
00733 blocking_read_req->read_reply = read_reply;
00734 if (NULL == read_reply)
00735 {
00736 _client_tcp_port->blocking = 0;
00737 rcs_print_error ("Server could not process request.\n");
00738 *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
00739 *((u_long *) temp_buffer + 1) =
00740 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
00741 *((u_long *) temp_buffer + 2) = dl_htonl (0);
00742 *((u_long *) temp_buffer + 3) = dl_htonl (0);
00743 *((u_long *) temp_buffer + 4) = dl_htonl (0);
00744 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
00745 _client_tcp_port->errors++;
00746 _client_tcp_port->blocking_read_req = NULL;
00747 delete blocking_read_req;
00748 _client_tcp_port->threadId = 0;
00749 tcpsvr_threads_returned_early++;
00750 #ifndef SGI
00751 return 0;
00752 #else
00753 return;
00754 #endif
00755 }
00756 *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
00757 *((u_long *) temp_buffer + 1) = dl_htonl (read_reply->status);
00758 *((u_long *) temp_buffer + 2) = dl_htonl (read_reply->size);
00759 *((u_long *) temp_buffer + 3) = dl_htonl (read_reply->write_id);
00760 *((u_long *) temp_buffer + 4) = dl_htonl (read_reply->was_read);
00761 if (read_reply->size < (0x2000 - 20) && read_reply->size > 0)
00762 {
00763 memcpy (temp_buffer + 20, read_reply->data, read_reply->size);
00764 _client_tcp_port->blocking = 0;
00765 if (sendn
00766 (_client_tcp_port->socket_fd, temp_buffer, 20 + read_reply->size, 0,
00767 dtimeout) < 0)
00768 {
00769 _client_tcp_port->blocking = 0;
00770 _client_tcp_port->errors++;
00771 _client_tcp_port->blocking_read_req = NULL;
00772 delete blocking_read_req;
00773 _client_tcp_port->threadId = 0;
00774 tcpsvr_threads_returned_early++;
00775 #ifndef SGI
00776 return 0;
00777 #else
00778 return;
00779 #endif
00780 }
00781 }
00782 else
00783 {
00784 _client_tcp_port->blocking = 0;
00785 if (sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout) <
00786 0)
00787 {
00788 _client_tcp_port->blocking = 0;
00789 _client_tcp_port->errors++;
00790 _client_tcp_port->blocking_read_req = NULL;
00791 delete blocking_read_req;
00792 _client_tcp_port->threadId = 0;
00793 tcpsvr_threads_returned_early++;
00794 #ifndef SGI
00795 return 0;
00796 #else
00797 return;
00798 #endif
00799 }
00800 if (read_reply->size > 0)
00801 {
00802 if (sendn
00803 (_client_tcp_port->socket_fd, read_reply->data,
00804 read_reply->size, 0, dtimeout) < 0)
00805 {
00806 _client_tcp_port->blocking = 0;
00807 _client_tcp_port->errors++;
00808 _client_tcp_port->blocking_read_req = NULL;
00809 delete blocking_read_req;
00810 _client_tcp_port->threadId = 0;
00811 tcpsvr_threads_returned_early++;
00812 #ifndef SGI
00813 return 0;
00814 #else
00815 return;
00816 #endif
00817 }
00818 }
00819 }
00820 _client_tcp_port->blocking_read_req = NULL;
00821 delete blocking_read_req;
00822 _client_tcp_port->threadId = 0;
00823 tcpsvr_threads_exited++;
00824 #if defined(sunos5) && !defined(NO_THREADS)
00825 thr_exit (0);
00826 #endif
00827 #ifdef POSIX_THREADS
00828 pthread_exit (0);
00829 #endif
00830 #ifdef NO_THREADS
00831 exit (0);
00832 #endif
00833 #ifndef SGI
00834 return 0;
00835 #endif
00836 }
00837
00838 #endif
00839
00840 void
00841 CMS_SERVER_REMOTE_TCP_PORT::handle_request (CLIENT_TCP_PORT *
00842 _client_tcp_port)
00843 {
00844 CLIENT_TCP_PORT *client_port_to_check = NULL;
00845 #if defined(WIN32) && !defined(gnuwin32)
00846 DWORD pid = GetCurrentProcessId ();
00847 DWORD tid = GetCurrentThreadId ();
00848 #else
00849 #ifdef VXWORKS
00850 int pid = taskIdSelf ();
00851
00852 int tid = 0;
00853 #else
00854 pid_t pid = getpid ();
00855 pid_t tid = 0;
00856 #endif
00857 #endif
00858 CMS_SERVER *server;
00859 server = find_server (pid, tid);
00860 if (NULL == server)
00861 {
00862 rcs_print_error
00863 ("CMS_SERVER_REMOTE_TCP_PORT::handle_request() Cannot find server object for pid = %d.\n",
00864 pid);
00865 return;
00866 }
00867
00868 if (server->using_passwd_file)
00869 {
00870 current_user_info = get_connected_user (_client_tcp_port->socket_fd);
00871 }
00872
00873 if (_client_tcp_port->errors >= _client_tcp_port->max_errors)
00874 {
00875 rcs_print_error ("Too many errors - closing connection(%d)\n",
00876 _client_tcp_port->socket_fd);
00877 client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head ();
00878 while (NULL != client_port_to_check)
00879 {
00880 if (client_port_to_check->socket_fd == _client_tcp_port->socket_fd)
00881 {
00882 delete client_port_to_check;
00883 client_ports->delete_current_node ();
00884 }
00885 client_port_to_check =
00886 (CLIENT_TCP_PORT *) client_ports->get_next ();
00887 }
00888 dl_closesocket (_client_tcp_port->socket_fd);
00889 current_clients--;
00890 RCS_FD_CLR (_client_tcp_port->socket_fd, &read_fd_set);
00891 _client_tcp_port->socket_fd = -1;
00892 }
00893
00894 if (recvn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, -1, NULL) < 0)
00895 {
00896 rcs_print_error ("Can not read from client port (%d) from %s\n",
00897 _client_tcp_port->socket_fd,
00898 dl_inet_ntoa (_client_tcp_port->address.sin_addr));
00899 _client_tcp_port->errors++;
00900 return;
00901 }
00902 long request_type, buffer_number, received_serial_number;
00903 received_serial_number = dl_ntohl (*((u_long *) temp_buffer));
00904 if (received_serial_number != _client_tcp_port->serial_number)
00905 {
00906 rcs_print_error
00907 ("received_serial_number (%d) does not equal expected serial number.(%d)\n",
00908 received_serial_number, _client_tcp_port->serial_number);
00909 _client_tcp_port->serial_number = received_serial_number;
00910 _client_tcp_port->errors++;
00911 }
00912 _client_tcp_port->serial_number++;
00913 request_type = dl_ntohl (*((u_long *) temp_buffer + 1));
00914 buffer_number = dl_ntohl (*((u_long *) temp_buffer + 2));
00915
00916 rcs_print_debug (PRINT_ALL_SOCKET_REQUESTS,
00917 "TCPSVR request recieved: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n",
00918 _client_tcp_port->socket_fd,
00919 _client_tcp_port->serial_number, request_type,
00920 buffer_number);
00921
00922 if (NULL != _client_tcp_port->diag_info)
00923 {
00924 _client_tcp_port->diag_info->buffer_number = buffer_number;
00925 server->set_diag_info (_client_tcp_port->diag_info);
00926 }
00927 else if (server->diag_enabled)
00928 {
00929 server->reset_diag_info (buffer_number);
00930 }
00931
00932 switch_function (_client_tcp_port,
00933 server,
00934 request_type, buffer_number, received_serial_number);
00935
00936 if (NULL != _client_tcp_port->diag_info &&
00937 NULL != server->last_local_port_used && server->diag_enabled)
00938 {
00939 if (NULL != server->last_local_port_used->cms)
00940 {
00941 if (NULL !=
00942 server->last_local_port_used->cms->handle_to_global_data)
00943 {
00944 _client_tcp_port->diag_info->bytes_moved =
00945 server->last_local_port_used->cms->handle_to_global_data->
00946 total_bytes_moved;
00947 }
00948 }
00949 }
00950 }
00951
00952 void
00953 CMS_SERVER_REMOTE_TCP_PORT::switch_function (CLIENT_TCP_PORT *
00954 _client_tcp_port,
00955 CMS_SERVER * server,
00956 long request_type,
00957 long buffer_number,
00958 long received_serial_number)
00959 {
00960 int total_subdivisions = 1;
00961 CLIENT_TCP_PORT *client_port_to_check = NULL;
00962 switch (request_type)
00963 {
00964 case REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE:
00965 {
00966 if (NULL == _client_tcp_port->diag_info)
00967 {
00968 _client_tcp_port->diag_info = new REMOTE_SET_DIAG_INFO_REQUEST ();
00969 }
00970 if (recvn
00971 (_client_tcp_port->socket_fd, server->set_diag_info_buf, 68, 0,
00972 -1, NULL) < 0)
00973 {
00974 rcs_print_error ("Can not read from client port (%d) from %s\n",
00975 _client_tcp_port->socket_fd,
00976 dl_inet_ntoa (_client_tcp_port->address.
00977 sin_addr));
00978 _client_tcp_port->errors++;
00979 return;
00980 }
00981 _client_tcp_port->diag_info->bytes_moved = 0.0;
00982 _client_tcp_port->diag_info->buffer_number = buffer_number;
00983 memcpy (_client_tcp_port->diag_info->process_name,
00984 server->set_diag_info_buf, 16);
00985 memcpy (_client_tcp_port->diag_info->host_sysinfo,
00986 server->set_diag_info_buf + 16, 32);
00987 _client_tcp_port->diag_info->pid =
00988 dl_htonl (*((u_long *) (server->set_diag_info_buf + 48)));
00989 _client_tcp_port->diag_info->c_num =
00990 dl_htonl (*((u_long *) (server->set_diag_info_buf + 52)));
00991 memcpy (&(_client_tcp_port->diag_info->rcslib_ver),
00992 server->set_diag_info_buf + 56, 8);
00993 _client_tcp_port->diag_info->reverse_flag =
00994 *((int *) ((char *) server->set_diag_info_buf + 64));
00995 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
00996 {
00997 _client_tcp_port->diag_info->rcslib_ver =
00998 (double) tcp_svr_reverse_double ((double)
00999 _client_tcp_port->
01000 diag_info->rcslib_ver);
01001 }
01002 }
01003 break;
01004
01005
01006 case REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE:
01007 {
01008 REMOTE_GET_DIAG_INFO_REQUEST diagreq;
01009 diagreq.buffer_number = buffer_number;
01010 REMOTE_GET_DIAG_INFO_REPLY *diagreply = NULL;
01011 diagreply =
01012 (REMOTE_GET_DIAG_INFO_REPLY *) server->process_request (&diagreq);
01013 if (NULL == diagreply)
01014 {
01015 *((u_long *) temp_buffer) =
01016 dl_htonl (_client_tcp_port->serial_number);
01017 *((u_long *) temp_buffer + 1) =
01018 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01019 if (sendn
01020 (_client_tcp_port->socket_fd, temp_buffer, 24, 0,
01021 dtimeout) < 0)
01022 {
01023 _client_tcp_port->errors++;
01024 }
01025 return;
01026 }
01027 if (NULL == diagreply->cdi)
01028 {
01029 *((u_long *) temp_buffer) =
01030 dl_htonl (_client_tcp_port->serial_number);
01031 *((u_long *) temp_buffer + 1) =
01032 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01033 if (sendn
01034 (_client_tcp_port->socket_fd, temp_buffer, 24, 0,
01035 dtimeout) < 0)
01036 {
01037 _client_tcp_port->errors++;
01038 }
01039 return;
01040 }
01041 memset (temp_buffer, 0, 0x2000);
01042 unsigned long dpi_offset = 32;
01043 *((u_long *) temp_buffer) =
01044 dl_htonl (_client_tcp_port->serial_number);
01045 *((u_long *) temp_buffer + 1) = dl_htonl (diagreply->status);
01046 *((u_long *) temp_buffer + 2) =
01047 dl_htonl (diagreply->cdi->last_writer);
01048 *((u_long *) temp_buffer + 3) =
01049 dl_htonl (diagreply->cdi->last_reader);
01050 double curtime = etime ();
01051 double reversed_temp = 0.0;
01052 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01053 {
01054 reversed_temp =
01055 (double) tcp_svr_reverse_double ((double) curtime);
01056 memcpy (temp_buffer + 16, &reversed_temp, 8);
01057 }
01058 else
01059 {
01060 memcpy (temp_buffer + 16, &(curtime), 8);
01061 }
01062 int dpi_count = 0;
01063 if (NULL != diagreply->cdi->dpis)
01064 {
01065 CMS_DIAG_PROC_INFO *dpi =
01066 (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->get_head ();
01067 while ((dpi_offset < ((int) 0x2000 - sizeof (CMS_DIAG_PROC_INFO)))
01068 && dpi != NULL)
01069 {
01070 dpi_count++;
01071 memcpy (temp_buffer + dpi_offset, dpi->name, 16);
01072 dpi_offset += 16;
01073 memcpy (temp_buffer + dpi_offset, dpi->host_sysinfo, 32);
01074 dpi_offset += 32;
01075 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01076 dl_htonl (dpi->pid);
01077 dpi_offset += 4;
01078 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01079 {
01080 reversed_temp =
01081 (double) tcp_svr_reverse_double ((double)
01082 dpi->rcslib_ver);
01083 memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01084 }
01085 else
01086 {
01087 memcpy (temp_buffer + dpi_offset, &(dpi->rcslib_ver), 8);
01088 }
01089 dpi_offset += 8;
01090 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01091 dl_htonl (dpi->access_type);
01092 dpi_offset += 4;
01093 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01094 dl_htonl (dpi->msg_id);
01095 dpi_offset += 4;
01096 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01097 dl_htonl (dpi->msg_size);
01098 dpi_offset += 4;
01099 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01100 dl_htonl (dpi->msg_type);
01101 dpi_offset += 4;
01102 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01103 dl_htonl (dpi->number_of_accesses);
01104 dpi_offset += 4;
01105 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01106 dl_htonl (dpi->number_of_new_messages);
01107 dpi_offset += 4;
01108 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01109 {
01110 reversed_temp =
01111 (double) tcp_svr_reverse_double ((double)
01112 dpi->bytes_moved);
01113 memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01114 }
01115 else
01116 {
01117 memcpy (temp_buffer + dpi_offset, &(dpi->bytes_moved), 8);
01118 }
01119 dpi_offset += 8;
01120 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01121 {
01122 reversed_temp =
01123 (double) tcp_svr_reverse_double ((double)
01124 dpi->
01125 bytes_moved_across_socket);
01126 memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01127 }
01128 else
01129 {
01130 memcpy (temp_buffer + dpi_offset,
01131 &(dpi->bytes_moved_across_socket), 8);
01132 }
01133 dpi_offset += 8;
01134 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01135 {
01136 reversed_temp =
01137 (double) tcp_svr_reverse_double ((double)
01138 dpi->last_access_time);
01139 memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01140 }
01141 else
01142 {
01143 memcpy (temp_buffer + dpi_offset,
01144 &(dpi->last_access_time), 8);
01145 }
01146 dpi_offset += 8;
01147 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01148 {
01149 reversed_temp =
01150 (double) tcp_svr_reverse_double ((double)
01151 dpi->
01152 first_access_time);
01153 memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01154 }
01155 else
01156 {
01157 memcpy (temp_buffer + dpi_offset,
01158 &(dpi->first_access_time), 8);
01159 }
01160 dpi_offset += 8;
01161 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01162 {
01163 reversed_temp =
01164 (double) tcp_svr_reverse_double ((double)
01165 dpi->min_difference);
01166 memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01167 }
01168 else
01169 {
01170 memcpy (temp_buffer + dpi_offset, &(dpi->min_difference),
01171 8);
01172 }
01173 dpi_offset += 8;
01174 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01175 {
01176 reversed_temp =
01177 (double) tcp_svr_reverse_double ((double)
01178 dpi->max_difference);
01179 memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01180 }
01181 else
01182 {
01183 memcpy (temp_buffer + dpi_offset, &(dpi->max_difference),
01184 8);
01185 }
01186 dpi_offset += 8;
01187 int is_last_writer = (dpi == diagreply->cdi->last_writer_dpi);
01188 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01189 dl_htonl (is_last_writer);
01190 dpi_offset += 4;
01191 int is_last_reader = (dpi == diagreply->cdi->last_reader_dpi);
01192 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01193 dl_htonl (is_last_reader);
01194 dpi_offset += 4;
01195 dpi =
01196 (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->get_next ();
01197 }
01198 }
01199 *((u_long *) temp_buffer + 6) = dl_htonl (dpi_count);
01200 *((u_long *) temp_buffer + 7) = dl_htonl (dpi_offset);
01201 if (sendn
01202 (_client_tcp_port->socket_fd, temp_buffer, dpi_offset, 0,
01203 dtimeout) < 0)
01204 {
01205 _client_tcp_port->errors++;
01206 return;
01207 }
01208 }
01209 break;
01210
01211 case REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE:
01212 {
01213 }
01214 break;
01215
01216 case REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE:
01217 {
01218 REMOTE_GET_BUF_NAME_REQUEST namereq;
01219 namereq.buffer_number = buffer_number;
01220 REMOTE_GET_BUF_NAME_REPLY *namereply = NULL;
01221 namereply =
01222 (REMOTE_GET_BUF_NAME_REPLY *) server->process_request (&namereq);
01223 memset (temp_buffer, 0, 40);
01224 if (NULL != namereply)
01225 {
01226 *((u_long *) temp_buffer) =
01227 dl_htonl (_client_tcp_port->serial_number);
01228 *((u_long *) temp_buffer + 1) = dl_htonl (namereply->status);
01229 strncpy (temp_buffer + 8, namereply->name, 31);
01230 if (sendn
01231 (_client_tcp_port->socket_fd, temp_buffer, 40, 0,
01232 dtimeout) < 0)
01233 {
01234 _client_tcp_port->errors++;
01235 return;
01236 }
01237 }
01238 else
01239 {
01240 *((u_long *) temp_buffer) =
01241 dl_htonl (_client_tcp_port->serial_number);
01242 *((u_long *) temp_buffer + 1) =
01243 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01244 if (sendn
01245 (_client_tcp_port->socket_fd, temp_buffer, 40, 0,
01246 dtimeout) < 0)
01247 {
01248 _client_tcp_port->errors++;
01249 return;
01250 }
01251 }
01252 }
01253 break;
01254
01255 case REMOTE_CMS_BLOCKING_READ_REQUEST_TYPE:
01256 {
01257 if (NULL == _client_tcp_port->blocking_read_req)
01258 {
01259 _client_tcp_port->blocking_read_req =
01260 new TCPSVR_BLOCKING_READ_REQUEST ();
01261 }
01262 TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req =
01263 _client_tcp_port->blocking_read_req;
01264 blocking_read_req->buffer_number = buffer_number;
01265 blocking_read_req->access_type =
01266 dl_ntohl (*((u_long *) temp_buffer + 3));
01267 blocking_read_req->last_id_read =
01268 dl_ntohl (*((u_long *) temp_buffer + 4));
01269 total_subdivisions = 1;
01270 if (max_total_subdivisions > 1)
01271 {
01272 total_subdivisions =
01273 server->get_total_subdivisions (buffer_number);
01274 }
01275 if (total_subdivisions > 1)
01276 {
01277 if (recvn
01278 (_client_tcp_port->socket_fd,
01279 (char *) (((u_long *) temp_buffer) + 5), 8, 0, -1, NULL) < 0)
01280 {
01281 rcs_print_error
01282 ("Can not read from client port (%d) from %s\n",
01283 _client_tcp_port->socket_fd,
01284 dl_inet_ntoa (_client_tcp_port->address.sin_addr));
01285 _client_tcp_port->errors++;
01286 return;
01287 }
01288 blocking_read_req->subdiv =
01289 dl_ntohl (*((u_long *) temp_buffer + 6));
01290 }
01291 else
01292 {
01293 if (recvn
01294 (_client_tcp_port->socket_fd,
01295 (char *) (((u_long *) temp_buffer) + 5), 4, 0, -1, NULL) < 0)
01296 {
01297 rcs_print_error
01298 ("Can not read from client port (%d) from %s\n",
01299 _client_tcp_port->socket_fd,
01300 dl_inet_ntoa (_client_tcp_port->address.sin_addr));
01301 _client_tcp_port->errors++;
01302 return;
01303 }
01304 }
01305 blocking_read_req->timeout_millis =
01306 dl_ntohl (*((u_long *) temp_buffer + 5));
01307 blocking_read_req->server = server;
01308 blocking_read_req->remport = this;
01309 _client_tcp_port->blocking = 1;
01310 blocking_read_req->_client_tcp_port = _client_tcp_port;
01311 #if defined(sunos5) && !defined(NO_THREADS)
01312 int thr_retval = thr_create (NULL,
01313 0,
01314 tcpsvr_handle_blocking_request,
01315 blocking_read_req,
01316 THR_BOUND | THR_NEW_LWP,
01317 &(_client_tcp_port->threadId)
01318 );
01319 if (thr_retval != 0)
01320 {
01321 _client_tcp_port->blocking = 1;
01322 rcs_print_error ("thr_create error: thr_retval = %d\n",
01323 thr_retval);
01324 rcs_print_error ("thr_create error: %d %s\n", errno,
01325 strerror (errno));
01326 *((u_long *) temp_buffer) =
01327 dl_htonl (_client_tcp_port->serial_number);
01328 *((u_long *) temp_buffer + 1) =
01329 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01330 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01331 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01332 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01333 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01334 return;
01335 }
01336
01337 #else
01338 #ifdef VXWORKS
01339 _client_tcp_port->threadId = taskSpawn (NULL,
01340 cms_server_task_priority,
01341 VX_FP_TASK,
01342 cms_server_task_stack_size,
01343 ((FUNCPTR) tcpsvr_handle_blocking_request),
01344 blocking_read_req,
01345 0, 0, 0,
01346 0, 0, 0, 0, 0, 0);
01347 if (_client_tcp_port->threadId == ERROR)
01348 {
01349 _client_tcp_port->blocking = 1;
01350 rcs_print_error ("taskSpawn error: %d %s\n",
01351 errno, strerror (errno));
01352 *((u_long *) temp_buffer) =
01353 dl_htonl (_client_tcp_port->serial_number);
01354 *((u_long *) temp_buffer + 1) =
01355 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01356 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01357 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01358 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01359 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01360 return;
01361 }
01362 #else
01363 #ifdef POSIX_THREADS
01364 int thr_retval = pthread_create (&(_client_tcp_port->threadId),
01365 NULL,
01366 tcpsvr_handle_blocking_request,
01367 blocking_read_req
01368 );
01369 if (thr_retval != 0)
01370 {
01371 _client_tcp_port->blocking = 0;
01372 rcs_print_error ("pthread_create error: thr_retval = %d\n",
01373 thr_retval);
01374 rcs_print_error ("pthread_create error: %d %s\n", errno,
01375 strerror (errno));
01376 *((u_long *) temp_buffer) =
01377 dl_htonl (_client_tcp_port->serial_number);
01378 *((u_long *) temp_buffer + 1) =
01379 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01380 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01381 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01382 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01383 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01384 return;
01385 }
01386 #else
01387 #ifdef NO_THREADS
01388 int fork_ret = fork ();
01389 switch (fork_ret)
01390 {
01391 case 0:
01392 _client_tcp_port->threadId = getpid ();
01393 tcpsvr_handle_blocking_request (blocking_read_req);
01394 exit (0);
01395 break;
01396
01397 case -1:
01398 #ifndef UNDER_CE
01399 rcs_print_error ("fork error: %d %s\n", errno, strerror (errno));
01400 #endif
01401 _client_tcp_port->blocking = 0;
01402 *((u_long *) temp_buffer) =
01403 dl_htonl (_client_tcp_port->serial_number);
01404 *((u_long *) temp_buffer + 1) =
01405 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01406 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01407 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01408 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01409 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01410 break;
01411
01412 default:
01413 _client_tcp_port->threadId = fork_ret;
01414 break;
01415 }
01416 #else
01417 #ifdef WIN32
01418 #ifndef UNDER_CE
01419 _client_tcp_port->threadId = _beginthread ((void (__cdecl *) (void *)) tcpsvr_handle_blocking_request,
01420 0,
01421 blocking_read_req
01422 );
01423 #else
01424 unsigned long temp_id;
01425 HANDLE temp_handle = CreateThread (NULL,
01426 4096,
01427 tcpsvr_handle_blocking_request,
01428 blocking_read_req,
01429 0,
01430 &temp_id);
01431 _client_tcp_port->threadId = (int) temp_id;
01432 if (NULL == temp_handle)
01433 {
01434 rcs_print_error ("CreateThread failed. GetLastError = %d\n",
01435 GetLastError ());
01436 }
01437 #endif
01438 if (_client_tcp_port->threadId < 1)
01439 {
01440 _client_tcp_port->blocking = 0;
01441 rcs_print_sys_error (ERRNO_ERROR_SOURCE, "CreateTask error");
01442 *((u_long *) temp_buffer) =
01443 dl_htonl (_client_tcp_port->serial_number);
01444 *((u_long *) temp_buffer + 1) =
01445 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01446 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01447 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01448 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01449 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01450 }
01451 #else
01452 #ifdef SGI
01453 _client_tcp_port->threadId = sproc (tcpsvr_handle_blocking_request,
01454 0,
01455 blocking_read_req
01456 );
01457 if (_client_tcp_port->threadId < 1)
01458 {
01459 _client_tcp_port->blocking = 0;
01460 rcs_print_sys_error (ERRNO_ERROR_SOURCE, "sproc error");
01461 *((u_long *) temp_buffer) =
01462 dl_htonl (_client_tcp_port->serial_number);
01463 *((u_long *) temp_buffer + 1) =
01464 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01465 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01466 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01467 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01468 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01469 }
01470 #else
01471 rcs_print_error ("Blocking read not supported on this platform.\n");
01472 *((u_long *) temp_buffer) =
01473 dl_htonl (_client_tcp_port->serial_number);
01474 *((u_long *) temp_buffer + 1) =
01475 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01476 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01477 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01478 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01479 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01480 return;
01481
01482
01483 #endif
01484
01485 #endif
01486
01487 #endif
01488
01489 #endif
01490
01491 #endif
01492
01493 #endif
01494 tcpsvr_threads_created++;
01495 }
01496 break;
01497
01498
01499 case REMOTE_CMS_READ_REQUEST_TYPE:
01500 server->read_req.buffer_number = buffer_number;
01501 server->read_req.access_type = dl_ntohl (*((u_long *) temp_buffer + 3));
01502 server->read_req.last_id_read =
01503 dl_ntohl (*((u_long *) temp_buffer + 4));
01504 server->read_reply =
01505 (REMOTE_READ_REPLY *) server->process_request (&server->read_req);
01506 if (max_total_subdivisions > 1)
01507 {
01508 total_subdivisions = server->get_total_subdivisions (buffer_number);
01509 }
01510 if (total_subdivisions > 1)
01511 {
01512 if (recvn
01513 (_client_tcp_port->socket_fd,
01514 (char *) (((u_long *) temp_buffer) + 5), 4, 0, -1, NULL) < 0)
01515 {
01516 rcs_print_error ("Can not read from client port (%d) from %s\n",
01517 _client_tcp_port->socket_fd,
01518 dl_inet_ntoa (_client_tcp_port->address.
01519 sin_addr));
01520 _client_tcp_port->errors++;
01521 return;
01522 }
01523 server->read_req.subdiv = dl_ntohl (*((u_long *) temp_buffer + 5));
01524 }
01525 else
01526 {
01527 server->read_req.subdiv = 0;
01528 }
01529 if (NULL == server->read_reply)
01530 {
01531 rcs_print_error ("Server could not process request.\n");
01532 *((u_long *) temp_buffer) =
01533 dl_htonl (_client_tcp_port->serial_number);
01534 *((u_long *) temp_buffer + 1) =
01535 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01536 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01537 *((u_long *) temp_buffer + 3) = dl_htonl (0);
01538 *((u_long *) temp_buffer + 4) = dl_htonl (0);
01539 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01540 return;
01541 }
01542 *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
01543 *((u_long *) temp_buffer + 1) = dl_htonl (server->read_reply->status);
01544 *((u_long *) temp_buffer + 2) = dl_htonl (server->read_reply->size);
01545 *((u_long *) temp_buffer + 3) = dl_htonl (server->read_reply->write_id);
01546 *((u_long *) temp_buffer + 4) = dl_htonl (server->read_reply->was_read);
01547 if (server->read_reply->size < (0x2000 - 20)
01548 && server->read_reply->size > 0)
01549 {
01550 memcpy (temp_buffer + 20, server->read_reply->data,
01551 server->read_reply->size);
01552 if (sendn
01553 (_client_tcp_port->socket_fd, temp_buffer,
01554 20 + server->read_reply->size, 0, dtimeout) < 0)
01555 {
01556 _client_tcp_port->errors++;
01557 return;
01558 }
01559 }
01560 else
01561 {
01562 if (sendn
01563 (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout) < 0)
01564 {
01565 _client_tcp_port->errors++;
01566 return;
01567 }
01568 if (server->read_reply->size > 0)
01569 {
01570 if (sendn
01571 (_client_tcp_port->socket_fd, server->read_reply->data,
01572 server->read_reply->size, 0, dtimeout) < 0)
01573 {
01574 _client_tcp_port->errors++;
01575 return;
01576 }
01577 }
01578 }
01579 break;
01580
01581 case REMOTE_CMS_WRITE_REQUEST_TYPE:
01582 server->write_req.buffer_number = buffer_number;
01583 server->write_req.access_type =
01584 dl_ntohl (*((u_long *) temp_buffer + 3));
01585 server->write_req.size = dl_ntohl (*((u_long *) temp_buffer + 4));
01586 total_subdivisions = 1;
01587 if (max_total_subdivisions > 1)
01588 {
01589 total_subdivisions = server->get_total_subdivisions (buffer_number);
01590 }
01591 if (total_subdivisions > 1)
01592 {
01593 if (recvn
01594 (_client_tcp_port->socket_fd,
01595 (char *) (((u_long *) temp_buffer) + 5), 4, 0, -1, NULL) < 0)
01596 {
01597 rcs_print_error ("Can not read from client port (%d) from %s\n",
01598 _client_tcp_port->socket_fd,
01599 dl_inet_ntoa (_client_tcp_port->address.
01600 sin_addr));
01601 _client_tcp_port->errors++;
01602 return;
01603 }
01604 server->write_req.subdiv = dl_ntohl (*((u_long *) temp_buffer + 5));
01605 }
01606 else
01607 {
01608 server->write_req.subdiv = 0;
01609 }
01610 if (server->write_req.size > 0)
01611 {
01612 if (recvn
01613 (_client_tcp_port->socket_fd, server->write_req.data,
01614 server->write_req.size, 0, -1, NULL) < 0)
01615 {
01616 _client_tcp_port->errors++;
01617 return;
01618 }
01619 }
01620 server->write_reply =
01621 (REMOTE_WRITE_REPLY *) server->process_request (&server->write_req);
01622 if (min_compatible_version < 2.58 && min_compatible_version > 1e-6
01623 || confirm_write)
01624 {
01625 if (NULL == server->write_reply)
01626 {
01627 rcs_print_error ("Server could not process request.\n");
01628 *((u_long *) temp_buffer) =
01629 dl_htonl (_client_tcp_port->serial_number);
01630 *((u_long *) temp_buffer + 1) =
01631 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01632 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01633 sendn (_client_tcp_port->socket_fd, temp_buffer, 12, 0,
01634 dtimeout);
01635 return;
01636 }
01637 *((u_long *) temp_buffer) =
01638 dl_htonl (_client_tcp_port->serial_number);
01639 *((u_long *) temp_buffer + 1) =
01640 dl_htonl (server->write_reply->status);
01641 *((u_long *) temp_buffer + 2) =
01642 dl_htonl (server->write_reply->was_read);
01643 if (sendn
01644 (_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) < 0)
01645 {
01646 _client_tcp_port->errors++;
01647 }
01648 }
01649 else
01650 {
01651 if (NULL == server->write_reply)
01652 {
01653 rcs_print_error ("Server could not process request.\n");
01654 }
01655 }
01656 break;
01657
01658 case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE:
01659 server->check_if_read_req.buffer_number = buffer_number;
01660 server->check_if_read_req.subdiv =
01661 dl_ntohl (*((u_long *) temp_buffer + 3));
01662 server->check_if_read_reply =
01663 (REMOTE_CHECK_IF_READ_REPLY *) server->process_request (&server->
01664 check_if_read_req);
01665 if (NULL == server->check_if_read_reply)
01666 {
01667 rcs_print_error ("Server could not process request.\n");
01668 *((u_long *) temp_buffer) =
01669 dl_htonl (_client_tcp_port->serial_number);
01670 *((u_long *) temp_buffer + 1) =
01671 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01672 *((u_long *) temp_buffer + 2) = dl_htonl (0);
01673 sendn (_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout);
01674 return;
01675 }
01676 *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
01677 *((u_long *) temp_buffer + 1) =
01678 dl_htonl (server->check_if_read_reply->status);
01679 *((u_long *) temp_buffer + 2) =
01680 dl_htonl (server->check_if_read_reply->was_read);
01681 if (sendn (_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) <
01682 0)
01683 {
01684 _client_tcp_port->errors++;
01685 }
01686 break;
01687
01688 case REMOTE_CMS_CLEAR_REQUEST_TYPE:
01689 server->clear_req.buffer_number = buffer_number;
01690 server->clear_req.subdiv = dl_ntohl (*((u_long *) temp_buffer + 3));
01691 server->clear_reply =
01692 (REMOTE_CLEAR_REPLY *) server->process_request (&server->clear_req);
01693 if (NULL == server->clear_reply)
01694 {
01695 rcs_print_error ("Server could not process request.\n");
01696 *((u_long *) temp_buffer) =
01697 dl_htonl (_client_tcp_port->serial_number);
01698 *((u_long *) temp_buffer + 1) =
01699 dl_htonl ((u_long) CMS_SERVER_SIDE_ERROR);
01700 sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01701 return;
01702 }
01703 *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
01704 *((u_long *) temp_buffer + 1) = dl_htonl (server->clear_reply->status);
01705 if (sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout) <
01706 0)
01707 {
01708 _client_tcp_port->errors++;
01709 }
01710 break;
01711
01712 case REMOTE_CMS_CLEAN_REQUEST_TYPE:
01713 server->spawner_pid = server->server_pid;
01714 server->kill_server ();
01715 break;
01716
01717 case REMOTE_CMS_CLOSE_CHANNEL_REQUEST_TYPE:
01718 client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head ();
01719 while (NULL != client_port_to_check)
01720 {
01721 if (client_port_to_check->socket_fd == _client_tcp_port->socket_fd)
01722 {
01723 break;
01724 }
01725 client_port_to_check =
01726 (CLIENT_TCP_PORT *) client_ports->get_next ();
01727 }
01728 RCS_FD_CLR (_client_tcp_port->socket_fd, &read_fd_set);
01729 dl_closesocket (_client_tcp_port->socket_fd);
01730 current_clients--;
01731 if (NULL != _client_tcp_port->subscriptions)
01732 {
01733 remove_subscription_client (_client_tcp_port, buffer_number);
01734 }
01735 _client_tcp_port->socket_fd = -1;
01736 delete _client_tcp_port;
01737 client_ports->delete_current_node ();
01738 break;
01739
01740 #ifndef UNDER_CE
01741 case REMOTE_CMS_GET_KEYS_REQUEST_TYPE:
01742 server->get_keys_req.buffer_number = buffer_number;
01743 if (recvn (_client_tcp_port->socket_fd,
01744 server->get_keys_req.name, 16, 0, -1, NULL) < 0)
01745 {
01746 _client_tcp_port->errors++;
01747 return;
01748 }
01749 server->get_keys_reply =
01750 (REMOTE_GET_KEYS_REPLY *) server->process_request (&server->
01751 get_keys_req);
01752 if (NULL == server->get_keys_reply)
01753 {
01754 rcs_print_error ("Server could not process request.\n");
01755 memset (temp_buffer, 0, 20);
01756 *((u_long *) temp_buffer) =
01757 dl_htonl (_client_tcp_port->serial_number);
01758 server->gen_random_key (((char *) temp_buffer) + 4, 2);
01759 server->gen_random_key (((char *) temp_buffer) + 12, 2);
01760 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01761 return;
01762 }
01763 else
01764 {
01765 *((u_long *) temp_buffer) =
01766 dl_htonl (_client_tcp_port->serial_number);
01767 memcpy (((char *) temp_buffer) + 4, server->get_keys_reply->key1,
01768 8);
01769 memcpy (((char *) temp_buffer) + 12, server->get_keys_reply->key2,
01770 8);
01771
01772 sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01773 return;
01774 }
01775 break;
01776
01777 case REMOTE_CMS_LOGIN_REQUEST_TYPE:
01778 server->login_req.buffer_number = buffer_number;
01779 if (recvn (_client_tcp_port->socket_fd,
01780 server->login_req.name, 16, 0, -1, NULL) < 0)
01781 {
01782 _client_tcp_port->errors++;
01783 return;
01784 }
01785 if (recvn (_client_tcp_port->socket_fd,
01786 server->login_req.passwd, 16, 0, -1, NULL) < 0)
01787 {
01788 _client_tcp_port->errors++;
01789 return;
01790 }
01791 server->login_reply =
01792 (REMOTE_LOGIN_REPLY *) server->process_request (&server->login_req);
01793 if (NULL == server->login_reply)
01794 {
01795 rcs_print_error ("Server could not process request.\n");
01796 *((u_long *) temp_buffer) =
01797 dl_htonl (_client_tcp_port->serial_number);
01798 *((u_long *) temp_buffer + 1) = dl_htonl (0);
01799 sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01800 return;
01801 }
01802 else
01803 {
01804 *((u_long *) temp_buffer) =
01805 dl_htonl (_client_tcp_port->serial_number);
01806 *((u_long *) temp_buffer + 1) =
01807 dl_htonl (server->login_reply->success);
01808
01809 sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01810 return;
01811 }
01812 break;
01813
01814 #endif
01815
01816
01817 case REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE:
01818 server->set_subscription_req.buffer_number = buffer_number;
01819 server->set_subscription_req.subscription_type =
01820 dl_ntohl (*((u_long *) temp_buffer + 3));
01821 server->set_subscription_req.poll_interval_millis =
01822 dl_ntohl (*((u_long *) temp_buffer + 4));
01823 server->set_subscription_reply =
01824 (REMOTE_SET_SUBSCRIPTION_REPLY *) server->process_request (&server->
01825 set_subscription_req);
01826 if (NULL == server->set_subscription_reply)
01827 {
01828 rcs_print_error ("Server could not process request.\n");
01829 *((u_long *) temp_buffer) =
01830 dl_htonl (_client_tcp_port->serial_number);
01831 *((u_long *) temp_buffer + 1) = dl_htonl (0);
01832 sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01833 return;
01834 }
01835 else
01836 {
01837 if (server->set_subscription_reply->success)
01838 {
01839 if (server->set_subscription_req.subscription_type ==
01840 CMS_POLLED_SUBSCRIPTION
01841 || server->set_subscription_req.subscription_type ==
01842 CMS_VARIABLE_SUBSCRIPTION)
01843 {
01844 add_subscription_client (buffer_number,
01845 server->set_subscription_req.
01846 subscription_type,
01847 server->set_subscription_req.
01848 poll_interval_millis,
01849 _client_tcp_port);
01850 }
01851 if (server->set_subscription_req.subscription_type ==
01852 CMS_NO_SUBSCRIPTION)
01853 {
01854 remove_subscription_client (_client_tcp_port,
01855 buffer_number);
01856 }
01857 }
01858 *((u_long *) temp_buffer) =
01859 dl_htonl (_client_tcp_port->serial_number);
01860 *((u_long *) temp_buffer + 1) =
01861 dl_htonl (server->set_subscription_reply->success);
01862
01863 sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01864 return;
01865 }
01866 break;
01867
01868 default:
01869 _client_tcp_port->errors++;
01870 rcs_print_error ("Unrecognized request type received.(%ld)\n",
01871 request_type);
01872 break;
01873 }
01874 }
01875
01876
01877
01878
01879
01880
01881 void
01882 CMS_SERVER_REMOTE_TCP_PORT::add_subscription_client (int buffer_number,
01883 int subscription_type,
01884 int poll_interval_millis,
01885 CLIENT_TCP_PORT * clnt)
01886 {
01887 if (NULL == subscription_buffers)
01888 {
01889 subscription_buffers = new RCS_LINKED_LIST ();
01890 }
01891 if (NULL == subscription_buffers)
01892 {
01893 rcs_print_error ("Can`t create subscription_buffers list.\n");
01894 }
01895
01896 TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
01897 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
01898 while (NULL != buf_info)
01899 {
01900 if (buf_info->buffer_number == buffer_number)
01901 {
01902 break;
01903 }
01904 buf_info =
01905 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next ();
01906 }
01907 if (NULL == buf_info)
01908 {
01909 buf_info = new TCP_BUFFER_SUBSCRIPTION_INFO ();
01910 buf_info->buffer_number = buffer_number;
01911 buf_info->sub_clnt_info = new RCS_LINKED_LIST ();
01912 buf_info->list_id =
01913 subscription_buffers->store_at_tail (buf_info, sizeof (*buf_info), 0);
01914 }
01915 buf_info->min_last_id = 0;
01916 if (NULL == clnt->subscriptions)
01917 {
01918 clnt->subscriptions = new RCS_LINKED_LIST ();
01919 }
01920 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
01921 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head ();
01922 while (temp_clnt_info != NULL)
01923 {
01924 if (temp_clnt_info->buffer_number == buffer_number)
01925 {
01926 break;
01927 }
01928 temp_clnt_info =
01929 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next ();
01930 }
01931 if (NULL == temp_clnt_info)
01932 {
01933 temp_clnt_info = new TCP_CLIENT_SUBSCRIPTION_INFO ();
01934 temp_clnt_info->last_sub_sent_time = 0.0;
01935 temp_clnt_info->buffer_number = buffer_number;
01936 temp_clnt_info->subscription_paused = 0;
01937 temp_clnt_info->last_id_read = 0;
01938 temp_clnt_info->sub_buf_info = buf_info;
01939 temp_clnt_info->clnt_port = clnt;
01940 temp_clnt_info->last_sub_sent_time = etime ();
01941 temp_clnt_info->subscription_list_id =
01942 clnt->subscriptions->store_at_tail (temp_clnt_info,
01943 sizeof (*temp_clnt_info), 0);
01944 buf_info->sub_clnt_info->store_at_tail (temp_clnt_info,
01945 sizeof (*temp_clnt_info), 0);
01946 }
01947 temp_clnt_info->subscription_type = subscription_type;
01948 temp_clnt_info->poll_interval_millis = poll_interval_millis;
01949 recalculate_polling_interval ();
01950 }
01951
01952
01953 void
01954 CMS_SERVER_REMOTE_TCP_PORT::remove_subscription_client (CLIENT_TCP_PORT *
01955 clnt,
01956 int buffer_number)
01957 {
01958 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
01959 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head ();
01960 while (temp_clnt_info != NULL)
01961 {
01962 if (temp_clnt_info->buffer_number == buffer_number)
01963 {
01964 if (NULL != temp_clnt_info->sub_buf_info)
01965 {
01966 if (NULL != temp_clnt_info->sub_buf_info->sub_clnt_info)
01967 {
01968 temp_clnt_info->sub_buf_info->sub_clnt_info->
01969 delete_node (temp_clnt_info->subscription_list_id);
01970 if (temp_clnt_info->sub_buf_info->sub_clnt_info->
01971 list_size == 0)
01972 {
01973 subscription_buffers->delete_node (temp_clnt_info->
01974 sub_buf_info->
01975 list_id);
01976 delete temp_clnt_info->sub_buf_info->sub_clnt_info;
01977 temp_clnt_info->sub_buf_info->sub_clnt_info = NULL;
01978 delete temp_clnt_info->sub_buf_info;
01979 temp_clnt_info->sub_buf_info = NULL;
01980 }
01981 }
01982 }
01983 delete temp_clnt_info;
01984 temp_clnt_info = NULL;
01985 break;
01986 }
01987 temp_clnt_info =
01988 (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next ();
01989 }
01990 recalculate_polling_interval ();
01991 }
01992
01993 void
01994 CMS_SERVER_REMOTE_TCP_PORT::recalculate_polling_interval ()
01995 {
01996 int min_poll_interval_millis = 30000;
01997 polling_enabled = 0;
01998 TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
01999 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
02000 while (NULL != buf_info)
02001 {
02002 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
02003 (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_head ();
02004 while (temp_clnt_info != NULL)
02005 {
02006 if (temp_clnt_info->poll_interval_millis < min_poll_interval_millis
02007 && temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION)
02008 {
02009 min_poll_interval_millis = temp_clnt_info->poll_interval_millis;
02010 polling_enabled = 1;
02011 }
02012 temp_clnt_info =
02013 (TCP_CLIENT_SUBSCRIPTION_INFO *)
02014 buf_info->sub_clnt_info->get_next ();
02015 }
02016 buf_info =
02017 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next ();
02018 }
02019 if (min_poll_interval_millis >= ((int) (clk_tck () * 1000.0)))
02020 {
02021 current_poll_interval_millis = min_poll_interval_millis;
02022 }
02023 else
02024 {
02025 current_poll_interval_millis = ((int) (clk_tck () * 1000.0));
02026 }
02027 select_timeout.tv_sec = current_poll_interval_millis / 1000;
02028 select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000;
02029 dtimeout = (current_poll_interval_millis + 10) * 1000.0;
02030 if (dtimeout < 0.5)
02031 {
02032 dtimeout = 0.5;
02033 }
02034 }
02035
02036
02037 void
02038 CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions ()
02039 {
02040 #if defined(WIN32) && !defined(gnuwin32)
02041 DWORD pid = GetCurrentProcessId ();
02042 DWORD tid = GetCurrentThreadId ();
02043 #else
02044 #ifdef VXWORKS
02045 int pid = taskIdSelf ();
02046 int tid = 0;
02047 #else
02048 pid_t pid = getpid ();
02049 pid_t tid = 0;
02050 #endif
02051 #endif
02052 CMS_SERVER *server;
02053 server = find_server (pid, tid);
02054 if (NULL == server)
02055 {
02056 rcs_print_error
02057 ("CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions Cannot find server object for pid = %d.\n",
02058 pid);
02059 return;
02060 }
02061 if (NULL == subscription_buffers)
02062 {
02063 return;
02064 }
02065 double cur_time = etime ();
02066 TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
02067 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
02068 while (NULL != buf_info)
02069 {
02070 server->read_req.buffer_number = buf_info->buffer_number;
02071 server->read_req.access_type = CMS_READ_ACCESS;
02072 server->read_req.last_id_read = buf_info->min_last_id;
02073 server->read_reply =
02074 (REMOTE_READ_REPLY *) server->process_request (&server->read_req);
02075 if (NULL == server->read_reply)
02076 {
02077 rcs_print_error ("Server could not process request.\n");
02078 buf_info =
02079 (TCP_BUFFER_SUBSCRIPTION_INFO *)
02080 subscription_buffers->get_next ();
02081 continue;
02082 }
02083 if (server->read_reply->write_id == buf_info->min_last_id ||
02084 server->read_reply->size < 1)
02085 {
02086 buf_info =
02087 (TCP_BUFFER_SUBSCRIPTION_INFO *)
02088 subscription_buffers->get_next ();
02089 continue;
02090 }
02091 *((u_long *) temp_buffer) = 0;
02092 *((u_long *) temp_buffer + 1) = dl_htonl (server->read_reply->status);
02093 *((u_long *) temp_buffer + 2) = dl_htonl (server->read_reply->size);
02094 *((u_long *) temp_buffer + 3) = dl_htonl (server->read_reply->write_id);
02095 *((u_long *) temp_buffer + 4) = dl_htonl (server->read_reply->was_read);
02096 TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
02097 (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_head ();
02098 buf_info->min_last_id = server->read_reply->write_id;
02099 while (temp_clnt_info != NULL)
02100 {
02101 double time_diff = cur_time - temp_clnt_info->last_sub_sent_time;
02102 int time_diff_millis = (int) ((double) time_diff * 1000.0);
02103 rcs_print_debug (PRINT_SERVER_SUBSCRIPTION_ACTIVITY,
02104 "Subscription time_diff_millis=%d\n",
02105 time_diff_millis);
02106 if (
02107 ((temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION
02108 && time_diff_millis + 10 >=
02109 temp_clnt_info->poll_interval_millis)
02110 || temp_clnt_info->subscription_type ==
02111 CMS_VARIABLE_SUBSCRIPTION)
02112 && temp_clnt_info->last_id_read != server->read_reply->write_id)
02113 {
02114 temp_clnt_info->last_id_read = server->read_reply->write_id;
02115 temp_clnt_info->last_sub_sent_time = cur_time;
02116 temp_clnt_info->clnt_port->serial_number++;
02117 *((u_long *) temp_buffer) =
02118 dl_htonl (temp_clnt_info->clnt_port->serial_number);
02119 if (server->read_reply->size < 0x2000 - 20
02120 && server->read_reply->size > 0)
02121 {
02122 memcpy (temp_buffer + 20, server->read_reply->data,
02123 server->read_reply->size);
02124 if (sendn
02125 (temp_clnt_info->clnt_port->socket_fd, temp_buffer,
02126 20 + server->read_reply->size, 0, dtimeout) < 0)
02127 {
02128 temp_clnt_info->clnt_port->errors++;
02129 return;
02130 }
02131 }
02132 else
02133 {
02134 if (sendn (temp_clnt_info->clnt_port->socket_fd,
02135 temp_buffer, 20, 0, dtimeout) < 0)
02136 {
02137 temp_clnt_info->clnt_port->errors++;
02138 return;
02139 }
02140 if (server->read_reply->size > 0)
02141 {
02142 if (sendn (temp_clnt_info->clnt_port->socket_fd,
02143 server->read_reply->data,
02144 server->read_reply->size, 0, dtimeout) < 0)
02145 {
02146 temp_clnt_info->clnt_port->errors++;
02147 return;
02148 }
02149 }
02150 }
02151 }
02152 if (temp_clnt_info->last_id_read < buf_info->min_last_id)
02153 {
02154 buf_info->min_last_id = temp_clnt_info->last_id_read;
02155 }
02156 temp_clnt_info =
02157 (TCP_CLIENT_SUBSCRIPTION_INFO *)
02158 buf_info->sub_clnt_info->get_next ();
02159 }
02160 buf_info =
02161 (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next ();
02162 }
02163 }
02164
02165
02166 TCP_BUFFER_SUBSCRIPTION_INFO::TCP_BUFFER_SUBSCRIPTION_INFO ()
02167 {
02168 buffer_number = -1;
02169 min_last_id = 0;
02170 list_id = -1;
02171 sub_clnt_info = NULL;
02172 }
02173
02174 TCP_BUFFER_SUBSCRIPTION_INFO::~TCP_BUFFER_SUBSCRIPTION_INFO ()
02175 {
02176 buffer_number = -1;
02177 min_last_id = 0;
02178 list_id = -1;
02179 if (NULL != sub_clnt_info)
02180 {
02181 delete sub_clnt_info;
02182 sub_clnt_info = NULL;
02183 }
02184 }
02185
02186 TCP_CLIENT_SUBSCRIPTION_INFO::TCP_CLIENT_SUBSCRIPTION_INFO ()
02187 {
02188 subscription_type = CMS_NO_SUBSCRIPTION;
02189 poll_interval_millis = 30000;
02190 last_sub_sent_time = 0.0;
02191 subscription_list_id = -1;
02192 buffer_number = -1;
02193 subscription_paused = 0;
02194 last_id_read = 0;
02195 sub_buf_info = NULL;
02196 clnt_port = NULL;
02197 }
02198
02199 TCP_CLIENT_SUBSCRIPTION_INFO::~TCP_CLIENT_SUBSCRIPTION_INFO ()
02200 {
02201 subscription_type = CMS_NO_SUBSCRIPTION;
02202 poll_interval_millis = 30000;
02203 last_sub_sent_time = 0.0;
02204 subscription_list_id = -1;
02205 buffer_number = -1;
02206 subscription_paused = 0;
02207 last_id_read = 0;
02208 sub_buf_info = NULL;
02209 clnt_port = NULL;
02210 }
02211
02212
02213 CLIENT_TCP_PORT::CLIENT_TCP_PORT ()
02214 {
02215 serial_number = 0;
02216 errors = 0;
02217 max_errors = 50;
02218 address.sin_port = 0;
02219 address.sin_family = AF_INET;
02220 address.sin_addr.s_addr = dl_htonl (INADDR_ANY);
02221 socket_fd = -1;
02222 subscriptions = NULL;
02223 tid = -1;
02224 pid = -1;
02225 blocking_read_req = NULL;
02226 threadId = -1;
02227 diag_info = NULL;
02228 }
02229
02230 CLIENT_TCP_PORT::~CLIENT_TCP_PORT ()
02231 {
02232 if (socket_fd > 0)
02233 {
02234 dl_closesocket (socket_fd);
02235 socket_fd = -1;
02236 }
02237 if (NULL != subscriptions)
02238 {
02239 TCP_CLIENT_SUBSCRIPTION_INFO *sub_info =
02240 (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_head ();
02241 while (NULL != sub_info)
02242 {
02243 delete sub_info;
02244 sub_info =
02245 (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_next ();
02246 }
02247 delete subscriptions;
02248 subscriptions = NULL;
02249 }
02250 if (NULL != blocking_read_req)
02251 {
02252 delete blocking_read_req;
02253 blocking_read_req = NULL;
02254 }
02255 if (NULL != diag_info)
02256 {
02257 delete diag_info;
02258 diag_info = NULL;
02259 }
02260 }