Simplify and rewrite comm API.

This commit is contained in:
Thomas Preud'homme 2010-09-22 18:15:57 +02:00
parent c5507590aa
commit 006b1b1d94
27 changed files with 1517 additions and 588 deletions

View File

@ -7,37 +7,25 @@ LIBDIR:=lib
LOGDIR:=logs
LOCALDIR:=$(HOME)/local
# PHL stands for PAPI High Level
PHL_LIB_INSTALLDIR:=lib
PHL_INC_INSTALLDIR:=lib
PHL_LIBDIR:=lib
PHL_INCDIR:=include
PHLDIR:=../papihighlevel
PAPI_LIB_INSTALLDIR:=lib
PAPIDIR:=$(PHLDIR)/papi
PAPI_LIBDIR:=src
PAPI_PFMDIR:=$(PAPI_LIBDIR)/libpfm-3.y/lib
CALCDIR:=calculation
COMMDIR:=communication
# Compilation flags
# I know -finline-functions and -finline-functions-called-once are enabled by
# -O3 but I did this in case gcc behaviour change one day
CFLAGS:=-c -O3 -finline-functions -finline-functions-called-once -Wall -Werror
LDFLAGS:=-L$(LIBDIR) -L$(LOCALDIR)/$(PHL_LIB_INSTALLDIR) -L$(PHLDIR)/$(PHL_LIBDIR)
LDFLAGS:=$(LDFLAGS) -Wl,-rpath-link,$(LOCALDIR)/$(PAPI_LIB_INSTALLDIR):$(PAPIDIR)/$(PAPI_LIBDIR):$(PAPIDIR)/$(PAPI_PFMDIR)
LDFLAGS:=$(LDFLAGS) -lpthread -lpapihighlevel -ldl
CFLAGS:=-c -g -O3 -finline-functions -finline-functions-called-once -Wall -Werror
LDFLAGS:=-L$(LIBDIR) -lpthread -ldl
# Executables
CC=gcc
# Files
BINNAMES:=asm_cache_comm c_cache_comm pipe_comm shared_mem_comm shared_mem_opt_comm jikes_barrier_comm fake_comm
BINNAMES:=c_cache_comm pipe_comm lamport_comm shared_mem_opt_comm fake_comm csq_comm fast_forward_comm mcringbuffer_comm #jikes_barrier_comm asm_cache_comm
CALCLIBSNAMES:=calc_mat calc_useless_loop
BINS:=$(patsubst %,$(BINDIR)/%,$(BINNAMES))
CALCLIBS:=$(patsubst %,$(LIBDIR)/$(CALCDIR)/lib%.so.1,$(CALCLIBSNAMES))
MAIN_OBJS:=main.o
COMMON_LIB_OBJS:=common.o
COMMON_LIB_OBJS:=
.PHONY: all tidy clean distclean
.SECONDARY:
@ -77,7 +65,7 @@ $(OBJDIR)/$(COMMDIR)/%.o: $(SRCDIR)/$(COMMDIR)/%.c $(INCDIR)/%_comm.h $(INCDIR)/
$(OBJDIR)/$(CALCDIR)/%.o: $(SRCDIR)/$(CALCDIR)/%.c
if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi
if [ ! -d $(OBJDIR)/$(CALCDIR) ] ; then mkdir $(OBJDIR)/$(CALCDIR) ; fi
$(CC) $(CFLAGS) -I$(INCDIR) $< -o $@
$(CC) $(CFLAGS) -fPIC -I$(INCDIR) $< -o $@
#.%.d: %.c
# gcc $(CFLAGS) -MM $^ | sed -e 's/\([^:]*\):\(.*\)/\1 $@: \2 Makefile/' > $@
@ -119,4 +107,4 @@ $(OBJDIR)/%.o: $(SRCDIR)/$$(*F).c $(INCDIR)/$$(*D)_comm.h $(INCDIR)/commtech.h
if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi
if [ ! -d $(OBJDIR)/$(*D) ] ; then mkdir $(OBJDIR)/$(*D) ; fi
cd $(INCDIR) ; ln -sfT $(*D)_comm.h specific_comm.h
$(CC) $(CFLAGS) -I$(INCDIR) -I$(LOCALDIR)/$(PHL_INCDIR) -I../$(PHLDIR)/$(PHL_INCDIR) $< -o $@
$(CC) $(CFLAGS) -I$(INCDIR) $< -o $@

View File

@ -10,32 +10,36 @@
__BEGIN_DECLS
struct comm_channel
struct channel
{
volatile void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE)));
volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE)));
int idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
};
struct thread_comm
struct cons
{
struct comm_channel *channel;
struct channel *channel;
int receiver_idx;
int unused;
};
extern struct thread_comm *tcomms;
union comm
{
struct channel *channel;
struct cons *cons;
};
extern __thread union comm comm;
extern int swap_buffer;
int init_thread_comm(struct thread_comm *);
int end_thread_comm(void);
static inline void send(void **addr)
{
asm volatile("mov %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) " + " toString(CACHE_LINE_SIZE) ", %%eax\n\t"
"mov %0, %%gs:channel@NTPOFF(%%eax)\n\t"
"mov %0, %%gs:cons.channel@NTPOFF(%%eax)\n\t"
"addl $4, %%eax\n\t"
"andl $(2*" toString(BUF_SIZE) "-1), %%eax\n\t"
"mov %%eax, %%gs:channel@NTPOFF + 2 * " toString(BUF_SIZE)" + " toString(CACHE_LINE_SIZE) "\n\t"
"mov %%eax, %%gs:cons.channel@NTPOFF + 2 * " toString(BUF_SIZE)" + " toString(CACHE_LINE_SIZE) "\n\t"
"test $(" toString(BUF_SIZE) " - 1), %%eax\n\t"
"mov $2f, %%eax\n\t"
"jz swap_buffer\n\t"

View File

@ -8,34 +8,37 @@
#define toString(x) doStringification(x)
#define doStringification(x) #x
struct comm_channel
struct channel
{
volatile void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE)));
volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE)));
int idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
};
struct thread_comm
struct cons
{
struct comm_channel *channel;
struct channel *channel;
int receiver_idx;
};
extern struct thread_comm *tcomms;
extern __thread struct comm_channel channel;
union comm
{
struct channel *channel;
struct cons *cons;
};
extern __thread union comm comm;
__BEGIN_DECLS
int init_thread_comm(struct thread_comm *);
int end_thread_comm(void);
static inline void send(void **addr)
{
channel.buf[channel.idx++] = addr;
channel.idx %= 2 * BUF_SIZE / sizeof(void *);
if (!(channel.idx % (BUF_SIZE / sizeof(void *))))
comm.channel->buf[comm.channel->idx++] = addr;
comm.channel->idx %= 2 * (BUF_SIZE / sizeof(void *));
if (!(comm.channel->idx % (BUF_SIZE / sizeof(void *))))
{
while (channel.state);
channel.state = 1;
while (comm.channel->state);
comm.channel->state = 1;
}
}

View File

