From 756a701466fbee35692428150c700614e9e64a1e Mon Sep 17 00:00:00 2001 From: Thomas Preud'homme Date: Mon, 11 Apr 2011 16:18:49 +0200 Subject: [PATCH] [commtech] Refactor to chain more than 2 nodes * Refactor the source to be able to chain more than 2 nodes together * Compile all binaries by default (binList must be set manually in lancement.sh to run only a subset of the binaries --- communication_techniques/Makefile | 8 +- .../include/batch_queue_common_comm.h | 32 +-- communication_techniques/include/commtech.h | 71 +---- .../include/csq_common_comm.h | 23 +- .../include/fast_forward_comm.h | 20 +- .../include/lamport_comm.h | 12 +- .../include/mcringbuffer_comm.h | 25 +- communication_techniques/include/none_comm.h | 6 +- communication_techniques/include/pipe_comm.h | 8 +- .../include/shared_mem_opt_comm.h | 16 +- communication_techniques/lancement.sh | 19 +- .../src/communication/batch_queue.c | 84 ++---- .../src/communication/csq.c | 75 ++--- .../src/communication/fast_forward.c | 84 ++---- .../src/communication/lamport.c | 72 ++--- .../src/communication/mcringbuffer.c | 107 +++---- .../src/communication/none.c | 34 +-- .../src/communication/pipe.c | 66 +---- .../src/communication/shared_mem_opt.c | 82 ++---- communication_techniques/src/main.c | 269 +++++++++++------- 20 files changed, 404 insertions(+), 709 deletions(-) diff --git a/communication_techniques/Makefile b/communication_techniques/Makefile index 55d4d68..e8e1190 100644 --- a/communication_techniques/Makefile +++ b/communication_techniques/Makefile @@ -20,8 +20,12 @@ LDFLAGS:=-L$(LIBDIR) -lpthread -ldl CC=gcc # Files -BINNAMES:=batch_queue_comm lamport_comm shared_mem_opt_comm none_comm csq_2_comm -BINNAMES+=csq_64_comm fast_forward_comm mcringbuffer_comm #pipe_comm jikes_barrier_comm asm_cache_comm +BINNAMES:=batch_queue_2_comm batch_queue_4_comm batch_queue_8_comm +BINNAMES+=batch_queue_16_comm batch_queue_32_comm batch_queue_64_comm +BINNAMES+=batch_queue_128_comm batch_queue_256_comm batch_queue_512_comm +BINNAMES+=batch_queue_1024_comm lamport_comm shared_mem_opt_comm none_comm +BINNAMES+=csq_2_comm csq_64_comm fast_forward_comm mcringbuffer_comm pipe_comm +#BINNAMES+=jikes_barrier_comm asm_cache_comm CALCLIBSNAMES:=calc_mat calc_line calc_useless_loop BINS:=$(patsubst %,$(BINDIR)/%,$(BINNAMES)) CALCLIBS:=$(patsubst %,$(LIBDIR)/$(CALCDIR)/lib%.so.1,$(CALCLIBSNAMES)) diff --git a/communication_techniques/include/batch_queue_common_comm.h b/communication_techniques/include/batch_queue_common_comm.h index 8cbe970..e1f3c4a 100644 --- a/communication_techniques/include/batch_queue_common_comm.h +++ b/communication_techniques/include/batch_queue_common_comm.h @@ -8,42 +8,26 @@ #define BUF_SIZE (32 * CACHE_LINE_SIZE) #endif -/* This is not an error, we need this two-macro system */ -#define toString(x) doStringification(x) -#define doStringification(x) #x - struct channel { void * volatile buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); int unused[20] __attribute__ ((aligned (CACHE_LINE_SIZE))); volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE))); - int idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + int sender_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + int receiver_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; -struct cons -{ - struct channel *channel; - int receiver_idx; -}; - -union comm -{ - struct channel *channel; - struct cons *cons; -}; - -extern __thread union comm comm; __BEGIN_DECLS -static inline void send(void **addr) +static inline void send(struct channel *channel, void **addr) { - comm.channel->buf[comm.channel->idx++] = addr; - comm.channel->idx %= 2 * (BUF_SIZE / sizeof(void *)); - if (!(comm.channel->idx % (BUF_SIZE / sizeof(void *)))) + channel->buf[channel->sender_idx++] = addr; + channel->sender_idx %= 2 * (BUF_SIZE / sizeof(void *)); + if (!(channel->sender_idx % (BUF_SIZE / sizeof(void *)))) { - while (comm.channel->state); - comm.channel->state = 1; + while (channel->state); + channel->state = 1; } } diff --git a/communication_techniques/include/commtech.h b/communication_techniques/include/commtech.h index 944ecb2..a73d731 100644 --- a/communication_techniques/include/commtech.h +++ b/communication_techniques/include/commtech.h @@ -8,23 +8,7 @@ __BEGIN_DECLS -/* - * @return 0 if success, -1 else - * - * Initialize communication library. - * @comment Must be run before any other function of this library - */ -int init_library(void); - - -/* - * @return 0 if success, -1 else - * - * Finalize communication library. - * @comment Must be run after any other function of this library - */ -int finalize_library(void); - +struct channel; /* * @return a pointer on the channel if success, NULL else @@ -50,53 +34,7 @@ int destroy_comm_channel(void *); /* - * @param chan Address of the communication channel to attach to - * the producer calling this function. - * @return 0 on success, -1 else - * - * Initialize the producer and attach the given communication channel to - * it. - */ -int init_producer_thread(void *); - - -/* - * @param channel Address of the communication channel to detach from - * the producer calling this function - * @return 0 on success, -1 else - * - * Finalize the producer. - * @comment Must be run by the producer after it stopped to communicate - * with the consumer. - */ -int finalize_producer_thread(void *); - - -/* - * @param channel Address of the communication channel to attach to - * the consumer calling this function. - * @return 0 on success, -1 else - * - * Initialize the consumer and attach the given communication channel to - * it. - */ -int init_consumer_thread(void *); - - -/* - * @param channel Address of the communication channel to detach from - * the consumer calling this function - * @return 0 on success, -1 else - * - * Finalize the consumer. - * - * @comment Must be run by the consumer after it stopped to communicate - * with the consumer. - */ -int finalize_consumer_thread(void *); - - -/* + * @param channel Channel from which to receive data * @return a data sent by the matching producer * * Wait until a data sent by the matching producer is available @@ -104,11 +42,12 @@ int finalize_consumer_thread(void *); * @comment recv_one_data should not be used in conjonction of * recv_some_data */ -void *recv_one_data(void); +void *recv_one_data(struct channel *); /* + * @param channel Channel from which to receive data * @param buf The buffer to write received data into * @param count The maximum number of data to copy into buf * @return Number of data copied into buf @@ -120,7 +59,7 @@ void *recv_one_data(void); * @comment count must be a multiple of BUF_SIZE / sizeof(void *) which is * equal to SUB_SLOTS */ -ssize_t recv_some_data(void **, size_t); +ssize_t recv_some_data(struct channel *, void **, size_t); __END_DECLS diff --git a/communication_techniques/include/csq_common_comm.h b/communication_techniques/include/csq_common_comm.h index 7a43bd8..0c567b9 100644 --- a/communication_techniques/include/csq_common_comm.h +++ b/communication_techniques/include/csq_common_comm.h @@ -19,38 +19,31 @@ struct lvl_2 volatile unsigned int flag : 1; } __attribute__ ((aligned (CACHE_LINE_SIZE))); -struct comm +struct channel { struct lvl_2 queue[SLOTS] __attribute__ ((aligned (CACHE_LINE_SIZE))); -}; - -union ctrl -{ - int head; - int tail; + int head __attribute__ ((aligned(CACHE_LINE_SIZE))); + int tail __attribute__ ((aligned(CACHE_LINE_SIZE))); }; __BEGIN_DECLS -extern __thread struct comm *comm; -extern __thread union ctrl ctrl __attribute__ ((aligned (CACHE_LINE_SIZE))); - // TODO: Make it send only one data -static inline void send(void **addr) +static inline void send(struct channel *channel, void **addr) { static __thread int chkidx = 0; // If all slots are full, spin if (!chkidx) - while (comm->queue[ctrl.tail].flag); + while (channel->queue[channel->tail].flag); // Enqueue a data item - comm->queue[ctrl.tail].chunk[chkidx++] = addr; + channel->queue[channel->tail].chunk[chkidx++] = addr; if (!(chkidx % SUB_SLOTS)) { chkidx = 0; - comm->queue[ctrl.tail].flag = 1; - ctrl.tail = (ctrl.tail + 1) % SLOTS; + channel->queue[channel->tail].flag = 1; + channel->tail = (channel->tail + 1) % SLOTS; } } diff --git a/communication_techniques/include/fast_forward_comm.h b/communication_techniques/include/fast_forward_comm.h index 004a537..dfa5532 100644 --- a/communication_techniques/include/fast_forward_comm.h +++ b/communication_techniques/include/fast_forward_comm.h @@ -16,35 +16,33 @@ #define GOOD (6 * BUF_SIZE / sizeof(void *)) #define ADJUST_FREQ 64 -struct comm +struct channel { void * volatile *shared_space; - int head; - int tail; + int head __attribute__ ((aligned (CACHE_LINE_SIZE))); + int tail __attribute__ ((aligned (CACHE_LINE_SIZE))); }; __BEGIN_DECLS -extern __thread struct comm *comm; +extern int adjust_slip(struct channel *channel); -extern int adjust_slip(void); - -static inline void send(void **addr) +static inline void send(struct channel *channel, void **addr) { static __thread int nb_iter = 0; assert(addr != NULL); if (nb_iter == ADJUST_FREQ) { - adjust_slip(); + adjust_slip(channel); nb_iter = 0; } while (1) { - if (comm->shared_space[comm->head] != NULL) + if (channel->shared_space[channel->head] != NULL) continue; - comm->shared_space[comm->head] = addr; - comm->head = (comm->head + 1) % SHARED_SPACE_VOIDPTR; + channel->shared_space[channel->head] = addr; + channel->head = (channel->head + 1) % SHARED_SPACE_VOIDPTR; break; } } diff --git a/communication_techniques/include/lamport_comm.h b/communication_techniques/include/lamport_comm.h index c9a09b5..2ac37cc 100644 --- a/communication_techniques/include/lamport_comm.h +++ b/communication_techniques/include/lamport_comm.h @@ -11,7 +11,7 @@ #define SHARED_SPACE_SIZE (2 * BUF_SIZE) #define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *)) -struct comm +struct channel { void * volatile *shared_space; volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); @@ -20,13 +20,11 @@ struct comm __BEGIN_DECLS -extern __thread struct comm *comm; - -static inline void send(void **addr) +static inline void send(struct channel *channel, void **addr) { - while ((comm->prod_idx + 1) % SHARED_SPACE_VOIDPTR == comm->cons_idx); - comm->shared_space[comm->prod_idx] = addr; - comm->prod_idx = (comm->prod_idx + 1) % SHARED_SPACE_VOIDPTR; + while ((channel->prod_idx + 1) % SHARED_SPACE_VOIDPTR == channel->cons_idx); + channel->shared_space[channel->prod_idx] = addr; + channel->prod_idx = (channel->prod_idx + 1) % SHARED_SPACE_VOIDPTR; } __END_DECLS diff --git a/communication_techniques/include/mcringbuffer_comm.h b/communication_techniques/include/mcringbuffer_comm.h index 3eee817..55fafde 100644 --- a/communication_techniques/include/mcringbuffer_comm.h +++ b/communication_techniques/include/mcringbuffer_comm.h @@ -28,7 +28,7 @@ struct prod }; -struct comm +struct channel { struct control ctrl __attribute__ ((aligned (CACHE_LINE_SIZE))); struct prod prod __attribute__ ((aligned (CACHE_LINE_SIZE))); @@ -38,27 +38,26 @@ struct comm __BEGIN_DECLS -extern __thread struct comm *comm; extern const int batchSize; -static inline void send(void **addr) +static inline void send(struct channel *channel, void **addr) { while (1) { - int afterNextWrite = (comm->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR; - if (afterNextWrite == comm->prod.localRead) + int afterNextWrite = (channel->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR; + if (afterNextWrite == channel->prod.localRead) { - if (afterNextWrite == comm->ctrl.read) + if (afterNextWrite == channel->ctrl.read) continue; - comm->prod.localRead = comm->ctrl.read; + channel->prod.localRead = channel->ctrl.read; } - comm->shared_space[comm->prod.nextWrite] = addr; - comm->prod.nextWrite = afterNextWrite; - comm->prod.wBatch++; - if (comm->prod.wBatch >= batchSize) + channel->shared_space[channel->prod.nextWrite] = addr; + channel->prod.nextWrite = afterNextWrite; + channel->prod.wBatch++; + if (channel->prod.wBatch >= batchSize) { - comm->ctrl.write = comm->prod.nextWrite; - comm->prod.wBatch = 0; + channel->ctrl.write = channel->prod.nextWrite; + channel->prod.wBatch = 0; } break; } diff --git a/communication_techniques/include/none_comm.h b/communication_techniques/include/none_comm.h index 3b66b41..dac20a4 100644 --- a/communication_techniques/include/none_comm.h +++ b/communication_techniques/include/none_comm.h @@ -3,7 +3,11 @@ __BEGIN_DECLS -static inline void send(void **addr) {} +struct channel +{ +}; + +static inline void send(struct channel *channel, void **addr) {} __END_DECLS diff --git a/communication_techniques/include/pipe_comm.h b/communication_techniques/include/pipe_comm.h index 69b2719..735d2f2 100644 --- a/communication_techniques/include/pipe_comm.h +++ b/communication_techniques/include/pipe_comm.h @@ -6,16 +6,14 @@ #define READ_IDX 0 #define WRITE_IDX 1 -struct comm +struct channel { int pipefd[2]; }; __BEGIN_DECLS -extern __thread struct comm *comm; - -static inline void send(void **addr) +static inline void send(struct channel *channel, void **addr) { int nb_read; void *addr_ptr; @@ -26,7 +24,7 @@ static inline void send(void **addr) { int n; - n = write(comm->pipefd[WRITE_IDX], addr_ptr, sizeof(void *) - nb_read); + n = write(channel->pipefd[WRITE_IDX], addr_ptr, sizeof(void *) - nb_read); if (n > 0) { nb_read += n; diff --git a/communication_techniques/include/shared_mem_opt_comm.h b/communication_techniques/include/shared_mem_opt_comm.h index eef3e53..b67ac75 100644 --- a/communication_techniques/include/shared_mem_opt_comm.h +++ b/communication_techniques/include/shared_mem_opt_comm.h @@ -7,7 +7,7 @@ #define SHARED_SPACE_SIZE (2 * CACHE_LINE_SIZE) #define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *)) -struct comm +struct channel { void * volatile *shared_space; volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); @@ -17,22 +17,20 @@ struct comm __BEGIN_DECLS -extern __thread struct comm *comm; - -static inline void send(void **addr) +static inline void send(struct channel *channel, void **addr) { static __thread int local_cons_idx = 0; int local_prod, next_prod; - local_prod = comm->prod_idx; + local_prod = channel->prod_idx; next_prod = (local_prod + 1) % SHARED_SPACE_VOIDPTR; if (next_prod == local_cons_idx) { - while (next_prod == comm->cons_idx); - local_cons_idx = comm->cons_idx; + while (next_prod == channel->cons_idx); + local_cons_idx = channel->cons_idx; } - comm->shared_space[local_prod] = addr; - comm->prod_idx = next_prod; + channel->shared_space[local_prod] = addr; + channel->prod_idx = next_prod; } __END_DECLS diff --git a/communication_techniques/lancement.sh b/communication_techniques/lancement.sh index 6c3cca0..3e996b3 100755 --- a/communication_techniques/lancement.sh +++ b/communication_techniques/lancement.sh @@ -11,7 +11,7 @@ calcDir="calculation" # Param binList="$(ls -1 "${binDir}"| sed '$!s/$/ /' | tr -d '\n')" -nbProdList="1" # Nombre de cores producteurs +nbNodesList="2" # Nombre de noeuds chainés dans le pipeline typeProdList="none useless_loop line matrice" # Methode pour produire les valeurs typeCacheList="L2 Memory" # Niveau de cache partage perfOpt="stat -r 10 -e cycles -e L1-dcache-loads -e L1-dcache-stores -e L1-dcache-load-misses -e L1-dcache-store-misses -e L1-dcache-prefetch-misses" @@ -21,7 +21,7 @@ nbIter="500000000" # Nb de lignes produites sizeBuf="1" # En nombre de lignes de cache # Nom generique des fichiers de log -logFileName="\$perfDirName/cache_\$typeCache-nbProd_\$nbProd-typeProd_\$typeProd-argTypeProd_\$argTypeProd-nbIter_\$nbIter-\$bin.log" +logFileName="\$perfDirName/cache_\$typeCache-nbNodes_\$nbNodes-typeProd_\$typeProd-argTypeProd_\$argTypeProd-nbIter_\$nbIter-\$bin.log" expDirName="logs" perfDirName="$expDirName/perfCommMulti-`date +'%F-%Hh%Mm%S'`" @@ -43,9 +43,17 @@ function_run () { "L2" ) optTypeCache="-s" ;; * ) exit 1 ;; esac + nbNodes=$((nbNodes)) + case $nbNodes in + "") exit 1 ;; + 0|1 ) exit 1 ;; + 2 ) optNbNodes="" ;; + [0-9]* ) optNbNodes="-t $nbNodes" ;; + *) exit 1 ;; + esac make $binDir/$bin - echo "On lance : \"perf $perfOpt $binDir/$bin $optTypeCache $optTypeProd -n $nbIter\"" + echo "On lance : \"perf $perfOpt $binDir/$bin $optNbNodes $optTypeCache $optTypeProd -n $nbIter\"" beginingDate=`date +%s` ( perf $perfOpt $binDir/$bin $optTypeCache $optTypeProd -n $nbIter 2>&1 || echo "echec experience" ) | eval tee $logFileName @@ -60,12 +68,13 @@ function_run () { echo "" } - +eval echo \"# Describe what this experiment is about: \\\"what are the parameters evaluated?\\\"\" > "$perfDirName/description" +eval vim "$perfDirName/description" echo -e "On commence les perfs\n" globalBeginingDate=`date +%s` -for nbProd in $nbProdList ; do +for nbNodes in $nbNodesList ; do for typeProd in $typeProdList; do for typeCache in $typeCacheList ; do for bin in $binList ; do diff --git a/communication_techniques/src/communication/batch_queue.c b/communication_techniques/src/communication/batch_queue.c index 8a1e0b2..3cc892f 100644 --- a/communication_techniques/src/communication/batch_queue.c +++ b/communication_techniques/src/communication/batch_queue.c @@ -7,65 +7,23 @@ #include -__thread union comm comm; - -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { - struct cons *cons; + struct channel *channel; - if (!posix_memalign((void **) &cons, CACHE_LINE_SIZE, sizeof(struct cons))) + if (!posix_memalign((void **) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) { - cons->receiver_idx = 0; - if (!posix_memalign((void **) &cons->channel, CACHE_LINE_SIZE, sizeof(struct channel))) - { - cons->channel->state = 0; - cons->channel->idx = 0; - return cons; - } - else - free(cons); + channel->receiver_idx = 0; + channel->state = 0; + channel->sender_idx = 0; + return channel; } return NULL; } -int destroy_comm_channel(void *cons) +int destroy_comm_channel(void *channel) { - free(((struct cons *) cons)->channel); - free(cons); - return 0; -} - -int init_producer_thread(void *cons) -{ - comm.channel = ((struct cons *) cons)->channel; - return 0; -} - -int finalize_producer_thread(void *cons) -{ - comm.channel = NULL; - return 0; -} - -int init_consumer_thread(void *cons) -{ - comm.cons = (struct cons *) cons; - return 0; -} - -int finalize_consumer_thread(void *cons) -{ - comm.cons = NULL; + free(channel); return 0; } @@ -77,17 +35,17 @@ int finalize_consumer_thread(void *cons) * @warning recv_one_data should not be used in conjonction of * recv_some_data */ -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { static __thread int i; void *result; - if (unlikely(!(i % (BUF_SIZE / sizeof(void *))))) - while (!comm.cons->channel->state); - result = comm.cons->channel->buf[i++]; + if (unlikely(!(channel->receiver_idx % (BUF_SIZE / sizeof(void *))))) + while (!channel->state); + result = channel->buf[channel->receiver_idx++]; i %= (2 * BUF_SIZE) / sizeof(void *); - if (unlikely(!(i % (BUF_SIZE / sizeof(void *))))) - comm.cons->channel->state = 0; + if (unlikely(!(channel->receiver_idx % (BUF_SIZE / sizeof(void *))))) + channel->state = 0; return result; } @@ -100,12 +58,12 @@ void *recv_one_data(void) * recv_one_data * @warning count must be a multiple of BUF_SIZE */ -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int nb_read; nb_read = 0; - while (comm.cons->channel->state && nb_read < count) + while (channel->state && nb_read < count) { int i, n; /* @@ -115,19 +73,19 @@ ssize_t recv_some_data(void **buf, size_t count) * the second cache line we correct the pointer to point to the * first one (this is done by the modulo). */ - i = comm.cons->receiver_idx; - n = comm.cons->receiver_idx + (BUF_SIZE / sizeof(void *)); - comm.cons->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); + i = channel->receiver_idx; + n = channel->receiver_idx + (BUF_SIZE / sizeof(void *)); + channel->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); for(; i < n; i++) { /* * The behaviour of this is not documented but we know * the values inside buf won't change during this affectation */ - *buf++ = comm.cons->channel->buf[i]; + *buf++ = channel->buf[i]; } nb_read += BUF_SIZE / sizeof(void *); - comm.cons->channel->state = 0; + channel->state = 0; } return nb_read; } diff --git a/communication_techniques/src/communication/csq.c b/communication_techniques/src/communication/csq.c index 22b9060..54287a8 100644 --- a/communication_techniques/src/communication/csq.c +++ b/communication_techniques/src/communication/csq.c @@ -9,100 +9,61 @@ #include -__thread struct comm *comm; -__thread union ctrl ctrl __attribute__ ((aligned (CACHE_LINE_SIZE))); - -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { - struct comm *comm; + struct channel *channel; - if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + if (!posix_memalign((void *) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) { int i; for (i = 0; i < SLOTS; i++) - comm->queue[i].flag = 0; - return comm; + channel->queue[i].flag = 0; + channel->head = 0; + channel->tail = 0; + return channel; } return NULL; } -int destroy_comm_channel(void *comm) +int destroy_comm_channel(void *channel) { - free(comm); + free(channel); return 0; } -int init_producer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - ctrl.tail = 0; - return 0; -} - -int finalize_producer_thread(void *unused) -{ - comm = NULL; - ctrl.tail = 0; - return 0; -} - -int init_consumer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - ctrl.head = 0; - return 0; -} - -int finalize_consumer_thread(void *unused) -{ - comm = NULL; - ctrl.head = 0; - return 0; -} - -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { static __thread int i; void *result; if (__builtin_expect(!i, 0)) - while (!comm->queue[ctrl.head].flag); - result = comm->queue[ctrl.head].chunk[i++]; + while (!channel->queue[channel->head].flag); + result = channel->queue[channel->head].chunk[i++]; if (i % SUB_SLOTS) { i = 0; - comm->queue[ctrl.head].flag = 0; - ctrl.head = (ctrl.head + 1) % SLOTS; + channel->queue[channel->head].flag = 0; + channel->head = (channel->head + 1) % SLOTS; } return result; } -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int n; n = 0; // If all slots are empty, spin - while (comm->queue[ctrl.head].flag) + while (channel->queue[channel->head].flag) { // Dequeue a chunk of data items memcpy(buf, (const void *) - comm->queue[ctrl.head].chunk, + channel->queue[channel->head].chunk, SUB_SLOTS * sizeof(*buf)); n += SUB_SLOTS; - comm->queue[ctrl.head].flag = 0; - ctrl.head = (ctrl.head + 1) % SLOTS; + channel->queue[channel->head].flag = 0; + channel->head = (channel->head + 1) % SLOTS; if (n == count) break; } diff --git a/communication_techniques/src/communication/fast_forward.c b/communication_techniques/src/communication/fast_forward.c index c9c16ee..d5992a2 100644 --- a/communication_techniques/src/communication/fast_forward.c +++ b/communication_techniques/src/communication/fast_forward.c @@ -8,78 +8,42 @@ #include -__thread struct comm *comm; - -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { - struct comm *comm; + struct channel *channel; - if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + if (!posix_memalign((void *) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) { - if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) + if (!posix_memalign((void *) &channel->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) { int i; - comm->head = 0; - comm->tail = 0; + channel->head = 0; + channel->tail = 0; for (i = 0; i < SHARED_SPACE_VOIDPTR; i++) - comm->shared_space[i] = NULL; - return comm; + channel->shared_space[i] = NULL; + return channel; } else - free(comm); + free(channel); } return NULL; } -int destroy_comm_channel(void *comm) +int destroy_comm_channel(void *channel) { - free((void *) ((struct comm *) comm)->shared_space); - free(comm); + free((void *) ((struct channel *) channel)->shared_space); + free(channel); return 0; } -int init_producer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_producer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -int init_consumer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_consumer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -int adjust_slip(void) +int adjust_slip(struct channel *channel) { int dist, dist_old, unused; puts("adjust_slip is called"); /* Must be removed after calibration */ unused = 0; - dist = (comm->head + SHARED_SPACE_VOIDPTR - comm->tail) % SHARED_SPACE_VOIDPTR; + dist = (channel->head + SHARED_SPACE_VOIDPTR - channel->tail) % SHARED_SPACE_VOIDPTR; if (dist < DANGER) { dist_old = 0; @@ -95,35 +59,35 @@ int adjust_slip(void) */ for (i = 0; i < 20 * ((GOOD + 1) - dist); i++) unused++; - dist = (comm->head + SHARED_SPACE_VOIDPTR - comm->tail) % SHARED_SPACE_VOIDPTR; + dist = (channel->head + SHARED_SPACE_VOIDPTR - channel->tail) % SHARED_SPACE_VOIDPTR; } while (dist < GOOD && dist_old < dist); } return unused; } -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { void *result; static __thread int nb_iter = 0; if (nb_iter == ADJUST_FREQ) { - adjust_slip(); + adjust_slip(channel); nb_iter = 0; } while (1) { - result = comm->shared_space[comm->tail]; + result = channel->shared_space[channel->tail]; if (NULL == result) continue; - comm->shared_space[comm->tail] = NULL; - comm->tail = (comm->tail + 1) % SHARED_SPACE_VOIDPTR; + channel->shared_space[channel->tail] = NULL; + channel->tail = (channel->tail + 1) % SHARED_SPACE_VOIDPTR; break; } return result; } -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int n, next_adjust; static __thread int nb_iter = 0; @@ -134,19 +98,19 @@ ssize_t recv_some_data(void **buf, size_t count) /* if ((nb_iter + n) % ADJUST_FREQ == 0) */ if (n && (n % next_adjust == ADJUST_FREQ)) { - adjust_slip(); + adjust_slip(channel); nb_iter = 0; } /* * The behaviour of this is not documented but we know * the values inside buf won't change during this affectation */ - *buf = comm->shared_space[comm->tail]; + *buf = channel->shared_space[channel->tail]; if (NULL == *buf) break; buf++; - comm->shared_space[comm->tail] = NULL; - comm->tail = (comm->tail + 1) % SHARED_SPACE_VOIDPTR; + channel->shared_space[channel->tail] = NULL; + channel->tail = (channel->tail + 1) % SHARED_SPACE_VOIDPTR; } nb_iter = (nb_iter + n) % ADJUST_FREQ; return n; diff --git a/communication_techniques/src/communication/lamport.c b/communication_techniques/src/communication/lamport.c index f786261..41aa6db 100644 --- a/communication_techniques/src/communication/lamport.c +++ b/communication_techniques/src/communication/lamport.c @@ -8,92 +8,56 @@ #include -__thread struct comm *comm; - -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { - struct comm *comm; + struct channel *channel; - if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + if (!posix_memalign((void *) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) { - if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) + if (!posix_memalign((void *) &channel->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) { - comm->cons_idx = 0; - comm->prod_idx = 0; - return comm; + channel->cons_idx = 0; + channel->prod_idx = 0; + return channel; } else - free(comm); + free(channel); } return NULL; } -int destroy_comm_channel(void *comm) +int destroy_comm_channel(void *channel) { - free((void *) ((struct comm *) comm)->shared_space); - free(comm); + free((void *) ((struct channel *) channel)->shared_space); + free(channel); return 0; } -int init_producer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_producer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -int init_consumer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_consumer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { int cons_idx; void *result; - cons_idx = comm->cons_idx; - while (cons_idx == comm->prod_idx); - result = comm->shared_space[cons_idx]; + cons_idx = channel->cons_idx; + while (cons_idx == channel->prod_idx); + result = channel->shared_space[cons_idx]; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR; - comm->cons_idx = cons_idx; + channel->cons_idx = cons_idx; return result; } -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int n, cons_idx; n = 0; - for(cons_idx = comm->cons_idx; cons_idx != comm->prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, comm->cons_idx = cons_idx) + for(cons_idx = channel->cons_idx; cons_idx != channel->prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, channel->cons_idx = cons_idx) { /* * The behaviour of this is not documented but we know * the values inside buf won't change during this affectation */ - *buf++ = comm->shared_space[cons_idx]; + *buf++ = channel->shared_space[cons_idx]; if (++n == count) break; } diff --git a/communication_techniques/src/communication/mcringbuffer.c b/communication_techniques/src/communication/mcringbuffer.c index 028148c..3e44e32 100644 --- a/communication_techniques/src/communication/mcringbuffer.c +++ b/communication_techniques/src/communication/mcringbuffer.c @@ -8,123 +8,88 @@ #include -__thread struct comm *comm; const int batchSize = 50; // Check with SHARED_SPACE_SIZE -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { - struct comm *comm; + struct channel *channel; - if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + if (!posix_memalign((void *) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) { - if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) + if (!posix_memalign((void *) &channel->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) { - comm->ctrl.read = 0; - comm->ctrl.write = 0; - comm->cons.localWrite = 0; - comm->cons.nextRead = 0; - comm->cons.rBatch = 0; - comm->prod.localRead = 0; - comm->prod.nextWrite = 0; - comm->prod.wBatch = 0; - return comm; + channel->ctrl.read = 0; + channel->ctrl.write = 0; + channel->cons.localWrite = 0; + channel->cons.nextRead = 0; + channel->cons.rBatch = 0; + channel->prod.localRead = 0; + channel->prod.nextWrite = 0; + channel->prod.wBatch = 0; + return channel; } else - free(comm); + free(channel); } return NULL; } -int destroy_comm_channel(void *comm) +int destroy_comm_channel(void *channel) { - free((void *) ((struct comm *) comm)->shared_space); - free(comm); + free((void *) ((struct channel *) channel)->shared_space); + free(channel); return 0; } -int init_producer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_producer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -int init_consumer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_consumer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { void *result; while (1) { - if (comm->cons.nextRead == comm->cons.localWrite) + if (channel->cons.nextRead == channel->cons.localWrite) { - if (comm->cons.nextRead == comm->ctrl.write) + if (channel->cons.nextRead == channel->ctrl.write) continue; - comm->cons.localWrite = comm->ctrl.write; + channel->cons.localWrite = channel->ctrl.write; } - result = comm->shared_space[comm->cons.nextRead]; - comm->cons.nextRead = (comm->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR; - comm->cons.rBatch++; - if (comm->cons.rBatch >= batchSize) + result = channel->shared_space[channel->cons.nextRead]; + channel->cons.nextRead = (channel->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR; + channel->cons.rBatch++; + if (channel->cons.rBatch >= batchSize) { - comm->ctrl.read = comm->cons.nextRead; - comm->cons.rBatch = 0; + channel->ctrl.read = channel->cons.nextRead; + channel->cons.rBatch = 0; } break; } return result; } -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int n; for(n = 0; n < count; n++) { - if (comm->cons.nextRead == comm->cons.localWrite) + if (channel->cons.nextRead == channel->cons.localWrite) { - if (comm->cons.nextRead == comm->ctrl.write) + if (channel->cons.nextRead == channel->ctrl.write) break; - comm->cons.localWrite = comm->ctrl.write; + channel->cons.localWrite = channel->ctrl.write; } /* * The behaviour of this is not documented but we know * the values inside buf won't change during this affectation */ - *buf++ = comm->shared_space[comm->cons.nextRead]; - comm->cons.nextRead = (comm->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR; - comm->cons.rBatch++; - if (comm->cons.rBatch >= batchSize) + *buf++ = channel->shared_space[channel->cons.nextRead]; + channel->cons.nextRead = (channel->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR; + channel->cons.rBatch++; + if (channel->cons.rBatch >= batchSize) { - comm->ctrl.read = comm->cons.nextRead; - comm->cons.rBatch = 0; + channel->ctrl.read = channel->cons.nextRead; + channel->cons.rBatch = 0; } } return n; diff --git a/communication_techniques/src/communication/none.c b/communication_techniques/src/communication/none.c index 9a585c7..0f82058 100644 --- a/communication_techniques/src/communication/none.c +++ b/communication_techniques/src/communication/none.c @@ -11,16 +11,6 @@ __thread void ** volatile store_var = NULL; -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { return (void *) &store_var; @@ -31,26 +21,6 @@ int destroy_comm_channel(void *unused) return 0; } -int init_producer_thread(void *unused) -{ - return 0; -} - -int finalize_producer_thread(void *unused) -{ - return 0; -} - -int init_consumer_thread(void *unused) -{ - return 0; -} - -int finalize_consumer_thread(void *unused) -{ - return 0; -} - /* * Copy at max count received data into buf * @param buf The buffer in which received data must be copied into @@ -59,7 +29,7 @@ int finalize_consumer_thread(void *unused) * @warning recv_one_data should not be used in conjonction of * recv_some_data */ -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { return NULL; } @@ -73,7 +43,7 @@ void *recv_one_data(void) * recv_one_data * @warning count must be a multiple of BUF_SIZE */ -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { return count; } diff --git a/communication_techniques/src/communication/pipe.c b/communication_techniques/src/communication/pipe.c index 50dfb02..266cc56 100644 --- a/communication_techniques/src/communication/pipe.c +++ b/communication_techniques/src/communication/pipe.c @@ -10,69 +10,33 @@ #include -__thread struct comm *comm; - -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { - struct comm *comm; + struct channel *channel; int flags; - comm = malloc(sizeof(comm)); - if (comm != NULL) + channel = malloc(sizeof(channel)); + if (channel != NULL) { - if (!pipe(comm->pipefd)) + if (!pipe(channel->pipefd)) { - flags = fcntl(comm->pipefd[READ_IDX], F_GETFL); - fcntl(comm->pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK); - return comm; + flags = fcntl(channel->pipefd[READ_IDX], F_GETFL); + fcntl(channel->pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK); + return channel; } else - free(comm); + free(channel); } return NULL; } -int destroy_comm_channel(void *comm) +int destroy_comm_channel(void *channel) { - free(comm); + free(channel); return 0; } -int init_producer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_producer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -int init_consumer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_consumer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { void *result, **res_ptr; int n, nb_read; @@ -81,7 +45,7 @@ void *recv_one_data(void) res_ptr = &result; do { - n = read(comm->pipefd[READ_IDX], res_ptr, sizeof(void *)); + n = read(channel->pipefd[READ_IDX], res_ptr, sizeof(void *)); if (n > 0) { nb_read += n; @@ -91,18 +55,18 @@ void *recv_one_data(void) return result; } -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int n, nb_read, nb_bytes; nb_bytes = count * sizeof(void *); - nb_read = read(comm->pipefd[READ_IDX], buf, nb_bytes); + nb_read = read(channel->pipefd[READ_IDX], buf, nb_bytes); if (nb_read <= 0) return 0; buf = (void **) ((uintptr_t) buf + nb_read); while (nb_read % sizeof(void *)) { - n = read(comm->pipefd[READ_IDX], buf, sizeof(void *) - (nb_read % sizeof(void *))); + n = read(channel->pipefd[READ_IDX], buf, sizeof(void *) - (nb_read % sizeof(void *))); if (n > 0) { nb_read += n; diff --git a/communication_techniques/src/communication/shared_mem_opt.c b/communication_techniques/src/communication/shared_mem_opt.c index debc834..4323086 100644 --- a/communication_techniques/src/communication/shared_mem_opt.c +++ b/communication_techniques/src/communication/shared_mem_opt.c @@ -8,110 +8,74 @@ #include -__thread struct comm *comm; - -int init_library(void) -{ - return 0; -} - -int finalize_library(void) -{ - return 0; -} - void *create_comm_channel(void) { - struct comm *comm; + struct channel *channel; - if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + if (!posix_memalign((void *) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) { - if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) + if (!posix_memalign((void *) &channel->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) { - comm->cons_idx = 0; - comm->prod_idx = 0; - return comm; + channel->cons_idx = 0; + channel->prod_idx = 0; + return channel; } else - free(comm); + free(channel); } return NULL; } -int destroy_comm_channel(void *comm) +int destroy_comm_channel(void *channel) { - free((void *) ((struct comm *) comm)->shared_space); - free(comm); + free((void *) ((struct channel *) channel)->shared_space); + free(channel); return 0; } -int init_producer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_producer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -int init_consumer_thread(void *comm_param) -{ - comm = (struct comm *) comm_param; - return 0; -} - -int finalize_consumer_thread(void *unused) -{ - comm = NULL; - return 0; -} - -void *recv_one_data(void) +void *recv_one_data(struct channel *channel) { void *result; int cons_idx, prod_idx; - cons_idx = comm->cons_idx; - prod_idx = comm->prod_idx; + cons_idx = channel->cons_idx; + prod_idx = channel->prod_idx; if (cons_idx == prod_idx) - while(prod_idx == comm->prod_idx); + while(prod_idx == channel->prod_idx); /* * The behaviour of this is not documented but we know the * values inside shared_space won't change during this * affectation */ - result = comm->shared_space[cons_idx]; + result = channel->shared_space[cons_idx]; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR; - comm->cons_idx = cons_idx; + channel->cons_idx = cons_idx; return result; } -ssize_t recv_some_data(void **buf, size_t count) +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int n, cons_idx, prod_idx; n = 0; - cons_idx = comm->cons_idx; + cons_idx = channel->cons_idx; do { - prod_idx = comm->prod_idx; + prod_idx = channel->prod_idx; for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR) { /* * The behaviour of this is not documented but we know * the values inside buf won't change during this affectation */ - *buf++ = comm->shared_space[cons_idx]; + *buf++ = channel->shared_space[cons_idx]; if (++n == count) { - comm->cons_idx = cons_idx; + channel->cons_idx = cons_idx; return n; } } - } while (prod_idx != comm->prod_idx); - comm->cons_idx = cons_idx; + } while (prod_idx != channel->prod_idx); + channel->cons_idx = cons_idx; return n; } diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c index 15d2114..03509fc 100644 --- a/communication_techniques/src/main.c +++ b/communication_techniques/src/main.c @@ -22,6 +22,16 @@ #define toString(x) doStringification(x) #define doStringification(x) #x #define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t)) +#define PROD 1 +#define CONS 2 + +typedef struct prod_cons_thread +{ + void *prod_comm_channel; + void *cons_comm_channel; + int flags; // PROD, CONS or both + int cpu_binding; // id of the CPU to run the thread on +} prod_cons_thread_t; typedef int inc_check_t; @@ -36,18 +46,20 @@ pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER; static int init_calc_arg = 0; static int block_reception = 1; +static int nb_nodes = 2; // Nb of nodes participating to the chain of pipelines static int check_recv_match_send = 0; -static int page_size = 0; static uintptr_t single_prod_check_val; // /!\ Implies only one real producer static inc_check_t *single_prod_check_ctxt; // /!\ Implies only one real producer +static int nb_cpus = 4; // TOFIX: don't hardcode this +static int page_size = 0; void usage(char *argv[]) { char format[] = "-n -p [options]"; - char options[] = "Required options :\n" + char options[] = "Required options:\n" "-n nb_buffer_sent\t\tNumber of buffer to send to another core\n" "\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n" - "Facultative options :\n" + "Facultative options:\n" "-b\t\t\t\tReceive the biggest amount of data available (The default)\n" "-c calculation_libname arg\tLibrary to use for calculation with its argument\n" "\t\t\t\tThis library must implement functions in calc.h\n" @@ -59,7 +71,8 @@ void usage(char *argv[]) "\t\t\t\tIf level is:\n" "\t\t\t\t\t> 0, then the same L must be shared\n" "\t\t\t\t\t< 0, then different L must be used\n" - "\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n"; + "\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n" + "-t\t\t\tnb_nodes\t\tNumber of nodes in the pipeline chain\n"; printf("Usage : %s %s\n", argv[0], format); printf("Options :\n"); printf("%s\n", options); @@ -195,6 +208,22 @@ int analyse_options(int argc, char *argv[]) case 'k' : check_recv_match_send = 1; break; + case 'l' : + { + char *inval; + nb_nodes = strtol(optarg, &inval, 10); + if ((*optarg == '\0') || (*inval != '\0')) + { + fprintf(stderr, "Option '-l' needs an integer argument\n"); + return -1; + } + if ((nb_nodes < 2) || ((nb_nodes == LONG_MAX) && errno == ERANGE)) + { + fprintf(stderr, "Number of links to participate in the pipeline chain must be between 2 and %ld, both inclusive\n", LONG_MAX); + return -1; + } + } + break; case 'n' : { char *inval; @@ -314,69 +343,27 @@ int analyse_options(int argc, char *argv[]) return 0; } -void *producer(void *channel) +int producer(void *prod_channel) { int i, j; - if (init_producer_thread(channel)) - { - fprintf(stderr, "Initialization of thread has failed\n"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ - } - if (shared) - { - pthread_t tid; - cpu_set_t cpuset; - - tid = pthread_self(); - CPU_ZERO(&cpuset); - CPU_SET(1, &cpuset); - if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) - { - perror("pthread_setaffinity_np"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ - } - } - else - { - pthread_t tid; - cpu_set_t cpuset; - - tid = pthread_self(); - CPU_ZERO(&cpuset); - CPU_SET(2, &cpuset); - if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) - { - perror("pthread_setaffinity_np"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ - } - } if (init_calc(init_calc_arg)) { fprintf(stderr, "Initialization of calculation has failed\n"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + return 1; } for(i = 0; i < nb_bufs_sent; i++) { //printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE); for(j = 0; j < WORDS_PER_LINE; j++) - send(do_calc()); + send(prod_channel, do_calc()); } if (end_calc()) { fprintf(stderr, "uninitialization of calculation has failed\n"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + return 1; } printf("[%p] Producer finished !\n", (void*) pthread_self()); - /* - * When a producer end its thread-local storage vanished. Thus, - * producers must finish only after consumer has stopped using them - */ - if (finalize_producer_thread(channel)) - { - fprintf(stderr, "Finalization of thread has failed\n"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ - } - return NULL; + return 0; } void on_message(void *val) @@ -384,37 +371,17 @@ void on_message(void *val) //printf("Receive value: %p\n", (void *) val); } -void *consumer(void *channel) +int consumer(void *cons_channel) { int delayed_error; uintptr_t cons_check_value; inc_check_t *cons_check_context; delayed_error = 0; - if (shared) - { - pthread_t tid; - cpu_set_t cpuset; - - tid = pthread_self(); - CPU_ZERO(&cpuset); - CPU_SET(0, &cpuset); - if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) - { - perror("pthread_setaffinity_np"); - return NULL; - } - } - if (init_consumer_thread(channel)) - { - fprintf(stderr, "Initialization of thread has failed\n"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ - } if (inc_check_init(init_calc_arg, &cons_check_context)) { fprintf(stderr, "Initialization of check has failed\n"); - finalize_consumer_thread(channel); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + return -1; /* &page_size can't be NULL, whatever NULL is bound to */ } cons_check_value = init_calc_arg; if (block_reception) @@ -427,7 +394,7 @@ void *consumer(void *channel) int i; ssize_t nb_data_received; - nb_data_received = recv_some_data(data_buf, MAX_BLOCK_ENTRIES); + nb_data_received = recv_some_data(cons_channel, data_buf, MAX_BLOCK_ENTRIES); total_data_received += nb_data_received; for (i = 0; i < nb_data_received; i++) { @@ -457,12 +424,11 @@ void *consumer(void *channel) int i, j; for(i = 0; i < nb_bufs_sent; i++) { - //printf("[%p] About to receive %d new cache line%s\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE, (BUF_SIZE / CACHE_LINE_SIZE > 1) ? "s" : ""); for(j = 0; j < WORDS_PER_LINE; j++) { void *data; - data = recv_one_data(); + data = recv_one_data(cons_channel); if (inc_check_next(cons_check_context, &cons_check_value)) { if (!delayed_error) @@ -480,31 +446,92 @@ void *consumer(void *channel) } } on_message(data); + //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : ""); } } } - if (inc_check_end(cons_check_context)) - { - fprintf(stderr, "Finalization of check has failed\n"); - finalize_consumer_thread(channel); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ - } - if (finalize_consumer_thread(channel)) - { - fprintf(stderr, "Finalization of thread has failed\n"); - return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ - } + printf("[%p] Consumer finished !\n", (void*) pthread_self()); if (delayed_error) + return -1; + return 0; +} + +int consprod(void *cons_channel, void *prod_channel) +{ + if (block_reception) + { + long long total_data_received = 0; + void *data_buf[MAX_BLOCK_ENTRIES]; + + while (total_data_received < nb_bufs_sent * WORDS_PER_LINE) + { + int i; + ssize_t nb_data_received; + + nb_data_received = recv_some_data(cons_channel, data_buf, MAX_BLOCK_ENTRIES); + total_data_received += nb_data_received; + for (i = 0; i < nb_data_received; i++) + send(prod_channel, data_buf[i]); + //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : ""); + } + } + else + { + int i, j; + + for(i = 0; i < nb_bufs_sent; i++) { + for(j = 0; j < WORDS_PER_LINE; j++) + send(prod_channel, recv_one_data(cons_channel)); + //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : ""); + } + } + printf("[%p] Producer/consumer finished !\n", (void*) pthread_self()); + return 0; +} + +void *node(prod_cons_thread_t *thread_params) +{ + int return_value; + pthread_t tid; + cpu_set_t cpuset; + + tid = pthread_self(); + CPU_ZERO(&cpuset); + CPU_SET(thread_params->cpu_binding, &cpuset); + if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) + { + perror("pthread_setaffinity_np"); + return NULL; + } + switch (thread_params->flags & (PROD | CONS)) + { + case PROD: + return_value = producer(thread_params->prod_comm_channel); + break; + + case CONS: + return_value = consumer(thread_params->cons_comm_channel); + break; + + case (PROD | CONS): + return_value = consprod(thread_params->cons_comm_channel, + thread_params->prod_comm_channel); + break; + + default: + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + } + if (return_value) return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ return NULL; } int main(int argc, char *argv[]) { - pthread_t tid[2]; - int return_value; + pthread_t *tids; + int i, return_value; void *pthread_return_value; - void *channel; + prod_cons_thread_t *thread_params; return_value = EXIT_SUCCESS; if (analyse_options(argc, argv)) @@ -512,25 +539,59 @@ int main(int argc, char *argv[]) page_size = sysconf(_SC_PAGE_SIZE); if (page_size <= 0) return EXIT_FAILURE; - if (init_library()) + thread_params = malloc(nb_nodes * sizeof(prod_cons_thread_t)); + if (thread_params == NULL) return EXIT_FAILURE; - channel = create_comm_channel(); - if (channel != NULL) + tids = malloc(nb_nodes * sizeof(pthread_t)); + if (tids == NULL) { - pthread_create(&tid[0], NULL, producer, channel); - pthread_create(&tid[1], NULL, consumer, channel); - pthread_join(tid[0], &pthread_return_value); - if (pthread_return_value != NULL) + return_value = EXIT_FAILURE; + goto error_alloc_tids; + } + for (i = 0; i < nb_nodes - 1; i++) + { + if (i) + thread_params[i].flags = PROD | CONS; + else + thread_params[i].flags = PROD; + // Should work in most cases + if (shared) + thread_params[i].cpu_binding = i % nb_cpus; + else + thread_params[i].cpu_binding = (2 * i) % nb_cpus; + thread_params[i].prod_comm_channel = create_comm_channel(); + if (thread_params[i].prod_comm_channel == NULL) + { return_value = EXIT_FAILURE; - pthread_join(tid[1], &pthread_return_value); + goto error_create_channels; + } + if (i) + thread_params[i].cons_comm_channel = + thread_params[i - 1].prod_comm_channel; + pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]); + } + thread_params[i].flags = CONS; + if (shared) + thread_params[i].cpu_binding = i % nb_cpus; + else + thread_params[i].cpu_binding = (2 * i) % nb_cpus; + thread_params[i].cons_comm_channel = + thread_params[i - 1].prod_comm_channel; + pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]); + for (i = 0; i < nb_nodes; i++) + { + pthread_join(tids[i], &pthread_return_value); if (pthread_return_value != NULL) return_value = EXIT_FAILURE; } - else - return_value = EXIT_FAILURE; - if (destroy_comm_channel(channel)) - return_value = EXIT_FAILURE; - if (finalize_library()) - return EXIT_FAILURE; + i--; +error_create_channels: + for (i-- ; i >= 0; i--) { + if (destroy_comm_channel(thread_params[i].prod_comm_channel)) + return_value = EXIT_FAILURE; + } + free(tids); +error_alloc_tids: + free(thread_params); return return_value; }