diff --git a/communication_techniques/Makefile b/communication_techniques/Makefile index e8e1190..5adeb04 100644 --- a/communication_techniques/Makefile +++ b/communication_techniques/Makefile @@ -13,7 +13,7 @@ COMMDIR:=communication # Compilation flags # I know -finline-functions and -finline-functions-called-once are enabled by # -O3 but I did this in case gcc behaviour change one day -CFLAGS:=-c -g -O3 -finline-functions -finline-functions-called-once -Wall -Werror +CFLAGS:=-c -g -O3 -finline-functions -finline-functions-called-once -Wall -Wextra -Werror LDFLAGS:=-L$(LIBDIR) -lpthread -ldl # Executables diff --git a/communication_techniques/include/batch_queue_common_comm.h b/communication_techniques/include/batch_queue_common_comm.h index e1f3c4a..e6a4b01 100644 --- a/communication_techniques/include/batch_queue_common_comm.h +++ b/communication_techniques/include/batch_queue_common_comm.h @@ -12,9 +12,9 @@ struct channel { void * volatile buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); int unused[20] __attribute__ ((aligned (CACHE_LINE_SIZE))); - volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE))); - int sender_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); - int receiver_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile unsigned int state:1 __attribute__ ((aligned (CACHE_LINE_SIZE))); + unsigned int sender_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + unsigned int receiver_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; diff --git a/communication_techniques/include/csq_common_comm.h b/communication_techniques/include/csq_common_comm.h index 0c567b9..076721c 100644 --- a/communication_techniques/include/csq_common_comm.h +++ b/communication_techniques/include/csq_common_comm.h @@ -22,8 +22,8 @@ struct lvl_2 struct channel { struct lvl_2 queue[SLOTS] __attribute__ ((aligned (CACHE_LINE_SIZE))); - int head __attribute__ ((aligned(CACHE_LINE_SIZE))); - int tail __attribute__ ((aligned(CACHE_LINE_SIZE))); + unsigned int head __attribute__ ((aligned(CACHE_LINE_SIZE))); + unsigned int tail __attribute__ ((aligned(CACHE_LINE_SIZE))); }; __BEGIN_DECLS @@ -31,7 +31,7 @@ __BEGIN_DECLS // TODO: Make it send only one data static inline void send(struct channel *channel, void **addr) { - static __thread int chkidx = 0; + static __thread unsigned int chkidx = 0; // If all slots are full, spin if (!chkidx) diff --git a/communication_techniques/include/fast_forward_comm.h b/communication_techniques/include/fast_forward_comm.h index dfa5532..2c3960d 100644 --- a/communication_techniques/include/fast_forward_comm.h +++ b/communication_techniques/include/fast_forward_comm.h @@ -19,8 +19,8 @@ struct channel { void * volatile *shared_space; - int head __attribute__ ((aligned (CACHE_LINE_SIZE))); - int tail __attribute__ ((aligned (CACHE_LINE_SIZE))); + unsigned int head __attribute__ ((aligned (CACHE_LINE_SIZE))); + unsigned int tail __attribute__ ((aligned (CACHE_LINE_SIZE))); }; __BEGIN_DECLS @@ -29,7 +29,7 @@ extern int adjust_slip(struct channel *channel); static inline void send(struct channel *channel, void **addr) { - static __thread int nb_iter = 0; + static __thread unsigned int nb_iter = 0; assert(addr != NULL); if (nb_iter == ADJUST_FREQ) diff --git a/communication_techniques/include/lamport_comm.h b/communication_techniques/include/lamport_comm.h index 2ac37cc..9c7c6ae 100644 --- a/communication_techniques/include/lamport_comm.h +++ b/communication_techniques/include/lamport_comm.h @@ -14,8 +14,8 @@ struct channel { void * volatile *shared_space; - volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); - volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile unsigned int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile unsigned int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; __BEGIN_DECLS diff --git a/communication_techniques/include/mcringbuffer_comm.h b/communication_techniques/include/mcringbuffer_comm.h index 55fafde..3e75056 100644 --- a/communication_techniques/include/mcringbuffer_comm.h +++ b/communication_techniques/include/mcringbuffer_comm.h @@ -9,22 +9,22 @@ struct control { - volatile int read; - volatile int write; + volatile unsigned int read; + volatile unsigned int write; }; struct cons { - int localWrite; - int nextRead; - int rBatch; + unsigned int localWrite; + unsigned int nextRead; + unsigned int rBatch; }; struct prod { - int localRead; - int nextWrite; - int wBatch; + unsigned int localRead; + unsigned int nextWrite; + unsigned int wBatch; }; @@ -38,13 +38,15 @@ struct channel __BEGIN_DECLS -extern const int batchSize; +extern const unsigned int batchSize; static inline void send(struct channel *channel, void **addr) { while (1) { - int afterNextWrite = (channel->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR; + unsigned int afterNextWrite; + + afterNextWrite = (channel->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR; if (afterNextWrite == channel->prod.localRead) { if (afterNextWrite == channel->ctrl.read) diff --git a/communication_techniques/include/none_comm.h b/communication_techniques/include/none_comm.h index dac20a4..b67e996 100644 --- a/communication_techniques/include/none_comm.h +++ b/communication_techniques/include/none_comm.h @@ -7,7 +7,8 @@ struct channel { }; -static inline void send(struct channel *channel, void **addr) {} +static inline void send(struct channel *channel __attribute__ ((unused)), + void **addr __attribute__ ((unused))) {} __END_DECLS diff --git a/communication_techniques/include/pipe_comm.h b/communication_techniques/include/pipe_comm.h index 735d2f2..3f71ac9 100644 --- a/communication_techniques/include/pipe_comm.h +++ b/communication_techniques/include/pipe_comm.h @@ -15,7 +15,7 @@ __BEGIN_DECLS static inline void send(struct channel *channel, void **addr) { - int nb_read; + unsigned int nb_read; void *addr_ptr; nb_read = 0; diff --git a/communication_techniques/include/shared_mem_opt_comm.h b/communication_techniques/include/shared_mem_opt_comm.h index b67ac75..1c542dc 100644 --- a/communication_techniques/include/shared_mem_opt_comm.h +++ b/communication_techniques/include/shared_mem_opt_comm.h @@ -10,8 +10,8 @@ struct channel { void * volatile *shared_space; - volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); - volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile unsigned int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile unsigned int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; @@ -19,8 +19,8 @@ __BEGIN_DECLS static inline void send(struct channel *channel, void **addr) { - static __thread int local_cons_idx = 0; - int local_prod, next_prod; + static __thread unsigned int local_cons_idx = 0; + unsigned int local_prod, next_prod; local_prod = channel->prod_idx; next_prod = (local_prod + 1) % SHARED_SPACE_VOIDPTR; diff --git a/communication_techniques/src/calculation/calc_line.c b/communication_techniques/src/calculation/calc_line.c index 9d79dae..cd4103f 100644 --- a/communication_techniques/src/calculation/calc_line.c +++ b/communication_techniques/src/calculation/calc_line.c @@ -17,7 +17,7 @@ static int lines_per_calc; int init_calc(int lines_per_calc_param) { - int i; + unsigned int i; assert(!(CACHE_SIZE % (lines_per_calc_param * CACHE_LINE_SIZE))); lines_per_calc = lines_per_calc_param; diff --git a/communication_techniques/src/communication/batch_queue.c b/communication_techniques/src/communication/batch_queue.c index 0605ca4..594874c 100644 --- a/communication_techniques/src/communication/batch_queue.c +++ b/communication_techniques/src/communication/batch_queue.c @@ -59,7 +59,7 @@ void *recv_one_data(struct channel *channel) */ ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { - int nb_read; + size_t nb_read; nb_read = 0; while (channel->state && nb_read < count) @@ -76,13 +76,7 @@ ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) n = channel->receiver_idx + (BUF_SIZE / sizeof(void *)); channel->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); for(; i < n; i++) - { - /* - * The behaviour of this is not documented but we know - * the values inside buf won't change during this affectation - */ *buf++ = channel->buf[i]; - } nb_read += BUF_SIZE / sizeof(void *); channel->state = 0; } diff --git a/communication_techniques/src/communication/csq.c b/communication_techniques/src/communication/csq.c index 54287a8..ff2d278 100644 --- a/communication_techniques/src/communication/csq.c +++ b/communication_techniques/src/communication/csq.c @@ -51,16 +51,17 @@ void *recv_one_data(struct channel *channel) ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { - int n; + size_t n; n = 0; // If all slots are empty, spin while (channel->queue[channel->head].flag) { + unsigned int i; + // Dequeue a chunk of data items - memcpy(buf, (const void *) - channel->queue[channel->head].chunk, - SUB_SLOTS * sizeof(*buf)); + for(i = 0; i < SUB_SLOTS; i++) + *buf++ = channel->queue[channel->head].chunk[i]; n += SUB_SLOTS; channel->queue[channel->head].flag = 0; channel->head = (channel->head + 1) % SLOTS; diff --git a/communication_techniques/src/communication/fast_forward.c b/communication_techniques/src/communication/fast_forward.c index d5992a2..089c83e 100644 --- a/communication_techniques/src/communication/fast_forward.c +++ b/communication_techniques/src/communication/fast_forward.c @@ -16,7 +16,7 @@ void *create_comm_channel(void) { if (!posix_memalign((void *) &channel->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) { - int i; + unsigned int i; channel->head = 0; channel->tail = 0; @@ -39,9 +39,9 @@ int destroy_comm_channel(void *channel) int adjust_slip(struct channel *channel) { - int dist, dist_old, unused; + unsigned int dist, dist_old, unused; - puts("adjust_slip is called"); /* Must be removed after calibration */ + //puts("adjust_slip is called"); /* Must be removed after calibration */ unused = 0; dist = (channel->head + SHARED_SPACE_VOIDPTR - channel->tail) % SHARED_SPACE_VOIDPTR; if (dist < DANGER) @@ -49,7 +49,7 @@ int adjust_slip(struct channel *channel) dist_old = 0; do { - int i; + unsigned int i; dist_old = dist; @@ -89,14 +89,12 @@ void *recv_one_data(struct channel *channel) ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { - int n, next_adjust; + unsigned int n; static __thread int nb_iter = 0; - next_adjust = ADJUST_FREQ - nb_iter; - for(n = 0; n < count; n++) + for(n = 0; n < count; n++, nb_iter++) { - /* if ((nb_iter + n) % ADJUST_FREQ == 0) */ - if (n && (n % next_adjust == ADJUST_FREQ)) + if (nb_iter % ADJUST_FREQ == 0) { adjust_slip(channel); nb_iter = 0; @@ -112,6 +110,5 @@ ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) channel->shared_space[channel->tail] = NULL; channel->tail = (channel->tail + 1) % SHARED_SPACE_VOIDPTR; } - nb_iter = (nb_iter + n) % ADJUST_FREQ; return n; } diff --git a/communication_techniques/src/communication/lamport.c b/communication_techniques/src/communication/lamport.c index 41aa6db..8529ee3 100644 --- a/communication_techniques/src/communication/lamport.c +++ b/communication_techniques/src/communication/lamport.c @@ -35,7 +35,7 @@ int destroy_comm_channel(void *channel) void *recv_one_data(struct channel *channel) { - int cons_idx; + unsigned int cons_idx; void *result; cons_idx = channel->cons_idx; @@ -48,7 +48,7 @@ void *recv_one_data(struct channel *channel) ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { - int n, cons_idx; + unsigned int n, cons_idx; n = 0; for(cons_idx = channel->cons_idx; cons_idx != channel->prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, channel->cons_idx = cons_idx) diff --git a/communication_techniques/src/communication/mcringbuffer.c b/communication_techniques/src/communication/mcringbuffer.c index 7e1fae7..ba40aa3 100644 --- a/communication_techniques/src/communication/mcringbuffer.c +++ b/communication_techniques/src/communication/mcringbuffer.c @@ -8,7 +8,7 @@ #include -const int batchSize = 64; // Check with SHARED_SPACE_SIZE +const unsigned int batchSize = 64; // Check with SHARED_SPACE_SIZE void *create_comm_channel(void) { @@ -68,7 +68,7 @@ void *recv_one_data(struct channel *channel) ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { - int n; + unsigned int n; for(n = 0; n < count; n++) { diff --git a/communication_techniques/src/communication/none.c b/communication_techniques/src/communication/none.c index 0f82058..81b5309 100644 --- a/communication_techniques/src/communication/none.c +++ b/communication_techniques/src/communication/none.c @@ -16,7 +16,7 @@ void *create_comm_channel(void) return (void *) &store_var; } -int destroy_comm_channel(void *unused) +int destroy_comm_channel(void *unused __attribute__ ((unused))) { return 0; } @@ -29,7 +29,7 @@ int destroy_comm_channel(void *unused) * @warning recv_one_data should not be used in conjonction of * recv_some_data */ -void *recv_one_data(struct channel *channel) +void *recv_one_data(struct channel *channel __attribute__ ((unused))) { return NULL; } @@ -43,7 +43,9 @@ void *recv_one_data(struct channel *channel) * recv_one_data * @warning count must be a multiple of BUF_SIZE */ -ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel __attribute__ ((unused)), + void **buf __attribute__ ((unused)), + size_t count __attribute__ ((unused))) { return count; } diff --git a/communication_techniques/src/communication/pipe.c b/communication_techniques/src/communication/pipe.c index 266cc56..ad07867 100644 --- a/communication_techniques/src/communication/pipe.c +++ b/communication_techniques/src/communication/pipe.c @@ -39,7 +39,8 @@ int destroy_comm_channel(void *channel) void *recv_one_data(struct channel *channel) { void *result, **res_ptr; - int n, nb_read; + int n; + unsigned int nb_read; nb_read = 0; res_ptr = &result; diff --git a/communication_techniques/src/communication/shared_mem_opt.c b/communication_techniques/src/communication/shared_mem_opt.c index 4323086..e3c0a04 100644 --- a/communication_techniques/src/communication/shared_mem_opt.c +++ b/communication_techniques/src/communication/shared_mem_opt.c @@ -36,7 +36,7 @@ int destroy_comm_channel(void *channel) void *recv_one_data(struct channel *channel) { void *result; - int cons_idx, prod_idx; + unsigned int cons_idx, prod_idx; cons_idx = channel->cons_idx; prod_idx = channel->prod_idx; @@ -55,7 +55,7 @@ void *recv_one_data(struct channel *channel) ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { - int n, cons_idx, prod_idx; + unsigned int n, cons_idx, prod_idx; n = 0; cons_idx = channel->cons_idx; diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c index dc27104..911b70b 100644 --- a/communication_techniques/src/main.c +++ b/communication_techniques/src/main.c @@ -18,23 +18,33 @@ #include -//#define MAX_BLOCK_ENTRIES (page_size / sizeof(void *)) -#define MAX_BLOCK_ENTRIES (4096 * CACHE_LINE_SIZE / sizeof(void *)) // Must be a multiple of BUF_SIZE -//#define MAX_BLOCK_ENTRIES ((BUF_SIZE * 4 + page_size) & ~(page_size - 1)) // Big buffer size is not a good idea +/* Must be a multiple of BUF_SIZE */ +#define MAX_BLOCK_ENTRIES (2048 * CACHE_LINE_SIZE / sizeof(void *)) +/*#define MAX_BLOCK_ENTRIES ((BUF_SIZE * 4 + page_size) & ~(page_size - 1)) // Big buffer size is not a good idea */ #define toString(x) doStringification(x) #define doStringification(x) #x #define MIN(x,y) ((x < y) ? x : y) #define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t)) -#define PROD 1 -#define CONS 2 +#define SOURCE 0 /* Initial producer (calling do_calc()) */ +#define INTERM 1 +#define SINK 2 /* Final consumer (doing the check) */ -typedef struct prod_cons_thread +typedef struct data_xchg { - void *prod_comm_channel; - void *cons_comm_channel; - int flags; // PROD, CONS or both - int cpu_binding; // id of the CPU to run the thread on -} prod_cons_thread_t; + void ** volatile data_buf[2][MAX_BLOCK_ENTRIES]; + volatile int filled_buf_entries[2]; + int unused[20]; + volatile int buf_status:1 __attribute__((aligned (CACHE_LINE_SIZE))); +} data_xchg_t; + +typedef struct node_param +{ + void *prev_comm_channel; /* Channel with previous node */ + void *next_comm_channel; /* Channel with next mode */ + data_xchg_t *c2p_xfer; /* Consumer to producer local transfer */ + int type; /* SOURCE, INTERM or SINK */ + int thread_idx; +} node_param_t; typedef int inc_check_t; @@ -45,18 +55,16 @@ static int (*init_calc)(int) = NULL; static void **(*do_calc)(void) = NULL; static int (*end_calc)(void) = NULL; static int shared = 0; /* We are not shared by default */ -pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER; -pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER; -static int init_calc_arg = 0; +static long init_calc_arg = 0; static int block_reception = 1; -static int nb_nodes = 2; // Nb of nodes participating to the chain of pipelines +static long nb_nodes = 2; /* Nb of nodes in the chain of pipelines */ static int check_recv_match_send = 0; -static uintptr_t single_prod_check_val; // /!\ Implies only one real producer -static inc_check_t *single_prod_check_ctxt; // /!\ Implies only one real producer -static int nb_cpus = 4; // TOFIX: don't hardcode this +static uintptr_t single_prod_check_val; /* /!\ Only one real producer */ +static inc_check_t *single_prod_check_ctxt; /* /!\ Only one real producer */ +static int nb_cpus = 4; /* TOFIX: don't hardcode this */ static int page_size = 0; -void usage(char *argv[]) +static void usage(char *argv[]) { char format[] = "-n -p [options]"; char options[] = "Required options:\n" @@ -71,7 +79,7 @@ void usage(char *argv[]) "-l nb_nodes\t\t\tNumber of nodes in the pipeline chain\n" "-d\t\t\t\tReceive one piece of data\n" "-h\t\t\t\tPrint this help\n" - "-s \t\t\tShare the same L cache or not\n" + "-s\t\t\t\tShare the same L2 cache or not\n" "\t\t\t\tIf level is:\n" "\t\t\t\t\t> 0, then the same L must be shared\n" "\t\t\t\t\t< 0, then different L must be used\n" @@ -81,24 +89,24 @@ void usage(char *argv[]) printf("%s\n", options); } -int do_noinit(int unused) +static int do_noinit(int unused __attribute__ ((unused))) { return 0; } -void **do_nocalc(void) +static void **do_nocalc(void) { static int an_int, *an_int_ptr = &an_int; return (void **) &an_int_ptr; } -int do_noend(void) +static int do_noend(void) { return 0; } -int inc_check_init(int init_value, inc_check_t **context) +static int inc_check_init(int init_value, inc_check_t **context) { inc_check_t *ctxt; @@ -110,24 +118,24 @@ int inc_check_init(int init_value, inc_check_t **context) return 0; } -int inc_check_next(inc_check_t *context, uintptr_t *next_value) +static int inc_check_next(inc_check_t *context, uintptr_t *next_value) { *next_value = (*context)++; return 0; } -int inc_check_end(inc_check_t *context) +static int inc_check_end(inc_check_t *context) { free(context); return 0; } -int do_checkinit(int init_value) +static int do_checkinit(int init_value) { return inc_check_init(init_value, &single_prod_check_ctxt); } -void **do_checkcalc(void) +static void **do_checkcalc(void) { int ret; @@ -138,17 +146,17 @@ void **do_checkcalc(void) return (void **) single_prod_check_val; } -int do_checkend(void) +static int do_checkend(void) { return inc_check_end(single_prod_check_ctxt); } -int analyse_options(int argc, char *argv[]) +static int analyse_options(int argc, char *argv[]) { int opt; opterr = 0; - while ((opt = getopt(argc, argv, ":bc:dhl:kn:s::"/*p:"*/)) != -1) + while ((opt = getopt(argc, argv, ":bc:dhl:kn:s"/*:p:"*/)) != -1) { switch (opt) { @@ -262,40 +270,10 @@ int analyse_options(int argc, char *argv[]) break; #endif case 's' : - if ((optind != argc) && (*argv[optind] != '-')) - { - int share_level; - char *inval; - share_level = strtol(argv[optind], &inval, 10); - if ((*argv[optind] == '\0') || (*inval != '\0')) - { - fprintf(stderr, "Option '-p' needs an integer argument\n"); - return -1; - } - if ((share_level == LONG_MIN) || ((share_level == LONG_MAX) && errno == ERANGE)) - { - fprintf(stderr, "Shared memory level must be between %ld and %ld, both inclusive\n", LONG_MIN, LONG_MAX); - return -1; - } - /* TODO: Real management of shared memory level */ - /* TODO: -x: We want level x not to be shared; 0 do as we want, only memory is guaranteed to be shared */ - if (share_level <= 0) - shared = 0; - else - shared = 1; - optind++; - } + shared = 1; break; case '?' : fprintf(stderr, "Option inconnue\n"); - /*if (!strncmp("--check", argv[optind], strlen("--check"))) - { - check_recv_match_send = 1; - optind++; - optopt = (int) *argv[optind]; - fprintf(stderr, "--check required\n"); - break; - }*/ return -1; case ':' : fprintf(stderr, "Option %s needs an argument\n", argv[optind]); @@ -346,9 +324,9 @@ int analyse_options(int argc, char *argv[]) return 0; } -int producer(void *prod_channel) +static int initial_producer(node_param_t *node_param) { - int i, j; + unsigned int i, j; if (init_calc(init_calc_arg)) { @@ -356,9 +334,8 @@ int producer(void *prod_channel) return 1; } for(i = 0; i < nb_bufs_sent; i++) { - //printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE); for(j = 0; j < WORDS_PER_LINE; j++) - send(prod_channel, do_calc()); + send(node_param->next_comm_channel, do_calc()); } if (end_calc()) { @@ -369,184 +346,348 @@ int producer(void *prod_channel) return 0; } -void on_message(void *val) +static int intermediate_producer(node_param_t *node_param) { - //printf("Receive value: %p\n", (void *) val); + int i, j; + long long total_data_sent = 0, total_to_send; + + i = 0; + total_to_send = nb_bufs_sent * WORDS_PER_LINE; + while (total_data_sent < total_to_send) + { + while (!node_param->c2p_xfer->buf_status); + for(j = 0; j < node_param->c2p_xfer->filled_buf_entries[i]; j++) + send(node_param->next_comm_channel, node_param->c2p_xfer->data_buf[i][j]); + total_data_sent += node_param->c2p_xfer->filled_buf_entries[i]; + node_param->c2p_xfer->buf_status = 0; + i = !i; + } + printf("[%p] Producer finished !\n", (void*) pthread_self()); + return 0; } -int consumer(void *cons_channel) +static void on_message(void *val __attribute__ ((unused))) { + /*printf("Receive value: %p\n", (void *) val);*/ +} + +static void *consumer_block(node_param_t *node_param) +{ + int i = 0; + int delayed_error; + unsigned long long total_data_received = 0, total_to_receive; + uintptr_t cons_check_value; + inc_check_t *cons_check_context; + + cons_check_context = NULL; + delayed_error = 0; + if (check_recv_match_send) + { + if (inc_check_init(init_calc_arg, &cons_check_context)) + { + fprintf(stderr, "Initialization of check has failed\n"); + return &page_size; /* &page_size can't be NULL */ + } + } + cons_check_value = init_calc_arg; + total_to_receive = nb_bufs_sent * WORDS_PER_LINE; + while (total_data_received < total_to_receive) + { + int j; + ssize_t nb_data_received; + size_t to_receive; + + to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received); + nb_data_received = recv_some_data(node_param->prev_comm_channel, + (void **) node_param->c2p_xfer->data_buf[i], to_receive); + node_param->c2p_xfer->filled_buf_entries[i] = nb_data_received; + if (unlikely(!(node_param->type & SINK))) + { + /* Check with nb lines sent & recv */ + while (node_param->c2p_xfer->buf_status); + node_param->c2p_xfer->buf_status = 1; + } + total_data_received += nb_data_received; + for (j = 0; j < nb_data_received; j++) + { + if (unlikely(check_recv_match_send)) + { + if (inc_check_next(cons_check_context, &cons_check_value)) + { + if (!delayed_error) + { + fprintf(stderr, "Error while checking received value match sent value\n"); + delayed_error = 1; + } + } + if (cons_check_value != (uintptr_t) node_param->c2p_xfer->data_buf[i][j]) + { + if (!delayed_error) + { + fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) node_param->c2p_xfer->data_buf[i][j]); + delayed_error = 1; + } + } + } + on_message(node_param->c2p_xfer->data_buf[i][j]); + } + i = !i; + /*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");*/ + } + if (unlikely(!(node_param->type & SINK))) + printf("[%p] Consumer finished !\n", (void*) pthread_self()); + if (delayed_error) + return &page_size; /* &page_size can't be NULL */ + return NULL; +} + +static int consumer_data(node_param_t *node_param) +{ + unsigned int i, j; int delayed_error; uintptr_t cons_check_value; inc_check_t *cons_check_context; + cons_check_context = NULL; delayed_error = 0; - if (inc_check_init(init_calc_arg, &cons_check_context)) + if (check_recv_match_send) { - fprintf(stderr, "Initialization of check has failed\n"); - return -1; /* &page_size can't be NULL, whatever NULL is bound to */ + if (inc_check_init(init_calc_arg, &cons_check_context)) + { + fprintf(stderr, "Initialization of check has failed\n"); + return -1; /* &page_size can't be NULL */ + } } cons_check_value = init_calc_arg; - if (block_reception) - { - long long total_data_received = 0, total_to_receive; - void *data_buf[MAX_BLOCK_ENTRIES]; - - total_to_receive = nb_bufs_sent * WORDS_PER_LINE; - while (total_data_received < total_to_receive) + for(i = 0; i < nb_bufs_sent; i++) { + for(j = 0; j < WORDS_PER_LINE; j++) { - int i; - ssize_t nb_data_received; - size_t to_receive; + void *data; - to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received); - nb_data_received = recv_some_data(cons_channel, data_buf, to_receive); - total_data_received += nb_data_received; - for (i = 0; i < nb_data_received; i++) + data = recv_one_data(node_param->prev_comm_channel); + if (unlikely(check_recv_match_send)) { - if (check_recv_match_send) + if (inc_check_next(cons_check_context, &cons_check_value)) { - if (inc_check_next(cons_check_context, &cons_check_value)) + if (!delayed_error) { - if (!delayed_error) - { - fprintf(stderr, "Error while checking received value match sent value\n"); - delayed_error = 1; - } - } - if (cons_check_value != (uintptr_t) data_buf[i]) - { - if (!delayed_error) - { - fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data_buf[i]); - delayed_error = 1; - } + fprintf(stderr, "Error while checking received value match sent value\n"); + delayed_error = 1; } } - on_message(data_buf[i]); - } - //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : ""); - } - } - else - { - int i, j; - - for(i = 0; i < nb_bufs_sent; i++) { - for(j = 0; j < WORDS_PER_LINE; j++) - { - void *data; - - data = recv_one_data(cons_channel); - if (check_recv_match_send) + if (cons_check_value != (uintptr_t) data) { - if (inc_check_next(cons_check_context, &cons_check_value)) + if (!delayed_error) { - if (!delayed_error) - { - fprintf(stderr, "Error while checking received value match sent value\n"); - delayed_error = 1; - } - } - if (cons_check_value != (uintptr_t) data) - { - if (!delayed_error) - { - fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data); - delayed_error = 1; - } + fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data); + delayed_error = 1; } } - on_message(data); - //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : ""); } + on_message(data); + if (likely(node_param->type == INTERM)) + send(node_param->next_comm_channel, data); + /*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");*/ } } - printf("[%p] Consumer finished !\n", (void*) pthread_self()); + if (unlikely(!(node_param->type & SINK))) + printf("[%p] Consumer finished !\n", (void*) pthread_self()); if (delayed_error) return -1; return 0; } -int consprod(void *cons_channel, void *prod_channel) +static int alloc_consprod_databuf(data_xchg_t **c2p_xfer_addr) { - if (block_reception) - { - long long total_data_received = 0, total_to_receive; - void *data_buf[MAX_BLOCK_ENTRIES]; + int ret; + data_xchg_t *c2p_xfer; - total_to_receive = nb_bufs_sent * WORDS_PER_LINE; - while (total_data_received < total_to_receive) - { - int i; - ssize_t nb_data_received; - size_t to_receive; - - to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received); - nb_data_received = recv_some_data(cons_channel, data_buf, to_receive); - total_data_received += nb_data_received; - for (i = 0; i < nb_data_received; i++) - send(prod_channel, data_buf[i]); - //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : ""); - } - } - else - { - int i, j; - - for(i = 0; i < nb_bufs_sent; i++) { - for(j = 0; j < WORDS_PER_LINE; j++) - send(prod_channel, recv_one_data(cons_channel)); - //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : ""); - } - } - printf("[%p] Producer/consumer finished !\n", (void*) pthread_self()); + ret = posix_memalign((void **) &c2p_xfer, page_size, sizeof(*c2p_xfer)); + c2p_xfer->buf_status = 0; + if (ret) + return EXIT_FAILURE; + *c2p_xfer_addr = c2p_xfer; return 0; } -void *node(prod_cons_thread_t *thread_params) +static int interm_block_xfer(node_param_t *node_param) { int return_value; + void *pthread_return_value; + pthread_t prod_tid; + + return_value = 0; + if (alloc_consprod_databuf(&node_param->c2p_xfer)) + return -1; + /* Create producer to next node, we are consumer of previous node */ + if (pthread_create(&prod_tid, NULL, (void *(*)(void *)) consumer_block, + node_param)) + { + perror("pthread_create cons (block_reception)"); + return_value = -1; + goto free_c2p_xfer_ressources; + } + return_value = intermediate_producer(node_param); + pthread_join(prod_tid, &pthread_return_value); + if (pthread_return_value != NULL) + return_value = -1; + +free_c2p_xfer_ressources: + free(node_param->c2p_xfer); + return return_value; +} + +static int set_cpu_binding(int thread_idx) +{ + int cpu_binding; pthread_t tid; cpu_set_t cpuset; + /* Should work in most cases */ + if (shared) + cpu_binding = thread_idx % nb_cpus; + else + cpu_binding = (2 * thread_idx) % nb_cpus; tid = pthread_self(); CPU_ZERO(&cpuset); - CPU_SET(thread_params->cpu_binding, &cpuset); + CPU_SET(cpu_binding, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); - return NULL; + return -1; } - switch (thread_params->flags & (PROD | CONS)) + return 0; +} + +static void *node(node_param_t *node_param) +{ + int return_value; + + if (set_cpu_binding(node_param->thread_idx)) + return &page_size; + switch (node_param->type) { - case PROD: - return_value = producer(thread_params->prod_comm_channel); + case SOURCE: + return_value = initial_producer(node_param); break; - case CONS: - return_value = consumer(thread_params->cons_comm_channel); + case INTERM: + if (block_reception) + return_value = interm_block_xfer(node_param); + else + return_value = consumer_data(node_param); break; - case (PROD | CONS): - return_value = consprod(thread_params->cons_comm_channel, - thread_params->prod_comm_channel); + case SINK: + if (block_reception) + { + if (alloc_consprod_databuf(&node_param->c2p_xfer)) + return &page_size; + if (consumer_block(node_param) == NULL) //TODO: allocate data_buf + return_value = 0; + else + return_value = -1; + free(node_param->c2p_xfer); + } + else + return_value = consumer_data(node_param); break; default: - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + return &page_size; /* &page_size can't be NULL */ } if (return_value) - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + return &page_size; /* &page_size can't be NULL */ return NULL; } +static int set_node_params(node_param_t *node_param, + node_param_t *prev_node_param, int thread_idx) +{ + if (thread_idx == nb_nodes - 1) + node_param->type = SINK; + else + { + if (thread_idx) + node_param->type = INTERM; + else + node_param->type = SOURCE; + node_param->next_comm_channel = create_comm_channel(); + if (node_param->next_comm_channel == NULL) + return -1; + } + if (prev_node_param != NULL) + { + node_param->prev_comm_channel = + prev_node_param->next_comm_channel; + } + node_param->thread_idx = thread_idx; + return 0; +} + +static int create_threads(int *p_nb_nodes, pthread_t *node_tids, + node_param_t *node_params) +{ + int i, nb_nodes; + + nb_nodes = *p_nb_nodes; + for (i = 0; i < nb_nodes; i++) + { + node_param_t *prev_node_param; + + prev_node_param = (i) ? &node_params[i - 1] : NULL; + if (set_node_params(&node_params[i], prev_node_param, i)) + { + *p_nb_nodes = i - 1; + return -1; + } + if (pthread_create(&node_tids[i], NULL, + (void *(*)(void *)) node, &node_params[i])) + { + perror("pthread_create node"); + destroy_comm_channel(node_params[i].next_comm_channel); + *p_nb_nodes = i - 1; + return -1; + } + } + *p_nb_nodes = i - 1; + return 0; +} + +static int join_threads(int nb_threads, pthread_t *tids) +{ + int i, return_value; + void *pthread_return_value; + + for (i = 0, return_value = 0; i < nb_threads; i++) + { + pthread_join(tids[i], &pthread_return_value); + if (pthread_return_value != NULL) + return_value = EXIT_FAILURE; + } + return return_value; +} + +static int destroy_threads(int last_allocated, node_param_t *node_params) +{ + int i, return_value; + + for (i = last_allocated, return_value = 0; i >= 0; i--) + { + if (node_params[i].type != SINK) + { + if (destroy_comm_channel(node_params[i].next_comm_channel)) + return_value = -1; + } + } + return return_value; +} + int main(int argc, char *argv[]) { + int return_value, nb_threads; pthread_t *tids; - int i, return_value; - void *pthread_return_value; - prod_cons_thread_t *thread_params; + node_param_t *node_params; return_value = EXIT_SUCCESS; if (analyse_options(argc, argv)) @@ -554,59 +695,25 @@ int main(int argc, char *argv[]) page_size = sysconf(_SC_PAGE_SIZE); if (page_size <= 0) return EXIT_FAILURE; - thread_params = malloc(nb_nodes * sizeof(prod_cons_thread_t)); - if (thread_params == NULL) + nb_threads = nb_nodes; + node_params = malloc(nb_threads * sizeof(node_param_t)); + if (node_params == NULL) return EXIT_FAILURE; - tids = malloc(nb_nodes * sizeof(pthread_t)); + tids = malloc(nb_threads * sizeof(pthread_t)); if (tids == NULL) { return_value = EXIT_FAILURE; goto error_alloc_tids; } - for (i = 0; i < nb_nodes - 1; i++) - { - if (i) - thread_params[i].flags = PROD | CONS; - else - thread_params[i].flags = PROD; - // Should work in most cases - if (shared) - thread_params[i].cpu_binding = i % nb_cpus; - else - thread_params[i].cpu_binding = (2 * i) % nb_cpus; - thread_params[i].prod_comm_channel = create_comm_channel(); - if (thread_params[i].prod_comm_channel == NULL) - { - return_value = EXIT_FAILURE; - goto error_create_channels; - } - if (i) - thread_params[i].cons_comm_channel = - thread_params[i - 1].prod_comm_channel; - pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]); - } - thread_params[i].flags = CONS; - if (shared) - thread_params[i].cpu_binding = i % nb_cpus; - else - thread_params[i].cpu_binding = (2 * i) % nb_cpus; - thread_params[i].cons_comm_channel = - thread_params[i - 1].prod_comm_channel; - pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]); - for (i = 0; i < nb_nodes; i++) - { - pthread_join(tids[i], &pthread_return_value); - if (pthread_return_value != NULL) - return_value = EXIT_FAILURE; - } - i--; + if (create_threads(&nb_threads, tids, node_params)) + goto error_create_channels; + if (join_threads(nb_threads, tids)) + return_value = EXIT_FAILURE; error_create_channels: - for (i-- ; i >= 0; i--) { - if (destroy_comm_channel(thread_params[i].prod_comm_channel)) - return_value = EXIT_FAILURE; - } + if (destroy_threads(nb_threads, node_params)) + return_value = EXIT_FAILURE; free(tids); error_alloc_tids: - free(thread_params); + free(node_params); return return_value; }