@ -7,16 +7,121 @@
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
extern volatile int cont;
extern long nb_prod;
__BEGIN_DECLS
/*
* @return 0 if success, -1 else
*
* Initialize communication library.
* @comment Must be run before any other function of this library
*/
int init_library(void);
int end_library(void);
int init_producer_thread(void);
int end_producer_thread(void);
void reception(void (*)(void *));
/*
* @return 0 if success, -1 else
*
* Finalize communication library.
* @comment Must be run after any other function of this library
*/
int finalize_library(void);
/*
* @return a pointer on the channel if success, NULL else
*
* Create a communication channel. The return value is the address of
* the channel that has to be passed to init_producer_thread and
* init_consumer_thread respectively by the producer and consumer thread
* which want to communicate.
*/
void *create_comm_channel(void);
/*
* @param channel The channel to destroy
* @return 0 if success, -1 else
*
* Destroy a communication channel.
*
* @comment Must be called after producer and consumer stopped to
* communicate.
*/
int destroy_comm_channel(void *);
/*
* @param chan Address of the communication channel to attach to
* the producer calling this function.
* @return 0 on success, -1 else
*
* Initialize the producer and attach the given communication channel to
* it.
*/
int init_producer_thread(void *);
/*
* @param channel Address of the communication channel to detach from
* the producer calling this function
* @return 0 on success, -1 else
*
* Finalize the producer.
* @comment Must be run by the producer after it stopped to communicate
* with the consumer.
*/
int finalize_producer_thread(void *);
/*
* @param channel Address of the communication channel to attach to
* the consumer calling this function.
* @return 0 on success, -1 else
*
* Initialize the consumer and attach the given communication channel to
* it.
*/
int init_consumer_thread(void *);
/*
* @param channel Address of the communication channel to detach from
* the consumer calling this function
* @return 0 on success, -1 else
*
* Finalize the consumer.
*
* @comment Must be run by the consumer after it stopped to communicate
* with the consumer.
*/
int finalize_consumer_thread(void *);
/*
* @return a data sent by the matching producer
*
* Wait until a data sent by the matching producer is available
*
* @comment recv_one_data should not be used in conjonction of
* recv_some_data
*/
void *recv_one_data(void);
/*
* @param buf The buffer to write received data into
* @param count The maximum number of data to copy into buf
* @return Number of data copied into buf
*
* Wait until some data sent by the matching producer is available and
* copy as much data as possible into buf, with a maximum of count.
*
* @commentrecv_some_data should not be used in conjonction of recv_one_data
* @comment count must be a multiple of BUF_SIZE / sizeof(void *) which is
* equal to SUB_SLOTS
*/
ssize_t recv_some_data(void **, size_t);
__END_DECLS

View File

@ -0,0 +1,53 @@
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
/* Non standard include */
#include <commtech.h>
#define SLOTS 64 /* Value used in the article, section V.A */
#define SUB_SLOTS (BUF_SIZE / sizeof(void *))
struct lvl_2
{
volatile void *chunk[SUB_SLOTS];
volatile unsigned int flag : 1;
} __attribute__ ((aligned (CACHE_LINE_SIZE)));
struct comm
{
struct lvl_2 queue[SLOTS] __attribute__ ((aligned (CACHE_LINE_SIZE)));
};
union ctrl
{
int head;
int tail;
};
__BEGIN_DECLS
extern __thread struct comm *comm;
extern __thread union ctrl ctrl __attribute__ ((aligned (CACHE_LINE_SIZE)));
// TODO: Make it send only one data
static inline void send(void **addr)
{
static __thread int chkidx = 0;
// If all slots are full, spin
if (!chkidx)
while (comm->queue[ctrl.tail].flag);
// Enqueue a data item
comm->queue[ctrl.tail].chunk[chkidx++] = addr;
if (!(chkidx % SUB_SLOTS))
{
chkidx = 0;
comm->queue[ctrl.tail].flag = 1;
ctrl.tail = (ctrl.tail + 1) % SLOTS;
}
}
__END_DECLS
#endif

View File

