owlps/owlps-aggregator/owlps-aggregatord.c

1494 lines
44 KiB
C

/*
* This file is part of the Owl Positioning System (OwlPS).
* OwlPS is a project of the University of Franche-Comte
* (Université de Franche-Comté), France.
*
* Copyright © Université de Franche-Comté 2007-2012.
*
* Corresponding author: Matteo Cypriani <mcy@lm7.fr>
*
***********************************************************************
*
* This software is governed by the CeCILL license under French law and
* abiding by the rules of distribution of free software. You can use,
* modify and/or redistribute the software under the terms of the CeCILL
* license as circulated by CEA, CNRS and INRIA at the following URL:
* http://www.cecill.info
*
* As a counterpart to the access to the source code and rights to copy,
* modify and redistribute granted by the license, users are provided
* only with a limited warranty and the software's authors, the holder
* of the economic rights, and the successive licensors have only
* limited liability.
*
* In this respect, the user's attention is drawn to the risks
* associated with loading, using, modifying and/or developing or
* reproducing the software by the user in light of its specific status
* of free software, that may mean that it is complicated to manipulate,
* and that also therefore means that it is reserved for developers and
* experienced professionals having in-depth computer knowledge. Users
* are therefore encouraged to load and test the software's suitability
* as regards their requirements in conditions enabling the security of
* their systems and/or data to be ensured and, more generally, to use
* and operate it in the same conditions as regards security.
*
* The fact that you are presently reading this means that you have had
* knowledge of the CeCILL license and that you accept its terms.
*
***********************************************************************
*
* This is the main source file of OwlPS Aggregator, the program that
* aggregates the requests captured by the Listeners in order to present
* them to the Positioner.
*/
#include "owlps-aggregator.h"
#include <stdlib.h>
#include <unistd.h>
#include <inttypes.h>
#include <string.h>
#include <errno.h>
#include <signal.h>
#include <pthread.h>
#include <semaphore.h>
#include <arpa/inet.h>
#include <confuse.h>
#include <assert.h>
char *program_name = NULL ;
cfg_t *cfg = NULL ; // Configuration structure
request_list *requests = NULL ; // Computed data list
sem_t lock_requests ; // Semaphore to get access to the requests
ap_list *token_aps = NULL ; // Token ring of the APs
uint_fast16_t nb_aps = 0 ; // Number of APs in the APs' ring
sem_t lock_aps ; // Semaphore to get access to the APs' ring
int main(int argc, char **argv)
{
int ret = 0 ; // Program return value
struct sigaction action ; // Signal handler structure
pthread_t
monitor_thread, // Aggregated data monitoring thread
monitor_aps_thread, // APs monitoring thread
autocalibration_hello_thread ; // Hello messages reception thread
uint_fast16_t listening_port ;
int sockfd = -1 ; // UDP listening socket
owl_run = owl_true ;
program_name = argv[0] ;
ret = initialise_configuration(argc, argv) ;
if (! owl_run)
goto exit ;
if (cfg_getbool(cfg, "daemon"))
{
if (VERBOSE_WARNING)
fprintf(stderr, "Detaching to background...\n") ;
if (daemon(0, 0))
perror("Cannot daemonize") ;
}
/* Set up signal handlers */
action.sa_flags = 0 ;
sigemptyset(&action.sa_mask) ;
action.sa_handler = owl_sigint_handler ;
sigaction(SIGINT, &action, NULL) ;
action.sa_handler = owl_sigterm_handler ;
sigaction(SIGTERM, &action, NULL) ;
/* Set up semaphores */
sem_init(&lock_requests, 0, 1) ;
sem_init(&lock_aps, 0, 1) ;
/* Create UDP socket */
listening_port = cfg_getint(cfg, "listening_port") ;
if ((sockfd = owl_create_udp_listening_socket(listening_port)) < 0)
{
fprintf(stderr,
"Error! Cannot listen on port %"PRIuFAST16".\n",
listening_port) ;
ret = OWL_ERR_SOCKET_CREATE ;
goto exit ;
}
/* Set up threads */
ret = pthread_create(&monitor_thread, NULL, &monitor_requests, NULL) ;
if (ret != 0)
{
perror("Cannot create monitor thread") ;
ret = OWL_ERR_THREAD_CREATE ;
goto exit ;
}
if (cfg_getbool(cfg, "autocalibration"))
{
ret = pthread_create(&autocalibration_hello_thread, NULL,
&listen_for_aps, NULL) ;
if (ret != 0)
{
perror("Cannot create autocalibration hello thread") ;
ret = OWL_ERR_THREAD_CREATE ;
goto exit ;
}
ret = pthread_create(&monitor_aps_thread, NULL,
&monitor_aps, NULL) ;
if (ret != 0)
{
perror("Cannot create monitor APs thread") ;
ret = OWL_ERR_THREAD_CREATE ;
goto exit ;
}
}
ret = read_loop(sockfd) ;
/* Wait for the threads to terminate */
if (VERBOSE_WARNING)
fprintf(stderr, "Waiting for the monitor thread... ") ;
if (pthread_join(monitor_thread, NULL) != 0)
perror("Cannot join monitor thread") ;
else if (VERBOSE_WARNING)
fprintf(stderr, "OK.\n") ;
if (cfg_getbool(cfg, "autocalibration"))
{
// We must cancel this thread because it can be blocked on the
// recvfrom() call:
if (VERBOSE_WARNING)
fprintf(stderr,
"Cancelling the autocalibration hello thread... ") ;
if (pthread_cancel(autocalibration_hello_thread) != 0)
perror("Cannot cancel autocalibration hello thread") ;
else if (VERBOSE_WARNING)
fprintf(stderr, "OK.\n") ;
if (VERBOSE_WARNING)
fprintf(stderr,
"Waiting for the autocalibration hello thread... ") ;
if (pthread_join(autocalibration_hello_thread, NULL) != 0)
perror("Cannot join autocalibration hello thread") ;
else if (VERBOSE_WARNING)
fprintf(stderr, "OK.\n") ;
if (VERBOSE_WARNING)
fprintf(stderr, "Waiting for the monitor APs thread... ") ;
if (pthread_join(monitor_aps_thread, NULL) != 0)
perror("Cannot join monitor APs thread") ;
else if (VERBOSE_WARNING)
fprintf(stderr, "OK.\n") ;
}
/* Last cleaning tasks */
exit:
if (sockfd >= 0)
close(sockfd) ; // Close socket
free_request_list() ;
free_ap_list() ;
cfg_free(cfg) ; // Clean configuration
// Destroy semaphores:
sem_destroy(&lock_requests) ;
sem_destroy(&lock_aps) ;
fprintf(stderr, "%s: end.\n", program_name) ;
return ret ;
}
/*
* Read the configuration from both the command line and the
* configuration file.
* Returns an error code, or 0 in case of success. If the program should
* stop (because of a special option or a configuration error), owl_run
* is set to false.
*/
int initialise_configuration(int argc, char **argv)
{
int ret ;
ret = parse_config_file(argc, argv) ;
if (! owl_run)
return ret ;
ret = parse_command_line(argc, argv) ;
if (! owl_run)
return ret ;
ret = check_configuration() ;
if (! owl_run)
return ret ;
/* Configuration printing */
if (VERBOSE_INFO)
{
fprintf(stderr, "Configuration:\n") ;
cfg_print(cfg, stderr) ;
}
return 0 ;
}
int parse_config_file(int argc, char **argv)
{
// Config file options for confuse
cfg_opt_t opts[] =
{
// Daemon mode
CFG_BOOL("daemon", cfg_false, CFGF_NONE),
// Verbose level
CFG_INT("verbose", 0, CFGF_NONE),
// Aggregation listening port
CFG_INT("listening_port", OWL_DEFAULT_LISTENER_PORT, CFGF_NONE),
// Port and IP address of the localisation server:
CFG_INT("positioner_port", OWL_DEFAULT_AGGREGATION_PORT, CFGF_NONE),
CFG_STR("positioner_ip", POSITIONER_DEFAULT_IP, CFGF_NONE),
CFG_STR("output_file", "", CFGF_NONE),
// Timeouts (in milliseconds):
CFG_INT("aggregate_timeout", DEFAULT_AGGREGATE_TIMEOUT, CFGF_NONE),
CFG_INT("keep_timeout", DEFAULT_KEEP_TIMEOUT, CFGF_NONE),
// Time between two list checks (in milliseconds):
CFG_INT("check_interval", DEFAULT_CHECK_INTERVAL, CFGF_NONE),
// Autocalibration activated?
CFG_BOOL("autocalibration", cfg_false, CFGF_NONE),
// Port on which autocalibration orders are sent to the listeners:
CFG_INT("autocalibration_order_port",
OWL_DEFAULT_AUTOCALIBRATION_ORDER_PORT, CFGF_NONE),
// Port on which autocalibration hello are received from the
// listeners:
CFG_INT("autocalibration_hello_port",
OWL_DEFAULT_AUTOCALIBRATION_HELLO_PORT, CFGF_NONE),
// Time we keep APs in the list (in seconds):
CFG_INT("ap_keep_timeout", DEFAULT_AP_KEEP_TIMEOUT, CFGF_NONE),
// Time between two checks of the AP list (in milliseconds):
CFG_INT("ap_check_interval", DEFAULT_AP_CHECK_INTERVAL, CFGF_NONE),
CFG_END()
} ;
char *config_file = NULL ; // Configuration file name
// Option -f specifies a config file, options -h and -V exit the
// program, so we search for them first
int opt ;
while ((opt = getopt(argc, argv, OPTIONS)) != -1)
{
switch (opt)
{
case 'f' :
config_file = malloc((strlen(optarg) + 1) * sizeof(char)) ;
strcpy(config_file, optarg) ;
break ;
case 'h' :
print_usage() ;
owl_run = owl_false ;
return EXIT_SUCCESS ;
case 'V' :
print_version() ;
owl_run = owl_false ;
return EXIT_SUCCESS ;
}
}
// If -f isn't found, we use the default config file
if (config_file == NULL)
{
config_file =
malloc((strlen(DEFAULT_CONFIG_FILE) + 1) * sizeof(char)) ;
strcpy(config_file, DEFAULT_CONFIG_FILE) ;
}
/* Parse config file */
cfg = cfg_init(opts, CFGF_NONE) ; // Initialise options
switch (cfg_parse(cfg, config_file))
{
case CFG_FILE_ERROR :
fprintf(stderr,
"Error! Cannot open configuration file \"%s\": %s.\n",
config_file, strerror(errno)) ;
break ;
case CFG_PARSE_ERROR :
fprintf(stderr,
"Error! Parsing of configuration file \"%s\" failed!\n",
config_file) ;
free(config_file) ;
owl_run = owl_false ;
return OWL_ERR_CONFIG_FILE ;
}
free(config_file) ;
return 0 ;
}
int parse_command_line(int argc, char **argv)
{
int opt ;
optind = 1 ; // Rewind argument parsing
while ((opt = getopt(argc, argv, OPTIONS)) != -1)
{
switch (opt)
{
case 'A' :
cfg_setbool(cfg, "autocalibration", cfg_true) ;
break ;
case 'c' :
cfg_setint(cfg, "check_interval", strtol(optarg, NULL, 0)) ;
break ;
case 'C' :
cfg_setint(cfg, "ap_check_interval", strtol(optarg, NULL, 0)) ;
break ;
case 'D' :
cfg_setbool(cfg, "daemon", cfg_true) ;
break ;
case 'f' : // Config file
break ; // (already parsed)
case 'H' :
cfg_setint(cfg, "autocalibration_hello_port",
strtol(optarg, NULL, 0)) ;
break ;
case 'i' :
cfg_setstr(cfg, "positioner_ip", optarg) ;
break ;
case 'k' :
cfg_setint(cfg, "keep_timeout", strtol(optarg, NULL, 0)) ;
break ;
case 'K' :
cfg_setint(cfg, "ap_keep_timeout", strtol(optarg, NULL, 0)) ;
break ;
case 'l' :
cfg_setint(cfg, "listening_port", strtol(optarg, NULL, 0)) ;
break ;
case 'o' :
cfg_setstr(cfg, "output_file", optarg) ;
break ;
case 'O' :
cfg_setint(cfg, "autocalibration_order_port",
strtol(optarg, NULL, 0)) ;
break ;
case 'p' :
cfg_setint(cfg, "positioner_port", strtol(optarg, NULL, 0)) ;
break ;
case 'q' :
// Quiet mode: reset the verbose level
cfg_setint(cfg, "verbose", 0) ;
break ;
case 't' :
cfg_setint(cfg, "aggregate_timeout", strtol(optarg, NULL, 0)) ;
break ;
case 'v' :
// Increment the verbose level
cfg_setint(cfg, "verbose",
cfg_getint(cfg, "verbose") + 1) ;
break ;
default :
print_usage() ;
owl_run = owl_false ;
return OWL_ERR_BAD_USAGE ;
}
}
return 0 ;
}
int check_configuration()
{
// output_file //
if (cfg_getstr(cfg, "output_file")[0] == '\0')
{
fprintf(stderr, "Error! You must specify an output file.\n") ;
print_usage() ;
owl_run = owl_false ;
return OWL_ERR_BAD_USAGE ;
}
// listening_port //
if (cfg_getint(cfg, "listening_port") < 1 ||
cfg_getint(cfg, "listening_port") > 65535)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Warning! Bad listening_port:"
" failing back to the default value.\n") ;
cfg_setint(cfg, "listening_port", OWL_DEFAULT_LISTENER_PORT) ;
}
// positioner_port //
if (cfg_getint(cfg, "positioner_port") < 1 ||
cfg_getint(cfg, "positioner_port") > 65535)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Warning! Bad positioner_port:"
" failing back to the default value.\n") ;
cfg_setint(cfg, "positioner_port", OWL_DEFAULT_AGGREGATION_PORT) ;
}
// positioner_ip //
if (cfg_getstr(cfg, "positioner_ip")[0] == '\0')
{
fprintf(stderr, "Error! You must specify the IP address of the"
" localisation server.\n") ;
print_usage() ;
owl_run = owl_false ;
return OWL_ERR_BAD_USAGE ;
}
// aggregate_timeout //
if (cfg_getint(cfg, "aggregate_timeout") < 0)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Warning! aggregate_timeout cannot be negative:"
" failing back to the default value.\n") ;
cfg_setint(cfg, "aggregate_timeout", DEFAULT_AGGREGATE_TIMEOUT) ;
}
// keep_timeout //
if (cfg_getint(cfg, "keep_timeout") < 0)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Warning! keep_timeout cannot be negative:"
" failing back to the default value.\n") ;
cfg_setint(cfg, "keep_timeout", DEFAULT_KEEP_TIMEOUT) ;
}
// check_interval //
if (cfg_getint(cfg, "check_interval") < 0)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Warning! check_interval cannot be negative:"
" failing back to the default value.\n") ;
cfg_setint(cfg, "check_interval", DEFAULT_CHECK_INTERVAL) ;
}
// ap_keep_timeout //
if (cfg_getint(cfg, "ap_keep_timeout") < 0)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Warning! ap_keep_timeout cannot be negative:"
" failing back to the default value.\n") ;
cfg_setint(cfg, "ap_keep_timeout", DEFAULT_AP_KEEP_TIMEOUT) ;
}
// ap_check_interval //
if (cfg_getint(cfg, "ap_check_interval") < 0)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Warning! ap_check_interval cannot be negative:"
" failing back to the default value.\n") ;
cfg_setint(cfg, "ap_check_interval", DEFAULT_AP_CHECK_INTERVAL) ;
}
return 0 ;
}
/*
* Reads packets while the program is not stopped.
*/
int read_loop(int sockfd)
{
int ret = 0 ; // Return value
ssize_t nread ; // recvfrom return value
struct sockaddr_in client; // UDP client structure
socklen_t client_len = sizeof(client) ; // Size of clients
owl_captured_request request ; // Message read on the socket
while (owl_run)
{
nread = recvfrom(sockfd, &request, sizeof(request), 0,
(struct sockaddr *) &client, &client_len) ;
if (nread <= 0)
{
if (owl_run)
{
perror("No request received from listener") ;
ret = OWL_ERR_SOCKET_RECV ;
}
break ;
}
// Endianess conversions:
owl_ntoh_timestamp(&request.request_time) ;
owl_ntoh_timestamp(&request.capture_time) ;
request.x_position = owl_ntohf(request.x_position) ;
request.y_position = owl_ntohf(request.y_position) ;
request.z_position = owl_ntohf(request.z_position) ;
request.packet_id = ntohs(request.packet_id) ;
request.nb_packets = ntohs(request.nb_packets) ;
if (VERBOSE_REQUESTS)
{
char
request_time_str[OWL_TIMESTAMP_STRLEN],
capture_time_str[OWL_TIMESTAMP_STRLEN],
ap_mac_addr_str[OWL_ETHER_ADDR_STRLEN],
mobile_mac_addr_str[OWL_ETHER_ADDR_STRLEN],
mobile_ip_str[INET_ADDRSTRLEN] ;
owl_timestamp_to_string(&request.request_time,
request_time_str) ;
owl_timestamp_to_string(&request.capture_time,
capture_time_str) ;
owl_mac_bytes_to_string_r(request.ap_mac_addr_bytes,
ap_mac_addr_str) ;
owl_mac_bytes_to_string_r(request.mobile_mac_addr_bytes,
mobile_mac_addr_str) ;
inet_ntop(AF_INET, &request.mobile_ip_addr_bytes,
mobile_ip_str, INET_ADDRSTRLEN) ;
fprintf(stderr,
"\n"
"*** Request received from AP ***\n"
"\tType: %"PRIu8"\n"
"\tAP MAC: %s\n"
"\tMobile MAC: %s\n"
"\tMobile IP: %s\n"
"\tRequest timestamp: %s\n"
"\tRequest arrival time on the AP: %s\n"
"\tSignal: %d dBm\n"
"\tPosition X: %f\n"
"\tPosition Y: %f\n"
"\tPosition Z: %f\n"
"\tDirection: %"PRIu8"\n"
"\tPacket number: %"PRIu16"/%"PRIu16"\n"
,
request.type,
ap_mac_addr_str,
mobile_mac_addr_str,
mobile_ip_str,
request_time_str,
capture_time_str,
request.ss_dbm - 256,
request.x_position,
request.y_position,
request.z_position,
request.direction,
request.packet_id,
request.nb_packets
) ;
}
else if (VERBOSE_CHATTERBOX)
fprintf(stderr, "Request received from AP \"%s\".\n",
owl_mac_bytes_to_string(request.ap_mac_addr_bytes)) ;
got_request(request) ;
}
return ret ;
}
/*
* Thread function. Monitors the list and sends information to the
* localisation server when the timeout is reached.
*/
void* monitor_requests(void *NULL_value)
{
request_list *request_ptr, *request_prev ;
request_info_list *request_info_ptr ;
owl_timestamp current_time ;
FILE *fd = NULL ;
char mac_str[OWL_ETHER_ADDR_STRLEN] ;
uint_fast32_t sub ; // owl_time_elapsed_ms() result
char request_time_str[OWL_TIMESTAMP_STRLEN] ;
uint_fast32_t aggregate_timeout =
cfg_getint(cfg, "aggregate_timeout") ;
uint_fast32_t keep_timeout = cfg_getint(cfg, "keep_timeout") ;
struct sockaddr_in serv;
struct sockaddr_in client ;
socklen_t serv_len = sizeof(serv);
owl_request request ;
owl_request_info info;
int sockfd;
if (VERBOSE_WARNING)
fprintf(stderr, "Monitor requests thread launched.\n") ;
sockfd =
owl_create_udp_trx_socket(cfg_getstr(cfg, "positioner_ip"),
cfg_getint(cfg, "positioner_port"),
&serv, &client) ;
pthread_cleanup_push(&owl_close_fd, &sockfd) ;
/* Open output file */
if (strcmp("-", cfg_getstr(cfg, "output_file")) == 0)
fd = stdout ;
else
{
fd = fopen(cfg_getstr(cfg, "output_file"), "a") ; // add mode
if (fd == NULL) // If we failed to open the file,
{
perror("Cannot open output file") ;
fprintf(stderr, "Redirecting output to standard output.\n") ;
fd = stdout ; // we fail back to stdout.
}
}
pthread_cleanup_push(&owl_close_file, &fd) ;
while (owl_run)
{
request_prev = NULL ;
request_info_ptr = NULL ;
sem_wait(&lock_requests) ;
owl_timestamp_now(&current_time) ;
request_ptr = requests ;
while (request_ptr != NULL) // Parsing list
{
sub = owl_time_elapsed_ms(&request_ptr->start_time,
&current_time) ;
// If the request was not treated already
if (request_ptr->info != NULL)
{
// If the timeout is reached
if (sub > aggregate_timeout)
{
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "* Aggregate timeout reached:"
" %"PRIuFAST32" > %"PRIuFAST32"\n",
sub, aggregate_timeout) ;
// Print mobile MAC address to the output file
owl_mac_bytes_to_string_r(request_ptr->
mobile_mac_addr_bytes,
mac_str) ;
fprintf(fd, "%s;", mac_str) ;
// Print request type & number of packets to the
// output file
fprintf(fd, "%"PRIu8";%"PRIu16";",
request_ptr->type, request_ptr->nb_packets) ;
// Print request mobile timestamp to the output file
owl_timestamp_to_string(&request_ptr->request_time,
request_time_str) ;
fprintf(fd, "%s;", request_time_str) ;
// Print request info to the output file
fprintf(fd, "%0.2f;%0.2f;%0.2f;%hhd",
request_ptr->x_position,
request_ptr->y_position,
request_ptr->z_position,
request_ptr->direction) ;
// Initialise owl_request fields:
request.type = request_ptr->type ;
request.nb_packets = htons(request_ptr->nb_packets) ;
memcpy(request.mobile_mac_addr_bytes,
request_ptr->mobile_mac_addr_bytes,
ETHER_ADDR_LEN) ;
memcpy(request.mobile_ip_addr_bytes,
request_ptr->mobile_ip_addr_bytes, 4) ;
request.request_time = request_ptr->request_time ;
owl_hton_timestamp(&request.request_time) ;
request.x_position =
owl_htonf(request_ptr->x_position) ;
request.y_position =
owl_htonf(request_ptr->y_position) ;
request.z_position =
owl_htonf(request_ptr->z_position) ;
request.direction = request_ptr->direction ;
// Count the requests:
request.nb_info = 0 ;
request_info_ptr = request_ptr->info ;
while (request_info_ptr != NULL)
{
++request.nb_info ;
request_info_ptr = request_info_ptr->next ;
}
request.nb_info = htons(request.nb_info) ;
// Send the request:
sendto(sockfd, &request, sizeof(request), 0,
(struct sockaddr *)&serv, serv_len) ;
// Send requests to the server and empty the list
request_info_ptr = request_ptr->info ;
while (request_info_ptr != NULL)
{
// Send AP info to the localisation server
info.packet_id =
htons(request_info_ptr->packet_id) ;
memcpy(info.ap_mac_addr_bytes,
request_info_ptr->ap_mac_addr_bytes,
ETHER_ADDR_LEN) ;
info.capture_time = request_info_ptr->capture_time ;
owl_hton_timestamp(&info.capture_time) ;
info.ss_dbm = request_info_ptr->ss_dbm ;
sendto(sockfd, &info, sizeof(info),
0, (struct sockaddr *)&serv, serv_len) ;
// Print AP info to the output file
owl_mac_bytes_to_string_r(request_info_ptr->
ap_mac_addr_bytes,
mac_str) ;
fprintf(fd, ";%s;%"PRIu16";%d",
mac_str,
request_info_ptr->packet_id,
request_info_ptr->ss_dbm - 256) ;
// Delete request
request_info_ptr = request_info_ptr->next ;
free(request_ptr->info) ;
request_ptr->info = request_info_ptr ;
}
fprintf(fd, "\n") ;
}
}
// If the request was treated and keep timeout is reached
else if (sub > keep_timeout)
{
request_list *request_tmp = request_ptr ;
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "* Keep timeout reached:"
" %"PRIuFAST32" > %"PRIuFAST32"\n",
sub, keep_timeout) ;
request_ptr = request_ptr->next ;
// If it is the first request of the list
if (request_prev == NULL)
requests = request_ptr ; // we shift the head
else // else we put the next of the previous on the next
request_prev->next = request_ptr ;
free(request_tmp) ;
continue ;
}
// Next request
request_prev = request_ptr ;
request_ptr = request_ptr->next ;
}
sem_post(&lock_requests) ;
fflush(NULL) ;
// Wait to check again:
owl_msleep(cfg_getint(cfg, "check_interval")) ;
}
/* Close output file & socket */
pthread_cleanup_pop(1) ;
pthread_cleanup_pop(1) ;
pthread_exit(NULL_value) ;
}
/*
* Treats a received packet.
*/
void got_request(owl_captured_request request)
{
request_list *tmp_request = NULL ;
request_info_list *tmp_info = NULL ;
owl_timestamp start_time ; // Reception time on the aggregator
owl_timestamp_now(&start_time) ;
/* Create a new request */
tmp_info = malloc(sizeof(request_info_list)) ;
tmp_info->packet_id = request.packet_id ;
memcpy(tmp_info->ap_mac_addr_bytes, request.ap_mac_addr_bytes,
ETHER_ADDR_LEN) ;
tmp_info->capture_time = request.capture_time ;
tmp_info->ss_dbm = request.ss_dbm ;
tmp_info->next = NULL ;
/* Add it in the list */
sem_wait(&lock_requests) ;
tmp_request = requests ;
if (requests == NULL) // If the request list does not exist,
{
if (VERBOSE_INFO)
fprintf(stderr, "Creating request list with AP \"%s\".\n",
owl_mac_bytes_to_string(request.ap_mac_addr_bytes)) ;
tmp_request = malloc(sizeof(request_list)) ; // create it.
tmp_request->type = request.type ;
tmp_request->nb_packets = request.nb_packets ;
memcpy(tmp_request->mobile_mac_addr_bytes,
request.mobile_mac_addr_bytes, ETHER_ADDR_LEN) ;
memcpy(tmp_request->mobile_ip_addr_bytes,
request.mobile_ip_addr_bytes, 4) ;
// Explicit packet:
if (request.type != OWL_REQUEST_IMPLICIT)
// Transmission time on the mobile:
tmp_request->request_time = request.request_time ;
// Implicit packet:
else
// Reception time on the AP:
tmp_request->request_time = request.capture_time ;
// Save locale time on the aggregator (not the reception time
// on the AP):
tmp_request->start_time = start_time ;
tmp_request->x_position = request.x_position ;
tmp_request->y_position = request.y_position ;
tmp_request->z_position = request.z_position ;
tmp_request->direction = request.direction ;
tmp_request->next = NULL ;
tmp_request->info = tmp_info ;
requests = tmp_request ;
}
else // If the request list exists already
{ // we search the list for the request
// Explicit packet:
if (request.type != OWL_REQUEST_IMPLICIT)
{
while (tmp_request != NULL)
{ // Research criterion: MAC and transmission time
if (owl_mac_equals(request.mobile_mac_addr_bytes,
tmp_request->mobile_mac_addr_bytes)
&& owl_timestamp_equals(&request.request_time,
&tmp_request->request_time))
break ; // If the request exists, we stop on it
tmp_request = tmp_request->next ;
}
}
// Implicit packet:
else
{
while (tmp_request != NULL)
{ // Research criterion: MAC addresses equals and reception
// times on the APs less than 10 ms
// TODO : define an option for the maximal difference time.
if (owl_mac_equals(request.mobile_mac_addr_bytes,
tmp_request->mobile_mac_addr_bytes) &&
owl_time_elapsed_ms(&request.capture_time,
&tmp_request->request_time) <= 10)
break ; // If the request exists, we stop on it
tmp_request = tmp_request->next ;
}
}
if (tmp_request == NULL) // The request does not exist in the list
{
if (VERBOSE_INFO)
fprintf(stderr, "Create new request from AP \"%s\".\n",
owl_mac_bytes_to_string(request.ap_mac_addr_bytes)) ;
tmp_request = malloc(sizeof(request_list)) ; // create it
tmp_request->type = request.type ;
tmp_request->nb_packets = request.nb_packets ;
memcpy(tmp_request->mobile_mac_addr_bytes,
request.mobile_mac_addr_bytes, ETHER_ADDR_LEN) ;
memcpy(tmp_request->mobile_ip_addr_bytes,
request.mobile_ip_addr_bytes, 4) ;
// Explicit packet:
if (request.type != OWL_REQUEST_IMPLICIT)
// Transmission time on the mobile:
tmp_request->request_time = request.request_time ;
// Implicit packet:
else
// Reception time on the AP:
tmp_request->request_time = request.capture_time ;
// Save the local time on the aggregator (not the reception
// time on the AP):
tmp_request->start_time = start_time ;
tmp_request->x_position = request.x_position ;
tmp_request->y_position = request.y_position ;
tmp_request->z_position = request.z_position ;
tmp_request->direction = request.direction ;
tmp_request->next = requests ;
tmp_request->info = tmp_info ;
requests = tmp_request ;
}
else // If the request was found in the list
{
if (tmp_request->info == NULL)
{ // We already sent to the server data for this request
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "Request already treated.\n") ;
free(tmp_info) ;
}
else
{
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "Add information to the request.\n") ;
tmp_info->next = tmp_request->info ; // Add data
tmp_request->info = tmp_info ;
}
}
}
sem_post(&lock_requests) ;
}
/*
* Empties the request list.
* Note that this function does not use lock_requests, so it should not
* be called in a concurrent context.
*/
void free_request_list()
{
request_list *next_request ;
request_info_list *next_request_info ;
while (requests != NULL)
{
while (requests->info != NULL)
{
next_request_info = requests->info->next ;
free(requests->info) ;
requests->info = next_request_info ;
}
next_request = requests->next ;
free(requests) ;
requests = next_request ;
}
}
/*
* Thread function. Listens for hello messages from APs.
*/
void* listen_for_aps(void *NULL_value)
{
int listen_sockfd ;
int nread ; // recvfrom return value
struct sockaddr_in client; // UDP client structure
socklen_t client_len = sizeof(client) ; // Size of clients
owl_autocalibration_hello message ;
char ap_ip_addr[INET_ADDRSTRLEN] ;
if (VERBOSE_WARNING)
fprintf(stderr, "Autocalibration Hello thread launched.\n") ;
listen_sockfd =
owl_create_udp_listening_socket(cfg_getint(cfg,
"autocalibration_hello_port")) ;
if (listen_sockfd < 0)
{
perror("Error! Cannot create UDP listening socket from the"
" listeners") ;
exit(OWL_ERR_SOCKET_CREATE) ;
}
pthread_cleanup_push(&owl_close_fd, &listen_sockfd) ;
while (owl_run)
{
nread = recvfrom(listen_sockfd, &message, sizeof(message), 0,
(struct sockaddr *) &client, &client_len) ;
if (nread <= 0)
{
if (owl_run)
perror("No message received from listener") ;
continue ;
}
inet_ntop(AF_INET, &client.sin_addr, ap_ip_addr, INET_ADDRSTRLEN) ;
if (VERBOSE_INFO)
fprintf(stderr,
"Got a Hello message from \"%s\"\n", ap_ip_addr) ;
sem_wait(&lock_aps) ;
update_ap(message.ap_mac_addr_bytes, ap_ip_addr) ;
sem_post(&lock_aps) ;
}
/* Close the socket */
pthread_cleanup_pop(1) ;
pthread_exit(NULL_value) ;
}
/*
* Updates the timestamp of the AP with the given MAC address if it is in
* the AP list, or add a new AP with this MAC address to the AP list.
*/
void update_ap(uint8_t mac_addr_bytes[ETHER_ADDR_LEN],
char ip_addr[INET_ADDRSTRLEN])
{
ap_list *found ;
if ((found = find_ap(mac_addr_bytes)) == NULL)
{
ap_list *new_ap = add_ap_front(mac_addr_bytes) ;
update_ap_ip_addr(new_ap, ip_addr) ;
}
else
update_ap_seen(found) ;
}
/*
* Searches the AP list for an AP with the given MAC address and returns
* it.
*/
ap_list* find_ap(uint8_t mac_addr_bytes[ETHER_ADDR_LEN])
{
ap_list *found ;
if (token_aps == NULL)
return NULL ;
found = token_aps ;
do
{
if (owl_mac_equals(found->mac_addr_bytes, mac_addr_bytes))
return found ;
found = found->next ;
}
while (found != token_aps) ;
return NULL ;
}
/*
* Adds a new AP in front of the AP list.
*/
ap_list* add_ap_front(uint8_t mac_addr_bytes[ETHER_ADDR_LEN])
{
if (VERBOSE_INFO)
{
char mac_str[OWL_ETHER_ADDR_STRLEN] ;
owl_mac_bytes_to_string_r(mac_addr_bytes, mac_str) ;
fprintf(stderr,
"Creating AP with MAC address \"%s\"...\n", mac_str) ;
}
ap_list *ap = malloc(sizeof(ap_list)) ;
memcpy(ap->mac_addr_bytes, mac_addr_bytes, ETHER_ADDR_LEN) ;
update_ap_seen(ap) ;
push_ap(ap) ;
return ap ;
}
/*
* Change the IP address of the AP 'ap' with 'ip_addr'.
*/
void update_ap_ip_addr(ap_list *ap, char ip_addr[INET_ADDRSTRLEN])
{
strncpy(ap->ip_addr, ip_addr, INET_ADDRSTRLEN) ;
}
/*
* Updates the timestamp of the given AP.
*/
void update_ap_seen(ap_list *ap)
{
assert(ap) ;
owl_timestamp_now(&ap->last_seen) ;
}
/*
* Puts an existing AP in front of the AP list. The AP must not be in
* the list yet.
*/
void push_ap(ap_list *ap)
{
assert(ap) ;
++nb_aps ;
if (token_aps == NULL) // List does not exist yet
{
token_aps = ap ;
ap->next = ap ;
ap->previous = ap ;
return ;
}
ap->previous = token_aps->previous ;
ap->previous->next = ap ;
ap->next = token_aps ;
token_aps->previous = ap ;
token_aps = ap ;
}
/*
* Monitors the AP list: sends orders to APs following their order in
* the list, and deletes old APs.
*/
void* monitor_aps(void *NULL_value)
{
if (VERBOSE_WARNING)
fprintf(stderr, "Monitor AP thread launched.\n") ;
while (owl_run)
{
sem_wait(&lock_aps) ;
delete_old_aps() ;
sem_post(&lock_aps) ;
// Here we're not in a hurry, so we released the semaphore to
// allow listen_for_aps() to process a received hello packet,
// if needed.
sem_wait(&lock_aps) ;
if (nb_aps > 1)
{
order_send(token_aps) ;
token_aps = token_aps->next ;
}
sem_post(&lock_aps) ;
owl_msleep(cfg_getint(cfg, "ap_check_interval")) ;
}
pthread_exit(NULL_value) ;
}
/*
* Deletes APs that did not send any Hello packet for a while, following
* the list order. Stops on the first not-to-be-deleted AP.
*/
void delete_old_aps()
{
owl_timestamp now ;
owl_timestamp_now(&now) ;
while (token_aps != NULL)
if (owl_time_elapsed_ms(&token_aps->last_seen, &now) >
(uint_fast32_t) cfg_getint(cfg, "ap_keep_timeout") * 1000)
delete_ap(token_aps) ;
else
return ;
}
/*
* Deletes the given AP from the AP list.
*/
void delete_ap(ap_list *ap)
{
if (VERBOSE_INFO)
{
char mac_str[OWL_ETHER_ADDR_STRLEN] ;
assert(ap) ;
owl_mac_bytes_to_string_r(ap->mac_addr_bytes, mac_str) ;
fprintf(stderr, "Deleting AP \"%s\"...\n", mac_str) ;
}
unlink_ap(ap) ;
free(ap) ;
}
/*
* Extracts the given AP from the AP list (it will not be linked to any
* other element of the list).
*/
void unlink_ap(ap_list *ap)
{
ap_list
*ap_previous,
*ap_next ;
assert(ap) ;
ap_previous = ap->previous ;
ap_next = ap->next ;
assert(ap_previous) ;
assert(ap_next) ;
ap_previous->next = ap_next ;
ap_next->previous = ap_previous ;
if (ap == token_aps)
{
if (ap->next == ap) // It was the last AP in the ring
token_aps = NULL ;
else
token_aps = ap_next ;
}
--nb_aps ;
}
/*
* Sends a 'send' order to the given AP.
*/
void order_send(ap_list *ap)
{
owl_autocalibration_order message ;
struct sockaddr_in serv;
struct sockaddr_in client ;
socklen_t serv_len = sizeof(serv);
int sockfd ;
ssize_t nsent ;
if (VERBOSE_INFO)
fprintf(stderr, "Sending an order to %s...\n", ap->ip_addr) ;
sockfd =
owl_create_udp_trx_socket(ap->ip_addr,
cfg_getint(cfg,
"autocalibration_order_port"),
&serv, &client) ;
message.order = AUTOCALIBRATION_ORDER_SEND ;
nsent = sendto(sockfd, &message, sizeof(message), 0,
(struct sockaddr *)&serv, serv_len) ;
if (nsent != (ssize_t) sizeof(message))
{
perror("Error sending order to the listener") ;
exit(OWL_ERR_SOCKET_SEND) ;
}
close(sockfd) ;
}
/*
* Empties the AP list.
* Note that this function does not use lock_aps, so it should not
* be called in a concurrent context.
*/
void free_ap_list()
{
ap_list *ap_ptr ;
if (token_aps == NULL)
return ;
ap_ptr = token_aps->next ;
assert(ap_ptr) ;
while (ap_ptr != token_aps)
{
ap_list *ap_tmp = ap_ptr ;
ap_ptr = ap_ptr->next ;
free(ap_tmp) ;
}
free(token_aps) ;
token_aps = NULL ;
}
#ifndef NDEBUG
/*
* Prints the request list.
*/
void print_request_list()
{
request_list *request_ptr = NULL ;
request_info_list *info_ptr = NULL ;
char mobile_mac_str[OWL_ETHER_ADDR_STRLEN] ;
char
request_time_str[OWL_TIMESTAMP_STRLEN],
start_time_str[OWL_TIMESTAMP_STRLEN] ;
sem_wait(&lock_requests) ;
if (requests == NULL) // Empty list
{
fprintf(stderr, "No request.\n") ;
return ;
}
request_ptr = requests ;
while (request_ptr != NULL)
{
info_ptr = request_ptr->info ; // Get the sub-list pointer
owl_mac_bytes_to_string_r(request_ptr->mobile_mac_addr_bytes,
mobile_mac_str) ;
owl_timestamp_to_string(&request_ptr->request_time,
request_time_str) ;
owl_timestamp_to_string(&request_ptr->start_time,
start_time_str) ;
fprintf(stderr,
"Type: %"PRIu8"\n"
"Mobile MAC: %s\n"
"Sequence number: %s\n"
"Reception timestamp: %s\n"
"\n",
request_ptr->type,
mobile_mac_str,
request_time_str,
start_time_str
) ;
// Parse information relative to the current request
while (info_ptr != NULL)
{
print_request_info(info_ptr) ;
putc('\n', stderr) ;
info_ptr = info_ptr->next ;
}
fprintf(stderr, "\n\n") ;
request_ptr = request_ptr->next ;
}
sem_post(&lock_requests) ;
}
/*
* Prints an element of a request_info_list.
*/
void print_request_info(request_info_list *info)
{
char ap_mac_str[OWL_ETHER_ADDR_STRLEN] ;
if (info == NULL)
return ;
owl_mac_bytes_to_string_r(info->ap_mac_addr_bytes, ap_mac_str) ;
fprintf(stderr,
"\tAP MAC: %s\n"
"\tSignal strength: %d dBm\n",
ap_mac_str,
info->ss_dbm - 256
) ;
}
#endif // NDEBUG
void print_usage()
{
printf("Usage:\n"
"\t%s"
" [-f config_file]"
" [-D]"
" [-v[v[v[v]]] | -q]"
" -o out_file"
"\n\t"
" [-i positionner_ip]"
" [-p positioner_port]"
" [-l listening_port]"
"\n\t"
" [-t aggregate_timeout]"
" [-k keep_timeout]"
" [-c check_interval]"
"\n\t"
" [-A]"
" [-a autocalibration_port]"
" [-K ap_keep_timeout]"
"\n\t"
" [-C ap_check_interval]"
"\n"
"\t%s -h\n"
"\t%s -V\n"
"Main options:\n"
"\t-h\t\tPrint this help.\n"
"\t-V\t\tShow version information.\n"
"\t-f config_file\tUse 'config_file' instead of the default"
" configuration\n\t\t\tfile (%s).\n"
"\t-D\t\tDaemon mode.\n"
"\t-v\t\tBe verbose. You can use this option up to 4 times to"
"\n\t\t\tincrease the level of verbosity (1 = warnings,\n\t\t\t"
"2 = useful information, 3 = too much information,\n\t\t\t4 = "
"displays the detail of each and every received\n\t\t\t"
"request).\n"
"\t-q\t\tQuiet mode (default): sets the verbose level to 0.\n"
"Output options:\n"
"\t-o out_file\t\tAggregated requests will be appended to"
" this CSV\n\t\t\t\tfile ('-' designates the standard"
" output).\n"
"\t-i positionner_ip\tIP address of the localisation server"
" (default:\n\t\t\t\t%s).\n"
"\t-p positioner_port\tAggregated requests are transmitted to"
" the\n\t\t\t\tlocalisation server on this port (default:"
"\n\t\t\t\t%d).\n"
"Aggregation options:\n"
"\t-l listening_port\tOnly requests sent on this port will be"
" treated\n\t\t\t\t(default: %d).\n"
"\t-t aggregate_timeout\tRequests are stored during"
" 'aggregate_timeout'\n\t\t\t\tmilliseconds before to be"
" grouped (default:\n\t\t\t\t%d ms).\n"
"\t-k keep_timeout\t\tAggregated requests are kept during"
"\n\t\t\t\t'keep_timeout' milliseconds (default: %d ms).\n"
"\t-c check_interval\tTime between two checks of the stored"
" requests,\n\t\t\t\tin milliseconds (default: %d ms).\n"
"Autocalibration options:\n"
"\t-A\t\t\tEnable autocalibration (default: disabled).\n"
"\t-O port\t\t\tPort on which autocalibration orders are"
" sent to\n\t\t\t\tthe listeners (default: %d).\n"
"\t-H port\t\t\tPort on which autocalibration hello are"
" received\n\t\t\t\tfrom the listeners (default: %d).\n"
"\t-K ap_keep_timeout\tInactive APs are kept during"
" 'ap_keep_timeout'\n\t\t\t\tseconds (default: %d s).\n"
"\t-C ap_check_interval\tTime (in milliseconds) between two"
" checks of the\n\t\t\t\tstored APs (default: %d ms).\n"
,
program_name,
program_name,
program_name,
DEFAULT_CONFIG_FILE,
POSITIONER_DEFAULT_IP,
OWL_DEFAULT_AGGREGATION_PORT,
OWL_DEFAULT_LISTENER_PORT,
DEFAULT_AGGREGATE_TIMEOUT,
DEFAULT_KEEP_TIMEOUT,
DEFAULT_CHECK_INTERVAL,
OWL_DEFAULT_AUTOCALIBRATION_ORDER_PORT,
OWL_DEFAULT_AUTOCALIBRATION_HELLO_PORT,
DEFAULT_AP_KEEP_TIMEOUT,
DEFAULT_AP_CHECK_INTERVAL
) ;
}
void print_version()
{
printf("This is OwlPS Aggregator, part of the Owl"
" Positioning System project.\n"
"Version: %s.\n",
#ifdef OWLPS_VERSION
OWLPS_VERSION
#else // OWLPS_VERSION
"unknown version"
#endif // OWLPS_VERSION
) ;
}