[Aggregator] Refactor monitor_requests()

Create output_request() out of monitor_requests().
This commit is contained in:
Matteo Cypriani 2013-06-12 16:21:03 -04:00
parent 45d4eeb791
commit 47103f3360
2 changed files with 118 additions and 99 deletions

View File

@ -23,6 +23,8 @@
#include <owlps.h>
#include <stdio.h>
/* Arguments & program configuration */
#define OPTIONS "Ac:C:Df:FGhH:i:k:K:l:o:O:p:qt:vV" // getopt string
@ -109,6 +111,11 @@ int read_loop(int sockfd) ;
void got_request(owl_captured_request request) ;
void* monitor_requests(void *NULL_value) ;
void output_request(request_list *const request_ptr,
FILE *const stream,
const int sockfd,
const struct sockaddr_in *const serv) ;
void free_request_list(void) ;
#ifndef NDEBUG
void print_request_list(void) ;

View File

@ -787,12 +787,9 @@ void got_request(owl_captured_request request)
void* monitor_requests(void *NULL_value)
{
request_list *request_ptr, *request_prev ;
request_info_list *request_info_ptr ;
owl_timestamp current_time ;
FILE *fd = NULL ;
char mac_str[OWL_ETHER_ADDR_STRLEN] ;
FILE *stream = NULL ;
uint_fast32_t sub ; // owl_time_elapsed_ms() result
char request_time_str[OWL_TIMESTAMP_STRLEN] ;
uint_fast32_t aggregate_timeout =
cfg_getint(cfg, "aggregate_timeout") ;
@ -800,9 +797,6 @@ void* monitor_requests(void *NULL_value)
struct sockaddr_in serv;
struct sockaddr_in client ;
socklen_t serv_len = sizeof(serv);
owl_request request ;
owl_request_info info;
int sockfd;
if (VERBOSE_WARNING)
@ -816,23 +810,22 @@ void* monitor_requests(void *NULL_value)
/* Open output file */
if (strcmp("-", cfg_getstr(cfg, "output_file")) == 0)
fd = stdout ;
stream = stdout ;
else
{
fd = fopen(cfg_getstr(cfg, "output_file"), "a") ; // add mode
if (fd == NULL) // If we failed to open the file,
stream = fopen(cfg_getstr(cfg, "output_file"), "a") ; // add mode
if (stream == NULL) // If we failed to open the file,
{
perror("Cannot open output file") ;
fprintf(stderr, "Redirecting output to standard output.\n") ;
fd = stdout ; // we fail back to stdout.
stream = stdout ; // we fail back to stdout.
}
}
pthread_cleanup_push(&owl_close_file, &fd) ;
pthread_cleanup_push(&owl_close_file, &stream) ;
while (owl_run)
{
request_prev = NULL ;
request_info_ptr = NULL ;
sem_wait(&lock_requests) ;
@ -855,91 +848,7 @@ void* monitor_requests(void *NULL_value)
" %"PRIuFAST32" > %"PRIuFAST32"\n",
sub, aggregate_timeout) ;
// Print mobile MAC address to the output file
owl_mac_bytes_to_string_r(request_ptr->
mobile_mac_addr_bytes,
mac_str) ;
fprintf(fd, "%s;", mac_str) ;
// Print request type & number of packets to the
// output file
fprintf(fd, "%"PRIu8";%"PRIu16";",
request_ptr->type, request_ptr->nb_packets) ;
// Print request mobile timestamp to the output file
owl_timestamp_to_string(&request_ptr->request_time,
request_time_str) ;
fprintf(fd, "%s;", request_time_str) ;
// Print request info to the output file
fprintf(fd, "%0.2f;%0.2f;%0.2f;%hhd",
request_ptr->x_position,
request_ptr->y_position,
request_ptr->z_position,
request_ptr->direction) ;
// Initialise owl_request fields:
request.type = request_ptr->type ;
request.nb_packets = htons(request_ptr->nb_packets) ;
memcpy(request.mobile_mac_addr_bytes,
request_ptr->mobile_mac_addr_bytes,
ETHER_ADDR_LEN) ;
memcpy(request.mobile_ip_addr_bytes,
request_ptr->mobile_ip_addr_bytes, 4) ;
request.request_time = request_ptr->request_time ;
owl_hton_timestamp(&request.request_time) ;
request.x_position =
owl_htonf(request_ptr->x_position) ;
request.y_position =
owl_htonf(request_ptr->y_position) ;
request.z_position =
owl_htonf(request_ptr->z_position) ;
request.direction = request_ptr->direction ;
// Count the requests:
request.nb_info = 0 ;
request_info_ptr = request_ptr->info ;
while (request_info_ptr != NULL)
{
++request.nb_info ;
request_info_ptr = request_info_ptr->next ;
}
request.nb_info = htons(request.nb_info) ;
// Send the request:
sendto(sockfd, &request, sizeof(request), 0,
(struct sockaddr *)&serv, serv_len) ;
// Send requests to the server and empty the list
request_info_ptr = request_ptr->info ;
while (request_info_ptr != NULL)
{
// Send AP info to the localisation server
info.packet_id =
htons(request_info_ptr->packet_id) ;
memcpy(info.ap_mac_addr_bytes,
request_info_ptr->ap_mac_addr_bytes,
ETHER_ADDR_LEN) ;
info.capture_time = request_info_ptr->capture_time ;
owl_hton_timestamp(&info.capture_time) ;
info.ss_dbm = request_info_ptr->ss_dbm ;
sendto(sockfd, &info, sizeof(info),
0, (struct sockaddr *)&serv, serv_len) ;
// Print AP info to the output file
owl_mac_bytes_to_string_r(request_info_ptr->
ap_mac_addr_bytes,
mac_str) ;
fprintf(fd, ";%s;%"PRIu16";%"PRId8,
mac_str,
request_info_ptr->packet_id,
request_info_ptr->ss_dbm) ;
// Delete request
request_info_ptr = request_info_ptr->next ;
free(request_ptr->info) ;
request_ptr->info = request_info_ptr ;
}
fprintf(fd, "\n") ;
output_request(request_ptr, stream, sockfd, &serv) ;
}
}
@ -974,7 +883,7 @@ void* monitor_requests(void *NULL_value)
sem_post(&lock_requests) ;
if (cfg_getbool(cfg, "flush_output"))
if (fflush(fd))
if (fflush(stream))
perror("Failed to flush the output file") ;
// Wait to check again:
@ -989,6 +898,109 @@ void* monitor_requests(void *NULL_value)
}
/*
* Prints a request to the output file and sends it to the aggregation
* server. The request's info field is deleted.
*
* Parameters:
* - request_ptr: the request to print
* - stream: the stream to print to
* - sockfd: the file descriptor of the socket to send to
* - serv: the server's information
*/
void output_request(request_list *const request_ptr,
FILE *const stream,
const int sockfd,
const struct sockaddr_in *const serv)
{
char mac_str[OWL_ETHER_ADDR_STRLEN] ;
char request_time_str[OWL_TIMESTAMP_STRLEN] ;
socklen_t serv_len = sizeof(*serv) ;
owl_request request ;
owl_request_info info;
request_info_list *request_info_ptr = NULL ;
// Print mobile MAC address to the output file
owl_mac_bytes_to_string_r(request_ptr->mobile_mac_addr_bytes,
mac_str) ;
fprintf(stream, "%s;", mac_str) ;
// Print request type & number of packets to the
// output file
fprintf(stream, "%"PRIu8";%"PRIu16";",
request_ptr->type, request_ptr->nb_packets) ;
// Print request mobile timestamp to the output file
owl_timestamp_to_string(&request_ptr->request_time, request_time_str) ;
fprintf(stream, "%s;", request_time_str) ;
// Print request info to the output file
fprintf(stream, "%0.2f;%0.2f;%0.2f;%hhd",
request_ptr->x_position,
request_ptr->y_position,
request_ptr->z_position,
request_ptr->direction) ;
// Initialise owl_request fields:
request.type = request_ptr->type ;
request.nb_packets = htons(request_ptr->nb_packets) ;
memcpy(request.mobile_mac_addr_bytes,
request_ptr->mobile_mac_addr_bytes, ETHER_ADDR_LEN) ;
memcpy(request.mobile_ip_addr_bytes,
request_ptr->mobile_ip_addr_bytes, 4) ;
request.request_time = request_ptr->request_time ;
owl_hton_timestamp(&request.request_time) ;
request.x_position = owl_htonf(request_ptr->x_position) ;
request.y_position = owl_htonf(request_ptr->y_position) ;
request.z_position = owl_htonf(request_ptr->z_position) ;
request.direction = request_ptr->direction ;
// Count the requests:
request.nb_info = 0 ;
request_info_ptr = request_ptr->info ;
while (request_info_ptr != NULL)
{
++request.nb_info ;
request_info_ptr = request_info_ptr->next ;
}
request.nb_info = htons(request.nb_info) ;
// Send the request's main data:
sendto(sockfd, &request, sizeof(request), 0,
(const struct sockaddr *const)serv, serv_len) ;
// Send request's per-CP information to the server and delete it
// from the request
request_info_ptr = request_ptr->info ;
while (request_info_ptr != NULL)
{
// Send CP info to the localisation server
info.packet_id = htons(request_info_ptr->packet_id) ;
memcpy(info.ap_mac_addr_bytes,
request_info_ptr->ap_mac_addr_bytes, ETHER_ADDR_LEN) ;
info.capture_time = request_info_ptr->capture_time ;
owl_hton_timestamp(&info.capture_time) ;
info.ss_dbm = request_info_ptr->ss_dbm ;
sendto(sockfd, &info, sizeof(info),
0, (const struct sockaddr *const)serv, serv_len) ;
// Print CP info to the output file
owl_mac_bytes_to_string_r(request_info_ptr->ap_mac_addr_bytes,
mac_str) ;
fprintf(stream, ";%s;%"PRIu16";%"PRId8,
mac_str,
request_info_ptr->packet_id,
request_info_ptr->ss_dbm) ;
// Delete the current CP info from the request
request_info_ptr = request_info_ptr->next ;
free(request_ptr->info) ;
request_ptr->info = request_info_ptr ;
}
// End the line in the output file:
fprintf(stream, "\n") ;
}
/*
* Empties the request list.