@ -3,19 +3,12 @@
#define FAKE_NURSERY_START 1
struct thread_comm
{
int unused;
};
extern struct thread_comm *tcomms;
extern __thread void ** volatile store_var;
__BEGIN_DECLS
int init_thread_comm(struct thread_comm *);
int end_thread_comm(void);
static inline void send(void **addr) {
static inline void send(void **addr)
{
store_var = addr;
}

View File

@ -0,0 +1,50 @@
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
/* Non standard include */
#include <commtech.h>
#include <assert.h>
#define SHARED_SPACE_SIZE (16 * BUF_SIZE)
#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *))
#define DANGER (2 * BUF_SIZE / sizeof(void *))
#define GOOD (6 * BUF_SIZE / sizeof(void *))
#define ADJUST_FREQ 64
struct comm
{
void * volatile *shared_space;
int head;
int tail;
};
__BEGIN_DECLS
extern __thread struct comm *comm;
extern int adjust_slip(void);
static inline void send(void **addr)
{
static __thread int nb_iter = 0;
assert(addr != NULL);
if (nb_iter == ADJUST_FREQ)
{
adjust_slip();
nb_iter = 0;
}
while (1)
{
if (comm->shared_space[comm->head] != NULL)
continue;
comm->shared_space[comm->head] = addr;
comm->head = (comm->head + 1) % SHARED_SPACE_VOIDPTR;
break;
}
}
__END_DECLS
#endif

View File

@ -0,0 +1,30 @@
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
/* Non standard include */
#include <commtech.h>
#define SHARED_SPACE_SIZE (2 * BUF_SIZE)
#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *))
struct comm
{
void * volatile *shared_space;
volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
};
__BEGIN_DECLS
extern __thread struct comm *comm;
static inline void send(void **addr)
{
while ((comm->prod_idx + 1) % SHARED_SPACE_VOIDPTR == comm->cons_idx);
comm->shared_space[comm->prod_idx] = addr;
comm->prod_idx = (comm->prod_idx + 1) % SHARED_SPACE_VOIDPTR;
}
__END_DECLS
#endif

View File

@ -0,0 +1,69 @@
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
/* Non standard include */
#include <commtech.h>
#define SHARED_SPACE_SIZE (2 * BUF_SIZE)
#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *))
struct control
{
volatile int read;
volatile int write;
};
struct cons
{
int localWrite;
int nextRead;
int rBatch;
};
struct prod
{
int localRead;
int nextWrite;
int wBatch;
};
struct comm
{
struct control ctrl __attribute__ ((aligned (CACHE_LINE_SIZE)));
struct prod prod __attribute__ ((aligned (CACHE_LINE_SIZE)));
struct cons cons __attribute__ ((aligned (CACHE_LINE_SIZE)));
void * volatile *shared_space __attribute__ ((aligned (CACHE_LINE_SIZE))); // Align only to isolate cons on its cache line
};
__BEGIN_DECLS
extern __thread struct comm *comm;
extern const int batchSize;
static inline void send(void **addr)
{
while (1)
{
int afterNextWrite = (comm->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR;
if (afterNextWrite == comm->prod.localRead)
{
if (afterNextWrite == comm->ctrl.read)
continue;
comm->prod.localRead = comm->ctrl.read;
}
comm->shared_space[comm->prod.nextWrite] = addr;
comm->prod.nextWrite = afterNextWrite;
comm->prod.wBatch++;
if (comm->prod.wBatch >= batchSize)
{
comm->ctrl.write = comm->prod.nextWrite;
comm->prod.wBatch = 0;
}
break;
}
}
__END_DECLS
#endif

View File

@ -6,20 +6,33 @@
#define READ_IDX 0
#define WRITE_IDX 1
struct thread_comm
struct comm
{
int *pipefd;
int pipefd[2];
};
__BEGIN_DECLS
extern __thread int pipefd[];
extern struct thread_comm *tcomms;
extern __thread struct comm *comm;
int init_thread_comm(struct thread_comm *);
int end_thread_comm(void);
static inline void send(void **addr) {
write(pipefd[WRITE_IDX], &addr, sizeof(void *));
static inline void send(void **addr)
{
int nb_read;
void *addr_ptr;
nb_read = 0;
addr_ptr = &addr;
do
{
int n;
n = write(comm->pipefd[WRITE_IDX], addr_ptr, sizeof(void *) - nb_read);
if (n > 0)
{
nb_read += n;
addr_ptr = (void *) ((uintptr_t) addr_ptr + n);
}
} while (nb_read < sizeof(void *));
}
__END_DECLS

View File

@ -1,10 +0,0 @@
#ifndef _PRIVATE_COMMON_H_
#define _PRIVATE_COMMON_H_ 1
__BEGIN_DECLS
void wait_initialization(void);
__END_DECLS
#endif

View File

@ -1,35 +0,0 @@
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
/* Non standard include */
#include <commtech.h>
#define SHARED_SPACE_SIZE (2 * BUF_SIZE)
#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *))
struct thread_comm
{
volatile void **shared_space;
volatile int *cons_idx;
volatile int *prod_idx;
};
extern struct thread_comm *tcomms;
__BEGIN_DECLS
extern __thread volatile void **shared_space;
extern __thread volatile int prod_idx;
extern __thread volatile int cons_idx;
int init_thread_comm(struct thread_comm *);
int end_thread_comm(void);
static inline void send(void **addr) {
while ((prod_idx + 1) % SHARED_SPACE_VOIDPTR == cons_idx);
shared_space[prod_idx] = addr;
prod_idx = (prod_idx + 1) % SHARED_SPACE_VOIDPTR;
}
__END_DECLS
#endif

View File

@ -7,33 +7,32 @@
#define SHARED_SPACE_SIZE (2 * BUF_SIZE)
#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *))
struct thread_comm
struct comm
{
volatile void **shared_space;
volatile int *cons_idx;
volatile int *prod_idx;
void * volatile *shared_space;
volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
};
extern struct thread_comm *tcomms;
__BEGIN_DECLS
extern __thread volatile void **shared_space;
extern __thread volatile int prod_idx;
extern __thread volatile int cons_idx;
extern __thread struct comm *comm;
int init_thread_comm(struct thread_comm *);
int end_thread_comm(void);
static inline void send(void **addr) {
static inline void send(void **addr)
{
static __thread int local_cons_idx = 0;
int local_prod, next_prod;
if (likely(((prod_idx + 1) % SHARED_SPACE_VOIDPTR) == local_cons_idx))
local_prod = comm->prod_idx;
next_prod = (local_prod + 1) % SHARED_SPACE_VOIDPTR;
if (next_prod == local_cons_idx)
{
while (((prod_idx + 1) % SHARED_SPACE_VOIDPTR) == cons_idx);
local_cons_idx = cons_idx;
while (next_prod == comm->cons_idx);
local_cons_idx = comm->cons_idx;
}
shared_space[prod_idx] = addr;
prod_idx = (prod_idx + 1) % SHARED_SPACE_VOIDPTR;
comm->shared_space[local_prod] = addr;
comm->prod_idx = next_prod;
}
__END_DECLS

View File

@ -1,21 +1,18 @@
#! /bin/bash
set -u
set -v
# Files and directories
binDir="bin"
calcDir="calculation"
PHLDir="../papihighlevel"
PHLLibDir="lib"
PAPIDir="${PHLDir}/papi"
PAPILibDir="src"
PAPIPFMDir="libpfm-3.y/lib"
# Param
binList="asm_cache_comm c_cache_comm pipe_comm shared_mem_comm shared_mem_opt_comm jikes_barrier_comm fake_comm" # Type de communication
binList="$(ls -1 "${binDir}"| sed '$!s/$/ /' | tr -d '\n')"
nbProdList="1" # Nombre de cores producteurs
typeProdList="none matrice useless_loop" # Methode pour produire les valeurs
typeCacheList="L2 Memory" # Niveau de cache partage
perfOpt="stat -e cycles -e L1-dcache-loads -e L1-dcache-stores -e LLC-load-misses -e LLC-store-misses"
# Const
nbIter="5000000" # Nb de lignes produites
@ -26,42 +23,6 @@ logFileName="\$perfDirName/cache_\$typeCache-nbProd_\$nbProd-typeProd_\$typeProd
expDirName="logs"
perfDirName="$expDirName/perfCommMulti-`date +'%F-%Hh%Mm%S'`"
LD_LIBRARY_PATH="${LD_LIBRARY_PATH:-}${LD_LIBRARY_PATH:+:}${PHLDir}/${PHLLibDir}:${PAPIDir}/${PAPILibDir}:${PAPIDir}/${PAPILibDir}/${PAPIPFMDir}"
PAPILibPresent=""
PFMLibPresent=""
papiHighLevelLibPresent=""
LD_LIBRARY_PATH_LEFT="${LD_LIBRARY_PATH}:"
while [ -n "${LD_LIBRARY_PATH_LEFT}" ]
do
aLibDir="${LD_LIBRARY_PATH_LEFT%%:*}"
if [ -x ${aLibDir}/libpapi.so.3 ]
then
PAPILibPresent="1";
fi
if [ -x ${aLibDir}/libpfm.so.3 ]
then
PHLLibPresent="1";
fi
if [ -x ${aLibDir}/libpapihighlevel.so ]
then
PHLLibPresent="1";
fi
if [ -n "${PAPILibPresent}" -a -n "${PHLLibPresent}" -a -n "${PFMLibPresent}" ]
then
break
fi
LD_LIBRARY_PATH_LEFT="${LD_LIBRARY_PATH_LEFT#*:}"
done
if [ -z "${PAPILibPresent}" -o -z "${PHLLibPresent}" ]
then
echo "Libraries needed for this bench not accessible by \$LD_LIBRARY_PATH" > /dev/stderr # Is there a better way to display something on stderr ?
exit 1
fi
unset PAPILibPresent
unset PHLLibPresent
unset LD_LIBRARY_PATH_LEFT
export LD_LIBRARY_PATH
[ -d "$expDirName" ] || mkdir "$expDirName"
@ -81,10 +42,10 @@ function_run () {
esac
make $binDir/$bin
echo "On lance : \"${bin##*/} $optTypeCache $optTypeProd -p $nbProd -n $nbIter\""
echo "On lance : \"perf $perfOpt $binDir/$bin $optTypeCache $optTypeProd -n $nbIter\""
beginingDate=`date +%s`
( $binDir/$bin $optTypeCache $optTypeProd -p $nbProd -n $nbIter || echo "echec experience" ) | eval tee $logFileName
( perf $perfOpt $binDir/$bin $optTypeCache $optTypeProd -n $nbIter 2>&1 || echo "echec experience" ) | eval tee $logFileName
endDate=`date +%s`
duration_sec=`expr \( $endDate - $beginingDate \) % 60`
duration_min=`expr \( $endDate - $beginingDate \) / 60`

View File

@ -4,23 +4,48 @@ set -u
init_bench_vars ()
{
local -
local ourBenchs onlyWBBenchs comList com -
#barriereList="jikes_barrier asm_cache c_cache fake"
barriereList="jikes_barrier asm_cache c_cache"
communicationList="asm_cache c_cache shared_mem shared_mem_opt pipe"
ourBenchs="asm_cache|c_cache"
onlyWBBenchs="jikes_barrier"
comList=`ls *.log | perl -ni -e '/-([^-]+)_comm/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'`
if [ -z "${barriereList:-}" ]
then
for com in $comList
do
if echo $ourBenchs | grep "$com" > /dev/null 2>&1
then
barriereList="${barriereList:-}${barriereList:+ }$com"
fi
if echo $onlyWBBenchs | grep "$com" > /dev/null 2>&1
then
barriereList="${barriereList:-}${barriereList:+ }$com"
fi
done
fi
if [ -z "${communicationList:-}" ]
then
for com in $comList
do
if echo $onlyWBBenchs | grep -v "$com" > /dev/null 2>&1
then
communicationList="${communicationList:-}${communicationList:+ }$com"
fi
done
fi
}
init_log_vars ()
{
local firstBarriereComm -
comList=`ls *.log | perl -ni -e '/-([^-]+)_comm/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'`
cacheList=`ls *log | perl -ni -e '/cache_([^-]+)-/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'`
prodList=`ls *.log | perl -ni -e '/typeProd_([^-]+)-/; $a{$1}=""; END { foreach ( sort keys %a ) {print "$_ "}}'`
firstBarriereComm=`echo $barriereList | sed -r "s/^[^ ]* .*$//"`
argTypeProdList=`eval ls *typeProd_useless_loop*${firstBarriereComm}_comm.log | perl -ni -e '/argTypeProd_([\d]+)-/; $a{$1}=""; END { foreach ( sort { $a <=> $b } keys %a ) {print "$_ "}}'`
metriqueList="cache_hits cache_miss cycles total_time"
nbIter=`ls -1 *.log | head -1 | sed -r "s/^.*nbIter_([[:digit:]]+).*$/\1/"`
valuePerCacheLine=$(grep "^buf size: " $(ls -1 *.log | head -1) | sed -r "s/^buf size: ([[:digit:]]+)$/\1/")
}
parse_args ()
@ -78,7 +103,7 @@ contains_zero()
iMax=$((lineNum + 1 + nbCommTechs * 3))
for i in $(seq $iMin 3 $iMax)
do
if head -$i $datFile | tail -1 | grep "0\.00" > /dev/null
if head -$i $datFile | tail -1 | egrep "\.0+([^[:digit:]])?" > /dev/null
then
return 0
fi
@ -90,7 +115,7 @@ contains_zero()
iMax=$((iMin + nbArg * 8 - 4))
for i in $(seq $iMin 4 $iMax)
do
if head -$i $datFile | tail -1 | grep "0\.00" > /dev/null
if head -$i $datFile | tail -1 | egrep "\.0+([^[:digit:]])?" > /dev/null
then
return 0
fi
@ -147,7 +172,7 @@ create_dat_header ()
get_metric_values ()
{
local prod com cache argTypeProd logFile -
local prod com cache argTypeProd logFile metricValues totalValue metricValue loopValue stepValue -
prod="$1"
com="$2"
cache="$3"
@ -155,7 +180,17 @@ get_metric_values ()
metriquePattern="$5"
logFile=cache_$cache-*-typeProd_$prod-argTypeProd_$argTypeProd-*-${com}_comm.log
perl -n -e "print \"\$1 \$2 \$3 \" if /$metriquePattern.* (\S+) \/ (\S+) \/ (\S+)/" $logFile
metricValues=$(grep -E "$metriquePattern" $logFile | sed -r "s/^(.*[^[:alnum:].])?($metriquePattern)([^[:alnum:]].*)?$/\2/;s/^(.*[^[:alnum:].])?([[:digit:].]+)([^[:alnum:]].*)?$/\2/")
totalValue=0
for metricValue in $metricValues
do
totalValue=$(echo "$totalValue + $metricValue" | bc)
done
echo -n "$totalValue"
loopValue="$(echo "$totalValue / $nbIter" | bc -l)"
echo -n " $loopValue"
stepValue=$(echo "$totalValue / $nbIter / $valuePerCacheLine" | bc -l)
echo -n " $stepValue"
}
create_simple_dat_body ()
@ -359,9 +394,9 @@ main()
{
local logDir -
init_bench_vars
parse_args "$@"
cd $logDir
init_bench_vars
init_log_vars
for prod in $prodList ; do
@ -374,16 +409,16 @@ main()
for metrique in $metriqueList ; do
case "$metrique" in
cache_hits)
metriquePattern="cache hits"
metriquePattern="[[:digit:]]+ +(L1-dcache-loads|L1-dcache-stores|L1-dcache-prefetches|LLC-loads|LLC-stores|LLC-prefetches) "
ylabel="Nb cache hit" ;;
cache_miss)
metriquePattern="cache miss"
metriquePattern="[[:digit:]]+ +(L1-dcache-load-misses|L1-dcache-store-misses|L1-dcache-prefetch-misses|LLC-load-misses|LLC-store-misses|LLC-prefetch-misses) "
ylabel="Nb cache miss" ;;
cycles)
metriquePattern="cycles"
metriquePattern="[[:digit:]]+ +cycles "
ylabel="Nb cycles" ;;
total_time)
metriquePattern="total_time"
metriquePattern="total_time: [[:digit:].]+"
ylabel="Secondes" ;;
*)
echo "Pas de pattern pour cette métrique : $metrique"

View File

@ -4,27 +4,70 @@
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread struct comm_channel channel;
__thread union comm comm;
int init_thread_comm(struct thread_comm *comm)
{
comm->receiver_idx = 0;
comm->channel = &channel;
comm->channel->state = 0;
comm->channel->idx = 0;
return 0;
}
int end_thread_comm(void)
int init_library(void)
{
return 0;
}
char *dstr="buffer transition\n";
int finalize_library(void)
{
return 0;
}
void *create_comm_channel(void)
{
struct cons *cons;
if (!posix_memalign((void *) &cons, CACHE_LINE_SIZE, sizeof(struct cons)))
{
cons->receiver_idx = 0;
if (!posix_memalign((void *) &cons->channel, CACHE_LINE_SIZE, sizeof(struct channel)))
{
cons->channel->state = 0;
cons->channel->idx = 0;
return cons;
}
else
free(cons);
}
return NULL;
}
int destroy_comm_channel(void *cons)
{
free(cons->channel);
free(cons);
return 0;
}
int init_producer_thread(void *cons)
{
comm.channel = cons->channel;
return 0;
}
int finalize_producer_thread(void *cons)
{
comm.channel = NULL;
return 0;
}
int init_consumer_thread(void *cons)
{
comm.cons = cons;
return 0;
}
int finalize_consumer_thread(void *cons)
{
comm.cons = NULL;
return 0;
}
void _swap_buffer()
{
@ -32,48 +75,76 @@ void _swap_buffer()
"swap_buffer:\n\t"
"1:\n\t"
"testl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t"
"testl $1, %%gs:comm@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t"
"jnz 1b\n\t"
"movl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t"
"movl $1, %%gs:comm@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t"
"jmp *%%eax\n\t"
: : "m"(dstr));
}
void reception(void (*on_receive)(void *))
/*
* Copy at max count received data into buf
* @param buf The buffer in which received data must be copied into
* @return Number of data received and copied into buf
*
* @warning recv_one_data should not be used in conjonction of
* recv_some_data
*/
void *recv_one_data(void)
{
wait_initialization();
/* printf("Activate the consumer...\n"); */
while (cont)
{
int i;
static __thread i;
void *result;
for (i = 0; i < nb_prod; i++)
{
if(tcomms[i].channel->state)
{
int j, n;
/*
* cur->receiver_idx point to the last cache
* line we have read. We go to the next cache
* line "+ (CACHE_LINE_SIZE >> 2)" (because
* the line is full of integer (2^2 octets)
* and then if we are after the second cache
* line we correct the pointer to point to
* the first one (this is done by the modulo)
*/
j = tcomms[i].receiver_idx;
n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *));
tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; j<n; j++)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].channel->buf[j]);
}
tcomms[i].channel->state = 0;
}
}
}
if (unlikely(i % (BUF_SIZE / sizeof(void *))))
while (!comm.cons->channel->state);
result = (void *) comm.cons->channel->buf[i++];
i %= (2 * BUF_SIZE) / sizeof(void *);
if (unlikely(i % (BUF_SIZE / sizeof(void *))))
comm.cons->channel->state = 0;
return result;
}
/*
* Copy at max count received data into buf
* @param buf The buffer in which received data must be copied into
* @return Number of data received and copied into buf
*
* @warning recv_some_data should not be used in conjonction of
* recv_one_data
* @warning count must be a multiple of BUF_SIZE
*/
ssize_t recv_some_data(void **buf, size_t count)
{
int i, n;
void **buf_ptr, **buf_end;
n = 0;
buf_ptr = buf;
buf_end = buf + count + 1;
while (comm.cons->channel->state && end != buf)
{
int j, n;
/*
* cur->receiver_idx point to the last buffer we have read.
* We go to the next cache line "+ (BUF_SIZE / sizeof(void *))"
* (because the line is full of void * and then if we are after
* the second cache line we correct the pointer to point to the
* first one (this is done by the modulo).
*/
j = comm.cons->receiver_idx;
n = comm.cons->receiver_idx + (BUF_SIZE / sizeof(void *));
comm.cons->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; j<n; j++)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
*buf_ptr++ = (void *) comm.cons->channel->buf[j];
n++;
}
comm.cons->channel->state = 0;
}
return n;
}

