Main Page   Class Hierarchy   Alphabetical List   Data Structures   File List   Data Fields   Globals  

tcp_srv.cc

Go to the documentation of this file.
00001 /****************************************************************************
00002 * File: tcp_srv.cc
00003 * Purpose: Provides the functions for the class CMS_SERVER_REMOTE_T_PORT
00004 *  which provides TCP specific overrides of the CMS_SERVER_REMOTE_PORT class.
00005 ****************************************************************************/
00006 
00007 #include "rcs_defs.hh"          /* EXTERN_C_STD_HEADERS */
00008 #include "sokintrf.h"           /* dl_ioctl() */
00009 
00010 #ifdef EXTERN_C_STD_HEADERS
00011 extern "C"
00012 {
00013 #endif
00014 
00015 #include <string.h>             /* memset(), strerror() */
00016 #include <stdlib.h>             // malloc(), free()
00017 
00018 #ifndef UNDER_CE
00019 #include <errno.h>              /* errno */
00020 #include <signal.h>             // SIGPIPE, signal()
00021 #endif
00022 
00023 #if (defined(__CENTERLINE__) && !defined(VXWORKS)) || defined(sunos5) || defined(sparcworks)
00024 #include <sys/filio.h>          /* FIONREAD */
00025   char *strerror (int errnum);
00026   char *dl_inet_ntoa (struct in_addr);
00027 #endif
00028 
00029 #ifdef EXTERN_C_STD_HEADERS
00030 }
00031 #endif
00032 
00033 #include "cms.hh"               /* class CMS */
00034 #include "nml.hh"               // class NML
00035 #include "tcp_srv.hh"           /* class CMS_SERVER_REMOTE_TCP_PORT */
00036 #include "rcs_prnt.hh"          /* rcs_print_error() */
00037 #include "linklist.hh"          /* class RCS_LINKED_LIST */
00038 #include "tcp_opts.hh"          /* SET_TCP_NODELAY */
00039 #include "timer.hh"             // esleep()
00040 #include "cmsdiag.hh"           // class CMS_DIAGNOSTICS_INFO
00041 
00042 extern "C"
00043 {
00044 #include "recvn.h"              /* recvn() */
00045 #include "sendn.h"              /* sendn() */
00046 }
00047 
00048 
00049 #ifdef VXWORKS
00050 #include "vxWorks.h"
00051 #include "taskLib.h"            // taskSpawn
00052 #endif
00053 
00054 #ifdef UNIX_LIKE_PLAT
00055 #include <sys/types.h>
00056 #include <sys/wait.h>           // waitpid
00057 #endif
00058 
00059 #ifndef NO_THREADS
00060 #ifdef SGI
00061 #include <sys/resource.h>
00062 #include <sys/prctl.h>          // sproc(), prctl()
00063 #endif
00064 
00065 
00066 #ifdef WIN32
00067 #ifdef MULTITHREADED
00068 #ifndef UNDER_CE
00069 #include <process.h>            // _beginthread
00070 #else
00071 #include <winbase.h>            // CreateThread
00072 #endif
00073 #else
00074 #define NO_THREADS
00075 #endif
00076 #endif
00077 #endif
00078 
00079 int tcpsvr_threads_created = 0;
00080 int tcpsvr_threads_killed = 0;
00081 int tcpsvr_threads_exited = 0;
00082 int tcpsvr_threads_returned_early = 0;
00083 
00084 TCPSVR_BLOCKING_READ_REQUEST::TCPSVR_BLOCKING_READ_REQUEST ()
00085 {
00086   access_type = CMS_READ_ACCESS;        /* read or just peek */
00087   last_id_read = 0;             /* The server can compare with id from buffer */
00088   /* to determine if the buffer is new */
00089   /* to this client */
00090   timeout_millis = -1;          /* Milliseconds for blocking_timeout or -1
00091                                  * to wait forever */
00092   _client_tcp_port = NULL;
00093   remport = NULL;
00094   server = NULL;
00095   _nml = NULL;
00096   _reply = NULL;
00097   _data = NULL;
00098   read_reply = NULL;
00099 }
00100 
00101 
00102 static inline double
00103 tcp_svr_reverse_double (double in)
00104 {
00105   double out;
00106   char *c1, *c2;
00107 
00108   c1 = ((char *) &in) + 7;
00109   c2 = (char *) &out;
00110   for (int i = 0; i < 8; i++)
00111     {
00112       *c2 = *c1;
00113       c1--;
00114       c2++;
00115     }
00116   return out;
00117 }
00118 
00119 
00120 TCPSVR_BLOCKING_READ_REQUEST::~TCPSVR_BLOCKING_READ_REQUEST ()
00121 {
00122   if (NULL != _nml)
00123     {
00124       NML *nmlcopy = (NML *) _nml;
00125       _nml = NULL;
00126       delete nmlcopy;
00127     }
00128   if (NULL != _data)
00129     {
00130       DEBUG_FREE (_data);
00131       _data = NULL;
00132     }
00133   if (NULL != _reply)
00134     {
00135       DEBUG_FREE (_reply);
00136       _reply = NULL;
00137       read_reply = NULL;
00138     }
00139   if (NULL != read_reply)
00140     {
00141       if (NULL != read_reply->data)
00142         {
00143           DEBUG_FREE (read_reply->data);
00144           read_reply->data = NULL;
00145         }
00146       delete read_reply;
00147       read_reply = NULL;
00148     }
00149 }
00150 
00151 
00152 CMS_SERVER_REMOTE_TCP_PORT::CMS_SERVER_REMOTE_TCP_PORT (CMS_SERVER *
00153 _cms_server):
00154 CMS_SERVER_REMOTE_PORT (_cms_server)
00155 {
00156   client_ports = (RCS_LINKED_LIST *) NULL;
00157   connection_socket = 0;
00158   connection_port = 0;
00159   maxfdpl = 0;
00160   dtimeout = 20.0;
00161   if (load_socket_interface () < 0)
00162     {
00163       rcs_print_error ("Can't load socket interface.\n");
00164       return;
00165     }
00166 
00167   memset (&server_socket_address, 0, sizeof (server_socket_address));
00168   server_socket_address.sin_family = AF_INET;
00169   server_socket_address.sin_addr.s_addr = dl_htonl (INADDR_ANY);
00170   server_socket_address.sin_port = 0;
00171 
00172   client_ports = new RCS_LINKED_LIST;
00173   if (NULL == client_ports)
00174     {
00175       rcs_print_error ("Can not create linked list for client ports.\n");
00176       return;
00177     }
00178   polling_enabled = 0;
00179   memset (&select_timeout, 0, sizeof (select_timeout));
00180   select_timeout.tv_sec = 30;
00181   select_timeout.tv_usec = 30;
00182   subscription_buffers = NULL;
00183   current_poll_interval_millis = 30000;
00184   memset (&read_fd_set, 0, sizeof (read_fd_set));
00185   memset (&write_fd_set, 0, sizeof (write_fd_set));
00186 }
00187 
00188 CMS_SERVER_REMOTE_TCP_PORT::~CMS_SERVER_REMOTE_TCP_PORT ()
00189 {
00190   unregister_port ();
00191   if (NULL != client_ports)
00192     {
00193       delete client_ports;
00194       client_ports = (RCS_LINKED_LIST *) NULL;
00195     }
00196 }
00197 
00198 void
00199 blocking_thread_kill (long int id)
00200 {
00201 
00202   if (id <= 0)
00203     {
00204       return;
00205     }
00206 #if defined(sunos5) && !defined(NO_THREADS)
00207   thr_kill (id, SIGINT);
00208   thr_join (id, NULL, NULL);
00209 #endif
00210 #ifdef POSIX_THREADS
00211   pthread_kill (id, SIGINT);
00212   pthread_join (id, NULL);
00213 #endif
00214 #ifdef VXWORKS
00215   if (taskIdVerify (id) != OK)
00216     {
00217       return;
00218     }
00219   kill (id, SIGINT);
00220   taskDelay (1);
00221   if (taskIdVerify (id) == OK)
00222     {
00223       taskDelete (id);
00224     }
00225 #endif
00226 #if defined(NO_THREADS) || defined(SGI)
00227   kill (id, SIGINT);
00228   waitpid (id, NULL, 0);
00229 #endif
00230 #ifdef WIN32
00231   TerminateThread ((HANDLE) id, -1);
00232 #endif
00233   tcpsvr_threads_killed++;
00234 }
00235 
00236 void
00237 CMS_SERVER_REMOTE_TCP_PORT::unregister_port ()
00238 {
00239   CLIENT_TCP_PORT *client;
00240   int number_of_connected_clients = 0;
00241 #if defined(sunos5) || defined(VXWORKS)
00242   client = (CLIENT_TCP_PORT *) client_ports->get_head ();
00243   while (NULL != client)
00244     {
00245       if (client->threadId > 0 && client->blocking)
00246         {
00247           blocking_thread_kill (client->threadId);
00248           client->threadId = 0;
00249         }
00250       client = (CLIENT_TCP_PORT *) client_ports->get_next ();
00251     }
00252 #endif
00253 
00254   client = (CLIENT_TCP_PORT *) client_ports->get_head ();
00255   while (NULL != client)
00256     {
00257       rcs_print ("Exiting even though client on %s is still connected.\n",
00258                  dl_inet_ntoa (client->address.sin_addr));
00259       client = (CLIENT_TCP_PORT *) client_ports->get_next ();
00260       number_of_connected_clients++;
00261     }
00262   client = (CLIENT_TCP_PORT *) client_ports->get_head ();
00263   while (NULL != client)
00264     {
00265       delete client;
00266       client_ports->delete_current_node ();
00267       client = (CLIENT_TCP_PORT *) client_ports->get_next ();
00268     }
00269   if (NULL != subscription_buffers)
00270     {
00271       TCP_BUFFER_SUBSCRIPTION_INFO *sub_info =
00272         (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
00273       while (NULL != sub_info)
00274         {
00275           delete sub_info;
00276           sub_info =
00277             (TCP_BUFFER_SUBSCRIPTION_INFO *)
00278             subscription_buffers->get_next ();
00279         }
00280       delete subscription_buffers;
00281       subscription_buffers = NULL;
00282     }
00283   if (number_of_connected_clients > 0)
00284     {
00285       esleep (2.0);
00286     }
00287   if (connection_socket > 0)
00288     {
00289       dl_closesocket (connection_socket);
00290       connection_socket = 0;
00291     }
00292 }
00293 
00294 int
00295 CMS_SERVER_REMOTE_TCP_PORT::accept_local_port_cms (CMS * _cms)
00296 {
00297   if (NULL == _cms)
00298     {
00299       return 0;
00300     }
00301   if (_cms->remote_port_type != CMS_TCP_REMOTE_PORT_TYPE)
00302     {
00303       return 0;
00304     }
00305   if (NULL != _cms)
00306     {
00307       if (min_compatible_version < 1e-6 ||
00308           (min_compatible_version > _cms->min_compatible_version &&
00309            _cms->min_compatible_version > 1e-6))
00310         {
00311           min_compatible_version = _cms->min_compatible_version;
00312         }
00313       if (_cms->confirm_write)
00314         {
00315           confirm_write = _cms->confirm_write;
00316         }
00317     }
00318   if (_cms->total_subdivisions > max_total_subdivisions)
00319     {
00320       max_total_subdivisions = _cms->total_subdivisions;
00321     }
00322   if (server_socket_address.sin_port == 0)
00323     {
00324       server_socket_address.sin_port =
00325         dl_htons (((u_short) _cms->tcp_port_number));
00326       port_num = _cms->tcp_port_number;
00327       return 1;
00328     }
00329   if (server_socket_address.sin_port ==
00330       dl_htons (((u_short) _cms->tcp_port_number)))
00331     {
00332       port_num = _cms->tcp_port_number;
00333       return 1;
00334     }
00335   return 0;
00336 }
00337 
00338 void
00339 CMS_SERVER_REMOTE_TCP_PORT::register_port ()
00340 {
00341   port_registered = 0;
00342   rcs_print_debug (PRINT_CMS_CONFIG_INFO,
00343                    "Registering server on TCP port %d.\n",
00344                    dl_ntohs (server_socket_address.sin_port));
00345   if (server_socket_address.sin_port == 0)
00346     {
00347       rcs_print_error ("server can not register on port number 0.\n");
00348       return;
00349     }
00350   if ((connection_socket = dl_socket (AF_INET, SOCK_STREAM, 0)) < 0)
00351     {
00352 #ifndef UNDER_CE
00353       rcs_print_error ("socket error: %d -- %s\n", errno, strerror (errno));
00354 #endif
00355       rcs_print_error ("Server can not open stream socket.\n");
00356       return;
00357     }
00358 
00359   if (set_tcp_socket_options (connection_socket) < 0)
00360     {
00361       return;
00362     }
00363   if (dl_bind (connection_socket, (struct sockaddr *) &server_socket_address,
00364                sizeof (server_socket_address)) < 0)
00365     {
00366 #ifndef UNDER_CE
00367       rcs_print_error ("bind error: %d -- %s\n", errno, strerror (errno));
00368 #endif
00369       rcs_print_error
00370         ("Server can not bind the connection socket on port %d.\n",
00371          dl_ntohs (server_socket_address.sin_port));
00372       return;
00373     }
00374   if (dl_listen (connection_socket, 5) < 0)
00375     {
00376 #ifndef UNDER_CE
00377       rcs_print_error ("listen error: %d -- %s\n", errno, strerror (errno));
00378 #endif
00379       rcs_print_error ("TCP Server: error on call to listen for port %d.\n",
00380                        dl_ntohs (server_socket_address.sin_port));
00381       return;
00382     }
00383   port_registered = 1;
00384 
00385 }
00386 
00387 static int last_pipe_signum = 0;
00388 
00389 static void
00390 handle_pipe_error (int signum)
00391 {
00392   last_pipe_signum = signum;
00393   rcs_print_error ("SIGPIPE intercepted.\n");
00394 }
00395 
00396 void
00397 CMS_SERVER_REMOTE_TCP_PORT::run ()
00398 {
00399   unsigned long bytes_ready;
00400   int ready_descriptors;
00401   if (NULL == client_ports)
00402     {
00403       rcs_print_error ("CMS_SERVER: List of client ports is NULL.\n");
00404       return;
00405     }
00406   CLIENT_TCP_PORT *new_client_port, *client_port_to_check;
00407   FD_ZERO (&read_fd_set);
00408   FD_ZERO (&write_fd_set);
00409   RCS_FD_SET (connection_socket, &read_fd_set);
00410   maxfdpl = connection_socket + 1;
00411 #ifndef DOS_WINDOWS
00412   signal (SIGPIPE, handle_pipe_error);
00413 #endif
00414   rcs_print_debug (PRINT_CMS_CONFIG_INFO,
00415                    "running server for TCP port %d (connection_socket = %d).\n",
00416                    dl_ntohs (server_socket_address.sin_port),
00417                    connection_socket);
00418 
00419   cms_server_count++;
00420   fd_set read_fd_set_copy, write_fd_set_copy;
00421   FD_ZERO (&read_fd_set_copy);
00422   FD_ZERO (&write_fd_set_copy);
00423   RCS_FD_SET (connection_socket, &read_fd_set_copy);
00424 
00425   while (1)
00426     {
00427       if (polling_enabled)
00428         {
00429           memcpy (&read_fd_set_copy, &read_fd_set, sizeof (fd_set));
00430           memcpy (&write_fd_set_copy, &write_fd_set, sizeof (fd_set));
00431           ready_descriptors =
00432             dl_select (maxfdpl, &read_fd_set, &write_fd_set, (fd_set *) NULL,
00433                        (timeval *) & select_timeout);
00434           if (ready_descriptors == 0)
00435             {
00436               update_subscriptions ();
00437               memcpy (&read_fd_set, &read_fd_set_copy, sizeof (fd_set));
00438               memcpy (&write_fd_set, &write_fd_set_copy, sizeof (fd_set));
00439               continue;
00440             }
00441         }
00442       else
00443         {
00444           ready_descriptors =
00445             dl_select (maxfdpl, &read_fd_set, &write_fd_set, (fd_set *) NULL,
00446                        (timeval *) NULL);
00447 
00448         }
00449 #ifndef UNDER_CE
00450       if (ready_descriptors < 0)
00451         {
00452           rcs_print_error ("server: select error.(errno = %d | %s)\n",
00453                            errno, strerror (errno));
00454         }
00455 #endif
00456       if (NULL == client_ports)
00457         {
00458           rcs_print_error ("CMS_SERVER: List of client ports is NULL.\n");
00459           return;
00460         }
00461       client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head ();
00462       while (NULL != client_port_to_check)
00463         {
00464           if (dl_fd_isset (client_port_to_check->socket_fd, &read_fd_set))
00465             {
00466 #ifdef WIN32
00467               dl_ioctlsocket (client_port_to_check->socket_fd, FIONREAD,
00468                               &bytes_ready);
00469 #else
00470 #ifndef VXWORKS
00471               ioctl (client_port_to_check->socket_fd, FIONREAD,
00472                      (caddr_t) & bytes_ready);
00473 #else
00474               ioctl (client_port_to_check->socket_fd, FIONREAD,
00475                      (int) &bytes_ready);
00476 #endif
00477 #endif
00478               if (bytes_ready <= 0)
00479                 {
00480                   rcs_print_debug (PRINT_SOCKET_CONNECT,
00481                                    "Socket closed by host with IP address %s.\n",
00482                                    dl_inet_ntoa
00483                                    (client_port_to_check->address.sin_addr));
00484                   if (NULL != client_port_to_check->subscriptions)
00485                     {
00486                       TCP_CLIENT_SUBSCRIPTION_INFO *clnt_sub_info =
00487                         (TCP_CLIENT_SUBSCRIPTION_INFO *)
00488                         client_port_to_check->subscriptions->get_head ();
00489                       while (NULL != clnt_sub_info)
00490                         {
00491                           if (NULL != clnt_sub_info->sub_buf_info &&
00492                               clnt_sub_info->subscription_list_id >= 0)
00493                             {
00494                               if (NULL !=
00495                                   clnt_sub_info->sub_buf_info->sub_clnt_info)
00496                                 {
00497                                   clnt_sub_info->sub_buf_info->sub_clnt_info->
00498                                     delete_node (clnt_sub_info->
00499                                                  subscription_list_id);
00500                                   if (clnt_sub_info->sub_buf_info->
00501                                       sub_clnt_info->list_size < 1)
00502                                     {
00503                                       delete clnt_sub_info->sub_buf_info->
00504                                         sub_clnt_info;
00505                                       clnt_sub_info->sub_buf_info->
00506                                         sub_clnt_info = NULL;
00507                                       if (NULL != subscription_buffers
00508                                           && clnt_sub_info->sub_buf_info->
00509                                           list_id >= 0)
00510                                         {
00511                                           subscription_buffers->delete_node
00512                                             (clnt_sub_info->sub_buf_info->
00513                                              list_id);
00514                                           delete clnt_sub_info->sub_buf_info;
00515                                           clnt_sub_info->sub_buf_info = NULL;
00516                                         }
00517                                     }
00518                                   clnt_sub_info->sub_buf_info = NULL;
00519                                 }
00520                               delete clnt_sub_info;
00521                               clnt_sub_info =
00522                                 (TCP_CLIENT_SUBSCRIPTION_INFO *)
00523                                 client_port_to_check->subscriptions->
00524                                 get_next ();
00525                             }
00526                           delete client_port_to_check->subscriptions;
00527                           client_port_to_check->subscriptions = NULL;
00528                           recalculate_polling_interval ();
00529                         }
00530                     }
00531                   if (client_port_to_check->threadId > 0
00532                       && client_port_to_check->blocking)
00533                     {
00534                       blocking_thread_kill (client_port_to_check->threadId);
00535                     }
00536                   dl_closesocket (client_port_to_check->socket_fd);
00537                   RCS_FD_CLR (client_port_to_check->socket_fd, &read_fd_set);
00538                   client_port_to_check->socket_fd = -1;
00539                   delete client_port_to_check;
00540                   client_ports->delete_current_node ();
00541                 }
00542               else
00543                 {
00544                   if (client_port_to_check->blocking)
00545                     {
00546                       if (client_port_to_check->threadId > 0)
00547                         {
00548                           rcs_print_debug (PRINT_SERVER_THREAD_ACTIVITY,
00549                                            "Data recieved from %s:%d when it should be blocking (bytes_ready=%d).\n",
00550                                            dl_inet_ntoa
00551                                            (client_port_to_check->address.
00552                                             sin_addr),
00553                                            client_port_to_check->socket_fd,
00554                                            bytes_ready);
00555                           rcs_print_debug (PRINT_SERVER_THREAD_ACTIVITY,
00556                                            "Killing handler %d.\n",
00557                                            client_port_to_check->threadId);
00558 
00559                           blocking_thread_kill
00560                             (client_port_to_check->threadId);
00561 #if 0
00562                           *((u_long *) temp_buffer) =
00563                             dl_htonl (client_port_to_check->serial_number);
00564                           *((u_long *) temp_buffer + 1) =
00565                             dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
00566                           *((u_long *) temp_buffer + 2) = dl_htonl (0); /* size */
00567                           *((u_long *) temp_buffer + 3) = dl_htonl (0); /* write_id */
00568                           *((u_long *) temp_buffer + 4) = dl_htonl (0); /* was_read */
00569                           sendn (client_port_to_check->socket_fd, temp_buffer,
00570                                  20, 0, dtimeout);
00571 #endif
00572                           client_port_to_check->threadId = 0;
00573                           client_port_to_check->blocking = 0;
00574                         }
00575                     }
00576                   handle_request (client_port_to_check);
00577                 }
00578               ready_descriptors--;
00579             }
00580           else
00581             {
00582               RCS_FD_SET (client_port_to_check->socket_fd, &read_fd_set);
00583             }
00584           client_port_to_check =
00585             (CLIENT_TCP_PORT *) client_ports->get_next ();
00586         }
00587       if (dl_fd_isset (connection_socket, &read_fd_set)
00588           && ready_descriptors > 0)
00589         {
00590           ready_descriptors--;
00591           int client_address_length;
00592           new_client_port = new CLIENT_TCP_PORT ();
00593           client_address_length = sizeof (new_client_port->address);
00594           new_client_port->socket_fd = dl_accept (connection_socket,
00595                                                   (struct sockaddr *)
00596                                                   &new_client_port->address,
00597                                                   &client_address_length);
00598           current_clients++;
00599           if (current_clients > max_clients)
00600             {
00601               max_clients = current_clients;
00602             }
00603           if (new_client_port->socket_fd < 0)
00604             {
00605 #ifndef UNDER_CE
00606               rcs_print_error ("server: accept error -- %d %s \n", errno,
00607                                strerror (errno));
00608 #endif
00609               continue;
00610             }
00611           rcs_print_debug (PRINT_SOCKET_CONNECT,
00612                            "Socket opened by host with IP address %s.\n",
00613                            dl_inet_ntoa (new_client_port->address.sin_addr));
00614           new_client_port->serial_number = 0;
00615           new_client_port->blocking = 0;
00616           if (NULL != client_ports)
00617             {
00618               client_ports->store_at_tail (new_client_port,
00619                                            sizeof (new_client_port), 0);
00620             }
00621           if (maxfdpl < new_client_port->socket_fd + 1)
00622             {
00623               maxfdpl = new_client_port->socket_fd + 1;
00624             }
00625           RCS_FD_SET (new_client_port->socket_fd, &read_fd_set);
00626         }
00627       else
00628         {
00629           RCS_FD_SET (connection_socket, &read_fd_set);
00630         }
00631       if (0 != ready_descriptors)
00632         {
00633           rcs_print_error ("%d descriptors ready but not serviced.\n",
00634                            ready_descriptors);
00635         }
00636       update_subscriptions ();
00637     }
00638 }
00639 
00640 static int tcpsvr_handle_blocking_request_sigint_count = 0;
00641 static int tcpsvr_last_sig = 0;
00642 
00643 void
00644 tcpsvr_handle_blocking_request_sigint_handler (int sig)
00645 {
00646   tcpsvr_last_sig = sig;
00647   tcpsvr_handle_blocking_request_sigint_count++;
00648 }
00649 
00650 #if defined(sunos5) || defined(VXWORKS) || defined(POSIX_THREADS) || defined(NO_THREADS) || defined(WIN32) || defined(SGI)
00651 #ifdef VXWORKS
00652 int
00653 tcpsvr_handle_blocking_request (void *_req)
00654 #else
00655 #ifdef WIN32
00656 #ifndef UNDER_CE
00657 void *__cdecl
00658 tcpsvr_handle_blocking_request (void *_req)
00659 #else
00660 unsigned long __stdcall
00661 tcpsvr_handle_blocking_request (void *_req)
00662 #endif
00663 #else
00664 #ifdef SGI
00665 void
00666 tcpsvr_handle_blocking_request (void *_req)
00667 #else
00668 void *
00669 tcpsvr_handle_blocking_request (void *_req)
00670 #endif
00671 #endif
00672 #endif
00673 {
00674 #ifndef UNDER_CE
00675   signal (SIGINT, tcpsvr_handle_blocking_request_sigint_handler);
00676 #endif
00677   TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req =
00678     (TCPSVR_BLOCKING_READ_REQUEST *) _req;
00679   char temp_buffer[0x2000];
00680   if (_req == NULL)
00681     {
00682       tcpsvr_threads_returned_early++;
00683 #ifndef SGI
00684       return 0;
00685 #else
00686       return;
00687 #endif
00688     }
00689   double dtimeout =
00690     ((double) (blocking_read_req->timeout_millis + 10)) / 1000.0;
00691   if (dtimeout < 0)
00692     {
00693       dtimeout = 600.0;
00694     }
00695   if (dtimeout < 0.5)
00696     {
00697       dtimeout = 0.5;
00698     }
00699   if (dtimeout > 600.0)
00700     {
00701       dtimeout = 600.0;
00702     }
00703   CLIENT_TCP_PORT *_client_tcp_port = blocking_read_req->_client_tcp_port;
00704   CMS_SERVER *server = blocking_read_req->server;
00705 
00706   if (NULL == server || NULL == _client_tcp_port)
00707     {
00708       tcpsvr_threads_returned_early++;
00709 #ifndef SGI
00710       return 0;
00711 #else
00712       return;
00713 #endif
00714     }
00715   memset (temp_buffer, 0, 0x2000);
00716   REMOTE_BLOCKING_READ_REPLY *read_reply;
00717 
00718 
00719   if (NULL != _client_tcp_port->diag_info)
00720     {
00721       _client_tcp_port->diag_info->buffer_number =
00722         blocking_read_req->buffer_number;
00723       server->set_diag_info (_client_tcp_port->diag_info);
00724     }
00725   else if (server->diag_enabled)
00726     {
00727       server->reset_diag_info (blocking_read_req->buffer_number);
00728     }
00729 
00730   read_reply =
00731     (REMOTE_BLOCKING_READ_REPLY *)
00732     server->process_request (blocking_read_req);
00733   blocking_read_req->read_reply = read_reply;
00734   if (NULL == read_reply)
00735     {
00736       _client_tcp_port->blocking = 0;
00737       rcs_print_error ("Server could not process request.\n");
00738       *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
00739       *((u_long *) temp_buffer + 1) =
00740         dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
00741       *((u_long *) temp_buffer + 2) = dl_htonl (0);     /* size */
00742       *((u_long *) temp_buffer + 3) = dl_htonl (0);     /* write_id */
00743       *((u_long *) temp_buffer + 4) = dl_htonl (0);     /* was_read */
00744       sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
00745       _client_tcp_port->errors++;
00746       _client_tcp_port->blocking_read_req = NULL;
00747       delete blocking_read_req;
00748       _client_tcp_port->threadId = 0;
00749       tcpsvr_threads_returned_early++;
00750 #ifndef SGI
00751       return 0;
00752 #else
00753       return;
00754 #endif
00755     }
00756   *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
00757   *((u_long *) temp_buffer + 1) = dl_htonl (read_reply->status);
00758   *((u_long *) temp_buffer + 2) = dl_htonl (read_reply->size);
00759   *((u_long *) temp_buffer + 3) = dl_htonl (read_reply->write_id);
00760   *((u_long *) temp_buffer + 4) = dl_htonl (read_reply->was_read);
00761   if (read_reply->size < (0x2000 - 20) && read_reply->size > 0)
00762     {
00763       memcpy (temp_buffer + 20, read_reply->data, read_reply->size);
00764       _client_tcp_port->blocking = 0;
00765       if (sendn
00766           (_client_tcp_port->socket_fd, temp_buffer, 20 + read_reply->size, 0,
00767            dtimeout) < 0)
00768         {
00769           _client_tcp_port->blocking = 0;
00770           _client_tcp_port->errors++;
00771           _client_tcp_port->blocking_read_req = NULL;
00772           delete blocking_read_req;
00773           _client_tcp_port->threadId = 0;
00774           tcpsvr_threads_returned_early++;
00775 #ifndef SGI
00776           return 0;
00777 #else
00778           return;
00779 #endif
00780         }
00781     }
00782   else
00783     {
00784       _client_tcp_port->blocking = 0;
00785       if (sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout) <
00786           0)
00787         {
00788           _client_tcp_port->blocking = 0;
00789           _client_tcp_port->errors++;
00790           _client_tcp_port->blocking_read_req = NULL;
00791           delete blocking_read_req;
00792           _client_tcp_port->threadId = 0;
00793           tcpsvr_threads_returned_early++;
00794 #ifndef SGI
00795           return 0;
00796 #else
00797           return;
00798 #endif
00799         }
00800       if (read_reply->size > 0)
00801         {
00802           if (sendn
00803               (_client_tcp_port->socket_fd, read_reply->data,
00804                read_reply->size, 0, dtimeout) < 0)
00805             {
00806               _client_tcp_port->blocking = 0;
00807               _client_tcp_port->errors++;
00808               _client_tcp_port->blocking_read_req = NULL;
00809               delete blocking_read_req;
00810               _client_tcp_port->threadId = 0;
00811               tcpsvr_threads_returned_early++;
00812 #ifndef SGI
00813               return 0;
00814 #else
00815               return;
00816 #endif
00817             }
00818         }
00819     }
00820   _client_tcp_port->blocking_read_req = NULL;
00821   delete blocking_read_req;
00822   _client_tcp_port->threadId = 0;
00823   tcpsvr_threads_exited++;
00824 #if defined(sunos5) && !defined(NO_THREADS)
00825   thr_exit (0);
00826 #endif
00827 #ifdef POSIX_THREADS
00828   pthread_exit (0);
00829 #endif
00830 #ifdef NO_THREADS
00831   exit (0);
00832 #endif
00833 #ifndef SGI
00834   return 0;
00835 #endif
00836 }
00837 
00838 #endif
00839 
00840 void
00841 CMS_SERVER_REMOTE_TCP_PORT::handle_request (CLIENT_TCP_PORT *
00842                                             _client_tcp_port)
00843 {
00844   CLIENT_TCP_PORT *client_port_to_check = NULL;
00845 #if defined(WIN32) && !defined(gnuwin32)
00846   DWORD pid = GetCurrentProcessId ();
00847   DWORD tid = GetCurrentThreadId ();
00848 #else
00849 #ifdef VXWORKS
00850   int pid = taskIdSelf ();
00851 
00852   int tid = 0;
00853 #else
00854   pid_t pid = getpid ();
00855   pid_t tid = 0;
00856 #endif
00857 #endif
00858   CMS_SERVER *server;
00859   server = find_server (pid, tid);
00860   if (NULL == server)
00861     {
00862       rcs_print_error
00863         ("CMS_SERVER_REMOTE_TCP_PORT::handle_request() Cannot find server object for pid = %d.\n",
00864          pid);
00865       return;
00866     }
00867 
00868   if (server->using_passwd_file)
00869     {
00870       current_user_info = get_connected_user (_client_tcp_port->socket_fd);
00871     }
00872 
00873   if (_client_tcp_port->errors >= _client_tcp_port->max_errors)
00874     {
00875       rcs_print_error ("Too many errors - closing connection(%d)\n",
00876                        _client_tcp_port->socket_fd);
00877       client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head ();
00878       while (NULL != client_port_to_check)
00879         {
00880           if (client_port_to_check->socket_fd == _client_tcp_port->socket_fd)
00881             {
00882               delete client_port_to_check;
00883               client_ports->delete_current_node ();
00884             }
00885           client_port_to_check =
00886             (CLIENT_TCP_PORT *) client_ports->get_next ();
00887         }
00888       dl_closesocket (_client_tcp_port->socket_fd);
00889       current_clients--;
00890       RCS_FD_CLR (_client_tcp_port->socket_fd, &read_fd_set);
00891       _client_tcp_port->socket_fd = -1;
00892     }
00893 
00894   if (recvn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, -1, NULL) < 0)
00895     {
00896       rcs_print_error ("Can not read from client port (%d) from %s\n",
00897                        _client_tcp_port->socket_fd,
00898                        dl_inet_ntoa (_client_tcp_port->address.sin_addr));
00899       _client_tcp_port->errors++;
00900       return;
00901     }
00902   long request_type, buffer_number, received_serial_number;
00903   received_serial_number = dl_ntohl (*((u_long *) temp_buffer));
00904   if (received_serial_number != _client_tcp_port->serial_number)
00905     {
00906       rcs_print_error
00907         ("received_serial_number (%d) does not equal expected serial number.(%d)\n",
00908          received_serial_number, _client_tcp_port->serial_number);
00909       _client_tcp_port->serial_number = received_serial_number;
00910       _client_tcp_port->errors++;
00911     }
00912   _client_tcp_port->serial_number++;
00913   request_type = dl_ntohl (*((u_long *) temp_buffer + 1));
00914   buffer_number = dl_ntohl (*((u_long *) temp_buffer + 2));
00915 
00916   rcs_print_debug (PRINT_ALL_SOCKET_REQUESTS,
00917                    "TCPSVR request recieved: fd = %d, serial_number=%d, request_type=%d, buffer_number=%d\n",
00918                    _client_tcp_port->socket_fd,
00919                    _client_tcp_port->serial_number, request_type,
00920                    buffer_number);
00921 
00922   if (NULL != _client_tcp_port->diag_info)
00923     {
00924       _client_tcp_port->diag_info->buffer_number = buffer_number;
00925       server->set_diag_info (_client_tcp_port->diag_info);
00926     }
00927   else if (server->diag_enabled)
00928     {
00929       server->reset_diag_info (buffer_number);
00930     }
00931 
00932   switch_function (_client_tcp_port,
00933                    server,
00934                    request_type, buffer_number, received_serial_number);
00935 
00936   if (NULL != _client_tcp_port->diag_info &&
00937       NULL != server->last_local_port_used && server->diag_enabled)
00938     {
00939       if (NULL != server->last_local_port_used->cms)
00940         {
00941           if (NULL !=
00942               server->last_local_port_used->cms->handle_to_global_data)
00943             {
00944               _client_tcp_port->diag_info->bytes_moved =
00945                 server->last_local_port_used->cms->handle_to_global_data->
00946                 total_bytes_moved;
00947             }
00948         }
00949     }
00950 }
00951 
00952 void
00953 CMS_SERVER_REMOTE_TCP_PORT::switch_function (CLIENT_TCP_PORT *
00954                                              _client_tcp_port,
00955                                              CMS_SERVER * server,
00956                                              long request_type,
00957                                              long buffer_number,
00958                                              long received_serial_number)
00959 {
00960   int total_subdivisions = 1;
00961   CLIENT_TCP_PORT *client_port_to_check = NULL;
00962   switch (request_type)
00963     {
00964     case REMOTE_CMS_SET_DIAG_INFO_REQUEST_TYPE:
00965       {
00966         if (NULL == _client_tcp_port->diag_info)
00967           {
00968             _client_tcp_port->diag_info = new REMOTE_SET_DIAG_INFO_REQUEST ();
00969           }
00970         if (recvn
00971             (_client_tcp_port->socket_fd, server->set_diag_info_buf, 68, 0,
00972              -1, NULL) < 0)
00973           {
00974             rcs_print_error ("Can not read from client port (%d) from %s\n",
00975                              _client_tcp_port->socket_fd,
00976                              dl_inet_ntoa (_client_tcp_port->address.
00977                                            sin_addr));
00978             _client_tcp_port->errors++;
00979             return;
00980           }
00981         _client_tcp_port->diag_info->bytes_moved = 0.0;
00982         _client_tcp_port->diag_info->buffer_number = buffer_number;
00983         memcpy (_client_tcp_port->diag_info->process_name,
00984                 server->set_diag_info_buf, 16);
00985         memcpy (_client_tcp_port->diag_info->host_sysinfo,
00986                 server->set_diag_info_buf + 16, 32);
00987         _client_tcp_port->diag_info->pid =
00988           dl_htonl (*((u_long *) (server->set_diag_info_buf + 48)));
00989         _client_tcp_port->diag_info->c_num =
00990           dl_htonl (*((u_long *) (server->set_diag_info_buf + 52)));
00991         memcpy (&(_client_tcp_port->diag_info->rcslib_ver),
00992                 server->set_diag_info_buf + 56, 8);
00993         _client_tcp_port->diag_info->reverse_flag =
00994           *((int *) ((char *) server->set_diag_info_buf + 64));
00995         if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
00996           {
00997             _client_tcp_port->diag_info->rcslib_ver =
00998               (double) tcp_svr_reverse_double ((double)
00999                                                _client_tcp_port->
01000                                                diag_info->rcslib_ver);
01001           }
01002       }
01003       break;
01004 
01005 
01006     case REMOTE_CMS_GET_DIAG_INFO_REQUEST_TYPE:
01007       {
01008         REMOTE_GET_DIAG_INFO_REQUEST diagreq;
01009         diagreq.buffer_number = buffer_number;
01010         REMOTE_GET_DIAG_INFO_REPLY *diagreply = NULL;
01011         diagreply =
01012           (REMOTE_GET_DIAG_INFO_REPLY *) server->process_request (&diagreq);
01013         if (NULL == diagreply)
01014           {
01015             *((u_long *) temp_buffer) =
01016               dl_htonl (_client_tcp_port->serial_number);
01017             *((u_long *) temp_buffer + 1) =
01018               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01019             if (sendn
01020                 (_client_tcp_port->socket_fd, temp_buffer, 24, 0,
01021                  dtimeout) < 0)
01022               {
01023                 _client_tcp_port->errors++;
01024               }
01025             return;
01026           }
01027         if (NULL == diagreply->cdi)
01028           {
01029             *((u_long *) temp_buffer) =
01030               dl_htonl (_client_tcp_port->serial_number);
01031             *((u_long *) temp_buffer + 1) =
01032               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01033             if (sendn
01034                 (_client_tcp_port->socket_fd, temp_buffer, 24, 0,
01035                  dtimeout) < 0)
01036               {
01037                 _client_tcp_port->errors++;
01038               }
01039             return;
01040           }
01041         memset (temp_buffer, 0, 0x2000);
01042         unsigned long dpi_offset = 32;
01043         *((u_long *) temp_buffer) =
01044           dl_htonl (_client_tcp_port->serial_number);
01045         *((u_long *) temp_buffer + 1) = dl_htonl (diagreply->status);
01046         *((u_long *) temp_buffer + 2) =
01047           dl_htonl (diagreply->cdi->last_writer);
01048         *((u_long *) temp_buffer + 3) =
01049           dl_htonl (diagreply->cdi->last_reader);
01050         double curtime = etime ();
01051         double reversed_temp = 0.0;
01052         if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01053           {
01054             reversed_temp =
01055               (double) tcp_svr_reverse_double ((double) curtime);
01056             memcpy (temp_buffer + 16, &reversed_temp, 8);
01057           }
01058         else
01059           {
01060             memcpy (temp_buffer + 16, &(curtime), 8);
01061           }
01062         int dpi_count = 0;
01063         if (NULL != diagreply->cdi->dpis)
01064           {
01065             CMS_DIAG_PROC_INFO *dpi =
01066               (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->get_head ();
01067             while ((dpi_offset < ((int) 0x2000 - sizeof (CMS_DIAG_PROC_INFO)))
01068                    && dpi != NULL)
01069               {
01070                 dpi_count++;
01071                 memcpy (temp_buffer + dpi_offset, dpi->name, 16);
01072                 dpi_offset += 16;
01073                 memcpy (temp_buffer + dpi_offset, dpi->host_sysinfo, 32);
01074                 dpi_offset += 32;
01075                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01076                   dl_htonl (dpi->pid);
01077                 dpi_offset += 4;
01078                 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01079                   {
01080                     reversed_temp =
01081                       (double) tcp_svr_reverse_double ((double)
01082                                                        dpi->rcslib_ver);
01083                     memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01084                   }
01085                 else
01086                   {
01087                     memcpy (temp_buffer + dpi_offset, &(dpi->rcslib_ver), 8);
01088                   }
01089                 dpi_offset += 8;
01090                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01091                   dl_htonl (dpi->access_type);
01092                 dpi_offset += 4;
01093                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01094                   dl_htonl (dpi->msg_id);
01095                 dpi_offset += 4;
01096                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01097                   dl_htonl (dpi->msg_size);
01098                 dpi_offset += 4;
01099                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01100                   dl_htonl (dpi->msg_type);
01101                 dpi_offset += 4;
01102                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01103                   dl_htonl (dpi->number_of_accesses);
01104                 dpi_offset += 4;
01105                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01106                   dl_htonl (dpi->number_of_new_messages);
01107                 dpi_offset += 4;
01108                 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01109                   {
01110                     reversed_temp =
01111                       (double) tcp_svr_reverse_double ((double)
01112                                                        dpi->bytes_moved);
01113                     memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01114                   }
01115                 else
01116                   {
01117                     memcpy (temp_buffer + dpi_offset, &(dpi->bytes_moved), 8);
01118                   }
01119                 dpi_offset += 8;
01120                 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01121                   {
01122                     reversed_temp =
01123                       (double) tcp_svr_reverse_double ((double)
01124                                                        dpi->
01125                                                        bytes_moved_across_socket);
01126                     memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01127                   }
01128                 else
01129                   {
01130                     memcpy (temp_buffer + dpi_offset,
01131                             &(dpi->bytes_moved_across_socket), 8);
01132                   }
01133                 dpi_offset += 8;
01134                 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01135                   {
01136                     reversed_temp =
01137                       (double) tcp_svr_reverse_double ((double)
01138                                                        dpi->last_access_time);
01139                     memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01140                   }
01141                 else
01142                   {
01143                     memcpy (temp_buffer + dpi_offset,
01144                             &(dpi->last_access_time), 8);
01145                   }
01146                 dpi_offset += 8;
01147                 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01148                   {
01149                     reversed_temp =
01150                       (double) tcp_svr_reverse_double ((double)
01151                                                        dpi->
01152                                                        first_access_time);
01153                     memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01154                   }
01155                 else
01156                   {
01157                     memcpy (temp_buffer + dpi_offset,
01158                             &(dpi->first_access_time), 8);
01159                   }
01160                 dpi_offset += 8;
01161                 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01162                   {
01163                     reversed_temp =
01164                       (double) tcp_svr_reverse_double ((double)
01165                                                        dpi->min_difference);
01166                     memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01167                   }
01168                 else
01169                   {
01170                     memcpy (temp_buffer + dpi_offset, &(dpi->min_difference),
01171                             8);
01172                   }
01173                 dpi_offset += 8;
01174                 if (_client_tcp_port->diag_info->reverse_flag == 0x44332211)
01175                   {
01176                     reversed_temp =
01177                       (double) tcp_svr_reverse_double ((double)
01178                                                        dpi->max_difference);
01179                     memcpy (temp_buffer + dpi_offset, &reversed_temp, 8);
01180                   }
01181                 else
01182                   {
01183                     memcpy (temp_buffer + dpi_offset, &(dpi->max_difference),
01184                             8);
01185                   }
01186                 dpi_offset += 8;
01187                 int is_last_writer = (dpi == diagreply->cdi->last_writer_dpi);
01188                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01189                   dl_htonl (is_last_writer);
01190                 dpi_offset += 4;
01191                 int is_last_reader = (dpi == diagreply->cdi->last_reader_dpi);
01192                 *((u_long *) ((char *) temp_buffer + dpi_offset)) =
01193                   dl_htonl (is_last_reader);
01194                 dpi_offset += 4;
01195                 dpi =
01196                   (CMS_DIAG_PROC_INFO *) diagreply->cdi->dpis->get_next ();
01197               }
01198           }
01199         *((u_long *) temp_buffer + 6) = dl_htonl (dpi_count);
01200         *((u_long *) temp_buffer + 7) = dl_htonl (dpi_offset);
01201         if (sendn
01202             (_client_tcp_port->socket_fd, temp_buffer, dpi_offset, 0,
01203              dtimeout) < 0)
01204           {
01205             _client_tcp_port->errors++;
01206             return;
01207           }
01208       }
01209       break;
01210 
01211     case REMOTE_CMS_GET_MSG_COUNT_REQUEST_TYPE:
01212       {
01213       }
01214       break;
01215 
01216     case REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE:
01217       {
01218         REMOTE_GET_BUF_NAME_REQUEST namereq;
01219         namereq.buffer_number = buffer_number;
01220         REMOTE_GET_BUF_NAME_REPLY *namereply = NULL;
01221         namereply =
01222           (REMOTE_GET_BUF_NAME_REPLY *) server->process_request (&namereq);
01223         memset (temp_buffer, 0, 40);
01224         if (NULL != namereply)
01225           {
01226             *((u_long *) temp_buffer) =
01227               dl_htonl (_client_tcp_port->serial_number);
01228             *((u_long *) temp_buffer + 1) = dl_htonl (namereply->status);
01229             strncpy (temp_buffer + 8, namereply->name, 31);
01230             if (sendn
01231                 (_client_tcp_port->socket_fd, temp_buffer, 40, 0,
01232                  dtimeout) < 0)
01233               {
01234                 _client_tcp_port->errors++;
01235                 return;
01236               }
01237           }
01238         else
01239           {
01240             *((u_long *) temp_buffer) =
01241               dl_htonl (_client_tcp_port->serial_number);
01242             *((u_long *) temp_buffer + 1) =
01243               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01244             if (sendn
01245                 (_client_tcp_port->socket_fd, temp_buffer, 40, 0,
01246                  dtimeout) < 0)
01247               {
01248                 _client_tcp_port->errors++;
01249                 return;
01250               }
01251           }
01252       }
01253       break;
01254 
01255     case REMOTE_CMS_BLOCKING_READ_REQUEST_TYPE:
01256       {
01257         if (NULL == _client_tcp_port->blocking_read_req)
01258           {
01259             _client_tcp_port->blocking_read_req =
01260               new TCPSVR_BLOCKING_READ_REQUEST ();
01261           }
01262         TCPSVR_BLOCKING_READ_REQUEST *blocking_read_req =
01263           _client_tcp_port->blocking_read_req;
01264         blocking_read_req->buffer_number = buffer_number;
01265         blocking_read_req->access_type =
01266           dl_ntohl (*((u_long *) temp_buffer + 3));
01267         blocking_read_req->last_id_read =
01268           dl_ntohl (*((u_long *) temp_buffer + 4));
01269         total_subdivisions = 1;
01270         if (max_total_subdivisions > 1)
01271           {
01272             total_subdivisions =
01273               server->get_total_subdivisions (buffer_number);
01274           }
01275         if (total_subdivisions > 1)
01276           {
01277             if (recvn
01278                 (_client_tcp_port->socket_fd,
01279                  (char *) (((u_long *) temp_buffer) + 5), 8, 0, -1, NULL) < 0)
01280               {
01281                 rcs_print_error
01282                   ("Can not read from client port (%d) from %s\n",
01283                    _client_tcp_port->socket_fd,
01284                    dl_inet_ntoa (_client_tcp_port->address.sin_addr));
01285                 _client_tcp_port->errors++;
01286                 return;
01287               }
01288             blocking_read_req->subdiv =
01289               dl_ntohl (*((u_long *) temp_buffer + 6));
01290           }
01291         else
01292           {
01293             if (recvn
01294                 (_client_tcp_port->socket_fd,
01295                  (char *) (((u_long *) temp_buffer) + 5), 4, 0, -1, NULL) < 0)
01296               {
01297                 rcs_print_error
01298                   ("Can not read from client port (%d) from %s\n",
01299                    _client_tcp_port->socket_fd,
01300                    dl_inet_ntoa (_client_tcp_port->address.sin_addr));
01301                 _client_tcp_port->errors++;
01302                 return;
01303               }
01304           }
01305         blocking_read_req->timeout_millis =
01306           dl_ntohl (*((u_long *) temp_buffer + 5));
01307         blocking_read_req->server = server;
01308         blocking_read_req->remport = this;
01309         _client_tcp_port->blocking = 1;
01310         blocking_read_req->_client_tcp_port = _client_tcp_port;
01311 #if defined(sunos5) && !defined(NO_THREADS)
01312         int thr_retval = thr_create (NULL,      // stack_base, Have system allocate it.
01313                                      0, // stack_size, default = 1Mb
01314                                      tcpsvr_handle_blocking_request,    // start_func
01315                                      blocking_read_req, // arg for start_func
01316                                      THR_BOUND | THR_NEW_LWP,   // flags
01317                                      &(_client_tcp_port->threadId)      // ptr to new-thread-id
01318           );
01319         if (thr_retval != 0)
01320           {
01321             _client_tcp_port->blocking = 1;
01322             rcs_print_error ("thr_create error: thr_retval = %d\n",
01323                              thr_retval);
01324             rcs_print_error ("thr_create error: %d %s\n", errno,
01325                              strerror (errno));
01326             *((u_long *) temp_buffer) =
01327               dl_htonl (_client_tcp_port->serial_number);
01328             *((u_long *) temp_buffer + 1) =
01329               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01330             *((u_long *) temp_buffer + 2) = dl_htonl (0);       /* size */
01331             *((u_long *) temp_buffer + 3) = dl_htonl (0);       /* write_id */
01332             *((u_long *) temp_buffer + 4) = dl_htonl (0);       /* was_read */
01333             sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01334             return;
01335           }
01336         //      rcs_print("Thread spawned (%d)\n", _client_tcp_port->threadId);
01337 #else
01338 #ifdef VXWORKS
01339         _client_tcp_port->threadId = taskSpawn (NULL,   // name, let OS choose the name
01340                                                 cms_server_task_priority,       //priority
01341                                                 VX_FP_TASK,     // options
01342                                                 cms_server_task_stack_size,     // stackSize
01343                                                 ((FUNCPTR) tcpsvr_handle_blocking_request),     // entryPt
01344                                                 blocking_read_req,      // arg for entryPt
01345                                                 0, 0, 0,        // 9 unused args
01346                                                 0, 0, 0, 0, 0, 0);
01347         if (_client_tcp_port->threadId == ERROR)
01348           {
01349             _client_tcp_port->blocking = 1;
01350             rcs_print_error ("taskSpawn error: %d %s\n",
01351                              errno, strerror (errno));
01352             *((u_long *) temp_buffer) =
01353               dl_htonl (_client_tcp_port->serial_number);
01354             *((u_long *) temp_buffer + 1) =
01355               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01356             *((u_long *) temp_buffer + 2) = dl_htonl (0);       /* size */
01357             *((u_long *) temp_buffer + 3) = dl_htonl (0);       /* write_id */
01358             *((u_long *) temp_buffer + 4) = dl_htonl (0);       /* was_read */
01359             sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01360             return;
01361           }
01362 #else
01363 #ifdef POSIX_THREADS
01364         int thr_retval = pthread_create (&(_client_tcp_port->threadId), // ptr to new-thread-id
01365                                          NULL,  // pthread_attr_t *, ptr to attributes
01366                                          tcpsvr_handle_blocking_request,        // start_func
01367                                          blocking_read_req      // arg for start_func
01368           );
01369         if (thr_retval != 0)
01370           {
01371             _client_tcp_port->blocking = 0;
01372             rcs_print_error ("pthread_create error: thr_retval = %d\n",
01373                              thr_retval);
01374             rcs_print_error ("pthread_create error: %d %s\n", errno,
01375                              strerror (errno));
01376             *((u_long *) temp_buffer) =
01377               dl_htonl (_client_tcp_port->serial_number);
01378             *((u_long *) temp_buffer + 1) =
01379               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01380             *((u_long *) temp_buffer + 2) = dl_htonl (0);       /* size */
01381             *((u_long *) temp_buffer + 3) = dl_htonl (0);       /* write_id */
01382             *((u_long *) temp_buffer + 4) = dl_htonl (0);       /* was_read */
01383             sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01384             return;
01385           }
01386 #else
01387 #ifdef NO_THREADS
01388         int fork_ret = fork ();
01389         switch (fork_ret)
01390           {
01391           case 0:               // child
01392             _client_tcp_port->threadId = getpid ();
01393             tcpsvr_handle_blocking_request (blocking_read_req);
01394             exit (0);
01395             break;
01396 
01397           case -1:              // Error
01398 #ifndef UNDER_CE
01399             rcs_print_error ("fork error: %d %s\n", errno, strerror (errno));
01400 #endif
01401             _client_tcp_port->blocking = 0;
01402             *((u_long *) temp_buffer) =
01403               dl_htonl (_client_tcp_port->serial_number);
01404             *((u_long *) temp_buffer + 1) =
01405               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01406             *((u_long *) temp_buffer + 2) = dl_htonl (0);       /* size */
01407             *((u_long *) temp_buffer + 3) = dl_htonl (0);       /* write_id */
01408             *((u_long *) temp_buffer + 4) = dl_htonl (0);       /* was_read */
01409             sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01410             break;
01411 
01412           default:              // parent;
01413             _client_tcp_port->threadId = fork_ret;
01414             break;
01415           }
01416 #else
01417 #ifdef WIN32
01418 #ifndef UNDER_CE
01419         _client_tcp_port->threadId = _beginthread ((void (__cdecl *) (void *)) tcpsvr_handle_blocking_request,  // start_address
01420                                                    0,   // stack_size
01421                                                    blocking_read_req    // arglist
01422           );
01423 #else
01424         unsigned long temp_id;
01425         HANDLE temp_handle = CreateThread (NULL,        // SECURITY_ATTRIBUTES
01426                                            4096,        // stack size
01427                                            tcpsvr_handle_blocking_request,      // thread start routine
01428                                            blocking_read_req,   //  parameter
01429                                            0,   // creation flats
01430                                            &temp_id);   // address to store id
01431         _client_tcp_port->threadId = (int) temp_id;
01432         if (NULL == temp_handle)
01433           {
01434             rcs_print_error ("CreateThread failed. GetLastError = %d\n",
01435                              GetLastError ());
01436           }
01437 #endif
01438         if (_client_tcp_port->threadId < 1)
01439           {
01440             _client_tcp_port->blocking = 0;
01441             rcs_print_sys_error (ERRNO_ERROR_SOURCE, "CreateTask error");
01442             *((u_long *) temp_buffer) =
01443               dl_htonl (_client_tcp_port->serial_number);
01444             *((u_long *) temp_buffer + 1) =
01445               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01446             *((u_long *) temp_buffer + 2) = dl_htonl (0);       /* size */
01447             *((u_long *) temp_buffer + 3) = dl_htonl (0);       /* write_id */
01448             *((u_long *) temp_buffer + 4) = dl_htonl (0);       /* was_read */
01449             sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01450           }
01451 #else
01452 #ifdef SGI
01453         _client_tcp_port->threadId = sproc (tcpsvr_handle_blocking_request,     // entry
01454                                             0,  // inh
01455                                             blocking_read_req   // arglist
01456           );
01457         if (_client_tcp_port->threadId < 1)
01458           {
01459             _client_tcp_port->blocking = 0;
01460             rcs_print_sys_error (ERRNO_ERROR_SOURCE, "sproc error");
01461             *((u_long *) temp_buffer) =
01462               dl_htonl (_client_tcp_port->serial_number);
01463             *((u_long *) temp_buffer + 1) =
01464               dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01465             *((u_long *) temp_buffer + 2) = dl_htonl (0);       /* size */
01466             *((u_long *) temp_buffer + 3) = dl_htonl (0);       /* write_id */
01467             *((u_long *) temp_buffer + 4) = dl_htonl (0);       /* was_read */
01468             sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01469           }
01470 #else
01471         rcs_print_error ("Blocking read not supported on this platform.\n");
01472         *((u_long *) temp_buffer) =
01473           dl_htonl (_client_tcp_port->serial_number);
01474         *((u_long *) temp_buffer + 1) =
01475           dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01476         *((u_long *) temp_buffer + 2) = dl_htonl (0);   /* size */
01477         *((u_long *) temp_buffer + 3) = dl_htonl (0);   /* write_id */
01478         *((u_long *) temp_buffer + 4) = dl_htonl (0);   /* was_read */
01479         sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01480         return;
01481 
01482 // SGI
01483 #endif
01484 // WIN32
01485 #endif
01486 // NO_THREADS
01487 #endif
01488 // POSIX_THREADS
01489 #endif
01490 // VXWORKS
01491 #endif
01492 // sunos5
01493 #endif
01494         tcpsvr_threads_created++;
01495       }
01496       break;
01497 
01498 
01499     case REMOTE_CMS_READ_REQUEST_TYPE:
01500       server->read_req.buffer_number = buffer_number;
01501       server->read_req.access_type = dl_ntohl (*((u_long *) temp_buffer + 3));
01502       server->read_req.last_id_read =
01503         dl_ntohl (*((u_long *) temp_buffer + 4));
01504       server->read_reply =
01505         (REMOTE_READ_REPLY *) server->process_request (&server->read_req);
01506       if (max_total_subdivisions > 1)
01507         {
01508           total_subdivisions = server->get_total_subdivisions (buffer_number);
01509         }
01510       if (total_subdivisions > 1)
01511         {
01512           if (recvn
01513               (_client_tcp_port->socket_fd,
01514                (char *) (((u_long *) temp_buffer) + 5), 4, 0, -1, NULL) < 0)
01515             {
01516               rcs_print_error ("Can not read from client port (%d) from %s\n",
01517                                _client_tcp_port->socket_fd,
01518                                dl_inet_ntoa (_client_tcp_port->address.
01519                                              sin_addr));
01520               _client_tcp_port->errors++;
01521               return;
01522             }
01523           server->read_req.subdiv = dl_ntohl (*((u_long *) temp_buffer + 5));
01524         }
01525       else
01526         {
01527           server->read_req.subdiv = 0;
01528         }
01529       if (NULL == server->read_reply)
01530         {
01531           rcs_print_error ("Server could not process request.\n");
01532           *((u_long *) temp_buffer) =
01533             dl_htonl (_client_tcp_port->serial_number);
01534           *((u_long *) temp_buffer + 1) =
01535             dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01536           *((u_long *) temp_buffer + 2) = dl_htonl (0); /* size */
01537           *((u_long *) temp_buffer + 3) = dl_htonl (0); /* write_id */
01538           *((u_long *) temp_buffer + 4) = dl_htonl (0); /* was_read */
01539           sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01540           return;
01541         }
01542       *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
01543       *((u_long *) temp_buffer + 1) = dl_htonl (server->read_reply->status);
01544       *((u_long *) temp_buffer + 2) = dl_htonl (server->read_reply->size);
01545       *((u_long *) temp_buffer + 3) = dl_htonl (server->read_reply->write_id);
01546       *((u_long *) temp_buffer + 4) = dl_htonl (server->read_reply->was_read);
01547       if (server->read_reply->size < (0x2000 - 20)
01548           && server->read_reply->size > 0)
01549         {
01550           memcpy (temp_buffer + 20, server->read_reply->data,
01551                   server->read_reply->size);
01552           if (sendn
01553               (_client_tcp_port->socket_fd, temp_buffer,
01554                20 + server->read_reply->size, 0, dtimeout) < 0)
01555             {
01556               _client_tcp_port->errors++;
01557               return;
01558             }
01559         }
01560       else
01561         {
01562           if (sendn
01563               (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout) < 0)
01564             {
01565               _client_tcp_port->errors++;
01566               return;
01567             }
01568           if (server->read_reply->size > 0)
01569             {
01570               if (sendn
01571                   (_client_tcp_port->socket_fd, server->read_reply->data,
01572                    server->read_reply->size, 0, dtimeout) < 0)
01573                 {
01574                   _client_tcp_port->errors++;
01575                   return;
01576                 }
01577             }
01578         }
01579       break;
01580 
01581     case REMOTE_CMS_WRITE_REQUEST_TYPE:
01582       server->write_req.buffer_number = buffer_number;
01583       server->write_req.access_type =
01584         dl_ntohl (*((u_long *) temp_buffer + 3));
01585       server->write_req.size = dl_ntohl (*((u_long *) temp_buffer + 4));
01586       total_subdivisions = 1;
01587       if (max_total_subdivisions > 1)
01588         {
01589           total_subdivisions = server->get_total_subdivisions (buffer_number);
01590         }
01591       if (total_subdivisions > 1)
01592         {
01593           if (recvn
01594               (_client_tcp_port->socket_fd,
01595                (char *) (((u_long *) temp_buffer) + 5), 4, 0, -1, NULL) < 0)
01596             {
01597               rcs_print_error ("Can not read from client port (%d) from %s\n",
01598                                _client_tcp_port->socket_fd,
01599                                dl_inet_ntoa (_client_tcp_port->address.
01600                                              sin_addr));
01601               _client_tcp_port->errors++;
01602               return;
01603             }
01604           server->write_req.subdiv = dl_ntohl (*((u_long *) temp_buffer + 5));
01605         }
01606       else
01607         {
01608           server->write_req.subdiv = 0;
01609         }
01610       if (server->write_req.size > 0)
01611         {
01612           if (recvn
01613               (_client_tcp_port->socket_fd, server->write_req.data,
01614                server->write_req.size, 0, -1, NULL) < 0)
01615             {
01616               _client_tcp_port->errors++;
01617               return;
01618             }
01619         }
01620       server->write_reply =
01621         (REMOTE_WRITE_REPLY *) server->process_request (&server->write_req);
01622       if (min_compatible_version < 2.58 && min_compatible_version > 1e-6
01623           || confirm_write)
01624         {
01625           if (NULL == server->write_reply)
01626             {
01627               rcs_print_error ("Server could not process request.\n");
01628               *((u_long *) temp_buffer) =
01629                 dl_htonl (_client_tcp_port->serial_number);
01630               *((u_long *) temp_buffer + 1) =
01631                 dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01632               *((u_long *) temp_buffer + 2) = dl_htonl (0);     /* was_read */
01633               sendn (_client_tcp_port->socket_fd, temp_buffer, 12, 0,
01634                      dtimeout);
01635               return;
01636             }
01637           *((u_long *) temp_buffer) =
01638             dl_htonl (_client_tcp_port->serial_number);
01639           *((u_long *) temp_buffer + 1) =
01640             dl_htonl (server->write_reply->status);
01641           *((u_long *) temp_buffer + 2) =
01642             dl_htonl (server->write_reply->was_read);
01643           if (sendn
01644               (_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) < 0)
01645             {
01646               _client_tcp_port->errors++;
01647             }
01648         }
01649       else
01650         {
01651           if (NULL == server->write_reply)
01652             {
01653               rcs_print_error ("Server could not process request.\n");
01654             }
01655         }
01656       break;
01657 
01658     case REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE:
01659       server->check_if_read_req.buffer_number = buffer_number;
01660       server->check_if_read_req.subdiv =
01661         dl_ntohl (*((u_long *) temp_buffer + 3));
01662       server->check_if_read_reply =
01663         (REMOTE_CHECK_IF_READ_REPLY *) server->process_request (&server->
01664                                                                 check_if_read_req);
01665       if (NULL == server->check_if_read_reply)
01666         {
01667           rcs_print_error ("Server could not process request.\n");
01668           *((u_long *) temp_buffer) =
01669             dl_htonl (_client_tcp_port->serial_number);
01670           *((u_long *) temp_buffer + 1) =
01671             dl_htonl ((unsigned long) CMS_SERVER_SIDE_ERROR);
01672           *((u_long *) temp_buffer + 2) = dl_htonl (0); /* was_read */
01673           sendn (_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout);
01674           return;
01675         }
01676       *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
01677       *((u_long *) temp_buffer + 1) =
01678         dl_htonl (server->check_if_read_reply->status);
01679       *((u_long *) temp_buffer + 2) =
01680         dl_htonl (server->check_if_read_reply->was_read);
01681       if (sendn (_client_tcp_port->socket_fd, temp_buffer, 12, 0, dtimeout) <
01682           0)
01683         {
01684           _client_tcp_port->errors++;
01685         }
01686       break;
01687 
01688     case REMOTE_CMS_CLEAR_REQUEST_TYPE:
01689       server->clear_req.buffer_number = buffer_number;
01690       server->clear_req.subdiv = dl_ntohl (*((u_long *) temp_buffer + 3));
01691       server->clear_reply =
01692         (REMOTE_CLEAR_REPLY *) server->process_request (&server->clear_req);
01693       if (NULL == server->clear_reply)
01694         {
01695           rcs_print_error ("Server could not process request.\n");
01696           *((u_long *) temp_buffer) =
01697             dl_htonl (_client_tcp_port->serial_number);
01698           *((u_long *) temp_buffer + 1) =
01699             dl_htonl ((u_long) CMS_SERVER_SIDE_ERROR);
01700           sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01701           return;
01702         }
01703       *((u_long *) temp_buffer) = dl_htonl (_client_tcp_port->serial_number);
01704       *((u_long *) temp_buffer + 1) = dl_htonl (server->clear_reply->status);
01705       if (sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout) <
01706           0)
01707         {
01708           _client_tcp_port->errors++;
01709         }
01710       break;
01711 
01712     case REMOTE_CMS_CLEAN_REQUEST_TYPE:
01713       server->spawner_pid = server->server_pid;
01714       server->kill_server ();
01715       break;
01716 
01717     case REMOTE_CMS_CLOSE_CHANNEL_REQUEST_TYPE:
01718       client_port_to_check = (CLIENT_TCP_PORT *) client_ports->get_head ();
01719       while (NULL != client_port_to_check)
01720         {
01721           if (client_port_to_check->socket_fd == _client_tcp_port->socket_fd)
01722             {
01723               break;
01724             }
01725           client_port_to_check =
01726             (CLIENT_TCP_PORT *) client_ports->get_next ();
01727         }
01728       RCS_FD_CLR (_client_tcp_port->socket_fd, &read_fd_set);
01729       dl_closesocket (_client_tcp_port->socket_fd);
01730       current_clients--;
01731       if (NULL != _client_tcp_port->subscriptions)
01732         {
01733           remove_subscription_client (_client_tcp_port, buffer_number);
01734         }
01735       _client_tcp_port->socket_fd = -1;
01736       delete _client_tcp_port;
01737       client_ports->delete_current_node ();
01738       break;
01739 
01740 #ifndef UNDER_CE
01741     case REMOTE_CMS_GET_KEYS_REQUEST_TYPE:
01742       server->get_keys_req.buffer_number = buffer_number;
01743       if (recvn (_client_tcp_port->socket_fd,
01744                  server->get_keys_req.name, 16, 0, -1, NULL) < 0)
01745         {
01746           _client_tcp_port->errors++;
01747           return;
01748         }
01749       server->get_keys_reply =
01750         (REMOTE_GET_KEYS_REPLY *) server->process_request (&server->
01751                                                            get_keys_req);
01752       if (NULL == server->get_keys_reply)
01753         {
01754           rcs_print_error ("Server could not process request.\n");
01755           memset (temp_buffer, 0, 20);
01756           *((u_long *) temp_buffer) =
01757             dl_htonl (_client_tcp_port->serial_number);
01758           server->gen_random_key (((char *) temp_buffer) + 4, 2);
01759           server->gen_random_key (((char *) temp_buffer) + 12, 2);
01760           sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01761           return;
01762         }
01763       else
01764         {
01765           *((u_long *) temp_buffer) =
01766             dl_htonl (_client_tcp_port->serial_number);
01767           memcpy (((char *) temp_buffer) + 4, server->get_keys_reply->key1,
01768                   8);
01769           memcpy (((char *) temp_buffer) + 12, server->get_keys_reply->key2,
01770                   8);
01771           /*  successful ? */
01772           sendn (_client_tcp_port->socket_fd, temp_buffer, 20, 0, dtimeout);
01773           return;
01774         }
01775       break;
01776 
01777     case REMOTE_CMS_LOGIN_REQUEST_TYPE:
01778       server->login_req.buffer_number = buffer_number;
01779       if (recvn (_client_tcp_port->socket_fd,
01780                  server->login_req.name, 16, 0, -1, NULL) < 0)
01781         {
01782           _client_tcp_port->errors++;
01783           return;
01784         }
01785       if (recvn (_client_tcp_port->socket_fd,
01786                  server->login_req.passwd, 16, 0, -1, NULL) < 0)
01787         {
01788           _client_tcp_port->errors++;
01789           return;
01790         }
01791       server->login_reply =
01792         (REMOTE_LOGIN_REPLY *) server->process_request (&server->login_req);
01793       if (NULL == server->login_reply)
01794         {
01795           rcs_print_error ("Server could not process request.\n");
01796           *((u_long *) temp_buffer) =
01797             dl_htonl (_client_tcp_port->serial_number);
01798           *((u_long *) temp_buffer + 1) = dl_htonl (0); /* not successful */
01799           sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01800           return;
01801         }
01802       else
01803         {
01804           *((u_long *) temp_buffer) =
01805             dl_htonl (_client_tcp_port->serial_number);
01806           *((u_long *) temp_buffer + 1) =
01807             dl_htonl (server->login_reply->success);
01808           /*  successful ? */
01809           sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01810           return;
01811         }
01812       break;
01813 
01814 #endif
01815 
01816 
01817     case REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE:
01818       server->set_subscription_req.buffer_number = buffer_number;
01819       server->set_subscription_req.subscription_type =
01820         dl_ntohl (*((u_long *) temp_buffer + 3));
01821       server->set_subscription_req.poll_interval_millis =
01822         dl_ntohl (*((u_long *) temp_buffer + 4));
01823       server->set_subscription_reply =
01824         (REMOTE_SET_SUBSCRIPTION_REPLY *) server->process_request (&server->
01825                                                                    set_subscription_req);
01826       if (NULL == server->set_subscription_reply)
01827         {
01828           rcs_print_error ("Server could not process request.\n");
01829           *((u_long *) temp_buffer) =
01830             dl_htonl (_client_tcp_port->serial_number);
01831           *((u_long *) temp_buffer + 1) = dl_htonl (0); /* not successful */
01832           sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01833           return;
01834         }
01835       else
01836         {
01837           if (server->set_subscription_reply->success)
01838             {
01839               if (server->set_subscription_req.subscription_type ==
01840                   CMS_POLLED_SUBSCRIPTION
01841                   || server->set_subscription_req.subscription_type ==
01842                   CMS_VARIABLE_SUBSCRIPTION)
01843                 {
01844                   add_subscription_client (buffer_number,
01845                                            server->set_subscription_req.
01846                                            subscription_type,
01847                                            server->set_subscription_req.
01848                                            poll_interval_millis,
01849                                            _client_tcp_port);
01850                 }
01851               if (server->set_subscription_req.subscription_type ==
01852                   CMS_NO_SUBSCRIPTION)
01853                 {
01854                   remove_subscription_client (_client_tcp_port,
01855                                               buffer_number);
01856                 }
01857             }
01858           *((u_long *) temp_buffer) =
01859             dl_htonl (_client_tcp_port->serial_number);
01860           *((u_long *) temp_buffer + 1) =
01861             dl_htonl (server->set_subscription_reply->success);
01862           /*  successful ? */
01863           sendn (_client_tcp_port->socket_fd, temp_buffer, 8, 0, dtimeout);
01864           return;
01865         }
01866       break;
01867 
01868     default:
01869       _client_tcp_port->errors++;
01870       rcs_print_error ("Unrecognized request type received.(%ld)\n",
01871                        request_type);
01872       break;
01873     }
01874 }
01875 
01876 
01877 
01878 
01879 
01880 
01881 void
01882 CMS_SERVER_REMOTE_TCP_PORT::add_subscription_client (int buffer_number,
01883                                                      int subscription_type,
01884                                                      int poll_interval_millis,
01885                                                      CLIENT_TCP_PORT * clnt)
01886 {
01887   if (NULL == subscription_buffers)
01888     {
01889       subscription_buffers = new RCS_LINKED_LIST ();
01890     }
01891   if (NULL == subscription_buffers)
01892     {
01893       rcs_print_error ("Can`t create subscription_buffers list.\n");
01894     }
01895 
01896   TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
01897     (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
01898   while (NULL != buf_info)
01899     {
01900       if (buf_info->buffer_number == buffer_number)
01901         {
01902           break;
01903         }
01904       buf_info =
01905         (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next ();
01906     }
01907   if (NULL == buf_info)
01908     {
01909       buf_info = new TCP_BUFFER_SUBSCRIPTION_INFO ();
01910       buf_info->buffer_number = buffer_number;
01911       buf_info->sub_clnt_info = new RCS_LINKED_LIST ();
01912       buf_info->list_id =
01913         subscription_buffers->store_at_tail (buf_info, sizeof (*buf_info), 0);
01914     }
01915   buf_info->min_last_id = 0;
01916   if (NULL == clnt->subscriptions)
01917     {
01918       clnt->subscriptions = new RCS_LINKED_LIST ();
01919     }
01920   TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
01921     (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head ();
01922   while (temp_clnt_info != NULL)
01923     {
01924       if (temp_clnt_info->buffer_number == buffer_number)
01925         {
01926           break;
01927         }
01928       temp_clnt_info =
01929         (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next ();
01930     }
01931   if (NULL == temp_clnt_info)
01932     {
01933       temp_clnt_info = new TCP_CLIENT_SUBSCRIPTION_INFO ();
01934       temp_clnt_info->last_sub_sent_time = 0.0;
01935       temp_clnt_info->buffer_number = buffer_number;
01936       temp_clnt_info->subscription_paused = 0;
01937       temp_clnt_info->last_id_read = 0;
01938       temp_clnt_info->sub_buf_info = buf_info;
01939       temp_clnt_info->clnt_port = clnt;
01940       temp_clnt_info->last_sub_sent_time = etime ();
01941       temp_clnt_info->subscription_list_id =
01942         clnt->subscriptions->store_at_tail (temp_clnt_info,
01943                                             sizeof (*temp_clnt_info), 0);
01944       buf_info->sub_clnt_info->store_at_tail (temp_clnt_info,
01945                                               sizeof (*temp_clnt_info), 0);
01946     }
01947   temp_clnt_info->subscription_type = subscription_type;
01948   temp_clnt_info->poll_interval_millis = poll_interval_millis;
01949   recalculate_polling_interval ();
01950 }
01951 
01952 
01953 void
01954 CMS_SERVER_REMOTE_TCP_PORT::remove_subscription_client (CLIENT_TCP_PORT *
01955                                                         clnt,
01956                                                         int buffer_number)
01957 {
01958   TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
01959     (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_head ();
01960   while (temp_clnt_info != NULL)
01961     {
01962       if (temp_clnt_info->buffer_number == buffer_number)
01963         {
01964           if (NULL != temp_clnt_info->sub_buf_info)
01965             {
01966               if (NULL != temp_clnt_info->sub_buf_info->sub_clnt_info)
01967                 {
01968                   temp_clnt_info->sub_buf_info->sub_clnt_info->
01969                     delete_node (temp_clnt_info->subscription_list_id);
01970                   if (temp_clnt_info->sub_buf_info->sub_clnt_info->
01971                       list_size == 0)
01972                     {
01973                       subscription_buffers->delete_node (temp_clnt_info->
01974                                                          sub_buf_info->
01975                                                          list_id);
01976                       delete temp_clnt_info->sub_buf_info->sub_clnt_info;
01977                       temp_clnt_info->sub_buf_info->sub_clnt_info = NULL;
01978                       delete temp_clnt_info->sub_buf_info;
01979                       temp_clnt_info->sub_buf_info = NULL;
01980                     }
01981                 }
01982             }
01983           delete temp_clnt_info;
01984           temp_clnt_info = NULL;
01985           break;
01986         }
01987       temp_clnt_info =
01988         (TCP_CLIENT_SUBSCRIPTION_INFO *) clnt->subscriptions->get_next ();
01989     }
01990   recalculate_polling_interval ();
01991 }
01992 
01993 void
01994 CMS_SERVER_REMOTE_TCP_PORT::recalculate_polling_interval ()
01995 {
01996   int min_poll_interval_millis = 30000;
01997   polling_enabled = 0;
01998   TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
01999     (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
02000   while (NULL != buf_info)
02001     {
02002       TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
02003         (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_head ();
02004       while (temp_clnt_info != NULL)
02005         {
02006           if (temp_clnt_info->poll_interval_millis < min_poll_interval_millis
02007               && temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION)
02008             {
02009               min_poll_interval_millis = temp_clnt_info->poll_interval_millis;
02010               polling_enabled = 1;
02011             }
02012           temp_clnt_info =
02013             (TCP_CLIENT_SUBSCRIPTION_INFO *)
02014             buf_info->sub_clnt_info->get_next ();
02015         }
02016       buf_info =
02017         (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next ();
02018     }
02019   if (min_poll_interval_millis >= ((int) (clk_tck () * 1000.0)))
02020     {
02021       current_poll_interval_millis = min_poll_interval_millis;
02022     }
02023   else
02024     {
02025       current_poll_interval_millis = ((int) (clk_tck () * 1000.0));
02026     }
02027   select_timeout.tv_sec = current_poll_interval_millis / 1000;
02028   select_timeout.tv_usec = (current_poll_interval_millis % 1000) * 1000;
02029   dtimeout = (current_poll_interval_millis + 10) * 1000.0;
02030   if (dtimeout < 0.5)
02031     {
02032       dtimeout = 0.5;
02033     }
02034 }
02035 
02036 
02037 void
02038 CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions ()
02039 {
02040 #if defined(WIN32) && !defined(gnuwin32)
02041   DWORD pid = GetCurrentProcessId ();
02042   DWORD tid = GetCurrentThreadId ();
02043 #else
02044 #ifdef VXWORKS
02045   int pid = taskIdSelf ();
02046   int tid = 0;
02047 #else
02048   pid_t pid = getpid ();
02049   pid_t tid = 0;
02050 #endif
02051 #endif
02052   CMS_SERVER *server;
02053   server = find_server (pid, tid);
02054   if (NULL == server)
02055     {
02056       rcs_print_error
02057         ("CMS_SERVER_REMOTE_TCP_PORT::update_subscriptions Cannot find server object for pid = %d.\n",
02058          pid);
02059       return;
02060     }
02061   if (NULL == subscription_buffers)
02062     {
02063       return;
02064     }
02065   double cur_time = etime ();
02066   TCP_BUFFER_SUBSCRIPTION_INFO *buf_info =
02067     (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_head ();
02068   while (NULL != buf_info)
02069     {
02070       server->read_req.buffer_number = buf_info->buffer_number;
02071       server->read_req.access_type = CMS_READ_ACCESS;
02072       server->read_req.last_id_read = buf_info->min_last_id;
02073       server->read_reply =
02074         (REMOTE_READ_REPLY *) server->process_request (&server->read_req);
02075       if (NULL == server->read_reply)
02076         {
02077           rcs_print_error ("Server could not process request.\n");
02078           buf_info =
02079             (TCP_BUFFER_SUBSCRIPTION_INFO *)
02080             subscription_buffers->get_next ();
02081           continue;
02082         }
02083       if (server->read_reply->write_id == buf_info->min_last_id ||
02084           server->read_reply->size < 1)
02085         {
02086           buf_info =
02087             (TCP_BUFFER_SUBSCRIPTION_INFO *)
02088             subscription_buffers->get_next ();
02089           continue;
02090         }
02091       *((u_long *) temp_buffer) = 0;
02092       *((u_long *) temp_buffer + 1) = dl_htonl (server->read_reply->status);
02093       *((u_long *) temp_buffer + 2) = dl_htonl (server->read_reply->size);
02094       *((u_long *) temp_buffer + 3) = dl_htonl (server->read_reply->write_id);
02095       *((u_long *) temp_buffer + 4) = dl_htonl (server->read_reply->was_read);
02096       TCP_CLIENT_SUBSCRIPTION_INFO *temp_clnt_info =
02097         (TCP_CLIENT_SUBSCRIPTION_INFO *) buf_info->sub_clnt_info->get_head ();
02098       buf_info->min_last_id = server->read_reply->write_id;
02099       while (temp_clnt_info != NULL)
02100         {
02101           double time_diff = cur_time - temp_clnt_info->last_sub_sent_time;
02102           int time_diff_millis = (int) ((double) time_diff * 1000.0);
02103           rcs_print_debug (PRINT_SERVER_SUBSCRIPTION_ACTIVITY,
02104                            "Subscription time_diff_millis=%d\n",
02105                            time_diff_millis);
02106           if (
02107               ((temp_clnt_info->subscription_type == CMS_POLLED_SUBSCRIPTION
02108                 && time_diff_millis + 10 >=
02109                 temp_clnt_info->poll_interval_millis)
02110                || temp_clnt_info->subscription_type ==
02111                CMS_VARIABLE_SUBSCRIPTION)
02112               && temp_clnt_info->last_id_read != server->read_reply->write_id)
02113             {
02114               temp_clnt_info->last_id_read = server->read_reply->write_id;
02115               temp_clnt_info->last_sub_sent_time = cur_time;
02116               temp_clnt_info->clnt_port->serial_number++;
02117               *((u_long *) temp_buffer) =
02118                 dl_htonl (temp_clnt_info->clnt_port->serial_number);
02119               if (server->read_reply->size < 0x2000 - 20
02120                   && server->read_reply->size > 0)
02121                 {
02122                   memcpy (temp_buffer + 20, server->read_reply->data,
02123                           server->read_reply->size);
02124                   if (sendn
02125                       (temp_clnt_info->clnt_port->socket_fd, temp_buffer,
02126                        20 + server->read_reply->size, 0, dtimeout) < 0)
02127                     {
02128                       temp_clnt_info->clnt_port->errors++;
02129                       return;
02130                     }
02131                 }
02132               else
02133                 {
02134                   if (sendn (temp_clnt_info->clnt_port->socket_fd,
02135                              temp_buffer, 20, 0, dtimeout) < 0)
02136                     {
02137                       temp_clnt_info->clnt_port->errors++;
02138                       return;
02139                     }
02140                   if (server->read_reply->size > 0)
02141                     {
02142                       if (sendn (temp_clnt_info->clnt_port->socket_fd,
02143                                  server->read_reply->data,
02144                                  server->read_reply->size, 0, dtimeout) < 0)
02145                         {
02146                           temp_clnt_info->clnt_port->errors++;
02147                           return;
02148                         }
02149                     }
02150                 }
02151             }
02152           if (temp_clnt_info->last_id_read < buf_info->min_last_id)
02153             {
02154               buf_info->min_last_id = temp_clnt_info->last_id_read;
02155             }
02156           temp_clnt_info =
02157             (TCP_CLIENT_SUBSCRIPTION_INFO *)
02158             buf_info->sub_clnt_info->get_next ();
02159         }
02160       buf_info =
02161         (TCP_BUFFER_SUBSCRIPTION_INFO *) subscription_buffers->get_next ();
02162     }
02163 }
02164 
02165 
02166 TCP_BUFFER_SUBSCRIPTION_INFO::TCP_BUFFER_SUBSCRIPTION_INFO ()
02167 {
02168   buffer_number = -1;
02169   min_last_id = 0;
02170   list_id = -1;
02171   sub_clnt_info = NULL;
02172 }
02173 
02174 TCP_BUFFER_SUBSCRIPTION_INFO::~TCP_BUFFER_SUBSCRIPTION_INFO ()
02175 {
02176   buffer_number = -1;
02177   min_last_id = 0;
02178   list_id = -1;
02179   if (NULL != sub_clnt_info)
02180     {
02181       delete sub_clnt_info;
02182       sub_clnt_info = NULL;
02183     }
02184 }
02185 
02186 TCP_CLIENT_SUBSCRIPTION_INFO::TCP_CLIENT_SUBSCRIPTION_INFO ()
02187 {
02188   subscription_type = CMS_NO_SUBSCRIPTION;
02189   poll_interval_millis = 30000;
02190   last_sub_sent_time = 0.0;
02191   subscription_list_id = -1;
02192   buffer_number = -1;
02193   subscription_paused = 0;
02194   last_id_read = 0;
02195   sub_buf_info = NULL;
02196   clnt_port = NULL;
02197 }
02198 
02199 TCP_CLIENT_SUBSCRIPTION_INFO::~TCP_CLIENT_SUBSCRIPTION_INFO ()
02200 {
02201   subscription_type = CMS_NO_SUBSCRIPTION;
02202   poll_interval_millis = 30000;
02203   last_sub_sent_time = 0.0;
02204   subscription_list_id = -1;
02205   buffer_number = -1;
02206   subscription_paused = 0;
02207   last_id_read = 0;
02208   sub_buf_info = NULL;
02209   clnt_port = NULL;
02210 }
02211 
02212 
02213 CLIENT_TCP_PORT::CLIENT_TCP_PORT ()
02214 {
02215   serial_number = 0;
02216   errors = 0;
02217   max_errors = 50;
02218   address.sin_port = 0;
02219   address.sin_family = AF_INET;
02220   address.sin_addr.s_addr = dl_htonl (INADDR_ANY);
02221   socket_fd = -1;
02222   subscriptions = NULL;
02223   tid = -1;
02224   pid = -1;
02225   blocking_read_req = NULL;
02226   threadId = -1;
02227   diag_info = NULL;
02228 }
02229 
02230 CLIENT_TCP_PORT::~CLIENT_TCP_PORT ()
02231 {
02232   if (socket_fd > 0)
02233     {
02234       dl_closesocket (socket_fd);
02235       socket_fd = -1;
02236     }
02237   if (NULL != subscriptions)
02238     {
02239       TCP_CLIENT_SUBSCRIPTION_INFO *sub_info =
02240         (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_head ();
02241       while (NULL != sub_info)
02242         {
02243           delete sub_info;
02244           sub_info =
02245             (TCP_CLIENT_SUBSCRIPTION_INFO *) subscriptions->get_next ();
02246         }
02247       delete subscriptions;
02248       subscriptions = NULL;
02249     }
02250   if (NULL != blocking_read_req)
02251     {
02252       delete blocking_read_req;
02253       blocking_read_req = NULL;
02254     }
02255   if (NULL != diag_info)
02256     {
02257       delete diag_info;
02258       diag_info = NULL;
02259     }
02260 }

Generated on Sun Dec 2 15:56:52 2001 for rcslib by doxygen1.2.11.1 written by Dimitri van Heesch, © 1997-2001