/* * This file is part of the Owl Positioning System (OwlPS) project. * It is subject to the copyright notice and license terms in the * COPYRIGHT.t2t file found in the top-level directory of this * distribution and at * https://code.lm7.fr/mcy/owlps/src/master/COPYRIGHT.t2t * No part of the OwlPS Project, including this file, may be copied, * modified, propagated, or distributed except according to the terms * contained in the COPYRIGHT.t2t file; the COPYRIGHT.t2t file must be * distributed along with this file, either separately or by replacing * this notice by the COPYRIGHT.t2t file's contents. * *********************************************************************** * * This is the main source file of OwlPS Aggregator, the program that * aggregates the requests captured by the Listeners in order to present * them to the Positioner. */ #include "owlps-aggregator.h" #include #include #include #include #include #include #include #include #include #include #include char *program_name = NULL ; cfg_t *cfg = NULL ; // Configuration structure /* Will we dump the configuration? * Note that we declare this option as a global variable because * it must not appear in Confuse's options (to avoid dumping the * dump_configuration option itself). */ bool dump_configuration = false ; request_list *requests = NULL ; // Computed data list sem_t lock_requests ; // Semaphore to get access to the requests cp_list *token_cps = NULL ; // Token ring of the CPs uint_fast16_t nb_cps = 0 ; // Number of CPs in the CPs' ring sem_t lock_cps ; // Semaphore to get access to the CPs' ring int main(const int argc, char *const *argv) { int ret = 0 ; // Program return value struct sigaction action ; // Signal handler structure pthread_t monitor_thread, // Aggregated data monitoring thread monitor_cps_thread, // CPs monitoring thread autocalibration_hello_thread ; // Hello messages reception thread uint_fast16_t listening_port ; int sockfd = -1 ; // UDP listening socket owl_run = true ; program_name = argv[0] ; ret = initialise_configuration(argc, argv) ; if (! owl_run) goto exit ; if (cfg_getbool(cfg, "daemon")) { if (VERBOSE_WARNING) fprintf(stderr, "Detaching to background...\n") ; if (daemon(0, 0)) perror("Cannot daemonize") ; } /* Set up signal handlers */ action.sa_flags = 0 ; sigemptyset(&action.sa_mask) ; action.sa_handler = owl_sigint_handler ; sigaction(SIGINT, &action, NULL) ; action.sa_handler = owl_sigterm_handler ; sigaction(SIGTERM, &action, NULL) ; /* Set up semaphores */ sem_init(&lock_requests, 0, 1) ; sem_init(&lock_cps, 0, 1) ; /* Create UDP socket */ listening_port = cfg_getint(cfg, "listening_port") ; if ((sockfd = owl_create_udp_listening_socket(listening_port)) < 0) { fprintf(stderr, "Error! Cannot listen on port %"PRIuFAST16".\n", listening_port) ; ret = OWL_ERR_SOCKET_CREATE ; goto exit ; } /* Set up threads */ ret = pthread_create(&monitor_thread, NULL, &monitor_requests, NULL) ; if (ret != 0) { perror("Cannot create monitor thread") ; ret = OWL_ERR_THREAD_CREATE ; goto exit ; } if (cfg_getbool(cfg, "autocalibration")) { ret = pthread_create(&autocalibration_hello_thread, NULL, &listen_for_cps, NULL) ; if (ret != 0) { perror("Cannot create autocalibration hello thread") ; ret = OWL_ERR_THREAD_CREATE ; goto exit ; } ret = pthread_create(&monitor_cps_thread, NULL, &monitor_cps, NULL) ; if (ret != 0) { perror("Cannot create monitor CPs thread") ; ret = OWL_ERR_THREAD_CREATE ; goto exit ; } } ret = read_loop(sockfd) ; /* Wait for the threads to terminate */ if (VERBOSE_WARNING) fprintf(stderr, "Waiting for the monitor thread...\n") ; if (pthread_join(monitor_thread, NULL) != 0) perror("Cannot join monitor thread") ; else if (VERBOSE_WARNING) fprintf(stderr, "Monitor thread done.\n") ; if (cfg_getbool(cfg, "autocalibration")) { // We must cancel this thread because it can be blocked on the // recvfrom() call: if (VERBOSE_WARNING) fprintf(stderr, "Cancelling the autocalibration hello thread...\n") ; if (pthread_cancel(autocalibration_hello_thread) != 0) perror("Cannot cancel autocalibration hello thread") ; else if (VERBOSE_WARNING) fprintf(stderr, "Autocalibration hello thread cancelled.\n") ; if (VERBOSE_WARNING) fprintf(stderr, "Waiting for the autocalibration hello thread...\n") ; if (pthread_join(autocalibration_hello_thread, NULL) != 0) perror("Cannot join autocalibration hello thread") ; else if (VERBOSE_WARNING) fprintf(stderr, "Autocalibration hello thread done.\n") ; if (VERBOSE_WARNING) fprintf(stderr, "Waiting for the monitor CPs thread...\n") ; if (pthread_join(monitor_cps_thread, NULL) != 0) perror("Cannot join monitor CPs thread") ; else if (VERBOSE_WARNING) fprintf(stderr, "Monitor CPs thread done.\n") ; } /* Last cleaning tasks */ exit: /* If called with -h, cfg won't be initialised, so we must check if * cfg is NULL */ if (cfg && VERBOSE_CHATTERBOX) fprintf(stderr, "%s: exiting.\n", program_name) ; if (sockfd >= 0) close(sockfd) ; // Close socket assert(requests == NULL) ; free_cp_list() ; cfg_free(cfg) ; // Clean configuration // Destroy semaphores: sem_destroy(&lock_requests) ; sem_destroy(&lock_cps) ; return ret ; } /* * Read the configuration from both the command line and the * configuration file. * Returns an error code, or 0 in case of success. If the program should * stop (because of a special option or a configuration error), owl_run * is set to false. */ int initialise_configuration(const int argc, char *const *argv) { int ret ; ret = parse_config_file(argc, argv) ; if (! owl_run) return ret ; ret = parse_command_line(argc, argv) ; if (! owl_run) return ret ; ret = check_configuration() ; if (! owl_run) return ret ; /* Configuration dumping */ if (dump_configuration) { cfg_print(cfg, stdout) ; owl_run = false ; return 0 ; } /* Configuration printing */ if (VERBOSE_INFO) { fprintf(stderr, "Configuration:\n") ; cfg_print(cfg, stderr) ; } return 0 ; } int parse_config_file(const int argc, char *const *argv) { // Config file options for confuse cfg_opt_t opts[] = { // Daemon mode CFG_BOOL("daemon", cfg_false, CFGF_NONE), // Verbose level CFG_INT("verbose", 0, CFGF_NONE), // Aggregation listening port CFG_INT("listening_port", OWL_DEFAULT_LISTENER_PORT, CFGF_NONE), // Port and host of the localisation server: CFG_INT("positioner_port", OWL_DEFAULT_AGGREGATION_PORT, CFGF_NONE), CFG_STR("positioner_host", POSITIONER_DEFAULT_HOST, CFGF_NONE), // Output file (default is the standard output) CFG_STR("output_file", "-", CFGF_NONE), // Flush the output file or not CFG_BOOL("flush_output", cfg_true, CFGF_NONE), // Timeouts (in milliseconds): CFG_INT("aggregate_timeout", DEFAULT_AGGREGATE_TIMEOUT, CFGF_NONE), CFG_INT("keep_timeout", DEFAULT_KEEP_TIMEOUT, CFGF_NONE), // Time between two list checks (in milliseconds): CFG_INT("check_interval", DEFAULT_CHECK_INTERVAL, CFGF_NONE), // Autocalibration activated? CFG_BOOL("autocalibration", cfg_false, CFGF_NONE), // Port on which autocalibration orders are sent to the listeners: CFG_INT("autocalibration_order_port", OWL_DEFAULT_AUTOCALIBRATION_ORDER_PORT, CFGF_NONE), // Port on which autocalibration hello are received from the // listeners: CFG_INT("autocalibration_hello_port", OWL_DEFAULT_AUTOCALIBRATION_HELLO_PORT, CFGF_NONE), // Time we keep CPs in the list (in seconds): CFG_INT("cp_keep_timeout", DEFAULT_CP_KEEP_TIMEOUT, CFGF_NONE), // Time between two checks of the CP list (in milliseconds): CFG_INT("ac_order_interval", DEFAULT_AC_ORDER_INTERVAL, CFGF_NONE), CFG_END() } ; // Configuration file name char *config_file = NULL ; // True if we are using the default configuration file, false if the // user specified a different one with -f bool default_config_file = false ; // Option -f specifies a config file, options -h and -V exit the // program, so we search for them first int opt ; while ((opt = getopt(argc, argv, OPTIONS)) != -1) { switch (opt) { case 'f' : config_file = malloc((strlen(optarg) + 1) * sizeof(char)) ; if (! config_file) { perror("Cannot allocate memory") ; owl_run = false ; return errno ; } strcpy(config_file, optarg) ; break ; case 'h' : print_usage() ; owl_run = false ; return EXIT_SUCCESS ; case 'V' : print_version() ; owl_run = false ; return EXIT_SUCCESS ; } } // If -f isn't found, we use the default config file if (config_file == NULL) { default_config_file = true ; config_file = malloc((strlen(DEFAULT_CONFIG_FILE) + 1) * sizeof(char)) ; if (! config_file) { perror("Cannot allocate memory") ; owl_run = false ; return errno ; } strcpy(config_file, DEFAULT_CONFIG_FILE) ; } /* Parse config file */ cfg = cfg_init(opts, CFGF_NONE) ; // Initialise options switch (cfg_parse(cfg, config_file)) { case CFG_FILE_ERROR : /* If we can't open the file, we display a message only if * the user used -f. In verbose mode, it would be nice to * display the message even if the user didn't use -f, but * the command-line options are not parsed yet so the verbose * level is always zero. */ if (! default_config_file) fprintf(stderr, "Error! Cannot open configuration file \"%s\": %s.\n", config_file, strerror(errno)) ; break ; case CFG_PARSE_ERROR : fprintf(stderr, "Error! Parsing of configuration file \"%s\" failed!\n", config_file) ; free(config_file) ; owl_run = false ; return OWL_ERR_CONFIG_FILE ; } free(config_file) ; return 0 ; } int parse_command_line(const int argc, char *const *argv) { int opt ; long arg_long ; // Integer value of optarg char *endptr ; // Return value of strtol() optind = 1 ; // Rewind argument parsing while ((opt = getopt(argc, argv, OPTIONS)) != -1) { switch (opt) { case 'A' : cfg_setbool(cfg, "autocalibration", cfg_true) ; break ; case 'c' : arg_long = strtol(optarg, &endptr, 10) ; if (endptr != optarg) cfg_setint(cfg, "check_interval", arg_long) ; else fprintf(stderr, "Warning! Bad check_interval:" " failing to the default value.\n") ; break ; case 'C' : arg_long = strtol(optarg, &endptr, 10) ; if (endptr != optarg) cfg_setint(cfg, "ac_order_interval", arg_long) ; else fprintf(stderr, "Warning! Bad ac_order_interval:" " failing back to the default value.\n") ; break ; case 'D' : cfg_setbool(cfg, "daemon", cfg_true) ; break ; case 'f' : // Config file break ; // (already parsed) case 'F' : cfg_setbool(cfg, "flush_output", cfg_false) ; break ; case 'G' : dump_configuration = true ; /* We must not turn owl_run false here, to avoid the default * check on its value in initialise_configuration(). */ break ; case 'H' : cfg_setint(cfg, "autocalibration_hello_port", strtol(optarg, NULL, 10)) ; break ; case 'i' : cfg_setstr(cfg, "positioner_host", optarg) ; break ; case 'k' : arg_long = strtol(optarg, &endptr, 10) ; if (endptr != optarg) cfg_setint(cfg, "keep_timeout", arg_long) ; else fprintf(stderr, "Warning! Bad keep_timeout:" " failing back to the default value.\n") ; break ; case 'K' : arg_long = strtol(optarg, &endptr, 10) ; if (endptr != optarg) cfg_setint(cfg, "cp_keep_timeout", arg_long) ; else fprintf(stderr, "Warning! Bad cp_keep_timeout:" " failing back to the default value.\n") ; break ; case 'l' : cfg_setint(cfg, "listening_port", strtol(optarg, NULL, 10)) ; break ; case 'o' : cfg_setstr(cfg, "output_file", optarg) ; break ; case 'O' : cfg_setint(cfg, "autocalibration_order_port", strtol(optarg, NULL, 10)) ; break ; case 'p' : cfg_setint(cfg, "positioner_port", strtol(optarg, NULL, 10)) ; break ; case 'q' : // Quiet mode: reset the verbose level cfg_setint(cfg, "verbose", 0) ; break ; case 't' : arg_long = strtol(optarg, &endptr, 10) ; if (endptr != optarg) cfg_setint(cfg, "aggregate_timeout", arg_long) ; else fprintf(stderr, "Warning! Bad aggregate_timeout:" " failing back to the default value.\n") ; break ; case 'v' : { // Increment the verbose level if the maximum is not reached long vlevel = cfg_getint(cfg, "verbose") ; if (vlevel < MAX_VERBOSE_LEVEL) cfg_setint(cfg, "verbose", vlevel + 1) ; } break ; default : print_usage() ; owl_run = false ; return OWL_ERR_BAD_USAGE ; } } return 0 ; } int check_configuration() { // output_file // if (cfg_getstr(cfg, "output_file")[0] == '\0') { fprintf(stderr, "Error! The empty string is not a valid output file.\n") ; print_usage() ; owl_run = false ; return OWL_ERR_BAD_USAGE ; } // listening_port // if (cfg_getint(cfg, "listening_port") < 1 || cfg_getint(cfg, "listening_port") > 65535) { if (VERBOSE_WARNING) fprintf(stderr, "Warning! Bad listening_port:" " failing back to the default value.\n") ; cfg_setint(cfg, "listening_port", OWL_DEFAULT_LISTENER_PORT) ; } // positioner_port // if (cfg_getint(cfg, "positioner_port") < 1 || cfg_getint(cfg, "positioner_port") > 65535) { if (VERBOSE_WARNING) fprintf(stderr, "Warning! Bad positioner_port:" " failing back to the default value.\n") ; cfg_setint(cfg, "positioner_port", OWL_DEFAULT_AGGREGATION_PORT) ; } // positioner_host // if (cfg_getstr(cfg, "positioner_host")[0] == '\0') { fprintf(stderr, "Error! You must specify the host name or IP" " address of the localisation server.\n") ; print_usage() ; owl_run = false ; return OWL_ERR_BAD_USAGE ; } // aggregate_timeout // if (cfg_getint(cfg, "aggregate_timeout") < 0) { if (VERBOSE_WARNING) fprintf(stderr, "Warning! aggregate_timeout cannot be negative:" " failing back to the default value.\n") ; cfg_setint(cfg, "aggregate_timeout", DEFAULT_AGGREGATE_TIMEOUT) ; } // keep_timeout // if (cfg_getint(cfg, "keep_timeout") < 0) { if (VERBOSE_WARNING) fprintf(stderr, "Warning! keep_timeout cannot be negative:" " failing back to the default value.\n") ; cfg_setint(cfg, "keep_timeout", DEFAULT_KEEP_TIMEOUT) ; } // check_interval // if (cfg_getint(cfg, "check_interval") < 0) { if (VERBOSE_WARNING) fprintf(stderr, "Warning! check_interval cannot be negative:" " failing back to the default value.\n") ; cfg_setint(cfg, "check_interval", DEFAULT_CHECK_INTERVAL) ; } // cp_keep_timeout // if (cfg_getint(cfg, "cp_keep_timeout") < 0) { if (VERBOSE_WARNING) fprintf(stderr, "Warning! cp_keep_timeout cannot be negative:" " failing back to the default value.\n") ; cfg_setint(cfg, "cp_keep_timeout", DEFAULT_CP_KEEP_TIMEOUT) ; } // ac_order_interval // if (cfg_getint(cfg, "ac_order_interval") < 0) { if (VERBOSE_WARNING) fprintf(stderr, "Warning! ac_order_interval cannot be negative:" " failing back to the default value.\n") ; cfg_setint(cfg, "ac_order_interval", DEFAULT_AC_ORDER_INTERVAL) ; } return 0 ; } /* * Reads packets while the program is not stopped. */ int read_loop(const int sockfd) { int ret = 0 ; // Return value ssize_t nread ; // recvfrom return value owl_captured_request request ; // Message read on the socket while (owl_run) { nread = recvfrom(sockfd, &request, sizeof(request), 0, NULL, NULL) ; if (nread <= 0) { if (owl_run) { perror("No request received from listener") ; ret = OWL_ERR_SOCKET_RECV ; } break ; } // Endianess conversions: owl_ntoh_timestamp(&request.request_time) ; owl_ntoh_timestamp(&request.capture_time) ; request.x_position = owl_ntohf(request.x_position) ; request.y_position = owl_ntohf(request.y_position) ; request.z_position = owl_ntohf(request.z_position) ; request.packet_id = ntohs(request.packet_id) ; request.nb_packets = ntohs(request.nb_packets) ; if (VERBOSE_REQUESTS) print_captured_request(&request) ; else if (VERBOSE_CHATTERBOX) fprintf(stderr, "Request received from CP \"%s\".\n", owl_mac_bytes_to_string(request.cp_mac_addr_bytes)) ; got_request(&request) ; } return ret ; } /* * Prints an owl_captured_request on the standard error. */ void print_captured_request(const owl_captured_request *const request) { char request_time_str[OWL_TIMESTAMP_STRLEN], capture_time_str[OWL_TIMESTAMP_STRLEN], cp_mac_addr_str[OWL_ETHER_ADDR_STRLEN], mobile_mac_addr_str[OWL_ETHER_ADDR_STRLEN], mobile_ip_str[INET_ADDRSTRLEN] ; assert(request) ; owl_timestamp_to_string(&request->request_time, request_time_str) ; owl_timestamp_to_string(&request->capture_time, capture_time_str) ; owl_mac_bytes_to_string_r(request->cp_mac_addr_bytes, cp_mac_addr_str) ; owl_mac_bytes_to_string_r(request->mobile_mac_addr_bytes, mobile_mac_addr_str) ; inet_ntop(AF_INET, &request->mobile_ip_addr_bytes, mobile_ip_str, INET_ADDRSTRLEN) ; fprintf(stderr, "\n" "*** Request received from CP ***\n" "\tType: %"PRIu8"\n" "\tCP MAC: %s\n" "\tMobile MAC: %s\n" "\tMobile IP: %s\n" "\tRequest timestamp: %s\n" "\tRequest arrival time on the CP: %s\n" "\tSignal: %"PRId8" dBm\n" "\tPosition X: %f\n" "\tPosition Y: %f\n" "\tPosition Z: %f\n" "\tDirection: %"PRIu8"\n" "\tPacket number: %"PRIu16"/%"PRIu16"\n" , request->type, cp_mac_addr_str, mobile_mac_addr_str, mobile_ip_str, request_time_str, capture_time_str, request->ss_dbm, request->x_position, request->y_position, request->z_position, request->direction, request->packet_id, request->nb_packets ) ; } /* * Treats a received packet. */ void got_request(const owl_captured_request *const request) { request_info_list *tmp_info = NULL ; owl_timestamp reception_time ; // Reception time on the aggregator assert(request) ; owl_timestamp_now(&reception_time) ; /* Create a new request */ tmp_info = malloc(sizeof(request_info_list)) ; if (! tmp_info) { perror("Cannot allocate memory") ; owl_run = false ; return ; } tmp_info->packet_id = request->packet_id ; memcpy(tmp_info->cp_mac_addr_bytes, request->cp_mac_addr_bytes, ETHER_ADDR_LEN) ; tmp_info->capture_time = request->capture_time ; tmp_info->ss_dbm = request->ss_dbm ; tmp_info->next = NULL ; /* Add it in the list */ sem_wait(&lock_requests) ; // Make sure we are still running once we've got the lock if (owl_run) add_captured_request(&reception_time, request, tmp_info) ; else free(tmp_info) ; sem_post(&lock_requests) ; } /* * Inserts a received request into the requests' list. * * Parameters: * - reception_time: the time at wich the request was received at the * Aggregator * - request: the request to insert * - request_info: the request's information */ void add_captured_request(const owl_timestamp *const reception_time, const owl_captured_request *const request, request_info_list *const request_info) { request_list *tmp_request = requests ; assert(reception_time) ; assert(request) ; assert(request_info) ; if (requests == NULL) // If the request list does not exist, { if (VERBOSE_INFO) fprintf(stderr, "Creating request list with CP \"%s\".\n", owl_mac_bytes_to_string(request->cp_mac_addr_bytes)) ; tmp_request = malloc(sizeof(request_list)) ; // create it. if (! tmp_request) { perror("Cannot allocate memory") ; owl_run = false ; return ; } tmp_request->type = request->type ; tmp_request->nb_packets = request->nb_packets ; memcpy(tmp_request->mobile_mac_addr_bytes, request->mobile_mac_addr_bytes, ETHER_ADDR_LEN) ; memcpy(tmp_request->mobile_ip_addr_bytes, request->mobile_ip_addr_bytes, 4) ; // Explicit packet: if (request->type != OWL_REQUEST_IMPLICIT) // Transmission time on the mobile: tmp_request->request_time = request->request_time ; // Implicit packet: else // Reception time on the CP: tmp_request->request_time = request->capture_time ; // Save locale time on the aggregator (not the reception time // on the CP): tmp_request->start_time = *reception_time ; tmp_request->x_position = request->x_position ; tmp_request->y_position = request->y_position ; tmp_request->z_position = request->z_position ; tmp_request->direction = request->direction ; tmp_request->next = NULL ; tmp_request->info = request_info ; requests = tmp_request ; } else // If the request list exists already { // we search the list for the request // Explicit packet: if (request->type != OWL_REQUEST_IMPLICIT) { while (tmp_request != NULL) { // Research criterion: MAC and transmission time if (owl_mac_equals(request->mobile_mac_addr_bytes, tmp_request->mobile_mac_addr_bytes) && owl_timestamp_equals(&request->request_time, &tmp_request->request_time)) break ; // If the request exists, we stop on it tmp_request = tmp_request->next ; } } // Implicit packet: else { while (tmp_request != NULL) { // Research criterion: MAC addresses equals and reception // times on the CPs less than 10 ms // TODO : define an option for the maximal difference time. if (owl_mac_equals(request->mobile_mac_addr_bytes, tmp_request->mobile_mac_addr_bytes) && owl_time_elapsed_ms(&request->capture_time, &tmp_request->request_time) <= 10) break ; // If the request exists, we stop on it tmp_request = tmp_request->next ; } } if (tmp_request == NULL) // The request does not exist in the list { if (VERBOSE_INFO) fprintf(stderr, "Create new request from CP \"%s\".\n", owl_mac_bytes_to_string(request->cp_mac_addr_bytes)) ; tmp_request = malloc(sizeof(request_list)) ; // create it if (! tmp_request) { perror("Cannot allocate memory") ; owl_run = false ; return ; } tmp_request->type = request->type ; tmp_request->nb_packets = request->nb_packets ; memcpy(tmp_request->mobile_mac_addr_bytes, request->mobile_mac_addr_bytes, ETHER_ADDR_LEN) ; memcpy(tmp_request->mobile_ip_addr_bytes, request->mobile_ip_addr_bytes, 4) ; // Explicit packet: if (request->type != OWL_REQUEST_IMPLICIT) // Transmission time on the mobile: tmp_request->request_time = request->request_time ; // Implicit packet: else // Reception time on the CP: tmp_request->request_time = request->capture_time ; // Save the local time on the aggregator (not the reception // time on the CP): tmp_request->start_time = *reception_time ; tmp_request->x_position = request->x_position ; tmp_request->y_position = request->y_position ; tmp_request->z_position = request->z_position ; tmp_request->direction = request->direction ; tmp_request->next = requests ; tmp_request->info = request_info ; requests = tmp_request ; } else // If the request was found in the list { if (tmp_request->info == NULL) { // We already sent to the server data for this request if (VERBOSE_CHATTERBOX) fprintf(stderr, "Request already treated.\n") ; free(request_info) ; } else { if (VERBOSE_CHATTERBOX) fprintf(stderr, "Add information to the request.\n") ; request_info->next = tmp_request->info ; // Add data tmp_request->info = request_info ; } } } } /* * Thread function. Monitors the list and prints/sends information to the * localisation server when the timeout is reached. */ void* monitor_requests(void *const NULL_value) { FILE *stream = NULL ; struct sockaddr serv; int sockfd; if (VERBOSE_WARNING) fprintf(stderr, "Monitor requests thread launched.\n") ; /* Open the socket to the positioning server * We don't need to check wether or not the socket was opened * successfuly because the network transmission is not mandatory * (aggregated data will be written to the output file * anyway). owl_close_fd() and the rest of the code here handle * negative file descriptors. */ sockfd = owl_create_udp_trx_socket(cfg_getstr(cfg, "positioner_host"), cfg_getint(cfg, "positioner_port"), &serv) ; pthread_cleanup_push(&owl_close_fd, &sockfd) ; /* Open output file */ if (strcmp("-", cfg_getstr(cfg, "output_file")) == 0) stream = stdout ; else { stream = fopen(cfg_getstr(cfg, "output_file"), "a") ; // add mode if (stream == NULL) // If we failed to open the file, { perror("Cannot open output file") ; fprintf(stderr, "Redirecting output to standard output.\n") ; stream = stdout ; // we fail back to stdout. } } pthread_cleanup_push(&owl_close_file, &stream) ; /* Main loop */ while (owl_run) scan_request_list(stream, sockfd, &serv) ; /* Flush the requests' list prior to return */ flush_request_list(stream, sockfd, &serv) ; /* Close output file & socket */ pthread_cleanup_pop(1) ; pthread_cleanup_pop(1) ; pthread_exit(NULL_value) ; } /* * Goes through the requests' list, output the requests that are older * than the aggregation timeout, deletes the requests that are older * than the keep timeout. */ void scan_request_list(FILE *const stream, const int sockfd, const struct sockaddr *const serv) { request_list *request_ptr ; request_list *request_prev = NULL ; owl_timestamp current_time ; uint_fast32_t sub ; // owl_time_elapsed_ms() result uint_fast32_t aggregate_timeout = cfg_getint(cfg, "aggregate_timeout") ; uint_fast32_t keep_timeout = cfg_getint(cfg, "keep_timeout") ; sem_wait(&lock_requests) ; owl_timestamp_now(¤t_time) ; request_ptr = requests ; while (request_ptr != NULL) // Go through the list { sub = owl_time_elapsed_ms(&request_ptr->start_time, ¤t_time) ; // If the request was not treated already if (request_ptr->info != NULL) { // If the timeout is reached if (sub > aggregate_timeout) { if (VERBOSE_CHATTERBOX) fprintf(stderr, "* Aggregate timeout reached:" " %"PRIuFAST32" > %"PRIuFAST32"\n", sub, aggregate_timeout) ; output_request(request_ptr, stream, sockfd, serv) ; } } // If the request was treated and keep timeout is reached else if (sub > keep_timeout) { request_list *request_tmp = request_ptr ; if (VERBOSE_CHATTERBOX) fprintf(stderr, "* Keep timeout reached:" " %"PRIuFAST32" > %"PRIuFAST32"\n", sub, keep_timeout) ; request_ptr = request_ptr->next ; // If it is the first request of the list if (request_prev == NULL) requests = request_ptr ; // we shift the head else // else we put the next of the previous on the next request_prev->next = request_ptr ; free(request_tmp) ; continue ; } // Next request request_prev = request_ptr ; request_ptr = request_ptr->next ; } sem_post(&lock_requests) ; if (cfg_getbool(cfg, "flush_output")) if (fflush(stream)) perror("Failed to flush the output file") ; // Wait to check again: owl_msleep(cfg_getint(cfg, "check_interval")) ; } /* * Prints and sends all the requests in the requests' list and frees * the list. */ void flush_request_list(FILE *const stream, const int sockfd, const struct sockaddr *const serv) { request_list *next_request ; if (VERBOSE_CHATTERBOX) fprintf(stderr, " Flushing the requests' list...\n") ; sem_wait(&lock_requests) ; while (requests) { // If the request was not treated already, we print its data if (requests->info != NULL) output_request(requests, stream, sockfd, serv) ; // Free the current request and jump to the next next_request = requests->next ; free(requests) ; requests = next_request ; } sem_post(&lock_requests) ; if (VERBOSE_CHATTERBOX) fprintf(stderr, " Requests' list flushed.\n") ; } /* * Prints a request to the output file and sends it to the aggregation * server. The request's info field is deleted. * * Parameters: * - request_ptr: the request to print * - stream: the stream to print to * - sockfd: the file descriptor of the socket to send to (can be * negative, in which case no transmission will be attempted) * - serv: the server's information */ void output_request(request_list *const request_ptr, FILE *const stream, const int sockfd, const struct sockaddr *const serv) { char mac_str[OWL_ETHER_ADDR_STRLEN] ; char request_time_str[OWL_TIMESTAMP_STRLEN] ; socklen_t serv_len = sizeof(*serv) ; owl_request request ; owl_request_info info; request_info_list *request_info_ptr = NULL ; // Print CSV format version to the output file fprintf(stream, "%d;", OWL_LATEST_AGGREGATION_CSV_FORMAT) ; // Print mobile MAC address to the output file owl_mac_bytes_to_string_r(request_ptr->mobile_mac_addr_bytes, mac_str) ; fprintf(stream, "%s;", mac_str) ; // Print request type & number of packets to the // output file fprintf(stream, "%"PRIu8";%"PRIu16";", request_ptr->type, request_ptr->nb_packets) ; // Print request mobile timestamp to the output file owl_timestamp_to_string(&request_ptr->request_time, request_time_str) ; fprintf(stream, "%s;", request_time_str) ; // Print request info to the output file fprintf(stream, "%0.2f;%0.2f;%0.2f;%hhd", request_ptr->x_position, request_ptr->y_position, request_ptr->z_position, request_ptr->direction) ; // Initialise owl_request fields: request.type = request_ptr->type ; request.nb_packets = htons(request_ptr->nb_packets) ; memcpy(request.mobile_mac_addr_bytes, request_ptr->mobile_mac_addr_bytes, ETHER_ADDR_LEN) ; memcpy(request.mobile_ip_addr_bytes, request_ptr->mobile_ip_addr_bytes, 4) ; request.request_time = request_ptr->request_time ; owl_hton_timestamp(&request.request_time) ; request.x_position = owl_htonf(request_ptr->x_position) ; request.y_position = owl_htonf(request_ptr->y_position) ; request.z_position = owl_htonf(request_ptr->z_position) ; request.direction = request_ptr->direction ; // Count the requests: request.nb_info = 0 ; request_info_ptr = request_ptr->info ; while (request_info_ptr != NULL) { ++request.nb_info ; request_info_ptr = request_info_ptr->next ; } request.nb_info = htons(request.nb_info) ; // Send the request's main data: if (sockfd >= 0) { ssize_t n; n = sendto(sockfd, &request, sizeof(request), 0, serv, serv_len); if (n != sizeof(request)) perror("Couldn't send the request's main data"); } // Send request's per-CP information to the server and delete it // from the request request_info_ptr = request_ptr->info ; while (request_info_ptr != NULL) { // Send CP info to the localisation server info.packet_id = htons(request_info_ptr->packet_id) ; memcpy(info.cp_mac_addr_bytes, request_info_ptr->cp_mac_addr_bytes, ETHER_ADDR_LEN) ; info.capture_time = request_info_ptr->capture_time ; owl_hton_timestamp(&info.capture_time) ; info.ss_dbm = request_info_ptr->ss_dbm ; if (sockfd >= 0) { ssize_t n; n = sendto(sockfd, &info, sizeof(info), 0, serv, serv_len); if (n != sizeof(info)) perror("Couldn't send the CP information"); } // Print CP info to the output file owl_mac_bytes_to_string_r(request_info_ptr->cp_mac_addr_bytes, mac_str) ; fprintf(stream, ";%s;%"PRIu16";%"PRId8, mac_str, request_info_ptr->packet_id, request_info_ptr->ss_dbm) ; // Delete the current CP info from the request request_info_ptr = request_info_ptr->next ; free(request_ptr->info) ; request_ptr->info = request_info_ptr ; } // End the line in the output file: fprintf(stream, "\n") ; } #ifndef NDEBUG /* * Prints the request list. */ void print_request_list() { request_list *request_ptr = NULL ; request_info_list *info_ptr = NULL ; char mobile_mac_str[OWL_ETHER_ADDR_STRLEN] ; char request_time_str[OWL_TIMESTAMP_STRLEN], start_time_str[OWL_TIMESTAMP_STRLEN] ; sem_wait(&lock_requests) ; if (requests == NULL) // Empty list { fprintf(stderr, "No request.\n") ; return ; } request_ptr = requests ; while (request_ptr != NULL) { info_ptr = request_ptr->info ; // Get the sub-list pointer owl_mac_bytes_to_string_r(request_ptr->mobile_mac_addr_bytes, mobile_mac_str) ; owl_timestamp_to_string(&request_ptr->request_time, request_time_str) ; owl_timestamp_to_string(&request_ptr->start_time, start_time_str) ; fprintf(stderr, "Type: %"PRIu8"\n" "Mobile MAC: %s\n" "Sequence number: %s\n" "Reception timestamp: %s\n" "\n", request_ptr->type, mobile_mac_str, request_time_str, start_time_str ) ; // Parse information relative to the current request while (info_ptr != NULL) { print_request_info(info_ptr) ; putc('\n', stderr) ; info_ptr = info_ptr->next ; } fprintf(stderr, "\n\n") ; request_ptr = request_ptr->next ; } sem_post(&lock_requests) ; } /* * Prints an element of a request_info_list. */ void print_request_info(const request_info_list *const info) { char cp_mac_str[OWL_ETHER_ADDR_STRLEN] ; if (info == NULL) return ; owl_mac_bytes_to_string_r(info->cp_mac_addr_bytes, cp_mac_str) ; fprintf(stderr, "\tCP MAC: %s\n" "\tSignal strength: %"PRId8" dBm\n", cp_mac_str, info->ss_dbm ) ; } #endif // NDEBUG /* * Thread function. Listens for hello messages from CPs. */ void* listen_for_cps(void *const NULL_value) { int listen_sockfd ; int nread ; // recvfrom return value struct sockaddr_in client; // UDP client structure socklen_t client_len = sizeof(client) ; // Size of clients owl_autocalibration_hello message ; char cp_ip_addr[INET_ADDRSTRLEN] ; if (VERBOSE_WARNING) fprintf(stderr, "Autocalibration Hello thread launched.\n") ; listen_sockfd = owl_create_udp_listening_socket(cfg_getint(cfg, "autocalibration_hello_port")) ; if (listen_sockfd < 0) { perror("Error! Cannot create UDP listening socket from the" " listeners") ; exit(OWL_ERR_SOCKET_CREATE) ; } pthread_cleanup_push(&owl_close_fd, &listen_sockfd) ; while (owl_run) { nread = recvfrom(listen_sockfd, &message, sizeof(message), 0, (struct sockaddr *) &client, &client_len) ; if (nread <= 0) { if (owl_run) perror("No message received from listener") ; continue ; } inet_ntop(AF_INET, &client.sin_addr, cp_ip_addr, INET_ADDRSTRLEN) ; if (VERBOSE_INFO) fprintf(stderr, "Got a Hello message from \"%s\"\n", cp_ip_addr) ; sem_wait(&lock_cps) ; update_cp(message.cp_mac_addr_bytes, cp_ip_addr) ; sem_post(&lock_cps) ; } /* Close the socket */ pthread_cleanup_pop(1) ; pthread_exit(NULL_value) ; } /* * Updates the timestamp of the CP with the given MAC address if it is in * the CP list, or add a new CP with this MAC address to the CP list. */ void update_cp(const uint8_t mac_addr_bytes[ETHER_ADDR_LEN], const char ip_addr[INET_ADDRSTRLEN]) { cp_list *found ; if ((found = find_cp(mac_addr_bytes)) == NULL) { cp_list *new_cp = add_cp_front(mac_addr_bytes) ; if (new_cp) update_cp_ip_addr(new_cp, ip_addr) ; } else update_cp_seen(found) ; } /* * Searches the CP list for a CP with the given MAC address and returns * it. */ cp_list* find_cp(const uint8_t mac_addr_bytes[ETHER_ADDR_LEN]) { cp_list *found ; if (token_cps == NULL) return NULL ; found = token_cps ; do { if (owl_mac_equals(found->mac_addr_bytes, mac_addr_bytes)) return found ; found = found->next ; } while (found != token_cps) ; return NULL ; } /* * Adds a new CP in front of the CP list. * Returns the new list head, or NULL in case of error. */ cp_list* add_cp_front(const uint8_t mac_addr_bytes[ETHER_ADDR_LEN]) { if (VERBOSE_INFO) { char mac_str[OWL_ETHER_ADDR_STRLEN] ; owl_mac_bytes_to_string_r(mac_addr_bytes, mac_str) ; fprintf(stderr, "Creating CP with MAC address \"%s\"...\n", mac_str) ; } cp_list *cp = malloc(sizeof(cp_list)) ; if (! cp) { perror("Cannot allocate memory") ; owl_run = false ; return NULL ; } memcpy(cp->mac_addr_bytes, mac_addr_bytes, ETHER_ADDR_LEN) ; update_cp_seen(cp) ; push_cp(cp) ; return cp ; } /* * Change the IP address of the CP 'cp' with 'ip_addr'. */ void update_cp_ip_addr(cp_list *const cp, const char ip_addr[INET_ADDRSTRLEN]) { strncpy(cp->ip_addr, ip_addr, INET_ADDRSTRLEN) ; } /* * Updates the timestamp of the given CP. */ void update_cp_seen(cp_list *const cp) { assert(cp) ; owl_timestamp_now(&cp->last_seen) ; } /* * Puts an existing CP in front of the CP list. The CP must not be in * the list yet. */ void push_cp(cp_list *const cp) { assert(cp) ; ++nb_cps ; if (token_cps == NULL) // List does not exist yet { token_cps = cp ; cp->next = cp ; cp->previous = cp ; return ; } cp->previous = token_cps->previous ; cp->previous->next = cp ; cp->next = token_cps ; token_cps->previous = cp ; token_cps = cp ; } /* * Monitors the CP list: sends orders to CPs following their order in * the list, and deletes old CPs. */ void* monitor_cps(void *const NULL_value) { if (VERBOSE_WARNING) fprintf(stderr, "Monitor CP thread launched.\n") ; while (owl_run) { sem_wait(&lock_cps) ; delete_old_cps() ; sem_post(&lock_cps) ; // Here we're not in a hurry, so we released the semaphore to // allow listen_for_cps() to process a received hello packet, // if needed. sem_wait(&lock_cps) ; if (nb_cps > 1) { order_send(token_cps) ; token_cps = token_cps->next ; } sem_post(&lock_cps) ; owl_msleep(cfg_getint(cfg, "ac_order_interval")) ; } pthread_exit(NULL_value) ; } /* * Deletes CPs that did not send any Hello packet for a while, following * the list order. Stops on the first not-to-be-deleted CP. */ void delete_old_cps() { owl_timestamp now ; owl_timestamp_now(&now) ; while (token_cps != NULL) if (owl_time_elapsed_ms(&token_cps->last_seen, &now) > (uint_fast32_t) cfg_getint(cfg, "cp_keep_timeout") * 1000) delete_cp(token_cps) ; else return ; } /* * Deletes the given CP from the CP list. */ void delete_cp(cp_list *const cp) { if (VERBOSE_INFO) { char mac_str[OWL_ETHER_ADDR_STRLEN] ; assert(cp) ; owl_mac_bytes_to_string_r(cp->mac_addr_bytes, mac_str) ; fprintf(stderr, "Deleting CP \"%s\"...\n", mac_str) ; } unlink_cp(cp) ; free(cp) ; } /* * Extracts the given CP from the CP list (it will not be linked to any * other element of the list). */ void unlink_cp(const cp_list *const cp) { cp_list *cp_previous, *cp_next ; assert(cp) ; cp_previous = cp->previous ; cp_next = cp->next ; assert(cp_previous) ; assert(cp_next) ; cp_previous->next = cp_next ; cp_next->previous = cp_previous ; if (cp == token_cps) { if (cp->next == cp) // It was the last CP in the ring token_cps = NULL ; else token_cps = cp_next ; } --nb_cps ; } /* * Sends a 'send' order to the given CP. */ void order_send(const cp_list *const cp) { owl_autocalibration_order message ; struct sockaddr serv; socklen_t serv_len = sizeof(serv); int sockfd ; ssize_t nsent ; if (VERBOSE_INFO) fprintf(stderr, "Sending an order to %s...\n", cp->ip_addr) ; sockfd = owl_create_udp_trx_socket(cp->ip_addr, cfg_getint(cfg, "autocalibration_order_port"), &serv) ; if (sockfd < 0) { fprintf(stderr, "Can't transmit autocalibration order.\n"); return; } message.order = AUTOCALIBRATION_ORDER_SEND ; nsent = sendto(sockfd, &message, sizeof(message), 0, &serv, serv_len) ; if (nsent != (ssize_t) sizeof(message)) { perror("Error sending order to the listener") ; exit(OWL_ERR_SOCKET_SEND) ; } close(sockfd) ; } /* * Empties the CP list. * Note that this function does not use lock_cps, so it should not * be called in a concurrent context. */ void free_cp_list() { cp_list *cp_ptr ; if (token_cps == NULL) return ; cp_ptr = token_cps->next ; assert(cp_ptr) ; while (cp_ptr != token_cps) { cp_list *cp_tmp = cp_ptr ; cp_ptr = cp_ptr->next ; free(cp_tmp) ; } free(token_cps) ; token_cps = NULL ; } void print_usage() { printf("Usage:\n" "\t%s" " [-f config_file]" " [-G]" " [-D]" " [-v[v[v[v]]] | -q]" "\n\t" " [-o out_file]" " [-F]" " [-i positionner_host]" " [-p positioner_port]" "\n\t" " [-l listening_port]" " [-t aggregate_timeout]" " [-k keep_timeout]" "\n\t" " [-c check_interval]" " [-A]" " [-O autocalibration_order_port]" "\n\t" " [-H hello_port]" " [-K cp_keep_timeout]" " [-C ac_order_interval]" "\n" "\t%s -h\n" "\t%s -V\n" "Main options:\n" "\t-h\t\tPrint this help message and exit.\n" "\t-V\t\tShow version information and exit.\n" "\t-f config_file\tUse 'config_file' instead of the default" " configuration\n\t\t\tfile (%s).\n" "\t-G\t\tDump the configuration on the standard output and exit" "\n\t\t\t(useful to generate a configuration file from the" "\n\t\t\tcurrent set of options).\n" "\t-D\t\tDaemon mode.\n" "\t-v\t\tBe verbose. You can use this option up to 4 times to" "\n\t\t\tincrease the level of verbosity (1 = warnings,\n\t\t\t" "2 = useful information, 3 = too much information,\n\t\t\t4 = " "displays the detail of each and every received\n\t\t\t" "request).\n" "\t-q\t\tQuiet mode (default): sets the verbose level to 0.\n" "Output options:\n" "\t-o out_file\t\tAggregated requests will be appended to" " this CSV\n\t\t\t\tfile (default is \"-\", i.e. the standard" " output).\n" "\t-F\t\t\tDo not flush the output file after each analyse" "\n\t\t\t\tof the requests' list.\n" "\t-i positionner_host\tHost name or IP address of the" " positioning\n\t\t\t\tserver (default: %s).\n" "\t-p positioner_port\tAggregated requests are transmitted to" " the\n\t\t\t\tpositioning server on this port (default:" "\n\t\t\t\t%d).\n" "Aggregation options:\n" "\t-l listening_port\tListen for requests from the capture" " points on\n\t\t\t\tthis port (default: %d).\n" "\t-t aggregate_timeout\tRequests are stored during" " 'aggregate_timeout'\n\t\t\t\tmilliseconds before to be" " grouped (default:\n\t\t\t\t%d ms).\n" "\t-k keep_timeout\t\tAggregated requests are kept during" "\n\t\t\t\t'keep_timeout' milliseconds (default: %d ms).\n" "\t-c check_interval\tTime between two checks of the stored" " requests,\n\t\t\t\tin milliseconds (default: %d ms).\n" "Autocalibration options:\n" "\t-A\t\t\tEnable autocalibration (default: disabled).\n" "\t-O ac_order_port\tPort on which autocalibration orders" " are sent to\n\t\t\t\tthe listeners (default: %d).\n" "\t-H hello_port\t\tPort on which autocalibration hello" " messages are\n\t\t\t\treceived from the listeners" " (default: %d).\n" "\t-K cp_keep_timeout\tInactive CPs are kept during" " 'cp_keep_timeout'\n\t\t\t\tseconds (default: %d s).\n" "\t-C ac_order_interval\tTime (in milliseconds) between two" " transmissions\n\t\t\t\tof autocalibration orders to the" " stored CPs\n\t\t\t\t(default: %d ms).\n" , program_name, program_name, program_name, DEFAULT_CONFIG_FILE, POSITIONER_DEFAULT_HOST, OWL_DEFAULT_AGGREGATION_PORT, OWL_DEFAULT_LISTENER_PORT, DEFAULT_AGGREGATE_TIMEOUT, DEFAULT_KEEP_TIMEOUT, DEFAULT_CHECK_INTERVAL, OWL_DEFAULT_AUTOCALIBRATION_ORDER_PORT, OWL_DEFAULT_AUTOCALIBRATION_HELLO_PORT, DEFAULT_CP_KEEP_TIMEOUT, DEFAULT_AC_ORDER_INTERVAL ) ; } void print_version() { printf("This is OwlPS Aggregator, part of the Owl Positioning System" " project.\n" "Version: %s.\n", #ifdef OWLPS_VERSION OWLPS_VERSION #else // OWLPS_VERSION "unknown version" #endif // OWLPS_VERSION ) ; }