View File

@ -4,63 +4,130 @@
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread struct comm_channel channel;
__thread union comm comm;
int init_thread_comm(struct thread_comm *comm)
{
comm->receiver_idx = 0;
comm->channel = &channel;
comm->channel->state = 0;
comm->channel->idx = 0;
return 0;
}
int end_thread_comm(void)
int init_library(void)
{
return 0;
}
char *dstr="buffer transition\n";
void reception(void (*on_receive)(void *))
int finalize_library(void)
{
wait_initialization();
/* printf("Activate the consumer...\n"); */
while (cont)
return 0;
}
void *create_comm_channel(void)
{
struct cons *cons;
if (!posix_memalign((void **) &cons, CACHE_LINE_SIZE, sizeof(struct cons)))
{
int i;
for (i = 0; i < nb_prod; i++)
cons->receiver_idx = 0;
if (!posix_memalign((void **) &cons->channel, CACHE_LINE_SIZE, sizeof(struct channel)))
{
if(tcomms[i].channel->state)
{
int j, n;
/*
* cur->receiver_idx point to the last cache
* line we have read. We go to the next cache
* line "+ (CACHE_LINE_SIZE >> 2)" (because
* the line is full of integer (2^2 octets)
* and then if we are after the second cache
* line we correct the pointer to point to
* the first one (this is done by the modulo)
*/
j = tcomms[i].receiver_idx;
n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *));
tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; j<n; j++)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].channel->buf[j]);
}
tcomms[i].channel->state = 0;
}
cons->channel->state = 0;
cons->channel->idx = 0;
return cons;
}
else
free(cons);
}
return NULL;
}
int destroy_comm_channel(void *cons)
{
free(((struct cons *) cons)->channel);
free(cons);
return 0;
}
int init_producer_thread(void *cons)
{
comm.channel = ((struct cons *) cons)->channel;
return 0;
}
int finalize_producer_thread(void *cons)
{
comm.channel = NULL;
return 0;
}
int init_consumer_thread(void *cons)
{
comm.cons = (struct cons *) cons;
return 0;
}
int finalize_consumer_thread(void *cons)
{
comm.cons = NULL;
return 0;
}
/*
* Copy at max count received data into buf
* @param buf The buffer in which received data must be copied into
* @return Number of data received and copied into buf
*
* @warning recv_one_data should not be used in conjonction of
* recv_some_data
*/
void *recv_one_data(void)
{
static __thread int i;
void *result;
if (unlikely(i % (BUF_SIZE / sizeof(void *))))
while (!comm.cons->channel->state);
result = (void *) comm.cons->channel->buf[i++];
i %= (2 * BUF_SIZE) / sizeof(void *);
if (unlikely(i % (BUF_SIZE / sizeof(void *))))
comm.cons->channel->state = 0;
return result;
}
/*
* Copy at max count received data into buf
* @param buf The buffer in which received data must be copied into
* @return Number of data received and copied into buf
*
* @warning recv_some_data should not be used in conjonction of
* recv_one_data
* @warning count must be a multiple of BUF_SIZE
*/
ssize_t recv_some_data(void **buf, size_t count)
{
int nb_read;
nb_read = 0;
while (comm.cons->channel->state && nb_read < count)
{
int i, n;
/*
* cur->receiver_idx point to the last buffer we have read.
* We go to the next cache line "+ (BUF_SIZE / sizeof(void *))"
* (because the line is full of void * and then if we are after
* the second cache line we correct the pointer to point to the
* first one (this is done by the modulo).
*/
i = comm.cons->receiver_idx;
n = comm.cons->receiver_idx + (BUF_SIZE / sizeof(void *));
comm.cons->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; i < n; i++)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
*buf++ = (void *) comm.cons->channel->buf[i];
}
nb_read += BUF_SIZE / sizeof(void *);
comm.cons->channel->state = 0;
}
return nb_read;
}

