00001
00002
00003 #ifdef USE_PCNFS
00004 #undef USE_PCNFS
00005 #endif
00006
00007
00008 #include "rcs_defs.hh"
00009
00010
00011 #ifdef EXTERN_C_STD_HEADERS
00012 extern "C"
00013 {
00014 #endif
00015
00016 #include <stdlib.h>
00017 #ifndef UNDER_CE
00018 #include <errno.h>
00019 #endif
00020 #include <string.h>
00021 #include <math.h>
00022 #include <ctype.h>
00023
00024 #ifdef EXTERN_C_STD_HEADERS
00025 }
00026 #endif
00027
00028 #include "udpmem.hh"
00029
00030 #include "udp_opts.hh"
00031 #include "timer.hh"
00032
00033 #include "rem_msg.hh"
00034 #include "rcs_prnt.hh"
00035 #include "sendmsgt.h"
00036 #include "recvmsgt.h"
00037 #include "sokintrf.h"
00038
00039
00040
00041
00042 #ifdef UNDER_CE
00043 #include "rcs_ce.h"
00044 #endif
00045
00046 static int number_of_udpmem_objects;
00047
00048 UDPMEM::UDPMEM (char *_bufline, char *_procline):
00049 CMS (_bufline, _procline)
00050 {
00051 if (load_socket_interface () < 0)
00052 {
00053 rcs_print_error ("UDPMEM: Can not load socket interface.\n");
00054 status = CMS_LIBRARY_UNAVAILABLE_ERROR;
00055 return;
00056 }
00057 init_variables ();
00058 send_request = 1;
00059 send_broadcast = 0;
00060 number_of_udpmem_objects++;
00061 polling = (((char *) NULL) != strstr (ProcessLine, "poll"));
00062 memset (reply_buffer, 0, 32);
00063 memset (request_buffer, 0, 32);
00064 char *broadcast_eq = 0;
00065 broadcast_eq = strstr (ProcessLine, "broadcast_to_svr=");
00066 if (NULL != broadcast_eq)
00067 {
00068 send_broadcast_addr = dl_inet_addr (broadcast_eq + 10);
00069 send_broadcast = 1;
00070 }
00071 else if (0 != strstr (ProcessLine, "broadcast_to_svr"))
00072 {
00073 char localhostname[80];
00074 if (dl_gethostname (localhostname, 80) < 0)
00075 {
00076 if (strcmp (BufferHost, "localhost") != 0)
00077 {
00078 strncpy (localhostname, BufferHost, 80);
00079 }
00080 else
00081 {
00082 strncpy (localhostname, ProcessHost, 80);
00083 }
00084 }
00085 #ifndef VXWORKS
00086 dl_gethostbyname (localhostname, &broadcast_server_host_entry);
00087 if (NULL == broadcast_server_host_entry)
00088 {
00089 status = CMS_CONFIG_ERROR;
00090 rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00091 localhostname);
00092 return;
00093 }
00094 #ifdef __MSDOS__
00095 broadcast_server_socket_address.sin_addr.s_addr =
00096 *((u_long *) broadcast_server_host_entry->h_addr_list[0]);
00097 #else
00098 broadcast_server_socket_address.sin_addr.s_addr =
00099 *((int *) broadcast_server_host_entry->h_addr_list[0]);
00100 #endif
00101 broadcast_server_socket_address.sin_family =
00102 broadcast_server_host_entry->h_addrtype;
00103 #else
00104 broadcast_server_socket_address.sin_addr.s_addr =
00105 hostGetByName (localhostname);
00106 if (broadcast_server_socket_address.sin_addr.s_addr == ERROR)
00107 {
00108 rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00109 localhostname);
00110 status = CMS_CONFIG_ERROR;
00111 }
00112 #endif
00113 broadcast_server_socket_address.sin_addr.s_addr |= dl_htonl (0xff);
00114 send_broadcast_addr = broadcast_server_socket_address.sin_addr.s_addr;
00115 send_broadcast = 1;
00116 rcs_print_debug (PRINT_SOCKET_CONNECT,
00117 "Broadcasting to IP address %s.\n",
00118 dl_inet_ntoa
00119 (broadcast_server_socket_address.sin_addr));
00120 }
00121
00122 socket_fd = 0;
00123 #ifndef VXWORKS
00124 server_host_entry = NULL;
00125 #endif
00126 serial_number = 0;
00127
00128
00129 memset (&server_socket_address, 0, sizeof (server_socket_address));
00130 server_socket_address.sin_family = AF_INET;
00131 server_socket_address.sin_port = dl_htons (((u_short) udp_port_number));
00132
00133
00134 if (send_broadcast)
00135 {
00136 broadcast_server_socket_address.sin_family = AF_INET;
00137 broadcast_server_socket_address.sin_port =
00138 dl_htons (((u_short) udp_port_number));
00139 broadcast_server_socket_address.sin_addr.s_addr = send_broadcast_addr;;
00140 }
00141
00142 int hostname_was_address = 0;
00143 if (BufferHost[0] >= '0' && BufferHost[0] <= '9')
00144 {
00145 server_socket_address.sin_addr.s_addr = dl_inet_addr (BufferHost);
00146 if (server_socket_address.sin_addr.s_addr != 0 &&
00147 ((long) server_socket_address.sin_addr.s_addr) != -1)
00148 {
00149 hostname_was_address = 1;
00150 }
00151 }
00152 if (!hostname_was_address)
00153 {
00154 #ifndef VXWORKS
00155 dl_gethostbyname (BufferHost, &server_host_entry);
00156 if (NULL == server_host_entry)
00157 {
00158 status = CMS_CONFIG_ERROR;
00159 rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00160 BufferHost);
00161 return;
00162 }
00163 #ifdef __MSDOS__
00164 server_socket_address.sin_addr.s_addr =
00165 *((u_long *) server_host_entry->h_addr_list[0]);
00166 #else
00167 server_socket_address.sin_addr.s_addr =
00168 *((int *) server_host_entry->h_addr_list[0]);
00169 #endif
00170 server_socket_address.sin_family = server_host_entry->h_addrtype;
00171 #else
00172 server_socket_address.sin_addr.s_addr = hostGetByName (BufferHost);
00173 if (server_socket_address.sin_addr.s_addr == ERROR)
00174 {
00175 rcs_print_error ("UDPMEM: Couldn't get host address for (%s).\n",
00176 BufferHost);
00177 status = CMS_CONFIG_ERROR;
00178 }
00179 #endif
00180 }
00181 rcs_print_debug (PRINT_SOCKET_CONNECT,
00182 "Using server on %s with IP address %s.\n", BufferHost,
00183 dl_inet_ntoa (server_socket_address.sin_addr));
00184 socket_fd = dl_socket (AF_INET, SOCK_DGRAM, 0);
00185 if (socket_fd < 0)
00186 {
00187 #ifndef UNDER_CE
00188 rcs_print_error ("UDPMEM: Error from socket() (errno = %d:%s)\n",
00189 errno, strerror (errno));
00190 #endif
00191 status = CMS_CREATE_ERROR;
00192 return;
00193 }
00194 if (set_udp_socket_options (socket_fd) < 0)
00195 {
00196 status = CMS_MISC_ERROR;
00197 return;
00198 }
00199 char *sub_info_string = NULL;
00200 poll_interval_millis = 30000;
00201 subscription_type = CMS_NO_SUBSCRIPTION;
00202 cli_addr.sin_port = dl_htons (0);
00203
00204 if ((min_compatible_version > 3.13 || min_compatible_version < 1e-6)
00205 && !send_broadcast)
00206 {
00207 sub_info_string = strstr (ProcessLine, "sub=");
00208 if (NULL != sub_info_string)
00209 {
00210 if (!strncmp (sub_info_string + 4, "none", 4))
00211 {
00212 subscription_type = CMS_NO_SUBSCRIPTION;
00213 }
00214 else if (!strncmp (sub_info_string + 4, "var", 3))
00215 {
00216 subscription_type = CMS_VARIABLE_SUBSCRIPTION;
00217 }
00218 else
00219 {
00220 #ifndef UNDER_CE
00221 poll_interval_millis =
00222 ((int) (atof (sub_info_string + 4) * 1000.0));
00223 #else
00224 poll_interval_millis =
00225 ((int) (RCS_CE_ATOF (sub_info_string + 4) * 1000.0));
00226 #endif
00227 subscription_type = CMS_POLLED_SUBSCRIPTION;
00228 }
00229 }
00230 if (poll_interval_millis < ceil (clk_tck () * 1000.0))
00231 {
00232 poll_interval_millis = (int) ceil (clk_tck () * 1000.0);
00233 }
00234 char *broadcast_clnt_port_eq = strstr (buflineupper, "BROADCAST_PORT=");
00235 if (broadcast_clnt_port_eq != NULL)
00236 {
00237 broadcast_subscriptions = 1;
00238 #ifndef UNDER_CE
00239 broadcast_clnt_port = strtol (broadcast_clnt_port_eq + 15, 0, 0);
00240 #else
00241 broadcast_clnt_port = atol (broadcast_clnt_port_eq + 15);
00242 #endif
00243 cli_addr.sin_port = dl_htons (broadcast_clnt_port);
00244 }
00245 }
00246
00247 if (send_broadcast)
00248 {
00249 if (make_udp_socket_broadcast (socket_fd) < 0)
00250 {
00251 return;
00252 }
00253 }
00254
00255 cli_addr.sin_family = AF_INET;
00256 cli_addr.sin_addr.s_addr = dl_htonl (INADDR_ANY);
00257 if (dl_bind (socket_fd, (struct sockaddr *) &cli_addr, sizeof (cli_addr)) <
00258 0)
00259 {
00260 #if defined(_Windows) && !defined(USE_PCNFS) && !defined(gnuwin32)
00261 rcs_print_error ("UDPMEM: bind error %d\n", dl_WSAGetLastError ());
00262 #else
00263 rcs_print_error ("UDPMEM: bind error %d = %s\n", errno,
00264 strerror (errno));
00265 #endif
00266 }
00267 memset (&request_message_header, 0, sizeof (request_message_header));
00268 request_message_header.msg_name = (caddr_t) & server_socket_address;
00269 request_message_header.msg_namelen = sizeof (server_socket_address);
00270 request_iov2[0].iov_base = request_buffer;
00271 request_iov2[0].iov_len = 20;
00272 if (min_compatible_version > 3.13 || min_compatible_version < 1e-6)
00273 {
00274 request_iov2[0].iov_len = 24;
00275 }
00276 if (min_compatible_version > 3.43 || min_compatible_version < 1e-6)
00277 {
00278 request_iov2[0].iov_len = 28;
00279 }
00280 else
00281 {
00282 total_subdivisions = 1;
00283 }
00284 request_iov2[1].iov_base = (char *) encoded_data;
00285 request_iov2[1].iov_len = max_encoded_message_size;
00286 request_message_header.msg_iov = request_iov2;
00287 request_message_header.msg_iovlen = 2;
00288 memset (&reply_message_header, 0, sizeof (reply_message_header));
00289 reply_message_header.msg_name = (caddr_t) & server_socket_address;
00290 reply_message_header.msg_namelen = sizeof (server_socket_address);
00291 reply_iov2[0].iov_base = reply_buffer;
00292 reply_iov2[0].iov_len = 20;
00293 reply_iov2[1].iov_base = (char *) encoded_data;
00294 reply_iov2[1].iov_len = max_encoded_message_size;
00295 reply_message_header.msg_iov = reply_iov2;
00296 reply_message_header.msg_iovlen = 2;
00297 char *retry_string;
00298 if ((retry_string = strstr (ProcessLine, "retry=")) != NULL)
00299 {
00300 retry_string += strlen ("retry=");
00301 #ifndef UNDER_CE
00302 retry_timeout = strtod (retry_string, (char **) NULL);
00303 #else
00304 retry_timeout = RCS_CE_ATOF (retry_string);
00305 #endif
00306 }
00307 if (retry_timeout > timeout)
00308 retry_timeout = timeout;
00309 reply_size = 0;
00310
00311
00312 if ((min_compatible_version > 3.13 || min_compatible_version < 1e-6)
00313 && !send_broadcast)
00314 {
00315 if (verify_bufname () < 0)
00316 {
00317 status = CMS_STATUS_NOT_SET;
00318 }
00319 if (subscription_type != CMS_NO_SUBSCRIPTION)
00320 {
00321 setup_subscription ();
00322 }
00323 }
00324
00325 if (polling)
00326 {
00327 make_udp_socket_nonblocking (socket_fd);
00328 timeout = 0;
00329 retry_timeout = 0;
00330 }
00331 last_reply_timed_out = 0;
00332
00333 }
00334
00335 int
00336 UDPMEM::init_variables ()
00337 {
00338 polling = 0;
00339 get_reply = 1;
00340 last_reply_timed_out = 0;
00341 reply_size = 20;
00342 request_size = 24;
00343 retry_timeout = 0.01;
00344 serial_number = 0;
00345 returned_serial_number = 0;
00346 #ifndef VXWORKS
00347 server_host_entry = 0;
00348 broadcast_server_host_entry = 0;
00349 #endif
00350 memset (&server_socket_address, 0, sizeof (struct sockaddr_in));
00351 memset (&broadcast_server_socket_address, 0, sizeof (struct sockaddr_in));
00352 socket_fd = -1;
00353 memset (request_buffer, 0, 32);
00354 memset (reply_buffer, 0, 32);
00355 memset (&request_message_header, 0, sizeof (struct msghdr));
00356 memset (&reply_message_header, 0, sizeof (struct msghdr));
00357 send_broadcast = 0;
00358 send_broadcast_addr = 0xFF;
00359 recieve_broadcast = 0;
00360 recieve_broadcast_port = 0;
00361 subscription_type = CMS_NO_SUBSCRIPTION;
00362 poll_interval_millis = 30;
00363 subscription_id = -1;
00364 send_request = 1;
00365 memset (&cli_addr, 0, sizeof (struct sockaddr_in));
00366 memset (&request_iov2, 0, 2 * sizeof (struct iovec));
00367 memset (&reply_iov2, 0, 2 * sizeof (struct iovec));
00368 broadcast_subscriptions = 0;
00369 broadcast_clnt_port = 0;
00370 return 0;
00371 }
00372
00373
00374
00375 UDPMEM::~UDPMEM ()
00376 {
00377 if (socket_fd > 0)
00378 {
00379 if (subscription_type != CMS_NO_SUBSCRIPTION)
00380 {
00381 cancel_subscription ();
00382 }
00383
00384 if (delete_totally)
00385 {
00386 *((u_long *) request_buffer) =
00387 dl_htonl ((u_long) REMOTE_CMS_CLEAN_REQUEST_TYPE);
00388 *((u_long *) request_buffer + 1) =
00389 dl_htonl ((u_long) buffer_number);
00390 request_iov2[0].iov_len = 20;
00391 if (
00392 (min_compatible_version > 3.13
00393 || min_compatible_version < 1e-6))
00394 {
00395 request_iov2[0].iov_len = 24;
00396 }
00397 request_message_header.msg_iovlen = 1;
00398 sendmsgt (socket_fd, &request_message_header, 0, timeout);
00399 }
00400 dl_closesocket (socket_fd);
00401 socket_fd = 0;
00402 }
00403 #if defined(_Windows) && !defined(USE_PCNFS) && !defined(gnuwin32)
00404 if (number_of_udpmem_objects == 1)
00405 {
00406 free_sendmsg_collection_buffer ();
00407 free_recvmsg_collection_buffer ();
00408 unload_socket_interface ();
00409 }
00410 #endif
00411 number_of_udpmem_objects--;
00412 }
00413
00414 CMS_STATUS
00415 UDPMEM::read ()
00416 {
00417 long message_size, id;
00418 get_reply = 1;
00419 send_request = (subscription_type == CMS_NO_SUBSCRIPTION);
00420 if (socket_fd <= 0)
00421 {
00422 rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00423 socket_fd);
00424 return (status = CMS_MISC_ERROR);
00425 }
00426 if (!last_reply_timed_out)
00427 {
00428 serial_number++;
00429 }
00430 *((u_long *) request_buffer) =
00431 dl_htonl ((u_long) REMOTE_CMS_READ_REQUEST_TYPE);
00432 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00433 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00434 *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_READ_ACCESS);
00435 *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00436 if (total_subdivisions > 1)
00437 {
00438 *((u_long *) request_buffer + 6) =
00439 dl_htonl ((u_long) current_subdivision);
00440 }
00441 request_iov2[1].iov_len = 0;
00442 request_iov2[1].iov_base = (caddr_t) NULL;
00443 request_message_header.msg_iovlen = 1;
00444 reply_iov2[1].iov_len = max_encoded_message_size;
00445 reply_iov2[1].iov_base = (caddr_t) encoded_data;
00446 reply_message_header.msg_iovlen = 2;
00447 if (call_on_server () < 0)
00448 {
00449 rcs_print_error ("UDPMEM: read failed.\n");
00450 reply_message_header.msg_name = (caddr_t) & cli_addr;
00451 return (status = CMS_MISC_ERROR);
00452 }
00453 reply_message_header.msg_name = (caddr_t) & cli_addr;
00454 if (!last_reply_timed_out)
00455 {
00456 status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00457 message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00458 if (message_size + 20 != reply_size && !polling)
00459 {
00460 rcs_print_error ("UDPMEM: message_size+20 = %ld\n",
00461 message_size + 20);
00462 rcs_print_error ("UDPMEM: reply_size = %ld\n", reply_size);
00463 }
00464 id = dl_ntohl (*((u_long *) reply_buffer + 3));
00465 header.was_read = dl_ntohl (*((u_long *) reply_buffer + 4));
00466 check_id (id);
00467 }
00468 else
00469 {
00470 if (polling)
00471 {
00472 status = CMS_READ_OLD;
00473 }
00474 }
00475 return (status);
00476 }
00477
00478 CMS_STATUS
00479 UDPMEM::blocking_read (double _blocking_timeout)
00480 {
00481 long message_size, id;
00482 get_reply = 1;
00483 blocking_timeout = _blocking_timeout;
00484 double orig_timeout = timeout;
00485 timeout = blocking_timeout;
00486 int orig_polling = polling;
00487 if (polling)
00488 {
00489 polling = 0;
00490 make_udp_socket_blocking (socket_fd);
00491 }
00492 int orig_subscription_type = subscription_type;
00493 if (CMS_NO_SUBSCRIPTION == subscription_type)
00494 {
00495 subscription_type = CMS_POLLED_SUBSCRIPTION;
00496 if (blocking_timeout > 0.0)
00497 {
00498 poll_interval_millis = (int) (blocking_timeout * 100);
00499 if (poll_interval_millis < 10)
00500 {
00501 poll_interval_millis = 10;
00502 }
00503 if (poll_interval_millis > 1000)
00504 {
00505 poll_interval_millis = 1000;
00506 }
00507 }
00508 else
00509 {
00510 poll_interval_millis = 30;
00511 }
00512 setup_subscription ();
00513 }
00514 double orig_retry_timeout = retry_timeout;
00515 retry_timeout = timeout = blocking_timeout;
00516 send_request = 0;
00517 if (socket_fd <= 0)
00518 {
00519 rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00520 socket_fd);
00521 retry_timeout = orig_retry_timeout;
00522 return (status = CMS_MISC_ERROR);
00523 }
00524 if (!last_reply_timed_out)
00525 {
00526 serial_number++;
00527 }
00528 *((u_long *) request_buffer) =
00529 dl_htonl ((u_long) REMOTE_CMS_READ_REQUEST_TYPE);
00530 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00531 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00532 *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_READ_ACCESS);
00533 *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00534 if (total_subdivisions > 1)
00535 {
00536 *((u_long *) request_buffer + 6) =
00537 dl_htonl ((u_long) current_subdivision);
00538 }
00539 request_iov2[1].iov_len = 0;
00540 request_iov2[1].iov_base = (caddr_t) NULL;
00541 request_message_header.msg_iovlen = 1;
00542 reply_iov2[1].iov_len = max_encoded_message_size;
00543 reply_iov2[1].iov_base = (caddr_t) encoded_data;
00544 reply_message_header.msg_iovlen = 2;
00545 status = CMS_READ_OLD;
00546 double start_time = 0.0;
00547 if (timeout > 1e-6)
00548 {
00549 start_time = etime ();
00550 }
00551 double time_diff = 0.0;
00552 while (status == CMS_READ_OLD && (timeout < 0.0 || time_diff > timeout))
00553 {
00554 if (call_on_server () < 0)
00555 {
00556 rcs_print_error ("UDPMEM: read failed.\n");
00557 timeout = orig_timeout;
00558 polling = orig_polling;
00559 retry_timeout = orig_retry_timeout;
00560 reply_message_header.msg_name = (caddr_t) & cli_addr;
00561 if (orig_subscription_type == CMS_NO_SUBSCRIPTION)
00562 {
00563 cancel_subscription ();
00564 }
00565 subscription_type = orig_subscription_type;
00566 return (status = CMS_MISC_ERROR);
00567 }
00568 reply_message_header.msg_name = (caddr_t) & cli_addr;
00569 if (!last_reply_timed_out)
00570 {
00571 status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00572 message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00573 if (message_size + 20 != reply_size && !polling)
00574 {
00575 rcs_print_error ("UDPMEM: message_size+20 = %ld\n",
00576 message_size + 20);
00577 rcs_print_error ("UDPMEM: reply_size = %ld\n", reply_size);
00578 }
00579 id = dl_ntohl (*((u_long *) reply_buffer + 3));
00580 header.was_read = dl_ntohl (*((u_long *) reply_buffer + 4));
00581 check_id (id);
00582 }
00583 if (timeout > 1e-6)
00584 {
00585 time_diff = etime () - start_time;
00586 }
00587 }
00588 retry_timeout = orig_retry_timeout;
00589 timeout = orig_timeout;
00590 polling = orig_polling;
00591 if (orig_subscription_type == CMS_NO_SUBSCRIPTION)
00592 {
00593 cancel_subscription ();
00594 }
00595 subscription_type = orig_subscription_type;
00596 return (status);
00597 }
00598
00599 CMS_STATUS
00600 UDPMEM::peek ()
00601 {
00602 long message_size, id;
00603 get_reply = 1;
00604 send_request = (subscription_type == CMS_NO_SUBSCRIPTION);
00605 if (socket_fd <= 0)
00606 {
00607 rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00608 socket_fd);
00609 return (status = CMS_MISC_ERROR);
00610 }
00611 if (!last_reply_timed_out)
00612 {
00613 serial_number++;
00614 }
00615 *((u_long *) request_buffer) =
00616 dl_htonl ((u_long) REMOTE_CMS_READ_REQUEST_TYPE);
00617 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00618 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00619 *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_PEEK_ACCESS);
00620 *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00621 if (total_subdivisions > 1)
00622 {
00623 *((u_long *) request_buffer + 6) =
00624 dl_htonl ((u_long) current_subdivision);
00625 }
00626 request_iov2[1].iov_len = 0;
00627 request_iov2[1].iov_base = (caddr_t) NULL;
00628 request_message_header.msg_iovlen = 1;
00629 reply_iov2[1].iov_len = max_encoded_message_size;
00630 reply_iov2[1].iov_base = (caddr_t) encoded_data;
00631 reply_message_header.msg_iovlen = 2;
00632 if (call_on_server () < 0)
00633 {
00634 rcs_print_error ("UDPMEM: peek failed.\n");
00635 reply_message_header.msg_name = (caddr_t) & cli_addr;
00636 return (status = CMS_MISC_ERROR);
00637 }
00638 reply_message_header.msg_name = (caddr_t) & cli_addr;
00639 if (!last_reply_timed_out)
00640 {
00641 status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00642 message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00643 if (message_size + 20 != reply_size && !polling)
00644 {
00645 rcs_print_error ("UDPMEM: message_size+20 = %ld\n",
00646 message_size + 20);
00647 rcs_print_error ("UDPMEM: reply_size = %ld\n", reply_size);
00648 }
00649 id = dl_ntohl (*((u_long *) reply_buffer + 3));
00650 header.was_read = dl_ntohl (*((u_long *) reply_buffer + 4));
00651 check_id (id);
00652 }
00653 else
00654 {
00655 if (polling)
00656 {
00657 status = CMS_READ_OLD;
00658 }
00659 }
00660 return (status);
00661 }
00662
00663
00664 int
00665 UDPMEM::setup_subscription ()
00666 {
00667 get_reply = 1;
00668 send_request = 1;
00669 double orig_retry_timeout = retry_timeout;
00670 if (retry_timeout < 0.0 || retry_timeout > (poll_interval_millis / 1000.0))
00671 {
00672 retry_timeout = (poll_interval_millis / 1000.0);
00673 }
00674 if (retry_timeout < clk_tck () * 2.0)
00675 {
00676 retry_timeout = clk_tck () * 2.0;
00677 }
00678 if (socket_fd <= 0)
00679 {
00680 rcs_print_error
00681 ("UDPMEM::setup_subscription: Invalid socket descriptor. (%d)\n",
00682 socket_fd);
00683 return (-1);
00684 }
00685 last_reply_timed_out = 0;
00686 serial_number++;
00687 *((u_long *) request_buffer) =
00688 dl_htonl ((u_long) REMOTE_CMS_SET_SUBSCRIPTION_REQUEST_TYPE);
00689 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00690 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00691 *((u_long *) request_buffer + 3) = dl_htonl ((u_long) subscription_type);
00692 *((u_long *) request_buffer + 4) = dl_htonl ((u_long) poll_interval_millis);
00693 *((u_long *) request_buffer + 5) = dl_htonl ((u_long) in_buffer_id);
00694 if (total_subdivisions > 1)
00695 {
00696 *((u_long *) request_buffer + 6) =
00697 dl_htonl ((u_long) current_subdivision);
00698 }
00699 request_iov2[1].iov_len = 0;
00700 request_iov2[1].iov_base = (caddr_t) NULL;
00701 request_message_header.msg_iovlen = 1;
00702 reply_iov2[1].iov_len = 0;
00703 reply_iov2[1].iov_base = (caddr_t) 0;
00704 reply_message_header.msg_iovlen = 1;
00705 int orig_polling = polling;
00706 double orig_timeout = timeout;
00707 if (orig_timeout <= 0.0)
00708 {
00709 timeout = 5.0;
00710 }
00711 polling = 0;
00712 if (call_on_server () < 0)
00713 {
00714 retry_timeout = orig_retry_timeout;
00715 rcs_print_error ("UDPMEM: Can't setup subscription.\n");
00716 polling = orig_polling;
00717 timeout = orig_timeout;
00718 last_reply_timed_out = 0;
00719 subscription_type = CMS_NO_SUBSCRIPTION;
00720 subscription_id = -1;
00721 serial_number++;
00722 return (0);
00723 }
00724 retry_timeout = orig_retry_timeout;
00725 last_reply_timed_out = 0;
00726 serial_number++;
00727 polling = orig_polling;
00728 timeout = orig_timeout;
00729 if (!last_reply_timed_out)
00730 {
00731 long success = dl_htonl (*((u_long *) reply_buffer + 1));
00732 if (success == 1)
00733 {
00734 subscription_id = dl_htonl (*((u_long *) reply_buffer + 2));
00735 }
00736 else
00737 {
00738 rcs_print_error ("UDPMEM: Can't setup subscription.\n");
00739 subscription_type = CMS_NO_SUBSCRIPTION;
00740 subscription_id = -1;
00741 }
00742 }
00743 return (0);
00744 }
00745
00746
00747 int
00748 UDPMEM::cancel_subscription ()
00749 {
00750 get_reply = 1;
00751 send_request = 1;
00752 double orig_retry_timeout = retry_timeout;
00753 if (retry_timeout < 0.0 || retry_timeout > (poll_interval_millis / 1000.0))
00754 {
00755 retry_timeout = (poll_interval_millis / 1000.0);
00756 }
00757 if (retry_timeout < clk_tck () * 2.0)
00758 {
00759 retry_timeout = clk_tck () * 2.0;
00760 }
00761 if (socket_fd <= 0)
00762 {
00763 rcs_print_error
00764 ("UDPMEM::cancel_subscription: Invalid socket descriptor. (%d)\n",
00765 socket_fd);
00766 retry_timeout = orig_retry_timeout;
00767 return (-1);
00768 }
00769 make_udp_socket_blocking (socket_fd);
00770 last_reply_timed_out = 0;
00771 serial_number += 10000 / poll_interval_millis;
00772 serial_number += 10;
00773 *((u_long *) request_buffer) =
00774 dl_htonl ((u_long) REMOTE_CMS_CANCEL_SUBSCRIPTION_REQUEST_TYPE);
00775 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00776 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00777 *((u_long *) request_buffer + 3) = dl_htonl ((u_long) subscription_id);
00778 if (total_subdivisions > 1)
00779 {
00780 *((u_long *) request_buffer + 6) =
00781 dl_htonl ((u_long) current_subdivision);
00782 }
00783 request_iov2[1].iov_len = 0;
00784 request_iov2[1].iov_base = (caddr_t) NULL;
00785 request_message_header.msg_iovlen = 1;
00786 reply_iov2[1].iov_len = 0;
00787 reply_iov2[1].iov_base = (caddr_t) 0;
00788 reply_message_header.msg_iovlen = 1;
00789 int orig_polling = polling;
00790 double orig_timeout = timeout;
00791 if (orig_timeout <= 10.0)
00792 {
00793 timeout = 10.0;
00794 }
00795 polling = 0;
00796 if (call_on_server () < 0)
00797 {
00798 retry_timeout = orig_retry_timeout;
00799 rcs_print_error ("UDPMEM: Can't cancel subscription.\n");
00800 polling = orig_polling;
00801 timeout = orig_timeout;
00802 last_reply_timed_out = 0;
00803 subscription_type = CMS_NO_SUBSCRIPTION;
00804 subscription_id = -1;
00805 serial_number++;
00806 return (0);
00807 }
00808 retry_timeout = orig_retry_timeout;
00809 last_reply_timed_out = 0;
00810 serial_number++;
00811 polling = orig_polling;
00812 timeout = orig_timeout;
00813
00814 if (!last_reply_timed_out)
00815 {
00816 long success = dl_htonl (*((u_long *) reply_buffer + 1));
00817 if (success == 1)
00818 {
00819 if (subscription_id !=
00820 ((long) dl_htonl (*((u_long *) reply_buffer + 2))))
00821 {
00822 rcs_print_error
00823 ("UDPMEM: Can't cancel subscription. (Incorrect subscription id returned.)\n");
00824 subscription_type = CMS_NO_SUBSCRIPTION;
00825 subscription_id = -1;
00826 }
00827 }
00828 else
00829 {
00830 rcs_print_error ("UDPMEM: Can't cancel subscription.\n");
00831 subscription_type = CMS_NO_SUBSCRIPTION;
00832 subscription_id = -1;
00833 }
00834 }
00835 return (0);
00836 }
00837
00838
00839 int
00840 UDPMEM::verify_bufname ()
00841 {
00842 long message_size;
00843 get_reply = 1;
00844 send_request = 1;
00845 char bufname_from_server[80];
00846 if (socket_fd <= 0)
00847 {
00848 rcs_print_error ("UDPMEM::read: Invalid socket descriptor. (%d)\n",
00849 socket_fd);
00850 status = CMS_MISC_ERROR;
00851 return (-1);
00852 }
00853 last_reply_timed_out = 0;
00854 serial_number++;
00855 *((u_long *) request_buffer) =
00856 dl_htonl ((u_long) REMOTE_CMS_GET_BUF_NAME_REQUEST_TYPE);
00857 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
00858 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
00859 *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_PEEK_ACCESS);
00860 *((u_long *) request_buffer + 4) = dl_htonl ((u_long) in_buffer_id);
00861 if (total_subdivisions > 1)
00862 {
00863 *((u_long *) request_buffer + 6) =
00864 dl_htonl ((u_long) current_subdivision);
00865 }
00866 request_iov2[1].iov_len = 0;
00867 request_iov2[1].iov_base = (caddr_t) NULL;
00868 request_message_header.msg_iovlen = 1;
00869 reply_iov2[1].iov_len = 80;
00870 reply_iov2[1].iov_base = (caddr_t) bufname_from_server;
00871 reply_message_header.msg_iovlen = 2;
00872 int orig_polling = polling;
00873 double orig_timeout = timeout;
00874 if (orig_timeout <= 0.0)
00875 {
00876 timeout = 5.0;
00877 }
00878 polling = 0;
00879 if (call_on_server () < 0)
00880 {
00881 rcs_print_error
00882 ("UDPMEM: Failed to verify BufferName with server (Server may not be running.)\n");
00883 polling = orig_polling;
00884 timeout = orig_timeout;
00885 last_reply_timed_out = 0;
00886 serial_number++;
00887 status = CMS_NO_SERVER_ERROR;
00888 return (-1);
00889 }
00890 last_reply_timed_out = 0;
00891 serial_number++;
00892 polling = orig_polling;
00893 timeout = orig_timeout;
00894 if (!last_reply_timed_out)
00895 {
00896 status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
00897 message_size = dl_ntohl (*((u_long *) reply_buffer + 2));
00898 if (message_size > 100)
00899 {
00900 rcs_print_error
00901 ("UDPMEM: message_size = %ld exceeds maximum for GET_BUF_NAME reply.\n",
00902 message_size + 20);
00903 status = CMS_MISC_ERROR;
00904 return (-1);
00905 }
00906 if (0 != strncmp (BufferName, bufname_from_server, 80))
00907 {
00908 rcs_print_error
00909 ("UDPMEM: Buffer name retrieved from server %s at IP address %s for buffer number %d was %s but %s was expected.\n",
00910 BufferHost, dl_inet_ntoa (server_socket_address.sin_addr),
00911 buffer_number,
00912 ((0 != bufname_from_server[0]) ? bufname_from_server : "blank"),
00913 BufferName);
00914 if (0 != bufname_from_server[0] && 0 != BufferName[0])
00915 {
00916 status = CMS_RESOURCE_CONFLICT_ERROR;
00917 return (-1);
00918 }
00919 }
00920 }
00921 else
00922 {
00923 status = CMS_NO_SERVER_ERROR;
00924 return (-1);
00925 }
00926 return (0);
00927 }
00928
00929 void
00930 throwaway_extra_data_on_socket (int socket_fd)
00931 {
00932 int count = 0;
00933 if (socket_fd > 0)
00934 {
00935 struct msghdr temp_header;
00936 while (recvmsgtq (socket_fd, &temp_header, 0, 0.0) > 0 && count < 10)
00937 count++;
00938 }
00939 }
00940
00941 int
00942 UDPMEM::call_on_server ()
00943 {
00944 double start_time, current_time;
00945 start_time = etime ();
00946 last_reply_timed_out = 0;
00947 double last_time_sent = 0.0;
00948 double time_diff = 0.0;
00949 int first_time = 1;
00950 while (1)
00951 {
00952 if (send_broadcast)
00953 {
00954 if (get_reply)
00955 {
00956 request_message_header.msg_name =
00957 (caddr_t) & server_socket_address;
00958 request_message_header.msg_namelen =
00959 sizeof (server_socket_address);
00960 }
00961 else
00962 {
00963 request_message_header.msg_name =
00964 (caddr_t) & broadcast_server_socket_address;
00965 request_message_header.msg_namelen =
00966 sizeof (broadcast_server_socket_address);
00967 }
00968 }
00969 if (send_request)
00970 {
00971 time_diff = etime () - last_time_sent;
00972 if (first_time || time_diff >= retry_timeout)
00973 {
00974 if (
00975 (request_size =
00976 sendmsgt (socket_fd, &request_message_header, 0,
00977 retry_timeout)) < 0)
00978 {
00979 if (!sendmsgt_timed_out)
00980 return (-1);
00981 }
00982 last_time_sent = etime ();
00983 first_time = 0;
00984 }
00985 }
00986 if (!get_reply)
00987 {
00988 last_reply_timed_out = 0;
00989 return 0;
00990 }
00991 else
00992 {
00993 if (
00994 (reply_size =
00995 recvmsgtq (socket_fd, &reply_message_header, 0,
00996 retry_timeout)) < 0)
00997 {
00998 if (!recvmsgt_timed_out)
00999 return (-1);
01000 }
01001 returned_serial_number = dl_ntohl (*((u_long *) reply_buffer));
01002 if (returned_serial_number == serial_number)
01003 {
01004 last_reply_timed_out = 0;
01005 throwaway_extra_data_on_socket (socket_fd);
01006 return 0;
01007 }
01008
01009 if (!send_request &&
01010 (returned_serial_number > serial_number ||
01011 (serial_number > 1000 && returned_serial_number < 500)))
01012 {
01013 serial_number = returned_serial_number;
01014 last_reply_timed_out = 0;
01015 return 0;
01016 }
01017 if (!polling)
01018 {
01019 current_time = etime ();
01020 if (current_time - start_time > timeout && timeout >= 1e-6)
01021 {
01022 rcs_print_error
01023 ("UDPMEM: time out error after %f seconds.\n",
01024 current_time - start_time);
01025 last_reply_timed_out = 1;
01026 return -1;
01027 }
01028 }
01029 else
01030 {
01031 last_reply_timed_out = 1;
01032 return 0;
01033 }
01034 }
01035 }
01036 throwaway_extra_data_on_socket (socket_fd);
01037 last_reply_timed_out = 0;
01038 return (0);
01039 }
01040
01041
01042 CMS_STATUS
01043 UDPMEM::write (void *user_data)
01044 {
01045 get_reply = 0;
01046 send_request = 1;
01047
01048 if (!force_raw)
01049 {
01050 user_data = encoded_data;
01051 }
01052
01053 if (socket_fd <= 0)
01054 {
01055 rcs_print_error ("UDPMEM::write: Invalid socket descriptor. (%d)\n",
01056 socket_fd);
01057 return (status = CMS_MISC_ERROR);
01058 }
01059 serial_number++;
01060 *((u_long *) request_buffer) =
01061 dl_htonl ((u_long) REMOTE_CMS_WRITE_REQUEST_TYPE);
01062 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01063 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01064 *((u_long *) request_buffer + 3) = dl_htonl ((u_long) CMS_WRITE_ACCESS);
01065 *((u_long *) request_buffer + 4) =
01066 dl_htonl ((u_long) header.in_buffer_size);
01067 if (total_subdivisions > 1)
01068 {
01069 *((u_long *) request_buffer + 6) =
01070 dl_htonl ((u_long) current_subdivision);
01071 }
01072 request_iov2[1].iov_len = header.in_buffer_size;
01073 request_iov2[1].iov_base = (caddr_t) user_data;
01074 request_message_header.msg_iovlen = 2;
01075 reply_iov2[1].iov_len = 0;
01076 reply_iov2[1].iov_base = (caddr_t) NULL;
01077 reply_message_header.msg_iovlen = 1;
01078 if (call_on_server () < 0)
01079 {
01080 rcs_print_error ("UDPMEM: write failed.\n");
01081 return (status = CMS_MISC_ERROR);
01082 }
01083 status = CMS_WRITE_OK;
01084 header.was_read = 0;
01085 return (status);
01086 }
01087
01088 CMS_STATUS
01089 UDPMEM::write_if_read (void *user_data)
01090 {
01091 get_reply = 0;
01092 send_request = 1;
01093 if (!force_raw)
01094 {
01095 user_data = encoded_data;
01096 }
01097
01098 if (socket_fd <= 0)
01099 {
01100 rcs_print_error ("UDPMEM::write: Invalid socket descriptor. (%d)\n",
01101 socket_fd);
01102 return (status = CMS_MISC_ERROR);
01103 }
01104 serial_number++;
01105 *((u_long *) request_buffer) =
01106 dl_htonl ((u_long) REMOTE_CMS_WRITE_REQUEST_TYPE);
01107 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01108 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01109 *((u_long *) request_buffer + 3) =
01110 dl_htonl ((u_long) CMS_WRITE_IF_READ_ACCESS);
01111 *((u_long *) request_buffer + 4) =
01112 dl_htonl ((u_long) header.in_buffer_size);
01113 if (total_subdivisions > 1)
01114 {
01115 *((u_long *) request_buffer + 6) =
01116 dl_htonl ((u_long) current_subdivision);
01117 }
01118 request_iov2[1].iov_len = header.in_buffer_size;
01119 request_iov2[1].iov_base = (caddr_t) user_data;
01120 request_message_header.msg_iovlen = 2;
01121 reply_iov2[1].iov_len = 0;
01122 reply_iov2[1].iov_base = (caddr_t) NULL;
01123 reply_message_header.msg_iovlen = 1;
01124 if (call_on_server () < 0)
01125 {
01126 rcs_print_error ("UDPMEM: write_if_read failed.\n");
01127 return (status = CMS_MISC_ERROR);
01128 }
01129 status = CMS_WRITE_OK;
01130 header.was_read = 0;
01131 return (status);
01132 }
01133
01134 int
01135 UDPMEM::check_if_read ()
01136 {
01137 get_reply = 1;
01138 send_request = 1;
01139 if (socket_fd <= 0)
01140 {
01141 rcs_print_error
01142 ("UDPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
01143 socket_fd);
01144 return (status = CMS_MISC_ERROR);
01145 }
01146 serial_number++;
01147 *((u_long *) request_buffer) =
01148 dl_htonl ((u_long) REMOTE_CMS_CHECK_IF_READ_REQUEST_TYPE);
01149 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01150 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01151 if (total_subdivisions > 1)
01152 {
01153 *((u_long *) request_buffer + 6) =
01154 dl_htonl ((u_long) current_subdivision);
01155 }
01156 request_iov2[1].iov_len = 0;
01157 request_iov2[1].iov_base = (caddr_t) NULL;
01158 request_message_header.msg_iovlen = 1;
01159 reply_iov2[1].iov_len = 0;
01160 reply_iov2[1].iov_base = (caddr_t) NULL;
01161 reply_message_header.msg_iovlen = 1;
01162 if (call_on_server () < 0)
01163 {
01164 rcs_print_error ("UDPMEM: check_if_read failed.\n");
01165 return (status = CMS_MISC_ERROR);
01166 }
01167 status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
01168 header.was_read = dl_ntohl (*((u_long *) reply_buffer + 2));
01169 if (last_reply_timed_out)
01170 {
01171 header.was_read = 0;
01172 }
01173 return (header.was_read);
01174 }
01175
01176
01177 CMS_STATUS
01178 UDPMEM::clear ()
01179 {
01180 send_request = 1;
01181 get_reply = 0;
01182 if (socket_fd <= 0)
01183 {
01184 rcs_print_error
01185 ("UDPMEM::check_if_read: Invalid socket descriptor. (%d)\n",
01186 socket_fd);
01187 return (status = CMS_MISC_ERROR);
01188 }
01189 *((u_long *) request_buffer) =
01190 dl_htonl ((u_long) REMOTE_CMS_CLEAR_REQUEST_TYPE);
01191 *((u_long *) request_buffer + 1) = dl_htonl ((u_long) buffer_number);
01192 *((u_long *) request_buffer + 2) = dl_htonl ((u_long) serial_number);
01193 if (total_subdivisions > 1)
01194 {
01195 *((u_long *) request_buffer + 6) =
01196 dl_htonl ((u_long) current_subdivision);
01197 }
01198 request_iov2[1].iov_base = (caddr_t) NULL;
01199 request_message_header.msg_iovlen = 1;
01200 reply_iov2[1].iov_len = 0;
01201 reply_iov2[1].iov_base = (caddr_t) NULL;
01202 reply_message_header.msg_iovlen = 1;
01203 if (call_on_server () < 0)
01204 {
01205 return (status = CMS_MISC_ERROR);
01206 }
01207 status = (CMS_STATUS) dl_ntohl (*((u_long *) reply_buffer + 1));
01208 return (status);
01209 }