From 9c835d4c46ac80b59301edfea26e0fd559dc857e Mon Sep 17 00:00:00 2001 From: Thomas Preud'homme Date: Wed, 4 May 2011 19:32:24 +0200 Subject: [PATCH] Add a "sent words == received words" check --- communication_techniques/src/main.c | 152 ++++++++++++++++++++++++++-- 1 file changed, 144 insertions(+), 8 deletions(-) diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c index 8da250a..15d2114 100644 --- a/communication_techniques/src/main.c +++ b/communication_techniques/src/main.c @@ -23,6 +23,8 @@ #define doStringification(x) #x #define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t)) +typedef int inc_check_t; + static long nb_bufs_sent = 0; long nb_prod = 0; @@ -34,7 +36,10 @@ pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER; static int init_calc_arg = 0; static int block_reception = 1; +static int check_recv_match_send = 0; static int page_size = 0; +static uintptr_t single_prod_check_val; // /!\ Implies only one real producer +static inc_check_t *single_prod_check_ctxt; // /!\ Implies only one real producer void usage(char *argv[]) { @@ -43,11 +48,12 @@ void usage(char *argv[]) "-n nb_buffer_sent\t\tNumber of buffer to send to another core\n" "\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n" "Facultative options :\n" - "b\t\t\t\tReceive the biggest amount of data available (The default)\n" + "-b\t\t\t\tReceive the biggest amount of data available (The default)\n" "-c calculation_libname arg\tLibrary to use for calculation with its argument\n" "\t\t\t\tThis library must implement functions in calc.h\n" "\t\t\t\t(default to none)\n" - "d\t\t\t\tReceive one piece of data\n" + "--check\t\t\t\tCheck we receive what is sent\n" + "-d\t\t\t\tReceive one piece of data\n" "-h\t\t\t\tPrint this help\n" "-s \t\t\tShare the same L cache or not\n" "\t\t\t\tIf level is:\n" @@ -76,12 +82,57 @@ int do_noend(void) return 0; } +int inc_check_init(int init_value, inc_check_t **context) +{ + inc_check_t *ctxt; + + ctxt = malloc(sizeof(*ctxt)); + if (ctxt == NULL) + return -1; + *ctxt = init_value; + *context = ctxt; + return 0; +} + +int inc_check_next(inc_check_t *context, uintptr_t *next_value) +{ + *next_value = (*context)++; + return 0; +} + +int inc_check_end(inc_check_t *context) +{ + free(context); + return 0; +} + +int do_checkinit(int init_value) +{ + return inc_check_init(init_value, &single_prod_check_ctxt); +} + +void **do_checkcalc(void) +{ + int ret; + + ret = inc_check_next(single_prod_check_ctxt, &single_prod_check_val); + if (ret) + return NULL; + else + return (void **) single_prod_check_val; +} + +int do_checkend(void) +{ + return inc_check_end(single_prod_check_ctxt); +} + int analyse_options(int argc, char *argv[]) { int opt; opterr = 0; - while ((opt = getopt(argc, argv, ":hs::c:n:p:")) != -1) + while ((opt = getopt(argc, argv, ":bc:dhkn:s::"/*p:"*/)) != -1) { switch (opt) { @@ -141,6 +192,9 @@ int analyse_options(int argc, char *argv[]) case 'h' : usage(argv); exit(EXIT_SUCCESS); + case 'k' : + check_recv_match_send = 1; + break; case 'n' : { char *inval; @@ -192,7 +246,8 @@ int analyse_options(int argc, char *argv[]) return -1; } /* TODO: Real management of shared memory level */ - if (share_level <= 0) /* -x: We wan't level x not to be shared; 0 do as we want, only memory is guaranteed to be shared */ + /* TODO: -x: We want level x not to be shared; 0 do as we want, only memory is guaranteed to be shared */ + if (share_level <= 0) shared = 0; else shared = 1; @@ -201,6 +256,14 @@ int analyse_options(int argc, char *argv[]) break; case '?' : fprintf(stderr, "Option inconnue\n"); + /*if (!strncmp("--check", argv[optind], strlen("--check"))) + { + check_recv_match_send = 1; + optind++; + optopt = (int) *argv[optind]; + fprintf(stderr, "--check required\n"); + break; + }*/ return -1; case ':' : fprintf(stderr, "Option %s needs an argument\n", argv[optind]); @@ -227,11 +290,25 @@ int analyse_options(int argc, char *argv[]) fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n"); return -1; } + if (check_recv_match_send && do_calc) + { + fprintf(stderr, "Can't specifying a computation library with check activated\n"); + return -1; + } if (do_calc == NULL) { - init_calc = do_noinit; - do_calc = do_nocalc; - end_calc = do_noend; + if (check_recv_match_send) + { + init_calc = do_checkinit; + do_calc = do_checkcalc; + end_calc = do_checkend; + } + else + { + init_calc = do_noinit; + do_calc = do_nocalc; + end_calc = do_noend; + } } printf("buf size: %lu\n", WORDS_PER_LINE); return 0; @@ -309,6 +386,11 @@ void on_message(void *val) void *consumer(void *channel) { + int delayed_error; + uintptr_t cons_check_value; + inc_check_t *cons_check_context; + + delayed_error = 0; if (shared) { pthread_t tid; @@ -328,6 +410,13 @@ void *consumer(void *channel) fprintf(stderr, "Initialization of thread has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } + if (inc_check_init(init_calc_arg, &cons_check_context)) + { + fprintf(stderr, "Initialization of check has failed\n"); + finalize_consumer_thread(channel); + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + } + cons_check_value = init_calc_arg; if (block_reception) { long long total_data_received = 0; @@ -341,7 +430,25 @@ void *consumer(void *channel) nb_data_received = recv_some_data(data_buf, MAX_BLOCK_ENTRIES); total_data_received += nb_data_received; for (i = 0; i < nb_data_received; i++) + { + if (inc_check_next(cons_check_context, &cons_check_value)) + { + if (!delayed_error) + { + fprintf(stderr, "Error while checking received value match sent value\n"); + delayed_error = 1; + } + } + if (cons_check_value != (uintptr_t) data_buf[i]) + { + if (!delayed_error) + { + fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data_buf[i]); + delayed_error = 1; + } + } on_message(data_buf[i]); + } //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : ""); } } @@ -352,14 +459,43 @@ void *consumer(void *channel) for(i = 0; i < nb_bufs_sent; i++) { //printf("[%p] About to receive %d new cache line%s\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE, (BUF_SIZE / CACHE_LINE_SIZE > 1) ? "s" : ""); for(j = 0; j < WORDS_PER_LINE; j++) - on_message(recv_one_data()); + { + void *data; + + data = recv_one_data(); + if (inc_check_next(cons_check_context, &cons_check_value)) + { + if (!delayed_error) + { + fprintf(stderr, "Error while checking received value match sent value\n"); + delayed_error = 1; + } + } + if (cons_check_value != (uintptr_t) data) + { + if (!delayed_error) + { + fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data); + delayed_error = 1; + } + } + on_message(data); + } } } + if (inc_check_end(cons_check_context)) + { + fprintf(stderr, "Finalization of check has failed\n"); + finalize_consumer_thread(channel); + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + } if (finalize_consumer_thread(channel)) { fprintf(stderr, "Finalization of thread has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } + if (delayed_error) + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ return NULL; }