View File

@ -1,85 +0,0 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
/* Non standard include */
#include <specific_comm.h>
#include <commtech.h>
struct thread_comm *tcomms;
volatile int cont = 1;
static int init = 0;
static int error = 0;
static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
int init_library(void)
{
tcomms = (struct thread_comm *) malloc(nb_prod * sizeof(struct thread_comm));
if (tcomms == NULL)
{
fprintf(stderr, "Failed to allocate %lu bytes needed by the library to work\n", nb_prod * sizeof(struct thread_comm));
return -1;
}
return 0;
}
int end_library(void)
{
free(tcomms);
return 0;
}
int get_thread_number(void)
{
static int i = 0;
static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER;
int i_local;
pthread_mutex_lock(&i_lock);
i_local = i;
i++;
pthread_mutex_unlock(&i_lock);
return i_local;
}
int init_producer_thread(void)
{
int thread_num;
thread_num = get_thread_number();
if (init_thread_comm(&tcomms[thread_num]))
{
pthread_mutex_lock(&init_lock);
error = 1;
pthread_cond_signal(&init_cond);
pthread_mutex_unlock(&init_lock);
return -1;
}
if (thread_num == nb_prod - 1)
{
pthread_mutex_lock(&init_lock);
init = 1;
pthread_cond_signal(&init_cond);
pthread_mutex_unlock(&init_lock);
}
return 0;
}
int end_producer_thread(void)
{
return end_thread_comm();
}
int wait_initialization(void)
{
pthread_mutex_lock(&init_lock);
if (!init && !error)
pthread_cond_wait(&init_cond, &init_lock);
pthread_mutex_unlock(&init_lock);
if (error)
return -1;
return 0;
}

View File

@ -0,0 +1,114 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
/* Non standard include */
#include <commtech.h>
#include <specific_comm.h>
__thread struct comm *comm;
__thread union ctrl ctrl __attribute__ ((aligned (CACHE_LINE_SIZE)));
int init_library(void)
{
return 0;
}
int finalize_library(void)
{
return 0;
}
void *create_comm_channel(void)
{
struct comm *comm;
if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm)))
{
int i;
for (i = 0; i < SLOTS; i++)
comm->queue[i].flag = 0;
return comm;
}
return NULL;
}
int destroy_comm_channel(void *comm)
{
free(comm);
return 0;
}
int init_producer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
ctrl.tail = 0;
return 0;
}
int finalize_producer_thread(void *unused)
{
comm = NULL;
ctrl.tail = 0;
return 0;
}
int init_consumer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
ctrl.head = 0;
return 0;
}
int finalize_consumer_thread(void *unused)
{
comm = NULL;
ctrl.head = 0;
return 0;
}
void *recv_one_data(void)
{
static __thread int i;
void *result;
if (__builtin_expect(!i, 0))
while (!comm->queue[ctrl.head].flag);
result = (void *) comm->queue[ctrl.head].chunk[i++];
if (i % SUB_SLOTS)
{
i = 0;
comm->queue[ctrl.head].flag = 0;
ctrl.head = (ctrl.head + 1) % SLOTS;
}
return result;
}
ssize_t recv_some_data(void **buf, size_t count)
{
int i, n;
n = 0;
// If all slots are empty, spin
while (comm->queue[ctrl.head].flag)
{
// Dequeue a chunk of data items
for (i = 0; i < SUB_SLOTS; i++)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
*buf++ = (void *) comm->queue[ctrl.head].chunk[i];
}
n += SUB_SLOTS;
comm->queue[ctrl.head].flag = 0;
ctrl.head = (ctrl.head + 1) % SLOTS;
if (n == count)
break;
}
return n;
}

View File

@ -5,26 +5,75 @@
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread void ** volatile store_var = NULL;
int init_thread_comm(struct thread_comm *comm)
int init_library(void)
{
return 0;
}
int end_thread_comm(void)
int finalize_library(void)
{
return 0;
}
void reception(void (*on_receive)(void *))
void *create_comm_channel(void)
{
wait_initialization(); /* Not needed but here for equity with others techniques */
/* printf("Activate the consumer...\n"); */
while (cont);
return (void *) &store_var;
}
int destroy_comm_channel(void *unused)
{
return 0;
}
int init_producer_thread(void *unused)
{
return 0;
}
int finalize_producer_thread(void *unused)
{
return 0;
}
int init_consumer_thread(void *unused)
{
return 0;
}
int finalize_consumer_thread(void *unused)
{
return 0;
}
/*
* Copy at max count received data into buf
* @param buf The buffer in which received data must be copied into
* @return Number of data received and copied into buf
*
* @warning recv_one_data should not be used in conjonction of
* recv_some_data
*/
void *recv_one_data(void)
{
return NULL;
}
/*
* Copy at max count received data into buf
* @param buf The buffer in which received data must be copied into
* @return Number of data received and copied into buf
*
* @warning recv_some_data should not be used in conjonction of
* recv_one_data
* @warning count must be a multiple of BUF_SIZE
*/
ssize_t recv_some_data(void **buf, size_t count)
{
return count;
}

