From 0e913a2938051ff9bf8a69860e2044d2e33cbf70 Mon Sep 17 00:00:00 2001 From: Matteo Cypriani Date: Wed, 12 Jun 2013 17:52:11 -0400 Subject: [PATCH] [Aggregator] More monitor_requests() refactoring Create scan_request_list() out of monitor_requests(). --- TODO.t2t | 4 +- owlps-aggregator/owlps-aggregator.h | 3 + owlps-aggregator/owlps-aggregatord.c | 159 ++++++++++++++------------- 3 files changed, 89 insertions(+), 77 deletions(-) diff --git a/TODO.t2t b/TODO.t2t index 4659eca..1ed27f7 100644 --- a/TODO.t2t +++ b/TODO.t2t @@ -96,9 +96,7 @@ Work to do in OwlPS - Allow a different aggregation timeout for each request type. -- Refactoring: - - monitor_requests() - - got_request() +- Refactor got_request(). - got_request(): option for the maximal difference time diff --git a/owlps-aggregator/owlps-aggregator.h b/owlps-aggregator/owlps-aggregator.h index b1cf3fe..8ae2d70 100644 --- a/owlps-aggregator/owlps-aggregator.h +++ b/owlps-aggregator/owlps-aggregator.h @@ -111,6 +111,9 @@ int read_loop(int sockfd) ; void got_request(owl_captured_request request) ; void* monitor_requests(void *NULL_value) ; +void scan_request_list(FILE *const stream, + const int sockfd, + const struct sockaddr_in *const serv) ; void flush_request_list(FILE *const stream, const int sockfd, const struct sockaddr_in *const serv) ; diff --git a/owlps-aggregator/owlps-aggregatord.c b/owlps-aggregator/owlps-aggregatord.c index c610868..8e653de 100644 --- a/owlps-aggregator/owlps-aggregatord.c +++ b/owlps-aggregator/owlps-aggregatord.c @@ -788,20 +788,12 @@ void got_request(owl_captured_request request) /* - * Thread function. Monitors the list and sends information to the + * Thread function. Monitors the list and prints/sends information to the * localisation server when the timeout is reached. */ void* monitor_requests(void *NULL_value) { - request_list *request_ptr, *request_prev ; - owl_timestamp current_time ; FILE *stream = NULL ; - uint_fast32_t sub ; // owl_time_elapsed_ms() result - - uint_fast32_t aggregate_timeout = - cfg_getint(cfg, "aggregate_timeout") ; - uint_fast32_t keep_timeout = cfg_getint(cfg, "keep_timeout") ; - struct sockaddr_in serv; struct sockaddr_in client ; int sockfd; @@ -832,71 +824,7 @@ void* monitor_requests(void *NULL_value) /* Main loop */ while (owl_run) - { - request_prev = NULL ; - - sem_wait(&lock_requests) ; - - owl_timestamp_now(¤t_time) ; - request_ptr = requests ; - - while (request_ptr != NULL) // Parsing list - { - sub = owl_time_elapsed_ms(&request_ptr->start_time, - ¤t_time) ; - - // If the request was not treated already - if (request_ptr->info != NULL) - { - // If the timeout is reached - if (sub > aggregate_timeout) - { - if (VERBOSE_CHATTERBOX) - fprintf(stderr, "* Aggregate timeout reached:" - " %"PRIuFAST32" > %"PRIuFAST32"\n", - sub, aggregate_timeout) ; - - output_request(request_ptr, stream, sockfd, &serv) ; - } - } - - // If the request was treated and keep timeout is reached - else if (sub > keep_timeout) - { - request_list *request_tmp = request_ptr ; - - if (VERBOSE_CHATTERBOX) - fprintf(stderr, "* Keep timeout reached:" - " %"PRIuFAST32" > %"PRIuFAST32"\n", - sub, keep_timeout) ; - - request_ptr = request_ptr->next ; - - // If it is the first request of the list - if (request_prev == NULL) - requests = request_ptr ; // we shift the head - else // else we put the next of the previous on the next - request_prev->next = request_ptr ; - - free(request_tmp) ; - - continue ; - } - - // Next request - request_prev = request_ptr ; - request_ptr = request_ptr->next ; - } - - sem_post(&lock_requests) ; - - if (cfg_getbool(cfg, "flush_output")) - if (fflush(stream)) - perror("Failed to flush the output file") ; - - // Wait to check again: - owl_msleep(cfg_getint(cfg, "check_interval")) ; - } + scan_request_list(stream, sockfd, &serv) ; /* Flush the requests' list prior to return */ flush_request_list(stream, sockfd, &serv) ; @@ -909,6 +837,89 @@ void* monitor_requests(void *NULL_value) } +/* + * Goes through the requests' list, output the requests that are older + * than the aggregation timeout, deletes the requests that are older + * than the keep timeout. + */ +void scan_request_list(FILE *const stream, + const int sockfd, + const struct sockaddr_in *const serv) +{ + request_list *request_ptr ; + request_list *request_prev = NULL ; + + owl_timestamp current_time ; + uint_fast32_t sub ; // owl_time_elapsed_ms() result + + uint_fast32_t aggregate_timeout = + cfg_getint(cfg, "aggregate_timeout") ; + uint_fast32_t keep_timeout = cfg_getint(cfg, "keep_timeout") ; + + sem_wait(&lock_requests) ; + + owl_timestamp_now(¤t_time) ; + request_ptr = requests ; + + while (request_ptr != NULL) // Go through the list + { + sub = owl_time_elapsed_ms(&request_ptr->start_time, + ¤t_time) ; + + // If the request was not treated already + if (request_ptr->info != NULL) + { + // If the timeout is reached + if (sub > aggregate_timeout) + { + if (VERBOSE_CHATTERBOX) + fprintf(stderr, "* Aggregate timeout reached:" + " %"PRIuFAST32" > %"PRIuFAST32"\n", + sub, aggregate_timeout) ; + + output_request(request_ptr, stream, sockfd, serv) ; + } + } + + // If the request was treated and keep timeout is reached + else if (sub > keep_timeout) + { + request_list *request_tmp = request_ptr ; + + if (VERBOSE_CHATTERBOX) + fprintf(stderr, "* Keep timeout reached:" + " %"PRIuFAST32" > %"PRIuFAST32"\n", + sub, keep_timeout) ; + + request_ptr = request_ptr->next ; + + // If it is the first request of the list + if (request_prev == NULL) + requests = request_ptr ; // we shift the head + else // else we put the next of the previous on the next + request_prev->next = request_ptr ; + + free(request_tmp) ; + + continue ; + } + + // Next request + request_prev = request_ptr ; + request_ptr = request_ptr->next ; + } + + sem_post(&lock_requests) ; + + if (cfg_getbool(cfg, "flush_output")) + if (fflush(stream)) + perror("Failed to flush the output file") ; + + // Wait to check again: + owl_msleep(cfg_getint(cfg, "check_interval")) ; +} + + /* * Prints and sends all the requests in the requests' list and frees * the list.