[Aggregator] More monitor_requests() refactoring

Create scan_request_list() out of monitor_requests().
This commit is contained in:
Matteo Cypriani 2013-06-12 17:52:11 -04:00
parent 5b6f40f59d
commit 0e913a2938
3 changed files with 89 additions and 77 deletions

View File

@ -96,9 +96,7 @@ Work to do in OwlPS
- Allow a different aggregation timeout for each request type.
- Refactoring:
- monitor_requests()
- got_request()
- Refactor got_request().
- got_request(): option for the maximal difference time

View File

@ -111,6 +111,9 @@ int read_loop(int sockfd) ;
void got_request(owl_captured_request request) ;
void* monitor_requests(void *NULL_value) ;
void scan_request_list(FILE *const stream,
const int sockfd,
const struct sockaddr_in *const serv) ;
void flush_request_list(FILE *const stream,
const int sockfd,
const struct sockaddr_in *const serv) ;

View File

@ -788,20 +788,12 @@ void got_request(owl_captured_request request)
/*
* Thread function. Monitors the list and sends information to the
* Thread function. Monitors the list and prints/sends information to the
* localisation server when the timeout is reached.
*/
void* monitor_requests(void *NULL_value)
{
request_list *request_ptr, *request_prev ;
owl_timestamp current_time ;
FILE *stream = NULL ;
uint_fast32_t sub ; // owl_time_elapsed_ms() result
uint_fast32_t aggregate_timeout =
cfg_getint(cfg, "aggregate_timeout") ;
uint_fast32_t keep_timeout = cfg_getint(cfg, "keep_timeout") ;
struct sockaddr_in serv;
struct sockaddr_in client ;
int sockfd;
@ -832,71 +824,7 @@ void* monitor_requests(void *NULL_value)
/* Main loop */
while (owl_run)
{
request_prev = NULL ;
sem_wait(&lock_requests) ;
owl_timestamp_now(&current_time) ;
request_ptr = requests ;
while (request_ptr != NULL) // Parsing list
{
sub = owl_time_elapsed_ms(&request_ptr->start_time,
&current_time) ;
// If the request was not treated already
if (request_ptr->info != NULL)
{
// If the timeout is reached
if (sub > aggregate_timeout)
{
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "* Aggregate timeout reached:"
" %"PRIuFAST32" > %"PRIuFAST32"\n",
sub, aggregate_timeout) ;
output_request(request_ptr, stream, sockfd, &serv) ;
}
}
// If the request was treated and keep timeout is reached
else if (sub > keep_timeout)
{
request_list *request_tmp = request_ptr ;
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "* Keep timeout reached:"
" %"PRIuFAST32" > %"PRIuFAST32"\n",
sub, keep_timeout) ;
request_ptr = request_ptr->next ;
// If it is the first request of the list
if (request_prev == NULL)
requests = request_ptr ; // we shift the head
else // else we put the next of the previous on the next
request_prev->next = request_ptr ;
free(request_tmp) ;
continue ;
}
// Next request
request_prev = request_ptr ;
request_ptr = request_ptr->next ;
}
sem_post(&lock_requests) ;
if (cfg_getbool(cfg, "flush_output"))
if (fflush(stream))
perror("Failed to flush the output file") ;
// Wait to check again:
owl_msleep(cfg_getint(cfg, "check_interval")) ;
}
scan_request_list(stream, sockfd, &serv) ;
/* Flush the requests' list prior to return */
flush_request_list(stream, sockfd, &serv) ;
@ -909,6 +837,89 @@ void* monitor_requests(void *NULL_value)
}
/*
* Goes through the requests' list, output the requests that are older
* than the aggregation timeout, deletes the requests that are older
* than the keep timeout.
*/
void scan_request_list(FILE *const stream,
const int sockfd,
const struct sockaddr_in *const serv)
{
request_list *request_ptr ;
request_list *request_prev = NULL ;
owl_timestamp current_time ;
uint_fast32_t sub ; // owl_time_elapsed_ms() result
uint_fast32_t aggregate_timeout =
cfg_getint(cfg, "aggregate_timeout") ;
uint_fast32_t keep_timeout = cfg_getint(cfg, "keep_timeout") ;
sem_wait(&lock_requests) ;
owl_timestamp_now(&current_time) ;
request_ptr = requests ;
while (request_ptr != NULL) // Go through the list
{
sub = owl_time_elapsed_ms(&request_ptr->start_time,
&current_time) ;
// If the request was not treated already
if (request_ptr->info != NULL)
{
// If the timeout is reached
if (sub > aggregate_timeout)
{
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "* Aggregate timeout reached:"
" %"PRIuFAST32" > %"PRIuFAST32"\n",
sub, aggregate_timeout) ;
output_request(request_ptr, stream, sockfd, serv) ;
}
}
// If the request was treated and keep timeout is reached
else if (sub > keep_timeout)
{
request_list *request_tmp = request_ptr ;
if (VERBOSE_CHATTERBOX)
fprintf(stderr, "* Keep timeout reached:"
" %"PRIuFAST32" > %"PRIuFAST32"\n",
sub, keep_timeout) ;
request_ptr = request_ptr->next ;
// If it is the first request of the list
if (request_prev == NULL)
requests = request_ptr ; // we shift the head
else // else we put the next of the previous on the next
request_prev->next = request_ptr ;
free(request_tmp) ;
continue ;
}
// Next request
request_prev = request_ptr ;
request_ptr = request_ptr->next ;
}
sem_post(&lock_requests) ;
if (cfg_getbool(cfg, "flush_output"))
if (fflush(stream))
perror("Failed to flush the output file") ;
// Wait to check again:
owl_msleep(cfg_getint(cfg, "check_interval")) ;
}
/*
* Prints and sends all the requests in the requests' list and frees
* the list.