View File

@ -0,0 +1,153 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
/* Non standard include */
#include <commtech.h>
#include <specific_comm.h>
__thread struct comm *comm;
int init_library(void)
{
return 0;
}
int finalize_library(void)
{
return 0;
}
void *create_comm_channel(void)
{
struct comm *comm;
if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm)))
{
if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE))
{
int i;
comm->head = 0;
comm->tail = 0;
for (i = 0; i < SHARED_SPACE_VOIDPTR; i++)
comm->shared_space[i] = NULL;
return comm;
}
else
free(comm);
}
return NULL;
}
int destroy_comm_channel(void *comm)
{
free((void *) ((struct comm *) comm)->shared_space);
free(comm);
return 0;
}
int init_producer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_producer_thread(void *unused)
{
comm = NULL;
return 0;
}
int init_consumer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_consumer_thread(void *unused)
{
comm = NULL;
return 0;
}
int adjust_slip(void)
{
int dist, dist_old, unused;
puts("adjust_slip is called"); /* Must be removed after calibration */
unused = 0;
dist = (comm->head + SHARED_SPACE_VOIDPTR - comm->tail) % SHARED_SPACE_VOIDPTR;
if (dist < DANGER)
{
dist_old = 0;
do
{
int i;
dist_old = dist;
/*
* I consider 20 as being the number of ++ which could
* be done while one data is sent by the producer
*/
for (i = 0; i < 20 * ((GOOD + 1) - dist); i++)
unused++;
dist = (comm->head + SHARED_SPACE_VOIDPTR - comm->tail) % SHARED_SPACE_VOIDPTR;
} while (dist < GOOD && dist_old < dist);
}
return unused;
}
void *recv_one_data(void)
{
void *result;
static __thread int nb_iter = 0;
if (nb_iter == ADJUST_FREQ)
{
adjust_slip();
nb_iter = 0;
}
while (1)
{
result = (void *) comm->shared_space[comm->tail];
if (NULL == result)
continue;
comm->shared_space[comm->tail] = NULL;
comm->tail = (comm->tail + 1) % SHARED_SPACE_VOIDPTR;
break;
}
return result;
}
ssize_t recv_some_data(void **buf, size_t count)
{
int n, next_adjust;
static __thread int nb_iter = 0;
next_adjust = ADJUST_FREQ - nb_iter;
for(n = 0; n < count; n++)
{
/* if ((nb_iter + n) % ADJUST_FREQ == 0) */
if (n && (n % next_adjust == ADJUST_FREQ))
{
adjust_slip();
nb_iter = 0;
}
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
*buf = (void *) comm->shared_space[comm->tail];
if (NULL == *buf)
break;
buf++;
comm->shared_space[comm->tail] = NULL;
comm->tail = (comm->tail + 1) % SHARED_SPACE_VOIDPTR;
}
nb_iter = (nb_iter + n) % ADJUST_FREQ;
return n;
}

View File

@ -0,0 +1,101 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
/* Non standard include */
#include <commtech.h>
#include <specific_comm.h>
__thread struct comm *comm;
int init_library(void)
{
return 0;
}
int finalize_library(void)
{
return 0;
}
void *create_comm_channel(void)
{
struct comm *comm;
if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm)))
{
if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE))
{
comm->cons_idx = 0;
comm->prod_idx = 0;
return comm;
}
else
free(comm);
}
return NULL;
}
int destroy_comm_channel(void *comm)
{
free((void *) ((struct comm *) comm)->shared_space);
free(comm);
return 0;
}
int init_producer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_producer_thread(void *unused)
{
comm = NULL;
return 0;
}
int init_consumer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_consumer_thread(void *unused)
{
comm = NULL;
return 0;
}
void *recv_one_data(void)
{
int cons_idx;
void *result;
cons_idx = comm->cons_idx;
while (cons_idx == comm->prod_idx);
result = (void *) comm->shared_space[cons_idx];
cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR;
comm->cons_idx = cons_idx;
return result;
}
ssize_t recv_some_data(void **buf, size_t count)
{
int n, cons_idx;
n = 0;
for(cons_idx = comm->cons_idx; cons_idx != comm->prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, comm->cons_idx = cons_idx)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
*buf++ = (void *) comm->shared_space[cons_idx];
if (++n == count)
break;
}
return n;
}

View File

@ -0,0 +1,131 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
/* Non standard include */
#include <commtech.h>
#include <specific_comm.h>
__thread struct comm *comm;
const int batchSize = BUF_SIZE / sizeof(void *);
int init_library(void)
{
return 0;
}
int finalize_library(void)
{
return 0;
}
void *create_comm_channel(void)
{
struct comm *comm;
if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm)))
{
if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE))
{
comm->ctrl.read = 0;
comm->ctrl.write = 0;
comm->cons.localWrite = 0;
comm->cons.nextRead = 0;
comm->cons.rBatch = 0;
comm->prod.localRead = 0;
comm->prod.nextWrite = 0;
comm->prod.wBatch = 0;
return comm;
}
else
free(comm);
}
return NULL;
}
int destroy_comm_channel(void *comm)
{
free((void *) ((struct comm *) comm)->shared_space);
free(comm);
return 0;
}
int init_producer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_producer_thread(void *unused)
{
comm = NULL;
return 0;
}
int init_consumer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_consumer_thread(void *unused)
{
comm = NULL;
return 0;
}
void *recv_one_data(void)
{
void *result;
while (1)
{
if (comm->cons.nextRead == comm->cons.localWrite)
{
if (comm->cons.nextRead == comm->ctrl.write)
continue;
comm->cons.localWrite = comm->ctrl.write;
}
result = (void *) comm->shared_space[comm->cons.nextRead];
comm->cons.nextRead = (comm->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR;
comm->cons.rBatch++;
if (comm->cons.rBatch >= batchSize)
{
comm->ctrl.read = comm->cons.nextRead;
comm->cons.rBatch = 0;
}
break;
}
return result;
}
ssize_t recv_some_data(void **buf, size_t count)
{
int n;
for(n = 0; n < count; n++)
{
if (comm->cons.nextRead == comm->cons.localWrite)
{
if (comm->cons.nextRead == comm->ctrl.write)
break;
comm->cons.localWrite = comm->ctrl.write;
}
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
*buf++ = (void *) comm->shared_space[comm->cons.nextRead];
comm->cons.nextRead = (comm->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR;
comm->cons.rBatch++;
if (comm->cons.rBatch >= batchSize)
{
comm->ctrl.read = comm->cons.nextRead;
comm->cons.rBatch = 0;
}
}
return n;
}

View File

@ -7,58 +7,107 @@
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread int pipefd[2];
__thread struct comm *comm;
int init_thread_comm(struct thread_comm *comm)
int init_library(void)
{
return 0;
}
int finalize_library(void)
{
return 0;
}
void *create_comm_channel(void)
{
struct comm *comm;
int flags;
if (pipe(pipefd))
comm = malloc(sizeof(comm));
if (comm != NULL)
{
fprintf(stderr, "Unable to create a pipe for pipe communication\n");
return -1;
}
flags = fcntl(pipefd[READ_IDX], F_GETFL);
fcntl(pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK);
comm->pipefd = pipefd;
return 0;
}
int end_thread_comm(void)
{
return 0;
}
void reception(void (*on_receive)(void *))
{
wait_initialization(); /* Not needed but here for equity with others techniques */
/* printf("Activate the consumer...\n"); */
while(cont)
{
int i;
for (i = 0; i < nb_prod; i++)
if (!pipe(comm->pipefd))
{
int nb_read;
flags = fcntl(comm->pipefd[READ_IDX], F_GETFL);
fcntl(comm->pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK);
return comm;
}
else
free(comm);
}
return NULL;
}
for(nb_read = 0; nb_read < BUF_SIZE / sizeof(void *); nb_read++)
{
int j, n;
void *tmp_buf[BUF_SIZE / sizeof(void *)];
int destroy_comm_channel(void *comm)
{
free(comm);
return 0;
}
j = nb_read / sizeof(void *);
n = read(tcomms[i].pipefd[READ_IDX], (void *) ((uintptr_t) tmp_buf + nb_read), BUF_SIZE - nb_read);
if (n > 0)
{
nb_read += n;
for (; j + sizeof(void *) <= nb_read / sizeof(void *); j += sizeof(void *))
on_receive(tmp_buf[j]);
}
}
int init_producer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_producer_thread(void *unused)
{
comm = NULL;
return 0;
}
int init_consumer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_consumer_thread(void *unused)
{
comm = NULL;
return 0;
}
void *recv_one_data(void)
{
void *result, **res_ptr;
int n, nb_read;
nb_read = 0;
res_ptr = &result;
do
{
n = read(comm->pipefd[READ_IDX], res_ptr, sizeof(void *));
if (n > 0)
{
nb_read += n;
res_ptr = (void **) ((uintptr_t) res_ptr + n);
}
} while (nb_read < sizeof(void *));
return result;
}
ssize_t recv_some_data(void **buf, size_t count)
{
int n, nb_read, nb_bytes;
nb_bytes = count * sizeof(void *);
nb_read = read(comm->pipefd[READ_IDX], buf, nb_bytes);
if (nb_read <= 0)
return 0;
buf = (void **) ((uintptr_t) buf + nb_read);
while (nb_read % sizeof(void *))
{
n = read(comm->pipefd[READ_IDX], buf, sizeof(void *) - (nb_read % sizeof(void *)));
if (n > 0)
{
nb_read += n;
buf = (void **) ((uintptr_t) buf + n);
}
}
return nb_read / sizeof(void *);
}

View File

@ -1,56 +0,0 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread volatile void **shared_space;
__thread volatile int cons_idx = 0;
__thread volatile int prod_idx = 0;
int init_thread_comm(struct thread_comm *comm)
{
if (posix_memalign((void *) &shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE))
{
fprintf(stderr, "Unable to allocate space for shared mem communication\n");
return -1;
}
comm->shared_space = shared_space;
comm->cons_idx = &cons_idx;
comm->prod_idx = &prod_idx;
return 0;
}
int end_thread_comm(void)
{
return 0;
}
void reception(void (*on_receive)(void *))
{
wait_initialization();
/* printf("Activate the consumer...\n"); */
while(cont)
{
int i;
for (i = 0; i < nb_prod; i++)
{
int cons_idx;
for(cons_idx = *tcomms[i].cons_idx; cons_idx != *tcomms[i].prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, *tcomms[i].cons_idx = cons_idx)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].shared_space[cons_idx]);
}
}
}
}

