Main Page   Class Hierarchy   Alphabetical List   Data Structures   File List   Data Fields   Globals  

udpmem.cc

Go to the documentation of this file.
00001 // This is neccessary to avoid muliple definitions of fd_set, etc when both
00002 // RPC via PCNFS and Windows Sockets are to be available
00003 #ifdef USE_PCNFS
00004 #undef USE_PCNFS
00005 #endif
00006 
00007 
00008 #include "rcs_defs.hh"          /* EXTERN_C_STD_HEADERS */
00009 
00010 
00011 #ifdef EXTERN_C_STD_HEADERS
00012 extern "C"
00013 {
00014 #endif
00015 
00016 #include <stdlib.h>             /* strtod() */
00017 #ifndef UNDER_CE
00018 #include <errno.h>              /* errno  */
00019 #endif
00020 #include <string.h>             // strstr, memset,
00021 #include <math.h>               // ceil()
00022 #include <ctype.h>              // isdigit()
00023 
00024 #ifdef EXTERN_C_STD_HEADERS
00025 }
00026 #endif
00027 
00028 #include "udpmem.hh"
00029 
00030 #include "udp_opts.hh"          /* SET_UDP_NODELAY */
00031 #include "timer.hh"             /* etime() */
00032 
00033 #include "rem_msg.hh"           /* REMOTE_CMS_READ_REQUEST_TYPE, etc. */
00034 #include "rcs_prnt.hh"          /* rcs_print_error() */
00035 #include "sendmsgt.h"           /* sendmsgt() */
00036 #include "recvmsgt.h"           /* recvmsgt() */
00037 #include "sokintrf.h"           /* load_socket_interface(), unload_socket_interface() */
00038                                /* dl_gethostbyname(), dl_socket(), dl_bind() */
00039                                 // dl_WSAGetLastError()
00040                                 // FIONREAD
00041 
00042 #ifdef UNDER_CE
00043 #include "rcs_ce.h"
00044 #endif
00045 
00046 static int number_of_udpmem_objects;
00047 
00048 UDPMEM::UDPMEM (char *_bufline, char *_procline):
00049 CMS (_bufline, _procline)
00050 {
00051   if (load_socket_interface () < 0)
00052     {
00053       rcs_print_error ("UDPMEM: Can not load socket interface.\n");
00054       status = CMS_LIBRARY_UNAVAILABLE_ERROR;
00055       return;
00056     }
00057   init_variables ();
00058   send_request = 1;
00059   send_broadcast = 0;
00060   number_of_udpmem_objects++;
00061   polling = (((char *) NULL) != strstr (ProcessLine, "poll"));
00062   memset (reply_buffer, 0, 32);
00063   memset (request_buffer, 0, 32);
00064   char *broadcast_eq = 0;
00065   broadcast_eq = strstr (ProcessLine, "broadcast_to_svr=");
00066   if (NULL != broadcast_eq)
00067     {
00068       send_broadcast_addr = dl_inet_addr (broadcast_eq + 10);
00069       send_broadcast = 1;
00070     }
00071   else if (0 != strstr (ProcessLine, "broadcast_to_svr"))
00072     {
00073       char localhostname[80];
00074       if (dl_gethostname (localhostname, 80) < 0)
00075         {
00076           if (strcmp (BufferHost, "localhost") != 0)
00077             {
00078               strncpy (localhostname, BufferHost, 80);
00079             }
00080           else
00081             {
00082               strncpy (localhostname, ProcessHost, 80);
00083             }
00084         }
00085 #ifndef VXWORKS
00086       dl_gethostbyname (localhostname, &broadcast_server_host_entry);
00087       if (NULL == broadcast_server_host_entry)
00088         {
00089           status = CMS_CONFIG_ERROR;
00090           rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00091                            localhostname);
00092           return;
00093         }
00094 #ifdef __MSDOS__
00095       broadcast_server_socket_address.sin_addr.s_addr =
00096         *((u_long *) broadcast_server_host_entry->h_addr_list[0]);
00097 #else
00098       broadcast_server_socket_address.sin_addr.s_addr =
00099         *((int *) broadcast_server_host_entry->h_addr_list[0]);
00100 #endif
00101       broadcast_server_socket_address.sin_family =
00102         broadcast_server_host_entry->h_addrtype;
00103 #else
00104       broadcast_server_socket_address.sin_addr.s_addr =
00105         hostGetByName (localhostname);
00106       if (broadcast_server_socket_address.sin_addr.s_addr == ERROR)
00107         {
00108           rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00109                            localhostname);
00110           status = CMS_CONFIG_ERROR;
00111         }
00112 #endif
00113       broadcast_server_socket_address.sin_addr.s_addr |= dl_htonl (0xff);
00114       send_broadcast_addr = broadcast_server_socket_address.sin_addr.s_addr;
00115       send_broadcast = 1;
00116       rcs_print_debug (PRINT_SOCKET_CONNECT,
00117                        "Broadcasting to IP address %s.\n",
00118                        dl_inet_ntoa
00119                        (broadcast_server_socket_address.sin_addr));
00120     }
00121 
00122   socket_fd = 0;
00123 #ifndef VXWORKS
00124   server_host_entry = NULL;
00125 #endif
00126   serial_number = 0;
00127 
00128   /* Set up the socket address stucture. */
00129   memset (&server_socket_address, 0, sizeof (server_socket_address));
00130   server_socket_address.sin_family = AF_INET;
00131   server_socket_address.sin_port = dl_htons (((u_short) udp_port_number));
00132 
00133   /* Get the IP address of the server using it's BufferHost. */
00134   if (send_broadcast)
00135     {
00136       broadcast_server_socket_address.sin_family = AF_INET;
00137       broadcast_server_socket_address.sin_port =
00138         dl_htons (((u_short) udp_port_number));
00139       broadcast_server_socket_address.sin_addr.s_addr = send_broadcast_addr;;
00140     }
00141 
00142   int hostname_was_address = 0;
00143   if (BufferHost[0] >= '0' && BufferHost[0] <= '9')
00144     {
00145       server_socket_address.sin_addr.s_addr = dl_inet_addr (BufferHost);
00146       if (server_socket_address.sin_addr.s_addr != 0 &&
00147           ((long) server_socket_address.sin_addr.s_addr) != -1)
00148         {
00149           hostname_was_address = 1;
00150         }
00151     }
00152   if (!hostname_was_address)
00153     {
00154 #ifndef VXWORKS
00155       dl_gethostbyname (BufferHost, &server_host_entry);
00156       if (NULL == server_host_entry)
00157         {
00158           status = CMS_CONFIG_ERROR;
00159           rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00160                            BufferHost);
00161           return;
00162         }
00163 #ifdef __MSDOS__
00164       server_socket_address.sin_addr.s_addr =
00165         *((u_long *) server_host_entry->h_addr_list[0]);
00166 #else
00167       server_socket_address.sin_addr.s_addr =
00168         *((int *) server_host_entry->h_addr_list[0]);
00169 #endif
00170       server_socket_address.sin_family = server_host_entry->h_addrtype;
00171 #else
00172       server_socket_address.sin_addr.s_addr = hostGetByName (BufferHost);
00173       if (server_socket_address.sin_addr.s_addr == ERROR)
00174         {
00175           rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00176                            BufferHost);
00177           status = CMS_CONFIG_ERROR;
00178         }
00179 #endif
00180     }
00181   rcs_print_debug (PRINT_SOCKET_CONNECT,
00182                    "Using server on %s with IP address %s.\n", BufferHost,
00183                    dl_inet_ntoa (server_socket_address.sin_addr));
00184   socket_fd = dl_socket (AF_INET, SOCK_DGRAM, 0);
00185   if (socket_fd < 0)
00186     {
00187 #ifndef UNDER_CE
00188       rcs_print_error ("UDPMEM: Error from socket() (errno = %d:%s)\n",
00189                        errno, strerror (errno));
00190 #endif
00191       status = CMS_CREATE_ERROR;
00192       return;
00193     }
00194   if (set_udp_socket_options (socket_fd) < 0)
00195     {
00196       status = CMS_MISC_ERROR;
00197       return;
00198     }
00199   char *sub_info_string = NULL;
00200   poll_interval_millis = 30000;
00201   subscription_type = CMS_NO_SUBSCRIPTION;
00202   cli_addr.sin_port = dl_htons (0);
00203 
00204   if ((min_compatible_version > 3.13 || min_compatible_version < 1e-6)
00205       && !send_broadcast)
00206     {
00207       sub_info_string = strstr (ProcessLine, "sub=");
00208       if (NULL != sub_info_string)
00209         {
00210           if (!strncmp (sub_info_string + 4, "none", 4))
00211             {
00212               subscription_type = CMS_NO_SUBSCRIPTION;
00213             }
00214           else if (!strncmp (sub_info_string + 4, "var", 3))
00215             {
00216               subscription_type = CMS_VARIABLE_SUBSCRIPTION;
00217             }
00218           else
00219             {
00220 #ifndef UNDER_CE
00221               poll_interval_millis =
00222                 ((int) (atof (sub_info_string + 4) * 1000.0));
00223 #else
00224               poll_interval_millis =
00225                 ((int) (RCS_CE_ATOF (sub_info_string + 4) * 1000.0));
00226 #endif
00227               subscription_type = CMS_POLLED_SUBSCRIPTION;
00228             }
00229         }
00230       if (poll_interval_millis < ceil (clk_tck () * 1000.0))
00231         {
00232           poll_interval_millis = (int) ceil (clk_tck () * 1000.0);
00233         }
00234       char *broadcast_clnt_port_eq = strstr (buflineupper, "BROADCAST_PORT=");
00235       if (broadcast_clnt_port_eq != NULL)
00236         {
00237           broadcast_subscriptions = 1;
00238 #ifndef UNDER_CE
00239           broadcast_clnt_port = strtol (broadcast_clnt_port_eq + 15, 0, 0);
00240 #else
00241           broadcast_clnt_port = atol (broadcast_clnt_port_eq + 15);
00242 #endif
00243           cli_addr.sin_port = dl_htons (broadcast_clnt_port);
00244         }
00245     }
00246 
00247   if (send_broadcast)
00248     {
00249       if (make_udp_socket_broadcast (socket_fd) < 0)
00250         {
00251           return;
00252         }
00253     }
00254 
00255   cli_addr.sin_family = AF_INET;
00256   cli_addr.sin_addr.s_addr = dl_htonl (INADDR_ANY);
00257   if (dl_bind (socket_fd, (struct sockaddr *) &cli_addr, sizeof (cli_addr)) <
00258       0)
00259     {
00260 #if defined(_Windows) && !defined(USE_PCNFS) && !defined(gnuwin32)
00261       rcs_print_error ("UDPMEM: bind error %d\n", dl_WSAGetLastError ());
00262 #else
00263       rcs_print_error ("UDPMEM: bind error %d = %s\n", errno,
00264                        strerror (errno));
00265 #endif
00266     }
00267   memset (&request_message_header, 0, sizeof (request_message_header));
00268   request_message_header.msg_name = (caddr_t) & server_socket_address;
00269   request_message_header.msg_namelen = sizeof (server_socket_address);
00270   request_iov2[0].iov_base = request_buffer;
00271   request_iov2[0].iov_len = 20;
00272   if (min_compatible_version > 3.13 || min_compatible_version < 1e-6)
00273     {
00274       request_iov2[0].iov_len = 24;
00275     }
00276   if (min_compatible_version > 3.43 || min_compatible_version < 1e-6)
00277     {
00278       request_iov2[0].iov_len = 28;
00279     }
00280   else
00281     {
00282       total_subdivisions = 1;
00283     }
00284   request_iov2[1].iov_base = (char *) encoded_data;
00285   request_iov2[1].iov_len = max_encoded_message_size;
00286   request_message_header.msg_iov = request_iov2;
00287   request_message_header.msg_iovlen = 2;
00288   memset (&reply_message_header, 0, sizeof (reply_message_header));
00289   reply_message_header.msg_name = (caddr_t) & server_socket_address;
00290   reply_message_header.msg_namelen = sizeof (server_socket_address);
00291   reply_iov2[0].iov_base = reply_buffer;
00292   reply_iov2[0].iov_len = 20;
00293   reply_iov2[1].iov_base = (char *) encoded_data;
00294   reply_iov2[1].iov_len = max_encoded_message_size;
00295   reply_message_header.msg_iov = reply_iov2;
00296   reply_message_header.msg_iovlen = 2;
00297   char *retry_string;
00298   if ((retry_string = strstr (ProcessLine, "retry=")) != NULL)
00299     {
00300       retry_string += strlen ("retry=");
00301 #ifndef UNDER_CE
00302       retry_timeout = strtod (retry_string, (char **) NULL);
00303 #else
00304       retry_timeout = RCS_CE_ATOF (retry_string);
00305 #endif
00306     }
00307   if (retry_timeout > timeout)
00308     retry_timeout = timeout;
00309   reply_size = 0;
00310 
00311 
00312   if ((min_compatible_version > 3.13 || min_compatible_version < 1e-6)
00313       && !send_broadcast)
00314     {
00315       if (verify_bufname () < 0)
00316         {
00317           status = CMS_STATUS_NOT_SET;
00318         }
00319       if (subscription_type != CMS_NO_SUBSCRIPTION)
00320         {
00321           setup_subscription ();
00322         }
00323     }
00324 
00325   if (polling)
00326     {
00327       make_udp_socket_nonblocking (socket_fd);
00328       timeout = 0;
00329       retry_timeout = 0;
00330     }
00331   last_reply_timed_out = 0;
00332 
00333 }
00334 
00335 int
00336 UDPMEM::init_variables ()
00337 {
00338   polling = 0;
00339   get_reply = 1;
00340   last_reply_timed_out = 0;
00341   reply_size = 20;
00342   request_size = 24;
00343   retry_timeout = 0.01;
00344   serial_number = 0;
00345   returned_serial_number = 0;
00346 #ifndef VXWORKS
00347   server_host_entry = 0;
00348   broadcast_server_host_entry = 0;
00349 #endif
00350   memset (&server_socket_address, 0, sizeof (struct sockaddr_in));
00351   memset (&broadcast_server_socket_address, 0, sizeof (struct sockaddr_in));
00352   socket_fd = -1;
00353   memset (request_buffer, 0, 32);
00354   memset (reply_buffer, 0, 32);
00355   memset (&request_message_header, 0, sizeof (struct msghdr));
00356   memset (&reply_message_header, 0, sizeof (struct msghdr));
00357   send_broadcast = 0;
00358   send_broadcast_addr = 0xFF;
00359   recieve_broadcast = 0;
00360   recieve_broadcast_port = 0;
00361   subscription_type = CMS_NO_SUBSCRIPTION;
00362   poll_interval_millis = 30;
00363   subscription_id = -1;
00364   send_request = 1;
00365   memset (&cli_addr, 0, sizeof (struct sockaddr_in));
00366   memset (&request_iov2, 0, 2 * sizeof (struct iovec));
00367   memset (&reply_iov2, 0, 2 * sizeof (struct iovec));
00368   broadcast_subscriptions = 0;
00369   broadcast_clnt_port = 0;
00370   return 0;
00371 }
00372 
00373 
00374 
00375 UDPMEM::~UDPMEM ()
00376 {
00377   if (socket_fd > 0)
00378     {
00379       if (subscription_type != CMS_NO_SUBSCRIPTION)
00380         {
00381           cancel_subscription ();
00382         }
00383 
00384       if (delete_totally)
00385         {
00386           *((u_long *) request_buffer) =
00387             dl_htonl ((u_long) REMOTE_CMS_CLEAN_REQUEST_TYPE);
00388           *((u_long *) request_buffer + 1) =
00389             dl_htonl ((u_long) buffer_number);
00390           request_iov2[0].iov_len = 20;
00391           if (
00392               (min_compatible_version > 3.13
00393                || min_compatible_version < 1e-6))
00394             {
00395               request_iov2[0].iov_len = 24;
00396             }
00397           request_message_header.msg_iovlen = 1;
00398           sendmsgt (socket_fd, &request_message_header, 0, timeout);
00399         }
00400       dl_closesocket (socket_fd);
00401       socket_fd = 0;
00402     }
00403 #if defined(_Windows) && !defined(USE_PCNFS) && !defined(gnuwin32)
00404   if (number_of_udpmem_objects == 1)
00405     {
00406       free_sendmsg_collection_buffer ();
00407       free_recvmsg_collection_buffer ();
00408       unload_socket_interface ();
00409     }
00410 #endif
00411   number_of_udpmem_objects--;
00412 }
00413 
00414 CMS_STATUS
00415 UDPMEM::read ()
00416 {
00417   long message_size, id;
00418   get_reply = 1;
00419   send_request = (subscription_type == CMS_NO_SUBSCRIPTION);
00420   if (socket_fd <= 0)
00421     {
00422       rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00423                        socket_fd);
00424       return (status = CMS_MISC_ERROR);
00425     }
00426   if (!last_reply_timed_out)
00427     {
00428       serial_number++;
00429     }
00430   *((u_long *) request_buffer) =
00431     dl_htonl ((u_long) REMOTE_CMS_READ_REQUEST_TYPE);
00432   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00433   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00434   *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_READ_ACCESS);
00435   *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00436   if (total_subdivisions > 1)
00437     {
00438       *((u_long *) request_buffer + 6) =
00439         dl_htonl ((u_long) current_subdivision);
00440     }
00441   request_iov2[1].iov_len = 0;
00442   request_iov2[1].iov_base = (caddr_t) NULL;
00443   request_message_header.msg_iovlen = 1;
00444   reply_iov2[1].iov_len = max_encoded_message_size;
00445   reply_iov2[1].iov_base = (caddr_t) encoded_data;
00446   reply_message_header.msg_iovlen = 2;
00447   if (call_on_server () < 0)
00448     {
00449       rcs_print_error ("UDPMEM: read failed.\n");
00450       reply_message_header.msg_name = (caddr_t) & cli_addr;
00451       return (status = CMS_MISC_ERROR);
00452     }
00453   reply_message_header.msg_name = (caddr_t) & cli_addr;
00454   if (!last_reply_timed_out)
00455     {
00456       status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00457       message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00458       if (message_size + 20 != reply_size && !polling)
00459         {
00460           rcs_print_error ("UDPMEM: message_size+20 = %ld\n",
00461                            message_size + 20);
00462           rcs_print_error ("UDPMEM: reply_size = %ld\n", reply_size);
00463         }
00464       id = dl_ntohl (*((u_long *) reply_buffer + 3));
00465       header.was_read = dl_ntohl (*((u_long *) reply_buffer + 4));
00466       check_id (id);
00467     }
00468   else
00469     {
00470       if (polling)
00471         {
00472           status = CMS_READ_OLD;
00473         }
00474     }
00475   return (status);
00476 }
00477 
00478 CMS_STATUS
00479 UDPMEM::blocking_read (double _blocking_timeout)
00480 {
00481   long message_size, id;
00482   get_reply = 1;
00483   blocking_timeout = _blocking_timeout;
00484   double orig_timeout = timeout;
00485   timeout = blocking_timeout;
00486   int orig_polling = polling;
00487   if (polling)
00488     {
00489       polling = 0;
00490       make_udp_socket_blocking (socket_fd);
00491     }
00492   int orig_subscription_type = subscription_type;
00493   if (CMS_NO_SUBSCRIPTION == subscription_type)
00494     {
00495       subscription_type = CMS_POLLED_SUBSCRIPTION;
00496       if (blocking_timeout > 0.0)
00497         {
00498           poll_interval_millis = (int) (blocking_timeout * 100);
00499           if (poll_interval_millis < 10)
00500             {
00501               poll_interval_millis = 10;
00502             }
00503           if (poll_interval_millis > 1000)
00504             {
00505               poll_interval_millis = 1000;
00506             }
00507         }
00508       else
00509         {
00510           poll_interval_millis = 30;
00511         }
00512       setup_subscription ();
00513     }
00514   double orig_retry_timeout = retry_timeout;
00515   retry_timeout = timeout = blocking_timeout;
00516   send_request = 0;
00517   if (socket_fd <= 0)
00518     {
00519       rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00520                        socket_fd);
00521       retry_timeout = orig_retry_timeout;
00522       return (status = CMS_MISC_ERROR);
00523     }
00524   if (!last_reply_timed_out)
00525     {
00526       serial_number++;
00527     }
00528   *((u_long *) request_buffer) =
00529     dl_htonl ((u_long) REMOTE_CMS_READ_REQUEST_TYPE);
00530   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00531   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00532   *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_READ_ACCESS);
00533   *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00534   if (total_subdivisions > 1)
00535     {
00536       *((u_long *) request_buffer + 6) =
00537         dl_htonl ((u_long) current_subdivision);
00538     }
00539   request_iov2[1].iov_len = 0;
00540   request_iov2[1].iov_base = (caddr_t) NULL;
00541   request_message_header.msg_iovlen = 1;
00542   reply_iov2[1].iov_len = max_encoded_message_size;
00543   reply_iov2[1].iov_base = (caddr_t) encoded_data;
00544   reply_message_header.msg_iovlen = 2;
00545   status = CMS_READ_OLD;
00546   double start_time = 0.0;
00547   if (timeout > 1e-6)
00548     {
00549       start_time = etime ();
00550     }
00551   double time_diff = 0.0;
00552   while (status == CMS_READ_OLD && (timeout < 0.0 || time_diff > timeout))
00553     {
00554       if (call_on_server () < 0)
00555         {
00556           rcs_print_error ("UDPMEM: read failed.\n");
00557           timeout = orig_timeout;
00558           polling = orig_polling;
00559           retry_timeout = orig_retry_timeout;
00560           reply_message_header.msg_name = (caddr_t) & cli_addr;
00561           if (orig_subscription_type == CMS_NO_SUBSCRIPTION)
00562             {
00563               cancel_subscription ();
00564             }
00565           subscription_type = orig_subscription_type;
00566           return (status = CMS_MISC_ERROR);
00567         }
00568       reply_message_header.msg_name = (caddr_t) & cli_addr;
00569       if (!last_reply_timed_out)
00570         {
00571           status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00572           message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00573           if (message_size + 20 != reply_size && !polling)
00574             {
00575               rcs_print_error ("UDPMEM: message_size+20 = %ld\n",
00576                                message_size + 20);
00577               rcs_print_error ("UDPMEM: reply_size = %ld\n", reply_size);
00578             }
00579           id = dl_ntohl (*((u_long *) reply_buffer + 3));
00580           header.was_read = dl_ntohl (*((u_long *) reply_buffer + 4));
00581           check_id (id);
00582         }
00583       if (timeout > 1e-6)
00584         {
00585           time_diff = etime () - start_time;
00586         }
00587     }
00588   retry_timeout = orig_retry_timeout;
00589   timeout = orig_timeout;
00590   polling = orig_polling;
00591   if (orig_subscription_type == CMS_NO_SUBSCRIPTION)
00592     {
00593       cancel_subscription ();
00594     }
00595   subscription_type = orig_subscription_type;
00596   return (status);
00597 }
00598 
00599 CMS_STATUS
00600 UDPMEM::peek ()
00601 {
00602   long message_size, id;
00603   get_reply = 1;
00604   send_request = (subscription_type == CMS_NO_SUBSCRIPTION);
00605   if (socket_fd <= 0)
00606     {
00607       rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00608                        socket_fd);
00609       return (status = CMS_MISC_ERROR);
00610     }
00611   if (!last_reply_timed_out)
00612     {
00613       serial_number++;
00614     }
00615   *((u_long *) request_buffer) =
00616     dl_htonl ((u_long) REMOTE_CMS_READ_REQUEST_TYPE);
00617   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00618   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00619   *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_PEEK_ACCESS);
00620   *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00621   if (total_subdivisions > 1)
00622     {
00623       *((u_long *) request_buffer + 6) =
00624         dl_htonl ((u_long) current_subdivision);
00625     }
00626   request_iov2[1].iov_len = 0;
00627   request_iov2[1].iov_base = (caddr_t) NULL;
00628   request_message_header.msg_iovlen = 1;
00629   reply_iov2[1].iov_len = max_encoded_message_size;
00630   reply_iov2[1].iov_base = (caddr_t) encoded_data;
00631   reply_message_header.msg_iovlen = 2;
00632   if (call_on_server () < 0)
00633     {
00634       rcs_print_error ("UDPMEM: peek failed.\n");
00635       reply_message_header.msg_name = (caddr_t) & cli_addr;
00636       return (status = CMS_MISC_ERROR);
00637     }
00638   reply_message_header.msg_name = (caddr_t) & cli_addr;
00639   if (!last_reply_timed_out)
00640     {
00641       status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00642       message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00643       if (message_size + 20 != reply_size && !polling)
00644         {
00645           rcs_print_error ("UDPMEM: message_size+20 = %ld\n",
00646                            message_size + 20);
00647           rcs_print_error ("UDPMEM: reply_size = %ld\n", reply_size);
00648         }
00649       id = dl_ntohl (*((u_long *) reply_buffer + 3));
00650       header.was_read = dl_ntohl (*((u_long *) reply_buffer + 4));
00651       check_id (id);
00652     }
00653   else
00654     {
00655       if (polling)
00656         {
00657           status = CMS_READ_OLD;
00658         }
00659     }
00660   return (status);
00661 }
00662 
00663 
00664 int
00665 UDPMEM::setup_subscription ()
00666 {
00667   get_reply = 1;
00668   send_request = 1;
00669   double orig_retry_timeout = retry_timeout;
00670   if (retry_timeout < 0.0 || retry_timeout > (poll_interval_millis / 1000.0))
00671     {
00672       retry_timeout = (poll_interval_millis / 1000.0);
00673     }
00674   if (retry_timeout < clk_tck () * 2.0)
00675     {
00676       retry_timeout = clk_tck () * 2.0;
00677     }
00678   if (socket_fd <= 0)
00679     {
00680       rcs_print_error
00681         ("UDPMEM::setup_subscription: Invalid socket descriptor. (%d)\n",
00682          socket_fd);
00683       return (-1);
00684     }
00685   last_reply_timed_out = 0;
00686   serial_number++;
00687   *((u_long *) request_buffer) =
00688     dl_htonl ((u_long) REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE);
00689   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00690   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00691   *((u_long *) request_buffer + 3) = dl_htonl ((u_long) subscription_type);
00692   *((u_long *) request_buffer + 4) = dl_htonl ((u_long) poll_interval_millis);
00693   *((u_long *) request_buffer + 5) = dl_htonl ((u_long) in_buffer_id);
00694   if (total_subdivisions > 1)
00695     {
00696       *((u_long *) request_buffer + 6) =
00697         dl_htonl ((u_long) current_subdivision);
00698     }
00699   request_iov2[1].iov_len = 0;
00700   request_iov2[1].iov_base = (caddr_t) NULL;
00701   request_message_header.msg_iovlen = 1;
00702   reply_iov2[1].iov_len = 0;
00703   reply_iov2[1].iov_base = (caddr_t) 0;
00704   reply_message_header.msg_iovlen = 1;
00705   int orig_polling = polling;
00706   double orig_timeout = timeout;
00707   if (orig_timeout <= 0.0)
00708     {
00709       timeout = 5.0;
00710     }
00711   polling = 0;
00712   if (call_on_server () < 0)
00713     {
00714       retry_timeout = orig_retry_timeout;
00715       rcs_print_error ("UDPMEM: Can't setup subscription.\n");
00716       polling = orig_polling;
00717       timeout = orig_timeout;
00718       last_reply_timed_out = 0;
00719       subscription_type = CMS_NO_SUBSCRIPTION;
00720       subscription_id = -1;
00721       serial_number++;
00722       return (0);
00723     }
00724   retry_timeout = orig_retry_timeout;
00725   last_reply_timed_out = 0;
00726   serial_number++;
00727   polling = orig_polling;
00728   timeout = orig_timeout;
00729   if (!last_reply_timed_out)
00730     {
00731       long success = dl_htonl (*((u_long *) reply_buffer + 1));
00732       if (success == 1)
00733         {
00734           subscription_id = dl_htonl (*((u_long *) reply_buffer + 2));
00735         }
00736       else
00737         {
00738           rcs_print_error ("UDPMEM: Can't setup subscription.\n");
00739           subscription_type = CMS_NO_SUBSCRIPTION;
00740           subscription_id = -1;
00741         }
00742     }
00743   return (0);
00744 }
00745 
00746 
00747 int
00748 UDPMEM::cancel_subscription ()
00749 {
00750   get_reply = 1;
00751   send_request = 1;
00752   double orig_retry_timeout = retry_timeout;
00753   if (retry_timeout < 0.0 || retry_timeout > (poll_interval_millis / 1000.0))
00754     {
00755       retry_timeout = (poll_interval_millis / 1000.0);
00756     }
00757   if (retry_timeout < clk_tck () * 2.0)
00758     {
00759       retry_timeout = clk_tck () * 2.0;
00760     }
00761   if (socket_fd <= 0)
00762     {
00763       rcs_print_error
00764         ("UDPMEM::cancel_subscription: Invalid socket descriptor. (%d)\n",
00765          socket_fd);
00766       retry_timeout = orig_retry_timeout;
00767       return (-1);
00768     }
00769   make_udp_socket_blocking (socket_fd);
00770   last_reply_timed_out = 0;
00771   serial_number += 10000 / poll_interval_millis;
00772   serial_number += 10;
00773   *((u_long *) request_buffer) =
00774     dl_htonl ((u_long) REMOTE_CMS_CANCEL_SUBSCRIPTION_REQUEST_TYPE);
00775   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00776   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00777   *((u_long *) request_buffer + 3) = dl_htonl ((u_long) subscription_id);
00778   if (total_subdivisions > 1)
00779     {
00780       *((u_long *) request_buffer + 6) =
00781         dl_htonl ((u_long) current_subdivision);
00782     }
00783   request_iov2[1].iov_len = 0;
00784   request_iov2[1].iov_base = (caddr_t) NULL;
00785   request_message_header.msg_iovlen = 1;
00786   reply_iov2[1].iov_len = 0;
00787   reply_iov2[1].iov_base = (caddr_t) 0;
00788   reply_message_header.msg_iovlen = 1;
00789   int orig_polling = polling;
00790   double orig_timeout = timeout;
00791   if (orig_timeout <= 10.0)
00792     {
00793       timeout = 10.0;
00794     }
00795   polling = 0;
00796   if (call_on_server () < 0)
00797     {
00798       retry_timeout = orig_retry_timeout;
00799       rcs_print_error ("UDPMEM: Can't cancel subscription.\n");
00800       polling = orig_polling;
00801       timeout = orig_timeout;
00802       last_reply_timed_out = 0;
00803       subscription_type = CMS_NO_SUBSCRIPTION;
00804       subscription_id = -1;
00805       serial_number++;
00806       return (0);
00807     }
00808   retry_timeout = orig_retry_timeout;
00809   last_reply_timed_out = 0;
00810   serial_number++;
00811   polling = orig_polling;
00812   timeout = orig_timeout;
00813 
00814   if (!last_reply_timed_out)
00815     {
00816       long success = dl_htonl (*((u_long *) reply_buffer + 1));
00817       if (success == 1)
00818         {
00819           if (subscription_id !=
00820               ((long) dl_htonl (*((u_long *) reply_buffer + 2))))
00821             {
00822               rcs_print_error
00823                 ("UDPMEM: Can't cancel subscription. (Incorrect subscription id returned.)\n");
00824               subscription_type = CMS_NO_SUBSCRIPTION;
00825               subscription_id = -1;
00826             }
00827         }
00828       else
00829         {
00830           rcs_print_error ("UDPMEM: Can't cancel subscription.\n");
00831           subscription_type = CMS_NO_SUBSCRIPTION;
00832           subscription_id = -1;
00833         }
00834     }
00835   return (0);
00836 }
00837 
00838 
00839 int
00840 UDPMEM::verify_bufname ()
00841 {
00842   long message_size;
00843   get_reply = 1;
00844   send_request = 1;
00845   char bufname_from_server[80];
00846   if (socket_fd <= 0)
00847     {
00848       rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00849                        socket_fd);
00850       status = CMS_MISC_ERROR;
00851       return (-1);
00852     }
00853   last_reply_timed_out = 0;
00854   serial_number++;
00855   *((u_long *) request_buffer) =
00856     dl_htonl ((u_long) REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE);
00857   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00858   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00859   *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_PEEK_ACCESS);
00860   *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00861   if (total_subdivisions > 1)
00862     {
00863       *((u_long *) request_buffer + 6) =
00864         dl_htonl ((u_long) current_subdivision);
00865     }
00866   request_iov2[1].iov_len = 0;
00867   request_iov2[1].iov_base = (caddr_t) NULL;
00868   request_message_header.msg_iovlen = 1;
00869   reply_iov2[1].iov_len = 80;
00870   reply_iov2[1].iov_base = (caddr_t) bufname_from_server;
00871   reply_message_header.msg_iovlen = 2;
00872   int orig_polling = polling;
00873   double orig_timeout = timeout;
00874   if (orig_timeout <= 0.0)
00875     {
00876       timeout = 5.0;
00877     }
00878   polling = 0;
00879   if (call_on_server () < 0)
00880     {
00881       rcs_print_error
00882         ("UDPMEM: Failed to verify BufferName with server (Server may not be running.)\n");
00883       polling = orig_polling;
00884       timeout = orig_timeout;
00885       last_reply_timed_out = 0;
00886       serial_number++;
00887       status = CMS_NO_SERVER_ERROR;
00888       return (-1);
00889     }
00890   last_reply_timed_out = 0;
00891   serial_number++;
00892   polling = orig_polling;
00893   timeout = orig_timeout;
00894   if (!last_reply_timed_out)
00895     {
00896       status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00897       message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00898       if (message_size > 100)
00899         {
00900           rcs_print_error
00901             ("UDPMEM: message_size = %ld exceeds maximum for GET_BUF_NAME reply.\n",
00902              message_size + 20);
00903           status = CMS_MISC_ERROR;
00904           return (-1);
00905         }
00906       if (0 != strncmp (BufferName, bufname_from_server, 80))
00907         {
00908           rcs_print_error
00909             ("UDPMEM: Buffer name retrieved from server %s at IP address %s for buffer number %d was %s but %s was expected.\n",
00910              BufferHost, dl_inet_ntoa (server_socket_address.sin_addr),
00911              buffer_number,
00912              ((0 != bufname_from_server[0]) ? bufname_from_server : "blank"),
00913              BufferName);
00914           if (0 != bufname_from_server[0] && 0 != BufferName[0])
00915             {
00916               status = CMS_RESOURCE_CONFLICT_ERROR;
00917               return (-1);
00918             }
00919         }
00920     }
00921   else
00922     {
00923       status = CMS_NO_SERVER_ERROR;
00924       return (-1);
00925     }
00926   return (0);
00927 }
00928 
00929 void
00930 throwaway_extra_data_on_socket (int socket_fd)
00931 {
00932   int count = 0;
00933   if (socket_fd > 0)
00934     {
00935       struct msghdr temp_header;
00936       while (recvmsgtq (socket_fd, &temp_header, 0, 0.0) > 0 && count < 10)
00937         count++;
00938     }
00939 }
00940 
00941 int
00942 UDPMEM::call_on_server ()
00943 {
00944   double start_time, current_time;
00945   start_time = etime ();
00946   last_reply_timed_out = 0;
00947   double last_time_sent = 0.0;
00948   double time_diff = 0.0;
00949   int first_time = 1;
00950   while (1)
00951     {
00952       if (send_broadcast)
00953         {
00954           if (get_reply)
00955             {
00956               request_message_header.msg_name =
00957                 (caddr_t) & server_socket_address;
00958               request_message_header.msg_namelen =
00959                 sizeof (server_socket_address);
00960             }
00961           else
00962             {
00963               request_message_header.msg_name =
00964                 (caddr_t) & broadcast_server_socket_address;
00965               request_message_header.msg_namelen =
00966                 sizeof (broadcast_server_socket_address);
00967             }
00968         }
00969       if (send_request)
00970         {
00971           time_diff = etime () - last_time_sent;
00972           if (first_time || time_diff >= retry_timeout)
00973             {
00974               if (
00975                   (request_size =
00976                    sendmsgt (socket_fd, &request_message_header, 0,
00977                              retry_timeout)) < 0)
00978                 {
00979                   if (!sendmsgt_timed_out)
00980                     return (-1);
00981                 }
00982               last_time_sent = etime ();
00983               first_time = 0;
00984             }
00985         }
00986       if (!get_reply)
00987         {
00988           last_reply_timed_out = 0;
00989           return 0;
00990         }
00991       else
00992         {
00993           if (
00994               (reply_size =
00995                recvmsgtq (socket_fd, &reply_message_header, 0,
00996                           retry_timeout)) < 0)
00997             {
00998               if (!recvmsgt_timed_out)
00999                 return (-1);
01000             }
01001           returned_serial_number = dl_ntohl (*((u_long *) reply_buffer));
01002           if (returned_serial_number == serial_number)
01003             {
01004               last_reply_timed_out = 0;
01005               throwaway_extra_data_on_socket (socket_fd);
01006               return 0;
01007             }
01008           // If we have a subscription, force the messages to the correct order.
01009           if (!send_request &&
01010               (returned_serial_number > serial_number ||
01011                (serial_number > 1000 && returned_serial_number < 500)))
01012             {
01013               serial_number = returned_serial_number;
01014               last_reply_timed_out = 0;
01015               return 0;
01016             }
01017           if (!polling)
01018             {
01019               current_time = etime ();
01020               if (current_time - start_time > timeout && timeout >= 1e-6)
01021                 {
01022                   rcs_print_error
01023                     ("UDPMEM: time out error after %f seconds.\n",
01024                      current_time - start_time);
01025                   last_reply_timed_out = 1;
01026                   return -1;
01027                 }
01028             }
01029           else
01030             {
01031               last_reply_timed_out = 1;
01032               return 0;
01033             }
01034         }
01035     }
01036   throwaway_extra_data_on_socket (socket_fd);
01037   last_reply_timed_out = 0;
01038   return (0);
01039 }
01040 
01041 
01042 CMS_STATUS
01043 UDPMEM::write (void *user_data)
01044 {
01045   get_reply = 0;
01046   send_request = 1;
01047 
01048   if (!force_raw)
01049     {
01050       user_data = encoded_data;
01051     }
01052 
01053   if (socket_fd <= 0)
01054     {
01055       rcs_print_error ("UDPMEM::write: Invalid socket descriptor. (%d)\n",
01056                        socket_fd);
01057       return (status = CMS_MISC_ERROR);
01058     }
01059   serial_number++;
01060   *((u_long *) request_buffer) =
01061     dl_htonl ((u_long) REMOTE_CMS_WRITE_REQUEST_TYPE);
01062   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01063   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01064   *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_WRITE_ACCESS);
01065   *((u_long *) request_buffer + 4) =
01066     dl_htonl ((u_long) header.in_buffer_size);
01067   if (total_subdivisions > 1)
01068     {
01069       *((u_long *) request_buffer + 6) =
01070         dl_htonl ((u_long) current_subdivision);
01071     }
01072   request_iov2[1].iov_len = header.in_buffer_size;
01073   request_iov2[1].iov_base = (caddr_t) user_data;
01074   request_message_header.msg_iovlen = 2;
01075   reply_iov2[1].iov_len = 0;
01076   reply_iov2[1].iov_base = (caddr_t) NULL;
01077   reply_message_header.msg_iovlen = 1;
01078   if (call_on_server () < 0)
01079     {
01080       rcs_print_error ("UDPMEM: write failed.\n");
01081       return (status = CMS_MISC_ERROR);
01082     }
01083   status = CMS_WRITE_OK;
01084   header.was_read = 0;
01085   return (status);
01086 }
01087 
01088 CMS_STATUS
01089 UDPMEM::write_if_read (void *user_data)
01090 {
01091   get_reply = 0;
01092   send_request = 1;
01093   if (!force_raw)
01094     {
01095       user_data = encoded_data;
01096     }
01097 
01098   if (socket_fd <= 0)
01099     {
01100       rcs_print_error ("UDPMEM::write: Invalid socket descriptor. (%d)\n",
01101                        socket_fd);
01102       return (status = CMS_MISC_ERROR);
01103     }
01104   serial_number++;
01105   *((u_long *) request_buffer) =
01106     dl_htonl ((u_long) REMOTE_CMS_WRITE_REQUEST_TYPE);
01107   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01108   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01109   *((u_long *) request_buffer + 3) =
01110     dl_htonl ((u_long) CMS_WRITE_IF_READ_ACCESS);
01111   *((u_long *) request_buffer + 4) =
01112     dl_htonl ((u_long) header.in_buffer_size);
01113   if (total_subdivisions > 1)
01114     {
01115       *((u_long *) request_buffer + 6) =
01116         dl_htonl ((u_long) current_subdivision);
01117     }
01118   request_iov2[1].iov_len = header.in_buffer_size;
01119   request_iov2[1].iov_base = (caddr_t) user_data;
01120   request_message_header.msg_iovlen = 2;
01121   reply_iov2[1].iov_len = 0;
01122   reply_iov2[1].iov_base = (caddr_t) NULL;
01123   reply_message_header.msg_iovlen = 1;
01124   if (call_on_server () < 0)
01125     {
01126       rcs_print_error ("UDPMEM: write_if_read failed.\n");
01127       return (status = CMS_MISC_ERROR);
01128     }
01129   status = CMS_WRITE_OK;
01130   header.was_read = 0;
01131   return (status);
01132 }
01133 
01134 int
01135 UDPMEM::check_if_read ()
01136 {
01137   get_reply = 1;
01138   send_request = 1;
01139   if (socket_fd <= 0)
01140     {
01141       rcs_print_error
01142         ("UDPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
01143          socket_fd);
01144       return (status = CMS_MISC_ERROR);
01145     }
01146   serial_number++;
01147   *((u_long *) request_buffer) =
01148     dl_htonl ((u_long) REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE);
01149   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01150   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01151   if (total_subdivisions > 1)
01152     {
01153       *((u_long *) request_buffer + 6) =
01154         dl_htonl ((u_long) current_subdivision);
01155     }
01156   request_iov2[1].iov_len = 0;
01157   request_iov2[1].iov_base = (caddr_t) NULL;
01158   request_message_header.msg_iovlen = 1;
01159   reply_iov2[1].iov_len = 0;
01160   reply_iov2[1].iov_base = (caddr_t) NULL;
01161   reply_message_header.msg_iovlen = 1;
01162   if (call_on_server () < 0)
01163     {
01164       rcs_print_error ("UDPMEM: check_if_read failed.\n");
01165       return (status = CMS_MISC_ERROR);
01166     }
01167   status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
01168   header.was_read = dl_ntohl (*((u_long *) reply_buffer + 2));
01169   if (last_reply_timed_out)
01170     {
01171       header.was_read = 0;
01172     }
01173   return (header.was_read);
01174 }
01175 
01176 
01177 CMS_STATUS
01178 UDPMEM::clear ()
01179 {
01180   send_request = 1;
01181   get_reply = 0;
01182   if (socket_fd <= 0)
01183     {
01184       rcs_print_error
01185         ("UDPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
01186          socket_fd);
01187       return (status = CMS_MISC_ERROR);
01188     }
01189   *((u_long *) request_buffer) =
01190     dl_htonl ((u_long) REMOTE_CMS_CLEAR_REQUEST_TYPE);
01191   *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01192   *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01193   if (total_subdivisions > 1)
01194     {
01195       *((u_long *) request_buffer + 6) =
01196         dl_htonl ((u_long) current_subdivision);
01197     }
01198   request_iov2[1].iov_base = (caddr_t) NULL;
01199   request_message_header.msg_iovlen = 1;
01200   reply_iov2[1].iov_len = 0;
01201   reply_iov2[1].iov_base = (caddr_t) NULL;
01202   reply_message_header.msg_iovlen = 1;
01203   if (call_on_server () < 0)
01204     {
01205       return (status = CMS_MISC_ERROR);
01206     }
01207   status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));      /*  */
01208   return (status);
01209 }

Generated on Sun Dec 2 15:56:53 2001 for rcslib by doxygen1.2.11.1 written by Dimitri van Heesch, © 1997-2001