From 9b4388a964a038d449d991ccc8d61520fe16922f Mon Sep 17 00:00:00 2001 From: Thomas Preud'homme Date: Wed, 10 Jun 2009 09:58:50 +0200 Subject: [PATCH] Add benchs to compare communication techniques --- communication_techniques/.channel.d | 1 + communication_techniques/.main.d | 2 + communication_techniques/Makefile | 77 +++++++ .../include/asm_cache_comm.h | 51 +++++ .../include/common_comm.h | 23 +++ communication_techniques/include/pipe_comm.h | 35 ++++ .../include/shared_mem_comm.h | 36 ++++ communication_techniques/src/asm_cache.c | 72 +++++++ communication_techniques/src/common.c | 81 ++++++++ communication_techniques/src/main.c | 188 ++++++++++++++++++ communication_techniques/src/pipe.c | 47 +++++ communication_techniques/src/shared_mem.c | 57 ++++++ 12 files changed, 670 insertions(+) create mode 100644 communication_techniques/.channel.d create mode 100644 communication_techniques/.main.d create mode 100644 communication_techniques/Makefile create mode 100644 communication_techniques/include/asm_cache_comm.h create mode 100644 communication_techniques/include/common_comm.h create mode 100644 communication_techniques/include/pipe_comm.h create mode 100644 communication_techniques/include/shared_mem_comm.h create mode 100644 communication_techniques/src/asm_cache.c create mode 100644 communication_techniques/src/common.c create mode 100644 communication_techniques/src/main.c create mode 100644 communication_techniques/src/pipe.c create mode 100644 communication_techniques/src/shared_mem.c diff --git a/communication_techniques/.channel.d b/communication_techniques/.channel.d new file mode 100644 index 0000000..ba813ca --- /dev/null +++ b/communication_techniques/.channel.d @@ -0,0 +1 @@ +channel.o .channel.d: channel.c channel.h Makefile diff --git a/communication_techniques/.main.d b/communication_techniques/.main.d new file mode 100644 index 0000000..b8f3af5 --- /dev/null +++ b/communication_techniques/.main.d @@ -0,0 +1,2 @@ +main.o .main.d: main.c channel.h Makefile +channel.o .main.d: channel.h Makefile diff --git a/communication_techniques/Makefile b/communication_techniques/Makefile new file mode 100644 index 0000000..2b40d5d --- /dev/null +++ b/communication_techniques/Makefile @@ -0,0 +1,77 @@ +# Directories +OBJDIR=obj +BINDIR=bin +SRCDIR=src +INCDIR=include +LIBDIR=lib + +# Compilation flags +# I know -finline-functions and -finline-functions-called-once are enabled by +# -O3 but I did this in case gcc behaviour change one day +CFLAGS=-g -O3 -finline-functions -finline-functions-called-once -Wall -Werror +LDFLAGS=-L$(LIBDIR) -L$(HOME)/local/lib -Wl,-rpath-link,$(HOME)/local/lib -lpthread -lpapihighlevel + +# Executables +CC=gcc + +# Files +BINNAMES=asm_cache_comm c_cache_comm pipe_comm shared_mem_comm +BINS=$(patsubst %, $(BINDIR)/%, $(BINNAMES)) +MAIN_OBJS=main.o common.o +COMMON_LIB_OBJS=common.o + +.PHONY: all tidy clean distclean symlink +.SECONDARY: +.SUFFIXES: .c .o + +default: $(BINS) + +ifneq (,$(findstring $(MAKECMDGOALS),$(BINS))) +BASE_TARGET=$(patsubst $(BINDIR)/%_comm,%,$(MAKECMDGOALS)) + +# Compilation of binary +$(BINDIR)/$(BASE_TARGET)_comm: $(patsubst %,$(OBJDIR)/$(BASE_TARGET)_%,$(MAIN_OBJS)) $(LIBDIR)/lib$(BASE_TARGET).a + if [ ! -d $(BINDIR) ] ; then mkdir $(BINDIR) ; fi + $(CC) -o $@ $^ $(LDFLAGS) + +# Compilation of library +$(LIBDIR)/lib$(BASE_TARGET).a: $(OBJDIR)/$(BASE_TARGET).o $(patsubst %,$(OBJDIR)/$(BASE_TARGET)_%,$(COMMON_LIB_OBJS)) + if [ ! -d $(LIBDIR) ] ; then mkdir $(LIBDIR) ; fi + $(AR) -rcus $@ $^ + +# Compile common source files +$(OBJDIR)/$(BASE_TARGET)_%.o: $(SRCDIR)/%.c symlink + if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi + $(CC) $(CFLAGS) -I$(INCDIR) -I$(HOME)/local/include -c $< -o $@ +endif + +# Compile non common source files +$(OBJDIR)/%.o: $(SRCDIR)/%.c symlink + if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi + $(CC) $(CFLAGS) -I$(INCDIR) -c $< -o $@ + +symlink: + ln -sfT $(BASE_TARGET)_comm.h $(INCDIR)/specific_comm.h + +#.%.d: %.c +# gcc $(CFLAGS) -MM $^ | sed -e 's/\([^:]*\):\(.*\)/\1 $@: \2 Makefile/' > $@ + +tidy: + rm -f $(SRCDIR)/*~ \#* + +clean: + rm -f $(INCDIR)/specific_comm.h + rm -rf $(OBJDIR) + +distclean: clean + rm -rf $(BINDIR) $(LIBDIR) + +#ifneq ($(MAKECMDGOALS),tidy) +#ifneq ($(MAKECMDGOALS),clean) +#ifneq ($(MAKECMDGOALS),distclean) +# If the rules called is not a phony rules, then include the %.d makefile +# corresponding to all objects +#include $(patsubst %.o, .%.d, $(OBJ)) +#endif +#endif +#endif diff --git a/communication_techniques/include/asm_cache_comm.h b/communication_techniques/include/asm_cache_comm.h new file mode 100644 index 0000000..c2027c4 --- /dev/null +++ b/communication_techniques/include/asm_cache_comm.h @@ -0,0 +1,51 @@ +#ifndef _SPECIFIC_COMM_H_ +#define _SPECIFIC_COMM_H_ 1 + +#include + +/* Non standard include */ +#include + +/* This is not an error, we need this two-macro system */ +#define toString(x) doStringification(x) +#define doStringification(x) #x + +struct communication_channel +{ + uintptr_t buf[2 * BUF_SIZE / sizeof(uintptr_t)] __attribute__ ((aligned (CACHE_LINE_SIZE))); + int state __attribute__ ((aligned (CACHE_LINE_SIZE))); + int idx __attribute__ ((aligned (CACHE_LINE_SIZE))); +}; + +struct communication_assoc +{ + struct communication_assoc *next; + struct communication_assoc *prev; + pthread_t tid; + struct communication_channel *channel; + int receiver_idx; +}; + +extern struct communication_assoc assoc_root; + +__BEGIN_DECLS + +struct communication_assoc *create_comm_assoc(void); +static inline void send(uintptr_t value) { + asm volatile("mov %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) " + " toString(CACHE_LINE_SIZE) ", %%eax\n\t" + "mov %0, %%gs:channel@NTPOFF(%%eax)\n\t" + "addl $4, %%eax\n\t" + "andl $(2*" toString(BUF_SIZE) "-1), %%eax\n\t" + "mov %%eax, %%gs:channel@NTPOFF + 2 * " toString(BUF_SIZE)" + " toString(CACHE_LINE_SIZE) "\n\t" + "test $(" toString(BUF_SIZE) " - 1), %%eax\n\t" + "mov $2f, %%eax\n\t" + "jz swap_buffer\n\t" + "2:" + : + : "r"(value) + : "%eax"); +} + +__END_DECLS + +#endif diff --git a/communication_techniques/include/common_comm.h b/communication_techniques/include/common_comm.h new file mode 100644 index 0000000..027d6e2 --- /dev/null +++ b/communication_techniques/include/common_comm.h @@ -0,0 +1,23 @@ +#ifndef _COMMON_COMM_H_ +#define _COMMON_COMM_H_ 1 + +#include + +#define CACHE_LINE_SIZE 128 +#define BUF_SIZE CACHE_LINE_SIZE + +extern volatile int cont; + +__BEGIN_DECLS + +void add_sender(void); +void remove_sender(void); +volatile int *init_comm(void); +void reception(void (*)(uintptr_t)); +extern int swap_buffer; +void wait_initialization(void); +void discover_new_producers(void); + +__END_DECLS + +#endif diff --git a/communication_techniques/include/pipe_comm.h b/communication_techniques/include/pipe_comm.h new file mode 100644 index 0000000..5873fb6 --- /dev/null +++ b/communication_techniques/include/pipe_comm.h @@ -0,0 +1,35 @@ +#ifndef __COMM_H_ +#define __COMM_H_ 1 + +#include +#include + +/* Non standart include */ +#include + +#define READ_IDX 0 +#define WRITE_IDX 1 + +struct communication_assoc +{ + struct communication_assoc *next; + struct communication_assoc *prev; + pthread_t tid; + int *pipefd; + struct communication_channel *channel; + int receiver_idx; +}; + +__BEGIN_DECLS + +extern __thread int pipefd[]; +extern struct communication_assoc assoc_root; + +struct communication_assoc *create_comm_assoc(void); +static inline void send(uintptr_t value) { + write(pipefd[WRITE_IDX], &value, sizeof(uintptr_t)); +} + +__END_DECLS + +#endif diff --git a/communication_techniques/include/shared_mem_comm.h b/communication_techniques/include/shared_mem_comm.h new file mode 100644 index 0000000..b50b5f2 --- /dev/null +++ b/communication_techniques/include/shared_mem_comm.h @@ -0,0 +1,36 @@ +#ifndef __COMM_H_ +#define __COMM_H_ 1 + +#include + +/* Non standard include */ +#include + +#define SHARED_SPACE_SIZE (2 * CACHE_LINE_SIZE) + +struct communication_assoc +{ + struct communication_assoc *next; + struct communication_assoc *prev; + pthread_t tid; + uintptr_t *shared_space; + int *cons_idx; + int *prod_idx; +}; + +extern struct communication_assoc assoc_root; + +__BEGIN_DECLS + +extern __thread uintptr_t *shared_space; +extern __thread int prod_idx; + +struct communication_assoc *create_comm_assoc(void); +static inline void send(uintptr_t value) { + shared_space[prod_idx] = value; + prod_idx = (prod_idx + 1) % SHARED_SPACE_SIZE; +} + +__END_DECLS + +#endif diff --git a/communication_techniques/src/asm_cache.c b/communication_techniques/src/asm_cache.c new file mode 100644 index 0000000..2f48c2e --- /dev/null +++ b/communication_techniques/src/asm_cache.c @@ -0,0 +1,72 @@ +#include +#include +#include + +/* Non standard include */ +#include +#include + + +__thread struct communication_channel channel; + +struct communication_assoc *create_comm_assoc(void) +{ + struct communication_assoc *assoc; + + assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc)); + assoc->tid = pthread_self(); + assoc->receiver_idx = 0; + assoc->channel = &channel; + return assoc; +} + +char *dstr="buffer transition\n"; + +void _swap_buffer() +{ + asm volatile(".globl swap_buffer\n\t" + "swap_buffer:\n\t" + + "1:\n\t" + "testl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" + "jnz 1b\n\t" + "movl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" + "jmp *%%eax\n\t" + : : "m"(dstr)); +} + +void reception(void (*on_receive)(uintptr_t)) +{ + wait_initialization(); + /* printf("Activate the consumer...\n"); */ + while(cont) + { + struct communication_assoc *cur; + + discover_new_producers(); + cur = assoc_root.next; + while(cur != &assoc_root) + { + struct communication_channel *channel = cur->channel; + if(channel->state) + { + /* + * cur->receiver_idx point to the last cache + * line we have read. We go to the next cache + * line "+ (CACHE_LINE_SIZE >> 2)" (because + * the line is full of integer (2^2 octets) + * and then if we are after the second cache + * line we correct the pointer to point to + * the first one (this is done by the modulo) + */ + int i = cur->receiver_idx; + int n = cur->receiver_idx + (BUF_SIZE / sizeof(uintptr_t)); + cur->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(uintptr_t)); + for(; ibuf[i]); + channel->state = 0; + } + cur = cur->next; + } + } +} diff --git a/communication_techniques/src/common.c b/communication_techniques/src/common.c new file mode 100644 index 0000000..febfa5d --- /dev/null +++ b/communication_techniques/src/common.c @@ -0,0 +1,81 @@ +#include +#include +#include +#include + +/* Non standard include */ +#include + + +struct communication_assoc assoc_root; +static pthread_mutex_t assoc_lock = PTHREAD_MUTEX_INITIALIZER; +static struct communication_assoc assoc_tmp; +static volatile int init = 0; +volatile int cont = 1; + +void initialize_library(void) +{ + assoc_tmp.prev = &assoc_tmp; + assoc_tmp.next = &assoc_tmp; + assoc_root.prev = &assoc_root; + assoc_root.next = &assoc_root; +} + +volatile int *init_comm(void) +{ + return &cont; +} + +void wait_initialization(void) +{ + while (!init); +} + +void add_sender(void) +{ + struct communication_assoc *assoc; + + assoc = create_comm_assoc(); + pthread_mutex_lock(&assoc_lock); + if (!init) + { + initialize_library(); + init = 1; + } + assoc->next = assoc_tmp.next; + assoc_tmp.next->prev = assoc; + assoc->prev = &assoc_tmp; + assoc_tmp.next = assoc; + pthread_mutex_unlock(&assoc_lock); +} + +void remove_sender() +{ + printf("remove_communication_channel: Not yet implemented\n"); +} + +void discover_new_producers(void) +{ + /* If there is some new thread for the write barrier */ + if(&assoc_tmp != assoc_tmp.next) + { + /* printf("Adding a new set of producers\n"); */ + pthread_mutex_lock(&assoc_lock); + /* + * list in assoc_tmp is inserted between assoc_root + * and the first elements of assoc_root list + */ + assoc_root.next->prev = assoc_tmp.prev; + assoc_tmp.prev->next = assoc_root.next; + assoc_root.next = assoc_tmp.next; + assoc_root.next->prev = &assoc_root; + /* + * assoc_tmp temporary list has been copied in + * assoc_root list. assoc_tmp is now alone and so + * double linked to itself + */ + assoc_tmp.prev = &assoc_tmp; + assoc_tmp.next = &assoc_tmp; + pthread_mutex_unlock(&assoc_lock); + } +} diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c new file mode 100644 index 0000000..d0c61fe --- /dev/null +++ b/communication_techniques/src/main.c @@ -0,0 +1,188 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Non standards includes */ +#include +#include +#include + +static long nb_cache_lines = 0; +static long nb_prod = 0; +static long size_buf = 1; +static char *calculation_lib = NULL; + +void usage(char *argv[]) +{ + char format[] = "-n [options]"; + char options[] = "Required options :\n" + "-n nb_cache_lines\tNumber of cache lines to send to another core\n" + "-p nb_producers\tNumber of producers which send data to another core\n" + "Facultative options :\n" + "-h\t\t\tPrint this help\n" + "-s\t\t\tShare the same L2 cache or not\n" + "-c calculation_lib\tLibrary to use for calculation\n" + "\t\t\tThis library must implement functions in calc.h\n"; + printf("Usage : %s %s\n", argv[0], format); + printf("Options :\n"); + printf("%s\n", options); +} + +int analyse_options(int argc, char *argv[]) +{ + int opt; + + opterr = 0; + while ((opt = getopt(argc, argv, ":hsc:n:p:b:")) != -1) + { + switch (opt) + { + case 'b' : + { + char *inval; + size_buf = strtol(optarg, &inval, 10); + if ((*optarg == '\0') || (*inval != '\0')) + { + fprintf(stderr, "Option '-b' needs an integer argument\n"); + return -1; + } + if ((nb_cache_lines <= 0) || ((nb_cache_lines == LONG_MAX) && errno == ERANGE)) + { + fprintf(stderr, "Number of cache lines for each buffer must be between 1 and %ld, both inclusive\n", LONG_MAX); + return -1; + } + } + break; + case 'c' : + calculation_lib = optarg; + { + struct stat file_stat; + if (stat(calculation_lib, &file_stat)) + { + printf("%s: %s\n", optarg, strerror(errno)); + return -1; + } + } + break; + case 'h' : + usage(argv); + exit(EXIT_SUCCESS); + case 'n' : + { + char *inval; + nb_cache_lines = strtol(optarg, &inval, 10); + if ((*optarg == '\0') || (*inval != '\0')) + { + fprintf(stderr, "Option '-n' needs an integer argument\n"); + return -1; + } + if ((nb_cache_lines <= 0) || ((nb_cache_lines == LONG_MAX) && errno == ERANGE)) + { + fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX); + return -1; + } + } + break; + case 'p' : + { + char *inval; + nb_prod = strtol(optarg, &inval, 10); + if ((*optarg == '\0') || (*inval != '\0')) + { + fprintf(stderr, "Option '-p' needs an integer argument\n"); + return -1; + } + if ((nb_cache_lines <= 0) || ((nb_cache_lines == LONG_MAX) && errno == ERANGE)) + { + fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX); + return -1; + } + } + break; + case 's' : + /* TODO: shared L2 cache */ + break; + case '?' : + fprintf(stderr, "Option inconnue\n"); + return -1; + case ':' : + fprintf(stderr, "Option %s needs an argument\n", argv[optind]); + return -1; + default : + fprintf(stderr, "Error while analysing command line options\n"); + return -1; + } + } + if (!nb_cache_lines) + { + fprintf(stderr, "You must give the number of cache lines to be sent\n"); + return -1; + } + if (!nb_prod) + { + fprintf(stderr, "You must give the number of producers\n"); + return -1; + } + return 0; +} + +void *producer(void *unused) +{ + int i, j, k; + + printf("Registering: %p !\n", (void*) pthread_self()); + add_sender(); + k = (uintptr_t) pthread_self(); + if (initialize_papi() != -1) + { + for(i = 0; i < nb_cache_lines; i++) { + //printf("[%p] Send a new CACHE_LINE\n", (void *) pthread_self()); + for(j = 0; j < (CACHE_LINE_SIZE / sizeof(uintptr_t)); j++) + send(k++); + } + print_results(); + } + printf("[%p] Producer finished !\n", (void*) pthread_self()); + remove_sender(); + /* Threads must wait the consumer because of thread-local storages */ + return NULL; +} + +void onMessage(uintptr_t val) +{ + //printf("Receive value: %p\n", (void *) val); +} + +void *receptor(void *a) +{ + reception(onMessage); + return NULL; +} + +int main(int argc, char *argv[]) +{ + int i; + volatile int *cont; + void *return_value; + pthread_t *tid; + + if (analyse_options(argc, argv)) + return EXIT_FAILURE; + tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t)); + cont = init_comm(); + for(i = 0; i < nb_prod; i++) + pthread_create(&tid[i], NULL, producer, NULL); + pthread_create(&tid[i], NULL, receptor, NULL); + for(i = 0; i < nb_prod; i++) + pthread_join(tid[i], &return_value); + *cont = 0; + pthread_join(tid[i], &return_value); + free(tid); + return EXIT_SUCCESS; +} diff --git a/communication_techniques/src/pipe.c b/communication_techniques/src/pipe.c new file mode 100644 index 0000000..da70d00 --- /dev/null +++ b/communication_techniques/src/pipe.c @@ -0,0 +1,47 @@ +#include +#include +#include +#include + +/* Non standard include */ +#include +#include + + +__thread int pipefd[2]; + +struct communication_assoc *create_comm_assoc(void) +{ + struct communication_assoc *assoc; + + pipe(pipefd); + assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc)); + assoc->tid = pthread_self(); + assoc->pipefd = pipefd; + return assoc; +} + +void reception(void (*on_receive)(uintptr_t)) +{ + wait_initialization(); + /* printf("Activate the consumer...\n"); */ + while(cont) + { + struct communication_assoc *cur; + + discover_new_producers(); + cur = assoc_root.next; + while(cur != &assoc_root) + { + int i; + + for(i = 0; i < BUF_SIZE / sizeof(uintptr_t); i++) + { + uintptr_t tmp; + read(cur->pipefd[READ_IDX], &tmp, sizeof(uintptr_t)); + on_receive(tmp); + } + cur = cur->next; + } + } +} diff --git a/communication_techniques/src/shared_mem.c b/communication_techniques/src/shared_mem.c new file mode 100644 index 0000000..15249f6 --- /dev/null +++ b/communication_techniques/src/shared_mem.c @@ -0,0 +1,57 @@ +#include +#include +#include +#include + +/* Non standard include */ +#include +#include + + +__thread uintptr_t *shared_space; +__thread int cons_idx = 0; +__thread int prod_idx = 0; + +struct communication_assoc *create_comm_assoc(void) +{ + struct communication_assoc *assoc; + + shared_space = (uintptr_t *) malloc(SHARED_SPACE_SIZE * sizeof(uintptr_t)); + assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc)); + assoc->tid = pthread_self(); + assoc->shared_space = shared_space; + assoc->cons_idx = &cons_idx; + assoc->prod_idx = &prod_idx; + return assoc; +} + +void reception(void (*on_receive)(uintptr_t)) +{ + wait_initialization(); + /* printf("Activate the consumer...\n"); */ + while(cont) + { + struct communication_assoc *cur; + + discover_new_producers(); + cur = assoc_root.next; + while(cur != &assoc_root) + { + int cons_idx, prod_idx; + + cons_idx = *cur->cons_idx; + do + { + prod_idx = *cur->prod_idx; + for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_SIZE) + { + uintptr_t tmp; + tmp = cur->shared_space[cons_idx]; + on_receive(tmp); + } + } while (prod_idx != *cur->prod_idx); + *cur->cons_idx = cons_idx; + cur = cur->next; + } + } +}