View File

@ -5,58 +5,113 @@
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread volatile void **shared_space;
__thread volatile int cons_idx = 0;
__thread volatile int prod_idx = 0;
__thread struct comm *comm;
int init_thread_comm(struct thread_comm *comm)
{
if (posix_memalign((void *) &shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE))
{
fprintf(stderr, "Unable to allocate space for shared mem communication\n");
return -1;
}
comm->shared_space = shared_space;
comm->cons_idx = &cons_idx;
comm->prod_idx = &prod_idx;
return 0;
}
int end_thread_comm(void)
int init_library(void)
{
return 0;
}
void reception(void (*on_receive)(void *))
int finalize_library(void)
{
wait_initialization();
/* printf("Activate the consumer...\n"); */
while(cont)
{
int i;
return 0;
}
for( i = 0; i < nb_prod; i++)
void *create_comm_channel(void)
{
struct comm *comm;
if (!posix_memalign((void *) &comm, CACHE_LINE_SIZE, sizeof(struct comm)))
{
if (!posix_memalign((void *) &comm->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE))
{
int cons_idx, prod_idx;
cons_idx = *tcomms[i].cons_idx;
do
{
prod_idx = *tcomms[i].prod_idx;
for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].shared_space[cons_idx]);
}
} while (prod_idx != *tcomms[i].prod_idx);
*tcomms[i].cons_idx = cons_idx;
comm->cons_idx = 0;
comm->prod_idx = 0;
return comm;
}
else
free(comm);
}
return NULL;
}
int destroy_comm_channel(void *comm)
{
free((void *) ((struct comm *) comm)->shared_space);
free(comm);
return 0;
}
int init_producer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_producer_thread(void *unused)
{
comm = NULL;
return 0;
}
int init_consumer_thread(void *comm_param)
{
comm = (struct comm *) comm_param;
return 0;
}
int finalize_consumer_thread(void *unused)
{
comm = NULL;
return 0;
}
void *recv_one_data(void)
{
void *result;
int cons_idx, prod_idx;
cons_idx = comm->cons_idx;
prod_idx = comm->prod_idx;
if (cons_idx == prod_idx)
while(prod_idx == comm->prod_idx);
/*
* The behaviour of this is not documented but we know the
* values inside shared_space won't change during this
* affectation
*/
result = (void *) comm->shared_space[cons_idx];
cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR;
comm->cons_idx = cons_idx;
return result;
}
ssize_t recv_some_data(void **buf, size_t count)
{
int n, cons_idx, prod_idx;
n = 0;
cons_idx = comm->cons_idx;
do
{
prod_idx = comm->prod_idx;
for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR)
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
*buf++ = (void *) comm->shared_space[cons_idx];
if (++n == count)
{
comm->cons_idx = cons_idx;
return n;
}
}
} while (prod_idx != comm->prod_idx);
comm->cons_idx = cons_idx;
return n;
}

View File

