diff --git a/communication_techniques/Makefile b/communication_techniques/Makefile index d14a4b1..b66efa7 100644 --- a/communication_techniques/Makefile +++ b/communication_techniques/Makefile @@ -7,37 +7,25 @@ LIBDIR:=lib LOGDIR:=logs LOCALDIR:=$(HOME)/local -# PHL stands for PAPI High Level -PHL_LIB_INSTALLDIR:=lib -PHL_INC_INSTALLDIR:=lib -PHL_LIBDIR:=lib -PHL_INCDIR:=include -PHLDIR:=../papihighlevel -PAPI_LIB_INSTALLDIR:=lib -PAPIDIR:=$(PHLDIR)/papi -PAPI_LIBDIR:=src -PAPI_PFMDIR:=$(PAPI_LIBDIR)/libpfm-3.y/lib CALCDIR:=calculation COMMDIR:=communication # Compilation flags # I know -finline-functions and -finline-functions-called-once are enabled by # -O3 but I did this in case gcc behaviour change one day -CFLAGS:=-c -O3 -finline-functions -finline-functions-called-once -Wall -Werror -LDFLAGS:=-L$(LIBDIR) -L$(LOCALDIR)/$(PHL_LIB_INSTALLDIR) -L$(PHLDIR)/$(PHL_LIBDIR) -LDFLAGS:=$(LDFLAGS) -Wl,-rpath-link,$(LOCALDIR)/$(PAPI_LIB_INSTALLDIR):$(PAPIDIR)/$(PAPI_LIBDIR):$(PAPIDIR)/$(PAPI_PFMDIR) -LDFLAGS:=$(LDFLAGS) -lpthread -lpapihighlevel -ldl +CFLAGS:=-c -g -O3 -finline-functions -finline-functions-called-once -Wall -Werror +LDFLAGS:=-L$(LIBDIR) -lpthread -ldl # Executables CC=gcc # Files -BINNAMES:=asm_cache_comm c_cache_comm pipe_comm shared_mem_comm shared_mem_opt_comm jikes_barrier_comm fake_comm +BINNAMES:=c_cache_comm pipe_comm lamport_comm shared_mem_opt_comm fake_comm csq_comm fast_forward_comm mcringbuffer_comm #jikes_barrier_comm asm_cache_comm CALCLIBSNAMES:=calc_mat calc_useless_loop BINS:=$(patsubst %,$(BINDIR)/%,$(BINNAMES)) CALCLIBS:=$(patsubst %,$(LIBDIR)/$(CALCDIR)/lib%.so.1,$(CALCLIBSNAMES)) MAIN_OBJS:=main.o -COMMON_LIB_OBJS:=common.o +COMMON_LIB_OBJS:= .PHONY: all tidy clean distclean .SECONDARY: @@ -77,7 +65,7 @@ $(OBJDIR)/$(COMMDIR)/%.o: $(SRCDIR)/$(COMMDIR)/%.c $(INCDIR)/%_comm.h $(INCDIR)/ $(OBJDIR)/$(CALCDIR)/%.o: $(SRCDIR)/$(CALCDIR)/%.c if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi if [ ! -d $(OBJDIR)/$(CALCDIR) ] ; then mkdir $(OBJDIR)/$(CALCDIR) ; fi - $(CC) $(CFLAGS) -I$(INCDIR) $< -o $@ + $(CC) $(CFLAGS) -fPIC -I$(INCDIR) $< -o $@ #.%.d: %.c # gcc $(CFLAGS) -MM $^ | sed -e 's/\([^:]*\):\(.*\)/\1 $@: \2 Makefile/' > $@ @@ -119,4 +107,4 @@ $(OBJDIR)/%.o: $(SRCDIR)/$$(*F).c $(INCDIR)/$$(*D)_comm.h $(INCDIR)/commtech.h if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi if [ ! -d $(OBJDIR)/$(*D) ] ; then mkdir $(OBJDIR)/$(*D) ; fi cd $(INCDIR) ; ln -sfT $(*D)_comm.h specific_comm.h - $(CC) $(CFLAGS) -I$(INCDIR) -I$(LOCALDIR)/$(PHL_INCDIR) -I../$(PHLDIR)/$(PHL_INCDIR) $< -o $@ + $(CC) $(CFLAGS) -I$(INCDIR) $< -o $@ diff --git a/communication_techniques/include/asm_cache_comm.h b/communication_techniques/include/asm_cache_comm.h index 19a75ca..53c4aa1 100644 --- a/communication_techniques/include/asm_cache_comm.h +++ b/communication_techniques/include/asm_cache_comm.h @@ -10,32 +10,36 @@ __BEGIN_DECLS -struct comm_channel +struct channel { volatile void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE))); int idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; -struct thread_comm +struct cons { - struct comm_channel *channel; + struct channel *channel; int receiver_idx; - int unused; }; -extern struct thread_comm *tcomms; +union comm +{ + struct channel *channel; + struct cons *cons; +}; + +extern __thread union comm comm; + extern int swap_buffer; -int init_thread_comm(struct thread_comm *); -int end_thread_comm(void); static inline void send(void **addr) { asm volatile("mov %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) " + " toString(CACHE_LINE_SIZE) ", %%eax\n\t" - "mov %0, %%gs:channel@NTPOFF(%%eax)\n\t" + "mov %0, %%gs:cons.channel@NTPOFF(%%eax)\n\t" "addl $4, %%eax\n\t" "andl $(2*" toString(BUF_SIZE) "-1), %%eax\n\t" - "mov %%eax, %%gs:channel@NTPOFF + 2 * " toString(BUF_SIZE)" + " toString(CACHE_LINE_SIZE) "\n\t" + "mov %%eax, %%gs:cons.channel@NTPOFF + 2 * " toString(BUF_SIZE)" + " toString(CACHE_LINE_SIZE) "\n\t" "test $(" toString(BUF_SIZE) " - 1), %%eax\n\t" "mov $2f, %%eax\n\t" "jz swap_buffer\n\t" diff --git a/communication_techniques/include/c_cache_comm.h b/communication_techniques/include/c_cache_comm.h index c11ce07..1efba1c 100644 --- a/communication_techniques/include/c_cache_comm.h +++ b/communication_techniques/include/c_cache_comm.h @@ -8,34 +8,37 @@ #define toString(x) doStringification(x) #define doStringification(x) #x -struct comm_channel +struct channel { volatile void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE))); int idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; -struct thread_comm +struct cons { - struct comm_channel *channel; + struct channel *channel; int receiver_idx; }; -extern struct thread_comm *tcomms; -extern __thread struct comm_channel channel; +union comm +{ + struct channel *channel; + struct cons *cons; +}; + +extern __thread union comm comm; __BEGIN_DECLS -int init_thread_comm(struct thread_comm *); -int end_thread_comm(void); static inline void send(void **addr) { - channel.buf[channel.idx++] = addr; - channel.idx %= 2 * BUF_SIZE / sizeof(void *); - if (!(channel.idx % (BUF_SIZE / sizeof(void *)))) + comm.channel->buf[comm.channel->idx++] = addr; + comm.channel->idx %= 2 * (BUF_SIZE / sizeof(void *)); + if (!(comm.channel->idx % (BUF_SIZE / sizeof(void *)))) { - while (channel.state); - channel.state = 1; + while (comm.channel->state); + comm.channel->state = 1; } } diff --git a/communication_techniques/include/commtech.h b/communication_techniques/include/commtech.h index f0deb87..7c11ba1 100644 --- a/communication_techniques/include/commtech.h +++ b/communication_techniques/include/commtech.h @@ -7,16 +7,121 @@ #define likely(x) __builtin_expect(!!(x), 1) #define unlikely(x) __builtin_expect(!!(x), 0) -extern volatile int cont; -extern long nb_prod; - __BEGIN_DECLS +/* + * @return 0 if success, -1 else + * + * Initialize communication library. + * @comment Must be run before any other function of this library + */ int init_library(void); -int end_library(void); -int init_producer_thread(void); -int end_producer_thread(void); -void reception(void (*)(void *)); + + +/* + * @return 0 if success, -1 else + * + * Finalize communication library. + * @comment Must be run after any other function of this library + */ +int finalize_library(void); + + +/* + * @return a pointer on the channel if success, NULL else + * + * Create a communication channel. The return value is the address of + * the channel that has to be passed to init_producer_thread and + * init_consumer_thread respectively by the producer and consumer thread + * which want to communicate. + */ +void *create_comm_channel(void); + + +/* + * @param channel The channel to destroy + * @return 0 if success, -1 else + * + * Destroy a communication channel. + * + * @comment Must be called after producer and consumer stopped to + * communicate. + */ +int destroy_comm_channel(void *); + + +/* + * @param chan Address of the communication channel to attach to + * the producer calling this function. + * @return 0 on success, -1 else + * + * Initialize the producer and attach the given communication channel to + * it. + */ +int init_producer_thread(void *); + + +/* + * @param channel Address of the communication channel to detach from + * the producer calling this function + * @return 0 on success, -1 else + * + * Finalize the producer. + * @comment Must be run by the producer after it stopped to communicate + * with the consumer. + */ +int finalize_producer_thread(void *); + + +/* + * @param channel Address of the communication channel to attach to + * the consumer calling this function. + * @return 0 on success, -1 else + * + * Initialize the consumer and attach the given communication channel to + * it. + */ +int init_consumer_thread(void *); + + +/* + * @param channel Address of the communication channel to detach from + * the consumer calling this function + * @return 0 on success, -1 else + * + * Finalize the consumer. + * + * @comment Must be run by the consumer after it stopped to communicate + * with the consumer. + */ +int finalize_consumer_thread(void *); + + +/* + * @return a data sent by the matching producer + * + * Wait until a data sent by the matching producer is available + * + * @comment recv_one_data should not be used in conjonction of + * recv_some_data + */ +void *recv_one_data(void); + + + +/* + * @param buf The buffer to write received data into + * @param count The maximum number of data to copy into buf + * @return Number of data copied into buf + * + * Wait until some data sent by the matching producer is available and + * copy as much data as possible into buf, with a maximum of count. + * + * @commentrecv_some_data should not be used in conjonction of recv_one_data + * @comment count must be a multiple of BUF_SIZE / sizeof(void *) which is + * equal to SUB_SLOTS + */ +ssize_t recv_some_data(void **, size_t); __END_DECLS diff --git a/communication_techniques/include/csq_comm.h b/communication_techniques/include/csq_comm.h new file mode 100644 index 0000000..b37ef91 --- /dev/null +++ b/communication_techniques/include/csq_comm.h @@ -0,0 +1,53 @@ +#ifndef _SPECIFIC_COMM_H_ +#define _SPECIFIC_COMM_H_ 1 + +/* Non standard include */ +#include + +#define SLOTS 64 /* Value used in the article, section V.A */ +#define SUB_SLOTS (BUF_SIZE / sizeof(void *)) + +struct lvl_2 +{ + volatile void *chunk[SUB_SLOTS]; + volatile unsigned int flag : 1; +} __attribute__ ((aligned (CACHE_LINE_SIZE))); + +struct comm +{ + struct lvl_2 queue[SLOTS] __attribute__ ((aligned (CACHE_LINE_SIZE))); +}; + +union ctrl +{ + int head; + int tail; +}; + +__BEGIN_DECLS + +extern __thread struct comm *comm; +extern __thread union ctrl ctrl __attribute__ ((aligned (CACHE_LINE_SIZE))); + +// TODO: Make it send only one data +static inline void send(void **addr) +{ + static __thread int chkidx = 0; + + // If all slots are full, spin + if (!chkidx) + while (comm->queue[ctrl.tail].flag); + + // Enqueue a data item + comm->queue[ctrl.tail].chunk[chkidx++] = addr; + if (!(chkidx % SUB_SLOTS)) + { + chkidx = 0; + comm->queue[ctrl.tail].flag = 1; + ctrl.tail = (ctrl.tail + 1) % SLOTS; + } +} + +__END_DECLS + +#endif diff --git a/communication_techniques/include/fake_comm.h b/communication_techniques/include/fake_comm.h index ac9f3c2..21dd1a3 100644 --- a/communication_techniques/include/fake_comm.h +++ b/communication_techniques/include/fake_comm.h @@ -3,19 +3,12 @@ #define FAKE_NURSERY_START 1 -struct thread_comm -{ - int unused; -}; - -extern struct thread_comm *tcomms; extern __thread void ** volatile store_var; __BEGIN_DECLS -int init_thread_comm(struct thread_comm *); -int end_thread_comm(void); -static inline void send(void **addr) { +static inline void send(void **addr) +{ store_var = addr; } diff --git a/communication_techniques/include/fast_forward_comm.h b/communication_techniques/include/fast_forward_comm.h new file mode 100644 index 0000000..cd5095a --- /dev/null +++ b/communication_techniques/include/fast_forward_comm.h @@ -0,0 +1,50 @@ +#ifndef _SPECIFIC_COMM_H_ +#define _SPECIFIC_COMM_H_ 1 + +/* Non standard include */ +#include +#include + +#define SHARED_SPACE_SIZE (16 * BUF_SIZE) +#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *)) + +#define DANGER (2 * BUF_SIZE / sizeof(void *)) +#define GOOD (6 * BUF_SIZE / sizeof(void *)) +#define ADJUST_FREQ 64 + +struct comm +{ + void * volatile *shared_space; + int head; + int tail; +}; + +__BEGIN_DECLS + +extern __thread struct comm *comm; + +extern int adjust_slip(void); + +static inline void send(void **addr) +{ + static __thread int nb_iter = 0; + + assert(addr != NULL); + if (nb_iter == ADJUST_FREQ) + { + adjust_slip(); + nb_iter = 0; + } + while (1) + { + if (comm->shared_space[comm->head] != NULL) + continue; + comm->shared_space[comm->head] = addr; + comm->head = (comm->head + 1) % SHARED_SPACE_VOIDPTR; + break; + } +} + +__END_DECLS + +#endif diff --git a/communication_techniques/include/lamport_comm.h b/communication_techniques/include/lamport_comm.h new file mode 100644 index 0000000..7fdc277 --- /dev/null +++ b/communication_techniques/include/lamport_comm.h @@ -0,0 +1,30 @@ +#ifndef _SPECIFIC_COMM_H_ +#define _SPECIFIC_COMM_H_ 1 + +/* Non standard include */ +#include + +#define SHARED_SPACE_SIZE (2 * BUF_SIZE) +#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *)) + +struct comm +{ + void * volatile *shared_space; + volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); +}; + +__BEGIN_DECLS + +extern __thread struct comm *comm; + +static inline void send(void **addr) +{ + while ((comm->prod_idx + 1) % SHARED_SPACE_VOIDPTR == comm->cons_idx); + comm->shared_space[comm->prod_idx] = addr; + comm->prod_idx = (comm->prod_idx + 1) % SHARED_SPACE_VOIDPTR; +} + +__END_DECLS + +#endif diff --git a/communication_techniques/include/mcringbuffer_comm.h b/communication_techniques/include/mcringbuffer_comm.h new file mode 100644 index 0000000..70cd60e --- /dev/null +++ b/communication_techniques/include/mcringbuffer_comm.h @@ -0,0 +1,69 @@ +#ifndef _SPECIFIC_COMM_H_ +#define _SPECIFIC_COMM_H_ 1 + +/* Non standard include */ +#include + +#define SHARED_SPACE_SIZE (2 * BUF_SIZE) +#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *)) + +struct control +{ + volatile int read; + volatile int write; +}; + +struct cons +{ + int localWrite; + int nextRead; + int rBatch; +}; + +struct prod +{ + int localRead; + int nextWrite; + int wBatch; +}; + + +struct comm +{ + struct control ctrl __attribute__ ((aligned (CACHE_LINE_SIZE))); + struct prod prod __attribute__ ((aligned (CACHE_LINE_SIZE))); + struct cons cons __attribute__ ((aligned (CACHE_LINE_SIZE))); + void * volatile *shared_space __attribute__ ((aligned (CACHE_LINE_SIZE))); // Align only to isolate cons on its cache line +}; + +__BEGIN_DECLS + +extern __thread struct comm *comm; +extern const int batchSize; + +static inline void send(void **addr) +{ + while (1) + { + int afterNextWrite = (comm->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR; + if (afterNextWrite == comm->prod.localRead) + { + if (afterNextWrite == comm->ctrl.read) + continue; + comm->prod.localRead = comm->ctrl.read; + } + comm->shared_space[comm->prod.nextWrite] = addr; + comm->prod.nextWrite = afterNextWrite; + comm->prod.wBatch++; + if (comm->prod.wBatch >= batchSize) + { + comm->ctrl.write = comm->prod.nextWrite; + comm->prod.wBatch = 0; + } + break; + } +} + +__END_DECLS + +#endif diff --git a/communication_techniques/include/pipe_comm.h b/communication_techniques/include/pipe_comm.h index f5e4150..69b2719 100644 --- a/communication_techniques/include/pipe_comm.h +++ b/communication_techniques/include/pipe_comm.h @@ -6,20 +6,33 @@ #define READ_IDX 0 #define WRITE_IDX 1 -struct thread_comm +struct comm { - int *pipefd; + int pipefd[2]; }; __BEGIN_DECLS -extern __thread int pipefd[]; -extern struct thread_comm *tcomms; +extern __thread struct comm *comm; -int init_thread_comm(struct thread_comm *); -int end_thread_comm(void); -static inline void send(void **addr) { - write(pipefd[WRITE_IDX], &addr, sizeof(void *)); +static inline void send(void **addr) +{ + int nb_read; + void *addr_ptr; + + nb_read = 0; + addr_ptr = &addr; + do + { + int n; + + n = write(comm->pipefd[WRITE_IDX], addr_ptr, sizeof(void *) - nb_read); + if (n > 0) + { + nb_read += n; + addr_ptr = (void *) ((uintptr_t) addr_ptr + n); + } + } while (nb_read < sizeof(void *)); } __END_DECLS diff --git a/communication_techniques/include/private_common.h b/communication_techniques/include/private_common.h deleted file mode 100644 index ab27b64..0000000 --- a/communication_techniques/include/private_common.h +++ /dev/null @@ -1,10 +0,0 @@ -#ifndef _PRIVATE_COMMON_H_ -#define _PRIVATE_COMMON_H_ 1 - -__BEGIN_DECLS - -void wait_initialization(void); - -__END_DECLS - -#endif diff --git a/communication_techniques/include/shared_mem_comm.h b/communication_techniques/include/shared_mem_comm.h deleted file mode 100644 index 0bee7d7..0000000 --- a/communication_techniques/include/shared_mem_comm.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef _SPECIFIC_COMM_H_ -#define _SPECIFIC_COMM_H_ 1 - -/* Non standard include */ -#include - -#define SHARED_SPACE_SIZE (2 * BUF_SIZE) -#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *)) - -struct thread_comm -{ - volatile void **shared_space; - volatile int *cons_idx; - volatile int *prod_idx; -}; - -extern struct thread_comm *tcomms; - -__BEGIN_DECLS - -extern __thread volatile void **shared_space; -extern __thread volatile int prod_idx; -extern __thread volatile int cons_idx; - -int init_thread_comm(struct thread_comm *); -int end_thread_comm(void); -static inline void send(void **addr) { - while ((prod_idx + 1) % SHARED_SPACE_VOIDPTR == cons_idx); - shared_space[prod_idx] = addr; - prod_idx = (prod_idx + 1) % SHARED_SPACE_VOIDPTR; -} - -__END_DECLS - -#endif diff --git a/communication_techniques/include/shared_mem_opt_comm.h b/communication_techniques/include/shared_mem_opt_comm.h index 5184c73..ac10518 100644 --- a/communication_techniques/include/shared_mem_opt_comm.h +++ b/communication_techniques/include/shared_mem_opt_comm.h @@ -7,33 +7,32 @@ #define SHARED_SPACE_SIZE (2 * BUF_SIZE) #define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *)) -struct thread_comm +struct comm { - volatile void **shared_space; - volatile int *cons_idx; - volatile int *prod_idx; + void * volatile *shared_space; + volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; -extern struct thread_comm *tcomms; __BEGIN_DECLS -extern __thread volatile void **shared_space; -extern __thread volatile int prod_idx; -extern __thread volatile int cons_idx; +extern __thread struct comm *comm; -int init_thread_comm(struct thread_comm *); -int end_thread_comm(void); -static inline void send(void **addr) { +static inline void send(void **addr) +{ static __thread int local_cons_idx = 0; + int local_prod, next_prod; - if (likely(((prod_idx + 1) % SHARED_SPACE_VOIDPTR) == local_cons_idx)) + local_prod = comm->prod_idx; + next_prod = (local_prod + 1) % SHARED_SPACE_VOIDPTR; + if (next_prod == local_cons_idx) { - while (((prod_idx + 1) % SHARED_SPACE_VOIDPTR) == cons_idx); - local_cons_idx = cons_idx; + while (next_prod == comm->cons_idx); + local_cons_idx = comm->cons_idx; } - shared_space[prod_idx] = addr; - prod_idx = (prod_idx + 1) % SHARED_SPACE_VOIDPTR; + comm->shared_space[local_prod] = addr; + comm->prod_idx = next_prod; } __END_DECLS diff --git a/communication_techniques/lancement.sh b/communication_techniques/lancement.sh index 5954686..8dbcfd5 100755 --- a/communication_techniques/lancement.sh +++ b/communication_techniques/lancement.sh @@ -1,21 +1,18 @@ #! /bin/bash set -u +set -v # Files and directories binDir="bin" calcDir="calculation" -PHLDir="../papihighlevel" -PHLLibDir="lib" -PAPIDir="${PHLDir}/papi" -PAPILibDir="src" -PAPIPFMDir="libpfm-3.y/lib" # Param -binList="asm_cache_comm c_cache_comm pipe_comm shared_mem_comm shared_mem_opt_comm jikes_barrier_comm fake_comm" # Type de communication +binList="$(ls -1 "${binDir}"| sed '$!s/$/ /' | tr -d '\n')" nbProdList="1" # Nombre de cores producteurs typeProdList="none matrice useless_loop" # Methode pour produire les valeurs typeCacheList="L2 Memory" # Niveau de cache partage +perfOpt="stat -e cycles -e L1-dcache-loads -e L1-dcache-stores -e LLC-load-misses -e LLC-store-misses" # Const nbIter="5000000" # Nb de lignes produites @@ -26,42 +23,6 @@ logFileName="\$perfDirName/cache_\$typeCache-nbProd_\$nbProd-typeProd_\$typeProd expDirName="logs" perfDirName="$expDirName/perfCommMulti-`date +'%F-%Hh%Mm%S'`" -LD_LIBRARY_PATH="${LD_LIBRARY_PATH:-}${LD_LIBRARY_PATH:+:}${PHLDir}/${PHLLibDir}:${PAPIDir}/${PAPILibDir}:${PAPIDir}/${PAPILibDir}/${PAPIPFMDir}" - -PAPILibPresent="" -PFMLibPresent="" -papiHighLevelLibPresent="" -LD_LIBRARY_PATH_LEFT="${LD_LIBRARY_PATH}:" -while [ -n "${LD_LIBRARY_PATH_LEFT}" ] -do - aLibDir="${LD_LIBRARY_PATH_LEFT%%:*}" - if [ -x ${aLibDir}/libpapi.so.3 ] - then - PAPILibPresent="1"; - fi - if [ -x ${aLibDir}/libpfm.so.3 ] - then - PHLLibPresent="1"; - fi - if [ -x ${aLibDir}/libpapihighlevel.so ] - then - PHLLibPresent="1"; - fi - if [ -n "${PAPILibPresent}" -a -n "${PHLLibPresent}" -a -n "${PFMLibPresent}" ] - then - break - fi - LD_LIBRARY_PATH_LEFT="${LD_LIBRARY_PATH_LEFT#*:}" -done -if [ -z "${PAPILibPresent}" -o -z "${PHLLibPresent}" ] -then - echo "Libraries needed for this bench not accessible by \$LD_LIBRARY_PATH" > /dev/stderr # Is there a better way to display something on stderr ? - exit 1 -fi - -unset PAPILibPresent -unset PHLLibPresent -unset LD_LIBRARY_PATH_LEFT export LD_LIBRARY_PATH [ -d "$expDirName" ] || mkdir "$expDirName" @@ -81,10 +42,10 @@ function_run () { esac make $binDir/$bin - echo "On lance : \"${bin##*/} $optTypeCache $optTypeProd -p $nbProd -n $nbIter\"" + echo "On lance : \"perf $perfOpt $binDir/$bin $optTypeCache $optTypeProd -n $nbIter\"" beginingDate=`date +%s` - ( $binDir/$bin $optTypeCache $optTypeProd -p $nbProd -n $nbIter || echo "echec experience" ) | eval tee $logFileName + ( perf $perfOpt $binDir/$bin $optTypeCache $optTypeProd -n $nbIter 2>&1 || echo "echec experience" ) | eval tee $logFileName endDate=`date +%s` duration_sec=`expr \( $endDate - $beginingDate \) % 60` duration_min=`expr \( $endDate - $beginingDate \) / 60` diff --git a/communication_techniques/parsing.sh b/communication_techniques/parsing.sh index bd5e618..320a12e 100755 --- a/communication_techniques/parsing.sh +++ b/communication_techniques/parsing.sh @@ -4,23 +4,48 @@ set -u init_bench_vars () { - local - + local ourBenchs onlyWBBenchs comList com - - #barriereList="jikes_barrier asm_cache c_cache fake" - barriereList="jikes_barrier asm_cache c_cache" - communicationList="asm_cache c_cache shared_mem shared_mem_opt pipe" + ourBenchs="asm_cache|c_cache" + onlyWBBenchs="jikes_barrier" + comList=`ls *.log | perl -ni -e '/-([^-]+)_comm/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'` + if [ -z "${barriereList:-}" ] + then + for com in $comList + do + if echo $ourBenchs | grep "$com" > /dev/null 2>&1 + then + barriereList="${barriereList:-}${barriereList:+ }$com" + fi + if echo $onlyWBBenchs | grep "$com" > /dev/null 2>&1 + then + barriereList="${barriereList:-}${barriereList:+ }$com" + fi + done + fi + if [ -z "${communicationList:-}" ] + then + for com in $comList + do + if echo $onlyWBBenchs | grep -v "$com" > /dev/null 2>&1 + then + communicationList="${communicationList:-}${communicationList:+ }$com" + fi + done + fi } init_log_vars () { local firstBarriereComm - - comList=`ls *.log | perl -ni -e '/-([^-]+)_comm/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'` cacheList=`ls *log | perl -ni -e '/cache_([^-]+)-/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'` prodList=`ls *.log | perl -ni -e '/typeProd_([^-]+)-/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'` firstBarriereComm=`echo $barriereList | sed -r "s/^[^ ]* .*$//"` argTypeProdList=`eval ls *typeProd_useless_loop*${firstBarriereComm}_comm.log | perl -ni -e '/argTypeProd_([\d]+)-/; $a{$1}=""; END { foreach ( sort { $a <=> $b } keys %a ) {print "$_ "}}'` metriqueList="cache_hits cache_miss cycles total_time" + nbIter=`ls -1 *.log | head -1 | sed -r "s/^.*nbIter_([[:digit:]]+).*$/\1/"` + valuePerCacheLine=$(grep "^buf size: " $(ls -1 *.log | head -1) | sed -r "s/^buf size: ([[:digit:]]+)$/\1/") } parse_args () @@ -78,7 +103,7 @@ contains_zero() iMax=$((lineNum + 1 + nbCommTechs * 3)) for i in $(seq $iMin 3 $iMax) do - if head -$i $datFile | tail -1 | grep "0\.00" > /dev/null + if head -$i $datFile | tail -1 | egrep "\.0+([^[:digit:]])?" > /dev/null then return 0 fi @@ -90,7 +115,7 @@ contains_zero() iMax=$((iMin + nbArg * 8 - 4)) for i in $(seq $iMin 4 $iMax) do - if head -$i $datFile | tail -1 | grep "0\.00" > /dev/null + if head -$i $datFile | tail -1 | egrep "\.0+([^[:digit:]])?" > /dev/null then return 0 fi @@ -147,7 +172,7 @@ create_dat_header () get_metric_values () { - local prod com cache argTypeProd logFile - + local prod com cache argTypeProd logFile metricValues totalValue metricValue loopValue stepValue - prod="$1" com="$2" cache="$3" @@ -155,7 +180,17 @@ get_metric_values () metriquePattern="$5" logFile=cache_$cache-*-typeProd_$prod-argTypeProd_$argTypeProd-*-${com}_comm.log - perl -n -e "print \"\$1 \$2 \$3 \" if /$metriquePattern.* (\S+) \/ (\S+) \/ (\S+)/" $logFile + metricValues=$(grep -E "$metriquePattern" $logFile | sed -r "s/^(.*[^[:alnum:].])?($metriquePattern)([^[:alnum:]].*)?$/\2/;s/^(.*[^[:alnum:].])?([[:digit:].]+)([^[:alnum:]].*)?$/\2/") + totalValue=0 + for metricValue in $metricValues + do + totalValue=$(echo "$totalValue + $metricValue" | bc) + done + echo -n "$totalValue" + loopValue="$(echo "$totalValue / $nbIter" | bc -l)" + echo -n " $loopValue" + stepValue=$(echo "$totalValue / $nbIter / $valuePerCacheLine" | bc -l) + echo -n " $stepValue" } create_simple_dat_body () @@ -359,9 +394,9 @@ main() { local logDir - - init_bench_vars parse_args "$@" cd $logDir + init_bench_vars init_log_vars for prod in $prodList ; do @@ -374,16 +409,16 @@ main() for metrique in $metriqueList ; do case "$metrique" in cache_hits) - metriquePattern="cache hits" + metriquePattern="[[:digit:]]+ +(L1-dcache-loads|L1-dcache-stores|L1-dcache-prefetches|LLC-loads|LLC-stores|LLC-prefetches) " ylabel="Nb cache hit" ;; cache_miss) - metriquePattern="cache miss" + metriquePattern="[[:digit:]]+ +(L1-dcache-load-misses|L1-dcache-store-misses|L1-dcache-prefetch-misses|LLC-load-misses|LLC-store-misses|LLC-prefetch-misses) " ylabel="Nb cache miss" ;; cycles) - metriquePattern="cycles" + metriquePattern="[[:digit:]]+ +cycles " ylabel="Nb cycles" ;; total_time) - metriquePattern="total_time" + metriquePattern="total_time: [[:digit:].]+" ylabel="Secondes" ;; *) echo "Pas de pattern pour cette métrique : $metrique" diff --git a/communication_techniques/src/communication/asm_cache.c b/communication_techniques/src/communication/asm_cache.c index 4a49dad..7a881df 100644 --- a/communication_techniques/src/communication/asm_cache.c +++ b/communication_techniques/src/communication/asm_cache.c @@ -4,27 +4,70 @@ /* Non standard include */ #include -#include #include -__thread struct comm_channel channel; +__thread union comm comm; -int init_thread_comm(struct thread_comm *comm) -{ - comm->receiver_idx = 0; - comm->channel = &channel; - comm->channel->state = 0; - comm->channel->idx = 0; - return 0; -} - -int end_thread_comm(void) +int init_library(void) { return 0; } -char *dstr="buffer transition\n"; +int finalize_library(void) +{ + return 0; +} + +void *create_comm_channel(void) +{ + struct cons *cons; + + if (!posix_memalign((void *) &cons, CACHE_LINE_SIZE, sizeof(struct cons))) + { + cons->receiver_idx = 0; + if (!posix_memalign((void *) &cons->channel, CACHE_LINE_SIZE, sizeof(struct channel))) + { + cons->channel->state = 0; + cons->channel->idx = 0; + return cons; + } + else + free(cons); + } + return NULL; +} + +int destroy_comm_channel(void *cons) +{ + free(cons->channel); + free(cons); + return 0; +} + +int init_producer_thread(void *cons) +{ + comm.channel = cons->channel; + return 0; +} + +int finalize_producer_thread(void *cons) +{ + comm.channel = NULL; + return 0; +} + +int init_consumer_thread(void *cons) +{ + comm.cons = cons; + return 0; +} + +int finalize_consumer_thread(void *cons) +{ + comm.cons = NULL; + return 0; +} void _swap_buffer() { @@ -32,48 +75,76 @@ void _swap_buffer() "swap_buffer:\n\t" "1:\n\t" - "testl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" + "testl $1, %%gs:comm@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" "jnz 1b\n\t" - "movl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" + "movl $1, %%gs:comm@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" "jmp *%%eax\n\t" : : "m"(dstr)); } -void reception(void (*on_receive)(void *)) +/* + * Copy at max count received data into buf + * @param buf The buffer in which received data must be copied into + * @return Number of data received and copied into buf + * + * @warning recv_one_data should not be used in conjonction of + * recv_some_data + */ +void *recv_one_data(void) { - wait_initialization(); - /* printf("Activate the consumer...\n"); */ - while (cont) - { - int i; + static __thread i; + void *result; - for (i = 0; i < nb_prod; i++) - { - if(tcomms[i].channel->state) - { - int j, n; - /* - * cur->receiver_idx point to the last cache - * line we have read. We go to the next cache - * line "+ (CACHE_LINE_SIZE >> 2)" (because - * the line is full of integer (2^2 octets) - * and then if we are after the second cache - * line we correct the pointer to point to - * the first one (this is done by the modulo) - */ - j = tcomms[i].receiver_idx; - n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *)); - tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); - for(; jbuf[j]); - } - tcomms[i].channel->state = 0; - } - } - } + if (unlikely(i % (BUF_SIZE / sizeof(void *)))) + while (!comm.cons->channel->state); + result = (void *) comm.cons->channel->buf[i++]; + i %= (2 * BUF_SIZE) / sizeof(void *); + if (unlikely(i % (BUF_SIZE / sizeof(void *)))) + comm.cons->channel->state = 0; + return result; +} + + +/* + * Copy at max count received data into buf + * @param buf The buffer in which received data must be copied into + * @return Number of data received and copied into buf + * + * @warning recv_some_data should not be used in conjonction of + * recv_one_data + * @warning count must be a multiple of BUF_SIZE + */ +ssize_t recv_some_data(void **buf, size_t count) +{ + int i, n; + void **buf_ptr, **buf_end; + + n = 0; + buf_ptr = buf; + buf_end = buf + count + 1; + while (comm.cons->channel->state && end != buf) + { + int j, n; + /* + * cur->receiver_idx point to the last buffer we have read. + * We go to the next cache line "+ (BUF_SIZE / sizeof(void *))" + * (because the line is full of void * and then if we are after + * the second cache line we correct the pointer to point to the + * first one (this is done by the modulo). + */ + j = comm.cons->receiver_idx; + n = comm.cons->receiver_idx + (BUF_SIZE / sizeof(void *)); + comm.cons->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); + for(; jchannel->buf[j]; + n++; + } + comm.cons->channel->state = 0; + } + return n; } diff --git a/communication_techniques/src/communication/c_cache.c b/communication_techniques/src/communication/c_cache.c index b310595..e0da7c9 100644 --- a/communication_techniques/src/communication/c_cache.c +++ b/communication_techniques/src/communication/c_cache.c @@ -4,63 +4,130 @@ /* Non standard include */ #include -#include #include -__thread struct comm_channel channel; +__thread union comm comm; -int init_thread_comm(struct thread_comm *comm) -{ - comm->receiver_idx = 0; - comm->channel = &channel; - comm->channel->state = 0; - comm->channel->idx = 0; - return 0; -} - -int end_thread_comm(void) +int init_library(void) { return 0; } -char *dstr="buffer transition\n"; - -void reception(void (*on_receive)(void *)) +int finalize_library(void) { - wait_initialization(); - /* printf("Activate the consumer...\n"); */ - while (cont) + return 0; +} + +void *create_comm_channel(void) +{ + struct cons *cons; + + if (!posix_memalign((void **) &cons, CACHE_LINE_SIZE, sizeof(struct cons))) { - int i; - - for (i = 0; i < nb_prod; i++) + cons->receiver_idx = 0; + if (!posix_memalign((void **) &cons->channel, CACHE_LINE_SIZE, sizeof(struct channel))) { - if(tcomms[i].channel->state) - { - int j, n; - /* - * cur->receiver_idx point to the last cache - * line we have read. We go to the next cache - * line "+ (CACHE_LINE_SIZE >> 2)" (because - * the line is full of integer (2^2 octets) - * and then if we are after the second cache - * line we correct the pointer to point to - * the first one (this is done by the modulo) - */ - j = tcomms[i].receiver_idx; - n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *)); - tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); - for(; jbuf[j]); - } - tcomms[i].channel->state = 0; - } + cons->channel->state = 0; + cons->channel->idx = 0; + return cons; } + else + free(cons); } + return NULL; +} + +int destroy_comm_channel(void *cons) +{ + free(((struct cons *) cons)->channel); + free(cons); + return 0; +} + +int init_producer_thread(void *cons) +{ + comm.channel = ((struct cons *) cons)->channel; + return 0; +} + +int finalize_producer_thread(void *cons) +{ + comm.channel = NULL; + return 0; +} + +int init_consumer_thread(void *cons) +{ + comm.cons = (struct cons *) cons; + return 0; +} + +int finalize_consumer_thread(void *cons) +{ + comm.cons = NULL; + return 0; +} + +/* + * Copy at max count received data into buf + * @param buf The buffer in which received data must be copied into + * @return Number of data received and copied into buf + * + * @warning recv_one_data should not be used in conjonction of + * recv_some_data + */ +void *recv_one_data(void) +{ + static __thread int i; + void *result; + + if (unlikely(i % (BUF_SIZE / sizeof(void *)))) + while (!comm.cons->channel->state); + result = (void *) comm.cons->channel->buf[i++]; + i %= (2 * BUF_SIZE) / sizeof(void *); + if (unlikely(i % (BUF_SIZE / sizeof(void *)))) + comm.cons->channel->state = 0; + return result; +} + +/* + * Copy at max count received data into buf + * @param buf The buffer in which received data must be copied into + * @return Number of data received and copied into buf + * + * @warning recv_some_data should not be used in conjonction of + * recv_one_data + * @warning count must be a multiple of BUF_SIZE + */ +ssize_t recv_some_data(void **buf, size_t count) +{ + int nb_read; + + nb_read = 0; + while (comm.cons->channel->state && nb_read < count) + { + int i, n; + /* + * cur->receiver_idx point to the last buffer we have read. + * We go to the next cache line "+ (BUF_SIZE / sizeof(void *))" + * (because the line is full of void * and then if we are after + * the second cache line we correct the pointer to point to the + * first one (this is done by the modulo). + */ + i = comm.cons->receiver_idx; + n = comm.cons->receiver_idx + (BUF_SIZE / sizeof(void *)); + comm.cons->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); + for(; i < n; i++) + { + /* + * The behaviour of this is not documented but we know + * the values inside buf won't change during this affectation + */ + *buf++ = (void *) comm.cons->channel->buf[i]; + } + nb_read += BUF_SIZE / sizeof(void *); + comm.cons->channel->state = 0; + } + return nb_read; } diff --git a/communication_techniques/src/communication/common.c b/communication_techniques/src/communication/common.c deleted file mode 100644 index 03f02dd..0000000 --- a/communication_techniques/src/communication/common.c +++ /dev/null @@ -1,85 +0,0 @@ -#include -#include -#include -#include - -/* Non standard include */ -#include -#include - - -struct thread_comm *tcomms; -volatile int cont = 1; -static int init = 0; -static int error = 0; -static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER; - -int init_library(void) -{ - tcomms = (struct thread_comm *) malloc(nb_prod * sizeof(struct thread_comm)); - if (tcomms == NULL) - { - fprintf(stderr, "Failed to allocate %lu bytes needed by the library to work\n", nb_prod * sizeof(struct thread_comm)); - return -1; - } - return 0; -} - -int end_library(void) -{ - free(tcomms); - return 0; -} - -int get_thread_number(void) -{ - static int i = 0; - static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER; - int i_local; - - pthread_mutex_lock(&i_lock); - i_local = i; - i++; - pthread_mutex_unlock(&i_lock); - return i_local; -} - -int init_producer_thread(void) -{ - int thread_num; - - thread_num = get_thread_number(); - if (init_thread_comm(&tcomms[thread_num])) - { - pthread_mutex_lock(&init_lock); - error = 1; - pthread_cond_signal(&init_cond); - pthread_mutex_unlock(&init_lock); - return -1; - } - if (thread_num == nb_prod - 1) - { - pthread_mutex_lock(&init_lock); - init = 1; - pthread_cond_signal(&init_cond); - pthread_mutex_unlock(&init_lock); - } - return 0; -} - -int end_producer_thread(void) -{ - return end_thread_comm(); -} - -int wait_initialization(void) -{ - pthread_mutex_lock(&init_lock); - if (!init && !error) - pthread_cond_wait(&init_cond, &init_lock); - pthread_mutex_unlock(&init_lock); - if (error) - return -1; - return 0; -} diff --git a/communication_techniques/src/communication/csq.c b/communication_techniques/src/communication/csq.c new file mode 100644 index 0000000..c9d5ab5 --- /dev/null +++ b/communication_techniques/src/communication/csq.c @@ -0,0 +1,114 @@ +#include +#include +#include +#include + +/* Non standard include */ +#include +#include + + +__thread struct comm *comm; +__thread union ctrl ctrl __attribute__ ((aligned (CACHE_LINE_SIZE))); + +int init_library(void) +{ + return 0; +} + +int finalize_library(void) +{ + return 0; +} + +void *create_comm_channel(void) +{ + struct comm *comm; + + if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + { + int i; + + for (i = 0; i < SLOTS; i++) + comm->queue[i].flag = 0; + return comm; + } + return NULL; +} + +int destroy_comm_channel(void *comm) +{ + free(comm); + return 0; +} + +int init_producer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + ctrl.tail = 0; + return 0; +} + +int finalize_producer_thread(void *unused) +{ + comm = NULL; + ctrl.tail = 0; + return 0; +} + +int init_consumer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + ctrl.head = 0; + return 0; +} + +int finalize_consumer_thread(void *unused) +{ + comm = NULL; + ctrl.head = 0; + return 0; +} + +void *recv_one_data(void) +{ + static __thread int i; + void *result; + + if (__builtin_expect(!i, 0)) + while (!comm->queue[ctrl.head].flag); + result = (void *) comm->queue[ctrl.head].chunk[i++]; + if (i % SUB_SLOTS) + { + i = 0; + comm->queue[ctrl.head].flag = 0; + ctrl.head = (ctrl.head + 1) % SLOTS; + } + return result; +} + +ssize_t recv_some_data(void **buf, size_t count) +{ + int i, n; + + n = 0; + // If all slots are empty, spin + while (comm->queue[ctrl.head].flag) + { + // Dequeue a chunk of data items + for (i = 0; i < SUB_SLOTS; i++) + { + /* + * The behaviour of this is not documented but we know + * the values inside buf won't change during this affectation + */ + *buf++ = (void *) comm->queue[ctrl.head].chunk[i]; + } + n += SUB_SLOTS; + comm->queue[ctrl.head].flag = 0; + ctrl.head = (ctrl.head + 1) % SLOTS; + if (n == count) + break; + } + return n; +} diff --git a/communication_techniques/src/communication/fake.c b/communication_techniques/src/communication/fake.c index ffd72c3..9a585c7 100644 --- a/communication_techniques/src/communication/fake.c +++ b/communication_techniques/src/communication/fake.c @@ -5,26 +5,75 @@ /* Non standard include */ #include -#include #include __thread void ** volatile store_var = NULL; -int init_thread_comm(struct thread_comm *comm) +int init_library(void) { return 0; } -int end_thread_comm(void) +int finalize_library(void) { return 0; } -void reception(void (*on_receive)(void *)) +void *create_comm_channel(void) { - wait_initialization(); /* Not needed but here for equity with others techniques */ - /* printf("Activate the consumer...\n"); */ - while (cont); + return (void *) &store_var; +} + +int destroy_comm_channel(void *unused) +{ + return 0; +} + +int init_producer_thread(void *unused) +{ + return 0; +} + +int finalize_producer_thread(void *unused) +{ + return 0; +} + +int init_consumer_thread(void *unused) +{ + return 0; +} + +int finalize_consumer_thread(void *unused) +{ + return 0; +} + +/* + * Copy at max count received data into buf + * @param buf The buffer in which received data must be copied into + * @return Number of data received and copied into buf + * + * @warning recv_one_data should not be used in conjonction of + * recv_some_data + */ +void *recv_one_data(void) +{ + return NULL; +} + +/* + * Copy at max count received data into buf + * @param buf The buffer in which received data must be copied into + * @return Number of data received and copied into buf + * + * @warning recv_some_data should not be used in conjonction of + * recv_one_data + * @warning count must be a multiple of BUF_SIZE + */ +ssize_t recv_some_data(void **buf, size_t count) +{ + return count; } diff --git a/communication_techniques/src/communication/fast_forward.c b/communication_techniques/src/communication/fast_forward.c new file mode 100644 index 0000000..d820fdb --- /dev/null +++ b/communication_techniques/src/communication/fast_forward.c @@ -0,0 +1,153 @@ +#include +#include +#include +#include + +/* Non standard include */ +#include +#include + + +__thread struct comm *comm; + +int init_library(void) +{ + return 0; +} + +int finalize_library(void) +{ + return 0; +} + +void *create_comm_channel(void) +{ + struct comm *comm; + + if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + { + if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) + { + int i; + + comm->head = 0; + comm->tail = 0; + for (i = 0; i < SHARED_SPACE_VOIDPTR; i++) + comm->shared_space[i] = NULL; + return comm; + } + else + free(comm); + } + return NULL; +} + +int destroy_comm_channel(void *comm) +{ + free((void *) ((struct comm *) comm)->shared_space); + free(comm); + return 0; +} + +int init_producer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_producer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +int init_consumer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_consumer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +int adjust_slip(void) +{ + int dist, dist_old, unused; + + puts("adjust_slip is called"); /* Must be removed after calibration */ + unused = 0; + dist = (comm->head + SHARED_SPACE_VOIDPTR - comm->tail) % SHARED_SPACE_VOIDPTR; + if (dist < DANGER) + { + dist_old = 0; + do + { + int i; + + dist_old = dist; + + /* + * I consider 20 as being the number of ++ which could + * be done while one data is sent by the producer + */ + for (i = 0; i < 20 * ((GOOD + 1) - dist); i++) + unused++; + dist = (comm->head + SHARED_SPACE_VOIDPTR - comm->tail) % SHARED_SPACE_VOIDPTR; + } while (dist < GOOD && dist_old < dist); + } + return unused; +} + +void *recv_one_data(void) +{ + void *result; + static __thread int nb_iter = 0; + + if (nb_iter == ADJUST_FREQ) + { + adjust_slip(); + nb_iter = 0; + } + while (1) + { + result = (void *) comm->shared_space[comm->tail]; + if (NULL == result) + continue; + comm->shared_space[comm->tail] = NULL; + comm->tail = (comm->tail + 1) % SHARED_SPACE_VOIDPTR; + break; + } + return result; +} + +ssize_t recv_some_data(void **buf, size_t count) +{ + int n, next_adjust; + static __thread int nb_iter = 0; + + next_adjust = ADJUST_FREQ - nb_iter; + for(n = 0; n < count; n++) + { + /* if ((nb_iter + n) % ADJUST_FREQ == 0) */ + if (n && (n % next_adjust == ADJUST_FREQ)) + { + adjust_slip(); + nb_iter = 0; + } + /* + * The behaviour of this is not documented but we know + * the values inside buf won't change during this affectation + */ + *buf = (void *) comm->shared_space[comm->tail]; + if (NULL == *buf) + break; + buf++; + comm->shared_space[comm->tail] = NULL; + comm->tail = (comm->tail + 1) % SHARED_SPACE_VOIDPTR; + } + nb_iter = (nb_iter + n) % ADJUST_FREQ; + return n; +} diff --git a/communication_techniques/src/communication/lamport.c b/communication_techniques/src/communication/lamport.c new file mode 100644 index 0000000..0d45e07 --- /dev/null +++ b/communication_techniques/src/communication/lamport.c @@ -0,0 +1,101 @@ +#include +#include +#include +#include + +/* Non standard include */ +#include +#include + + +__thread struct comm *comm; + +int init_library(void) +{ + return 0; +} + +int finalize_library(void) +{ + return 0; +} + +void *create_comm_channel(void) +{ + struct comm *comm; + + if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + { + if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) + { + comm->cons_idx = 0; + comm->prod_idx = 0; + return comm; + } + else + free(comm); + } + return NULL; +} + +int destroy_comm_channel(void *comm) +{ + free((void *) ((struct comm *) comm)->shared_space); + free(comm); + return 0; +} + +int init_producer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_producer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +int init_consumer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_consumer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +void *recv_one_data(void) +{ + int cons_idx; + void *result; + + cons_idx = comm->cons_idx; + while (cons_idx == comm->prod_idx); + result = (void *) comm->shared_space[cons_idx]; + cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR; + comm->cons_idx = cons_idx; + return result; +} + +ssize_t recv_some_data(void **buf, size_t count) +{ + int n, cons_idx; + + n = 0; + for(cons_idx = comm->cons_idx; cons_idx != comm->prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, comm->cons_idx = cons_idx) + { + /* + * The behaviour of this is not documented but we know + * the values inside buf won't change during this affectation + */ + *buf++ = (void *) comm->shared_space[cons_idx]; + if (++n == count) + break; + } + return n; +} diff --git a/communication_techniques/src/communication/mcringbuffer.c b/communication_techniques/src/communication/mcringbuffer.c new file mode 100644 index 0000000..1ab88b6 --- /dev/null +++ b/communication_techniques/src/communication/mcringbuffer.c @@ -0,0 +1,131 @@ +#include +#include +#include +#include + +/* Non standard include */ +#include +#include + + +__thread struct comm *comm; +const int batchSize = BUF_SIZE / sizeof(void *); + +int init_library(void) +{ + return 0; +} + +int finalize_library(void) +{ + return 0; +} + +void *create_comm_channel(void) +{ + struct comm *comm; + + if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + { + if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) + { + comm->ctrl.read = 0; + comm->ctrl.write = 0; + comm->cons.localWrite = 0; + comm->cons.nextRead = 0; + comm->cons.rBatch = 0; + comm->prod.localRead = 0; + comm->prod.nextWrite = 0; + comm->prod.wBatch = 0; + return comm; + } + else + free(comm); + } + return NULL; +} + +int destroy_comm_channel(void *comm) +{ + free((void *) ((struct comm *) comm)->shared_space); + free(comm); + return 0; +} + +int init_producer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_producer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +int init_consumer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_consumer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +void *recv_one_data(void) +{ + void *result; + + while (1) + { + if (comm->cons.nextRead == comm->cons.localWrite) + { + if (comm->cons.nextRead == comm->ctrl.write) + continue; + comm->cons.localWrite = comm->ctrl.write; + } + result = (void *) comm->shared_space[comm->cons.nextRead]; + comm->cons.nextRead = (comm->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR; + comm->cons.rBatch++; + if (comm->cons.rBatch >= batchSize) + { + comm->ctrl.read = comm->cons.nextRead; + comm->cons.rBatch = 0; + } + break; + } + return result; +} + +ssize_t recv_some_data(void **buf, size_t count) +{ + int n; + + for(n = 0; n < count; n++) + { + if (comm->cons.nextRead == comm->cons.localWrite) + { + if (comm->cons.nextRead == comm->ctrl.write) + break; + comm->cons.localWrite = comm->ctrl.write; + } + + /* + * The behaviour of this is not documented but we know + * the values inside buf won't change during this affectation + */ + *buf++ = (void *) comm->shared_space[comm->cons.nextRead]; + comm->cons.nextRead = (comm->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR; + comm->cons.rBatch++; + if (comm->cons.rBatch >= batchSize) + { + comm->ctrl.read = comm->cons.nextRead; + comm->cons.rBatch = 0; + } + } + return n; +} diff --git a/communication_techniques/src/communication/pipe.c b/communication_techniques/src/communication/pipe.c index e903954..50dfb02 100644 --- a/communication_techniques/src/communication/pipe.c +++ b/communication_techniques/src/communication/pipe.c @@ -7,58 +7,107 @@ /* Non standard include */ #include -#include #include -__thread int pipefd[2]; +__thread struct comm *comm; -int init_thread_comm(struct thread_comm *comm) +int init_library(void) { + return 0; +} + +int finalize_library(void) +{ + return 0; +} + +void *create_comm_channel(void) +{ + struct comm *comm; int flags; - if (pipe(pipefd)) + comm = malloc(sizeof(comm)); + if (comm != NULL) { - fprintf(stderr, "Unable to create a pipe for pipe communication\n"); - return -1; - } - flags = fcntl(pipefd[READ_IDX], F_GETFL); - fcntl(pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK); - comm->pipefd = pipefd; - return 0; -} - -int end_thread_comm(void) -{ - return 0; -} - -void reception(void (*on_receive)(void *)) -{ - wait_initialization(); /* Not needed but here for equity with others techniques */ - /* printf("Activate the consumer...\n"); */ - while(cont) - { - int i; - - for (i = 0; i < nb_prod; i++) + if (!pipe(comm->pipefd)) { - int nb_read; + flags = fcntl(comm->pipefd[READ_IDX], F_GETFL); + fcntl(comm->pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK); + return comm; + } + else + free(comm); + } + return NULL; +} - for(nb_read = 0; nb_read < BUF_SIZE / sizeof(void *); nb_read++) - { - int j, n; - void *tmp_buf[BUF_SIZE / sizeof(void *)]; +int destroy_comm_channel(void *comm) +{ + free(comm); + return 0; +} - j = nb_read / sizeof(void *); - n = read(tcomms[i].pipefd[READ_IDX], (void *) ((uintptr_t) tmp_buf + nb_read), BUF_SIZE - nb_read); - if (n > 0) - { - nb_read += n; - for (; j + sizeof(void *) <= nb_read / sizeof(void *); j += sizeof(void *)) - on_receive(tmp_buf[j]); - } - } +int init_producer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_producer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +int init_consumer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_consumer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +void *recv_one_data(void) +{ + void *result, **res_ptr; + int n, nb_read; + + nb_read = 0; + res_ptr = &result; + do + { + n = read(comm->pipefd[READ_IDX], res_ptr, sizeof(void *)); + if (n > 0) + { + nb_read += n; + res_ptr = (void **) ((uintptr_t) res_ptr + n); + } + } while (nb_read < sizeof(void *)); + return result; +} + +ssize_t recv_some_data(void **buf, size_t count) +{ + int n, nb_read, nb_bytes; + + nb_bytes = count * sizeof(void *); + nb_read = read(comm->pipefd[READ_IDX], buf, nb_bytes); + if (nb_read <= 0) + return 0; + buf = (void **) ((uintptr_t) buf + nb_read); + while (nb_read % sizeof(void *)) + { + n = read(comm->pipefd[READ_IDX], buf, sizeof(void *) - (nb_read % sizeof(void *))); + if (n > 0) + { + nb_read += n; + buf = (void **) ((uintptr_t) buf + n); } } + return nb_read / sizeof(void *); } diff --git a/communication_techniques/src/communication/shared_mem.c b/communication_techniques/src/communication/shared_mem.c deleted file mode 100644 index 0ca39d1..0000000 --- a/communication_techniques/src/communication/shared_mem.c +++ /dev/null @@ -1,56 +0,0 @@ -#include -#include -#include -#include - -/* Non standard include */ -#include -#include -#include - - -__thread volatile void **shared_space; -__thread volatile int cons_idx = 0; -__thread volatile int prod_idx = 0; - -int init_thread_comm(struct thread_comm *comm) -{ - if (posix_memalign((void *) &shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) - { - fprintf(stderr, "Unable to allocate space for shared mem communication\n"); - return -1; - } - comm->shared_space = shared_space; - comm->cons_idx = &cons_idx; - comm->prod_idx = &prod_idx; - return 0; -} - -int end_thread_comm(void) -{ - return 0; -} - -void reception(void (*on_receive)(void *)) -{ - wait_initialization(); - /* printf("Activate the consumer...\n"); */ - while(cont) - { - int i; - - for (i = 0; i < nb_prod; i++) - { - int cons_idx; - - for(cons_idx = *tcomms[i].cons_idx; cons_idx != *tcomms[i].prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, *tcomms[i].cons_idx = cons_idx) - { - /* - * The behaviour of this is not documented but we know - * the values inside buf won't change during this affectation - */ - on_receive((void *) tcomms[i].shared_space[cons_idx]); - } - } - } -} diff --git a/communication_techniques/src/communication/shared_mem_opt.c b/communication_techniques/src/communication/shared_mem_opt.c index 64e58ad..7a95389 100644 --- a/communication_techniques/src/communication/shared_mem_opt.c +++ b/communication_techniques/src/communication/shared_mem_opt.c @@ -5,58 +5,113 @@ /* Non standard include */ #include -#include #include -__thread volatile void **shared_space; -__thread volatile int cons_idx = 0; -__thread volatile int prod_idx = 0; +__thread struct comm *comm; -int init_thread_comm(struct thread_comm *comm) -{ - if (posix_memalign((void *) &shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) - { - fprintf(stderr, "Unable to allocate space for shared mem communication\n"); - return -1; - } - comm->shared_space = shared_space; - comm->cons_idx = &cons_idx; - comm->prod_idx = &prod_idx; - return 0; -} - -int end_thread_comm(void) +int init_library(void) { return 0; } -void reception(void (*on_receive)(void *)) +int finalize_library(void) { - wait_initialization(); - /* printf("Activate the consumer...\n"); */ - while(cont) - { - int i; + return 0; +} - for( i = 0; i < nb_prod; i++) +void *create_comm_channel(void) +{ + struct comm *comm; + + if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm))) + { + if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE)) { - int cons_idx, prod_idx; - - cons_idx = *tcomms[i].cons_idx; - do - { - prod_idx = *tcomms[i].prod_idx; - for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR) - { - /* - * The behaviour of this is not documented but we know - * the values inside buf won't change during this affectation - */ - on_receive((void *) tcomms[i].shared_space[cons_idx]); - } - } while (prod_idx != *tcomms[i].prod_idx); - *tcomms[i].cons_idx = cons_idx; + comm->cons_idx = 0; + comm->prod_idx = 0; + return comm; } + else + free(comm); } + return NULL; +} + +int destroy_comm_channel(void *comm) +{ + free((void *) ((struct comm *) comm)->shared_space); + free(comm); + return 0; +} + +int init_producer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_producer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +int init_consumer_thread(void *comm_param) +{ + comm = (struct comm *) comm_param; + return 0; +} + +int finalize_consumer_thread(void *unused) +{ + comm = NULL; + return 0; +} + +void *recv_one_data(void) +{ + void *result; + int cons_idx, prod_idx; + + cons_idx = comm->cons_idx; + prod_idx = comm->prod_idx; + if (cons_idx == prod_idx) + while(prod_idx == comm->prod_idx); + /* + * The behaviour of this is not documented but we know the + * values inside shared_space won't change during this + * affectation + */ + result = (void *) comm->shared_space[cons_idx]; + cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR; + comm->cons_idx = cons_idx; + return result; +} + +ssize_t recv_some_data(void **buf, size_t count) +{ + int n, cons_idx, prod_idx; + + n = 0; + cons_idx = comm->cons_idx; + do + { + prod_idx = comm->prod_idx; + for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR) + { + /* + * The behaviour of this is not documented but we know + * the values inside buf won't change during this affectation + */ + *buf++ = (void *) comm->shared_space[cons_idx]; + if (++n == count) + { + comm->cons_idx = cons_idx; + return n; + } + } + } while (prod_idx != comm->prod_idx); + comm->cons_idx = cons_idx; + return n; } diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c index f080331..98827d7 100644 --- a/communication_techniques/src/main.c +++ b/communication_techniques/src/main.c @@ -1,4 +1,5 @@ #define _GNU_SOURCE +#define _POSIX_SOURCE 1 #include #include @@ -13,17 +14,14 @@ #include /* Non standards includes */ -#include #include #include +#define MAX_BLOCK_ENTRIES (page_size / sizeof(void *)) #define toString(x) doStringification(x) #define doStringification(x) #x #define WORDS_PER_BUF (BUF_SIZE / sizeof(uintptr_t)) -#define DIV_SEC(secs, div) ((unsigned ) (((unsigned) secs) / (unsigned long) (div))) -#define DIV_USEC(nsecs, nusecs, div) ((unsigned) (((unsigned) (nusecs) + 1000000 * \ - ((unsigned ) (nsecs) % (div))) / (unsigned long) (div))) static long nb_bufs_sent = 0; @@ -34,26 +32,28 @@ static int (*end_calc)(void) = NULL; static int shared = 0; /* We are not shared by default */ pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER; -static int consumer_has_finished = 0; -static int producers_ended = 0; static int init_calc_arg = 0; +static int block_reception = 1; +static int page_size = 0; void usage(char *argv[]) { - char format[] = "-n [options]"; + char format[] = "-n -p [options]"; char options[] = "Required options :\n" "-n nb_buffer_sent\t\tNumber of buffer to send to another core\n" "\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n" - "-p nb_producers\t\t\tNumber of producers which send data to another core\n" "Facultative options :\n" + "b\t\t\t\tReceive the biggest amount of data available (The default)\n" + "-c calculation_libname arg\tLibrary to use for calculation with its argument\n" + "\t\t\t\tThis library must implement functions in calc.h\n" + "\t\t\t\t(default to none)\n" + "d\t\t\t\tReceive one piece of data\n" "-h\t\t\t\tPrint this help\n" "-s \t\t\tShare the same L cache or not\n" "\t\t\t\tIf level is:\n" "\t\t\t\t\t> 0, then the same L must be shared\n" "\t\t\t\t\t< 0, then different L must be used\n" - "\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n" - "-c calculation_libname arg\tLibrary to use for calculation with its argument\n" - "\t\t\t\tThis library must implement functions in calc.h\n"; + "\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n"; printf("Usage : %s %s\n", argv[0], format); printf("Options :\n"); printf("%s\n", options); @@ -85,6 +85,9 @@ int analyse_options(int argc, char *argv[]) { switch (opt) { + case 'b' : + block_reception = 1; + break; case 'c' : { struct stat file_stat; @@ -132,6 +135,9 @@ int analyse_options(int argc, char *argv[]) optind++; } break; + case 'd' : + block_reception = 0; + break; case 'h' : usage(argv); exit(EXIT_SUCCESS); @@ -151,6 +157,7 @@ int analyse_options(int argc, char *argv[]) } } break; +#if 0 case 'p' : { char *inval; @@ -167,6 +174,7 @@ int analyse_options(int argc, char *argv[]) } } break; +#endif case 's' : if ((optind != argc) && (*argv[optind] != '-')) { @@ -207,11 +215,13 @@ int analyse_options(int argc, char *argv[]) fprintf(stderr, "You must give the number of cache lines to be sent\n"); return -1; } +#if 0 if (!nb_prod) { fprintf(stderr, "You must give the number of producers\n"); return -1; } +#endif if (shared && (nb_prod > 1)) { fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n"); @@ -223,29 +233,19 @@ int analyse_options(int argc, char *argv[]) do_calc = do_nocalc; end_calc = do_noend; } + printf("buf size: %lu\n", WORDS_PER_BUF); return 0; } -void wait_consumer(void) -{ - pthread_mutex_lock(&mutex_cons_has_finished); - if (++producers_ended == nb_prod) - cont = 0; - if (!consumer_has_finished) - pthread_cond_wait(&cond_cons_has_finished, &mutex_cons_has_finished); - pthread_mutex_unlock(&mutex_cons_has_finished); -} - -void *producer(void *unused) +void *producer(void *channel) { int i, j; struct timeval tv1, tv2, tv_result; - if (init_producer_thread()) + if (init_producer_thread(channel)) { fprintf(stderr, "Initialization of thread has failed\n"); - wait_consumer(); - return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */ + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } if (shared) { @@ -258,8 +258,7 @@ void *producer(void *unused) if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); - wait_consumer(); - return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */ + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } } else @@ -273,26 +272,21 @@ void *producer(void *unused) if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); - wait_consumer(); - return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */ + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } } if (init_calc(init_calc_arg)) { fprintf(stderr, "Initialization of calculation has failed\n"); - wait_consumer(); - return &nb_prod; /* nb_prod can't be NULL, whatever NULL is bound to */ + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } gettimeofday(&tv1, NULL); - if (initialize_papi() != -1) - { - for(i = 0; i < nb_bufs_sent; i++) { - //printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE); - for(j = 0; j < WORDS_PER_BUF; j++) - send(do_calc()); - } - print_results(WORDS_PER_BUF, nb_bufs_sent); + for(i = 0; i < nb_bufs_sent; i++) { + //printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE); + for(j = 0; j < WORDS_PER_BUF; j++) + send(do_calc()); } + //print_results(WORDS_PER_BUF, nb_bufs_sent); gettimeofday(&tv2, NULL); tv_result.tv_sec = tv2.tv_sec - tv1.tv_sec; if (tv2.tv_usec < tv1.tv_usec) @@ -302,38 +296,32 @@ void *producer(void *unused) } else tv_result.tv_usec = tv2.tv_usec - tv1.tv_usec; - printf("total_time: %u.%06u / %u.%06u / %u.%06u\n", (unsigned) tv_result.tv_sec, - (unsigned) tv_result.tv_usec, - DIV_SEC(tv_result.tv_sec, nb_bufs_sent), - DIV_USEC(tv_result.tv_sec, tv_result.tv_usec, nb_bufs_sent), - DIV_SEC(tv_result.tv_sec, nb_bufs_sent * WORDS_PER_BUF), - DIV_USEC(tv_result.tv_sec, tv_result.tv_usec, nb_bufs_sent * WORDS_PER_BUF)); + printf("total_time: %u.%06u\n", (unsigned) tv_result.tv_sec, + (unsigned) tv_result.tv_usec); if (end_calc()) { fprintf(stderr, "uninitialization of calculation has failed\n"); - wait_consumer(); - return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */ + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } printf("[%p] Producer finished !\n", (void*) pthread_self()); /* * When a producer end its thread-local storage vanished. Thus, * producers must finish only after consumer has stopped using them */ - wait_consumer(); - if (end_producer_thread()) + if (finalize_producer_thread(channel)) { - fprintf(stderr, "Uninitialization of thread has failed\n"); - return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */ + fprintf(stderr, "Finalization of thread has failed\n"); + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } return NULL; } -void onMessage(void *val) +void on_message(void *val) { //printf("Receive value: %p\n", (void *) val); } -void *receptor(void *a) +void *consumer(void *channel) { if (shared) { @@ -349,44 +337,78 @@ void *receptor(void *a) return NULL; } } - reception(onMessage); - pthread_mutex_lock(&mutex_cons_has_finished); - consumer_has_finished = 1; - pthread_cond_broadcast(&cond_cons_has_finished); - pthread_mutex_unlock(&mutex_cons_has_finished); + if (init_consumer_thread(channel)) + { + fprintf(stderr, "Initialization of thread has failed\n"); + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + } + if (block_reception) + { + long long total_data_received = 0; + void *data_buf[MAX_BLOCK_ENTRIES]; + + while (total_data_received < nb_bufs_sent * WORDS_PER_BUF) + { + int i; + ssize_t nb_data_received; + + nb_data_received = recv_some_data(data_buf, MAX_BLOCK_ENTRIES); + total_data_received += nb_data_received; + for (i = 0; i < nb_data_received; i++) + on_message(data_buf[i]); + //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : ""); + } + } + else + { + int i, j; + + for(i = 0; i < nb_bufs_sent; i++) { + //printf("[%p] About to receive %d new cache line%s\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE, (BUF_SIZE / CACHE_LINE_SIZE > 1) ? "s" : ""); + for(j = 0; j < WORDS_PER_BUF; j++) + on_message(recv_one_data()); + } + } + if (finalize_consumer_thread(channel)) + { + fprintf(stderr, "Finalization of thread has failed\n"); + return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ + } return NULL; } int main(int argc, char *argv[]) { - int i, global_return_value = EXIT_SUCCESS; - void *return_value; - pthread_t *tid; + pthread_t tid[2]; + int return_value; + void *pthread_return_value; + void *channel; + return_value = EXIT_SUCCESS; if (analyse_options(argc, argv)) return EXIT_FAILURE; + page_size = sysconf(_SC_PAGE_SIZE); + if (page_size <= 0) + return EXIT_FAILURE; if (init_library()) return EXIT_FAILURE; - tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t)); - if (tid == NULL) + channel = create_comm_channel(); + if (channel != NULL) { - fprintf(stderr, "Failed to allocate %lu bytes needed for thread creation\n", (nb_prod + 1) * sizeof(pthread_t)); - return EXIT_FAILURE; + pthread_create(&tid[0], NULL, producer, channel); + pthread_create(&tid[1], NULL, consumer, channel); + pthread_join(tid[0], &pthread_return_value); + if (pthread_return_value != NULL) + return_value = EXIT_FAILURE; + pthread_join(tid[1], &pthread_return_value); + if (pthread_return_value != NULL) + return_value = EXIT_FAILURE; } - for(i = 0; i < nb_prod; i++) - pthread_create(&tid[i], NULL, producer, NULL); - pthread_create(&tid[i], NULL, receptor, NULL); - for(i = 0; i < nb_prod; i++) - { - pthread_join(tid[i], &return_value); - if (return_value != NULL) - global_return_value = EXIT_FAILURE; - } - pthread_join(tid[i], &return_value); - if (return_value != NULL) - global_return_value = EXIT_FAILURE; - free(tid); - if (end_library()) + else + return_value = EXIT_FAILURE; + if (destroy_comm_channel(channel)) + return_value = EXIT_FAILURE; + if (finalize_library()) return EXIT_FAILURE; - return global_return_value; + return return_value; }