@ -1,4 +1,5 @@
#define _GNU_SOURCE
#define _POSIX_SOURCE 1
#include <stdio.h>
#include <stdlib.h>
@ -13,17 +14,14 @@
#include <sys/time.h>
/* Non standards includes */
#include <papihighlevel.h>
#include <commtech.h>
#include <specific_comm.h>
#define MAX_BLOCK_ENTRIES (page_size / sizeof(void *))
#define toString(x) doStringification(x)
#define doStringification(x) #x
#define WORDS_PER_BUF (BUF_SIZE / sizeof(uintptr_t))
#define DIV_SEC(secs, div) ((unsigned ) (((unsigned) secs) / (unsigned long) (div)))
#define DIV_USEC(nsecs, nusecs, div) ((unsigned) (((unsigned) (nusecs) + 1000000 * \
((unsigned ) (nsecs) % (div))) / (unsigned long) (div)))
static long nb_bufs_sent = 0;
@ -34,26 +32,28 @@ static int (*end_calc)(void) = NULL;
static int shared = 0; /* We are not shared by default */
pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER;
static int consumer_has_finished = 0;
static int producers_ended = 0;
static int init_calc_arg = 0;
static int block_reception = 1;
static int page_size = 0;
void usage(char *argv[])
{
char format[] = "-n [options]";
char format[] = "-n <num_buf> -p <num_prod> [options]";
char options[] = "Required options :\n"
"-n nb_buffer_sent\t\tNumber of buffer to send to another core\n"
"\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n"
"-p nb_producers\t\t\tNumber of producers which send data to another core\n"
"Facultative options :\n"
"b\t\t\t\tReceive the biggest amount of data available (The default)\n"
"-c calculation_libname arg\tLibrary to use for calculation with its argument\n"
"\t\t\t\tThis library must implement functions in calc.h\n"
"\t\t\t\t(default to none)\n"
"d\t\t\t\tReceive one piece of data\n"
"-h\t\t\t\tPrint this help\n"
"-s <level>\t\t\tShare the same L<level> cache or not\n"
"\t\t\t\tIf level is:\n"
"\t\t\t\t\t> 0, then the same L<level> must be shared\n"
"\t\t\t\t\t< 0, then different L<level> must be used\n"
"\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n"
"-c calculation_libname arg\tLibrary to use for calculation with its argument\n"
"\t\t\t\tThis library must implement functions in calc.h\n";
"\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n";
printf("Usage : %s %s\n", argv[0], format);
printf("Options :\n");
printf("%s\n", options);
@ -85,6 +85,9 @@ int analyse_options(int argc, char *argv[])
{
switch (opt)
{
case 'b' :
block_reception = 1;
break;
case 'c' :
{
struct stat file_stat;
@ -132,6 +135,9 @@ int analyse_options(int argc, char *argv[])
optind++;
}
break;
case 'd' :
block_reception = 0;
break;
case 'h' :
usage(argv);
exit(EXIT_SUCCESS);
@ -151,6 +157,7 @@ int analyse_options(int argc, char *argv[])
}
}
break;
#if 0
case 'p' :
{
char *inval;
@ -167,6 +174,7 @@ int analyse_options(int argc, char *argv[])
}
}
break;
#endif
case 's' :
if ((optind != argc) && (*argv[optind] != '-'))
{
@ -207,11 +215,13 @@ int analyse_options(int argc, char *argv[])
fprintf(stderr, "You must give the number of cache lines to be sent\n");
return -1;
}
#if 0
if (!nb_prod)
{
fprintf(stderr, "You must give the number of producers\n");
return -1;
}
#endif
if (shared && (nb_prod > 1))
{
fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n");
@ -223,29 +233,19 @@ int analyse_options(int argc, char *argv[])
do_calc = do_nocalc;
end_calc = do_noend;
}
printf("buf size: %lu\n", WORDS_PER_BUF);
return 0;
}
void wait_consumer(void)
{
pthread_mutex_lock(&mutex_cons_has_finished);
if (++producers_ended == nb_prod)
cont = 0;
if (!consumer_has_finished)
pthread_cond_wait(&cond_cons_has_finished, &mutex_cons_has_finished);
pthread_mutex_unlock(&mutex_cons_has_finished);
}
void *producer(void *unused)
void *producer(void *channel)
{
int i, j;
struct timeval tv1, tv2, tv_result;
if (init_producer_thread())
if (init_producer_thread(channel))
{
fprintf(stderr, "Initialization of thread has failed\n");
wait_consumer();
return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
if (shared)
{
@ -258,8 +258,7 @@ void *producer(void *unused)
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
{
perror("pthread_setaffinity_np");
wait_consumer();
return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
}
else
@ -273,26 +272,21 @@ void *producer(void *unused)
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
{
perror("pthread_setaffinity_np");
wait_consumer();
return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
}
if (init_calc(init_calc_arg))
{
fprintf(stderr, "Initialization of calculation has failed\n");
wait_consumer();
return &nb_prod; /* nb_prod can't be NULL, whatever NULL is bound to */
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
gettimeofday(&tv1, NULL);
if (initialize_papi() != -1)
{
for(i = 0; i < nb_bufs_sent; i++) {
//printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE);
for(j = 0; j < WORDS_PER_BUF; j++)
send(do_calc());
}
print_results(WORDS_PER_BUF, nb_bufs_sent);
for(i = 0; i < nb_bufs_sent; i++) {
//printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE);
for(j = 0; j < WORDS_PER_BUF; j++)
send(do_calc());
}
//print_results(WORDS_PER_BUF, nb_bufs_sent);
gettimeofday(&tv2, NULL);
tv_result.tv_sec = tv2.tv_sec - tv1.tv_sec;
if (tv2.tv_usec < tv1.tv_usec)
@ -302,38 +296,32 @@ void *producer(void *unused)
}
else
tv_result.tv_usec = tv2.tv_usec - tv1.tv_usec;
printf("total_time: %u.%06u / %u.%06u / %u.%06u\n", (unsigned) tv_result.tv_sec,
(unsigned) tv_result.tv_usec,
DIV_SEC(tv_result.tv_sec, nb_bufs_sent),
DIV_USEC(tv_result.tv_sec, tv_result.tv_usec, nb_bufs_sent),
DIV_SEC(tv_result.tv_sec, nb_bufs_sent * WORDS_PER_BUF),
DIV_USEC(tv_result.tv_sec, tv_result.tv_usec, nb_bufs_sent * WORDS_PER_BUF));
printf("total_time: %u.%06u\n", (unsigned) tv_result.tv_sec,
(unsigned) tv_result.tv_usec);
if (end_calc())
{
fprintf(stderr, "uninitialization of calculation has failed\n");
wait_consumer();
return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
printf("[%p] Producer finished !\n", (void*) pthread_self());
/*
* When a producer end its thread-local storage vanished. Thus,
* producers must finish only after consumer has stopped using them
*/
wait_consumer();
if (end_producer_thread())
if (finalize_producer_thread(channel))
{
fprintf(stderr, "Uninitialization of thread has failed\n");
return &nb_prod; /* &nb_prod can't be NULL, whatever NULL is bound to */
fprintf(stderr, "Finalization of thread has failed\n");
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
return NULL;
}
void onMessage(void *val)
void on_message(void *val)
{
//printf("Receive value: %p\n", (void *) val);
}
void *receptor(void *a)
void *consumer(void *channel)
{
if (shared)
{
@ -349,44 +337,78 @@ void *receptor(void *a)
return NULL;
}
}
reception(onMessage);
pthread_mutex_lock(&mutex_cons_has_finished);
consumer_has_finished = 1;
pthread_cond_broadcast(&cond_cons_has_finished);
pthread_mutex_unlock(&mutex_cons_has_finished);
if (init_consumer_thread(channel))
{
fprintf(stderr, "Initialization of thread has failed\n");
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
if (block_reception)
{
long long total_data_received = 0;
void *data_buf[MAX_BLOCK_ENTRIES];
while (total_data_received < nb_bufs_sent * WORDS_PER_BUF)
{
int i;
ssize_t nb_data_received;
nb_data_received = recv_some_data(data_buf, MAX_BLOCK_ENTRIES);
total_data_received += nb_data_received;
for (i = 0; i < nb_data_received; i++)
on_message(data_buf[i]);
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");
}
}
else
{
int i, j;
for(i = 0; i < nb_bufs_sent; i++) {
//printf("[%p] About to receive %d new cache line%s\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE, (BUF_SIZE / CACHE_LINE_SIZE > 1) ? "s" : "");
for(j = 0; j < WORDS_PER_BUF; j++)
on_message(recv_one_data());
}
}
if (finalize_consumer_thread(channel))
{
fprintf(stderr, "Finalization of thread has failed\n");
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
}
return NULL;
}
int main(int argc, char *argv[])
{
int i, global_return_value = EXIT_SUCCESS;
void *return_value;
pthread_t *tid;
pthread_t tid[2];
int return_value;
void *pthread_return_value;
void *channel;
return_value = EXIT_SUCCESS;
if (analyse_options(argc, argv))
return EXIT_FAILURE;
page_size = sysconf(_SC_PAGE_SIZE);
if (page_size <= 0)
return EXIT_FAILURE;
if (init_library())
return EXIT_FAILURE;
tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t));
if (tid == NULL)
channel = create_comm_channel();
if (channel != NULL)
{
fprintf(stderr, "Failed to allocate %lu bytes needed for thread creation\n", (nb_prod + 1) * sizeof(pthread_t));
return EXIT_FAILURE;
pthread_create(&tid[0], NULL, producer, channel);
pthread_create(&tid[1], NULL, consumer, channel);
pthread_join(tid[0], &pthread_return_value);
if (pthread_return_value != NULL)
return_value = EXIT_FAILURE;
pthread_join(tid[1], &pthread_return_value);
if (pthread_return_value != NULL)
return_value = EXIT_FAILURE;
}
for(i = 0; i < nb_prod; i++)
pthread_create(&tid[i], NULL, producer, NULL);
pthread_create(&tid[i], NULL, receptor, NULL);
for(i = 0; i < nb_prod; i++)
{
pthread_join(tid[i], &return_value);
if (return_value != NULL)
global_return_value = EXIT_FAILURE;
}
pthread_join(tid[i], &return_value);
if (return_value != NULL)
global_return_value = EXIT_FAILURE;
free(tid);
if (end_library())
else
return_value = EXIT_FAILURE;
if (destroy_comm_channel(channel))
return_value = EXIT_FAILURE;
if (finalize_library())
return EXIT_FAILURE;
return global_return_value;
return return_value;
}