From 4cdee1503e9cc20ba12f9d882dbc5d1a8a103bbc Mon Sep 17 00:00:00 2001 From: Thomas Preud'homme Date: Wed, 17 Jun 2009 18:15:16 +0200 Subject: [PATCH] communication techniques bench: refactoring (1) Main changes: * change library initialization: initialization is done with init_library once and init_thread_comm by each thread * cont is now directly accessed by main * list of struct communication_assoc -> array of struct thread_comm * struct communication_channel -> struct comm_channel --- .../include/asm_cache_comm.h | 17 ++--- .../include/common_comm.h | 9 +-- communication_techniques/src/asm_cache.c | 51 +++++++------ communication_techniques/src/common.c | 73 ++----------------- communication_techniques/src/main.c | 27 +++---- 5 files changed, 58 insertions(+), 119 deletions(-) diff --git a/communication_techniques/include/asm_cache_comm.h b/communication_techniques/include/asm_cache_comm.h index 16b8871..4c4b9eb 100644 --- a/communication_techniques/include/asm_cache_comm.h +++ b/communication_techniques/include/asm_cache_comm.h @@ -10,27 +10,24 @@ #define toString(x) doStringification(x) #define doStringification(x) #x -struct communication_channel +struct comm_channel { - void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); - int state __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); + volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE))); int idx __attribute__ ((aligned (CACHE_LINE_SIZE))); }; -struct communication_assoc +struct thread_comm { - struct communication_assoc *next; - struct communication_assoc *prev; - pthread_t tid; - struct communication_channel *channel; + struct comm_channel *channel; int receiver_idx; }; -extern struct communication_assoc assoc_root; +extern struct thread_comm *tcomms; __BEGIN_DECLS -struct communication_assoc *create_comm_assoc(void); +void init_thread_comm(void); static inline void send(void **addr) { asm volatile("mov %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) " + " toString(CACHE_LINE_SIZE) ", %%eax\n\t" "mov %0, %%gs:channel@NTPOFF(%%eax)\n\t" diff --git a/communication_techniques/include/common_comm.h b/communication_techniques/include/common_comm.h index 2f5d17b..8e6d2e8 100644 --- a/communication_techniques/include/common_comm.h +++ b/communication_techniques/include/common_comm.h @@ -10,16 +10,13 @@ #define unlikely(x) __builtin_expect(!!(x), 0) extern volatile int cont; +extern long nb_prod; __BEGIN_DECLS -void add_sender(void); -void remove_sender(void); -volatile int *init_comm(void); -void reception(void (*)(void *)); +int init_library(void); +void reception(void (*)(volatile void *)); extern int swap_buffer; -void wait_initialization(void); -void discover_new_producers(void); __END_DECLS diff --git a/communication_techniques/src/asm_cache.c b/communication_techniques/src/asm_cache.c index f4fffb9..e6903b1 100644 --- a/communication_techniques/src/asm_cache.c +++ b/communication_techniques/src/asm_cache.c @@ -7,17 +7,24 @@ #include -__thread struct communication_channel channel; +__thread struct comm_channel channel; -struct communication_assoc *create_comm_assoc(void) +void init_thread_comm(void) { - struct communication_assoc *assoc; + static int i = 0; + static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER; + int i_local; - assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc)); - assoc->tid = pthread_self(); - assoc->receiver_idx = 0; - assoc->channel = &channel; - return assoc; + pthread_mutex_lock(&i_lock); + i_local = i; + pthread_mutex_unlock(&i_lock); + tcomms[i].receiver_idx = 0; + tcomms[i].channel = &channel; + tcomms[i].channel->state = 0; + tcomms[i].channel->idx = 0; + pthread_mutex_lock(&i_lock); + i++; + pthread_mutex_unlock(&i_lock); } char *dstr="buffer transition\n"; @@ -35,21 +42,18 @@ void _swap_buffer() : : "m"(dstr)); } -void reception(void (*on_receive)(void *)) +void reception(void (*on_receive)(volatile void *)) { - wait_initialization(); /* printf("Activate the consumer...\n"); */ - while(cont) + while (cont) { - struct communication_assoc *cur; + int i; - discover_new_producers(); - cur = assoc_root.next; - while(cur != &assoc_root) + for (i = 0; i < nb_prod; i++) { - struct communication_channel *channel = cur->channel; - if(channel->state) + if(tcomms[i].channel->state) { + int j, n; /* * cur->receiver_idx point to the last cache * line we have read. We go to the next cache @@ -59,14 +63,13 @@ void reception(void (*on_receive)(void *)) * line we correct the pointer to point to * the first one (this is done by the modulo) */ - int i = cur->receiver_idx; - int n = cur->receiver_idx + (BUF_SIZE / sizeof(void *)); - cur->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); - for(; ibuf[i]); - channel->state = 0; + j = tcomms[i].receiver_idx; + n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *)); + tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); + for(; jbuf[j]); + tcomms[i].channel->state = 0; } - cur = cur->next; } } } diff --git a/communication_techniques/src/common.c b/communication_techniques/src/common.c index febfa5d..4c41510 100644 --- a/communication_techniques/src/common.c +++ b/communication_techniques/src/common.c @@ -7,75 +7,16 @@ #include -struct communication_assoc assoc_root; -static pthread_mutex_t assoc_lock = PTHREAD_MUTEX_INITIALIZER; -static struct communication_assoc assoc_tmp; -static volatile int init = 0; +struct thread_comm *tcomms; volatile int cont = 1; -void initialize_library(void) +int init_library() { - assoc_tmp.prev = &assoc_tmp; - assoc_tmp.next = &assoc_tmp; - assoc_root.prev = &assoc_root; - assoc_root.next = &assoc_root; -} - -volatile int *init_comm(void) -{ - return &cont; -} - -void wait_initialization(void) -{ - while (!init); -} - -void add_sender(void) -{ - struct communication_assoc *assoc; - - assoc = create_comm_assoc(); - pthread_mutex_lock(&assoc_lock); - if (!init) + tcomms = (struct thread_comm *) malloc(nb_prod * sizeof(struct thread_comm)); + if (tcomms == NULL) { - initialize_library(); - init = 1; - } - assoc->next = assoc_tmp.next; - assoc_tmp.next->prev = assoc; - assoc->prev = &assoc_tmp; - assoc_tmp.next = assoc; - pthread_mutex_unlock(&assoc_lock); -} - -void remove_sender() -{ - printf("remove_communication_channel: Not yet implemented\n"); -} - -void discover_new_producers(void) -{ - /* If there is some new thread for the write barrier */ - if(&assoc_tmp != assoc_tmp.next) - { - /* printf("Adding a new set of producers\n"); */ - pthread_mutex_lock(&assoc_lock); - /* - * list in assoc_tmp is inserted between assoc_root - * and the first elements of assoc_root list - */ - assoc_root.next->prev = assoc_tmp.prev; - assoc_tmp.prev->next = assoc_root.next; - assoc_root.next = assoc_tmp.next; - assoc_root.next->prev = &assoc_root; - /* - * assoc_tmp temporary list has been copied in - * assoc_root list. assoc_tmp is now alone and so - * double linked to itself - */ - assoc_tmp.prev = &assoc_tmp; - assoc_tmp.next = &assoc_tmp; - pthread_mutex_unlock(&assoc_lock); + fprintf(stderr, "Failed to allocate %lu bytes needed by the library to work\n", nb_prod * sizeof(struct thread_comm)); + return -1; } + return 0; } diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c index fa34f72..7fc9685 100644 --- a/communication_techniques/src/main.c +++ b/communication_techniques/src/main.c @@ -16,7 +16,7 @@ #include static long nb_cache_lines = 0; -static long nb_prod = 0; +long nb_prod = 0; static long size_buf = 1; static char *calculation_lib = NULL; static int shared = 0; @@ -145,13 +145,12 @@ int analyse_options(int argc, char *argv[]) return 0; } -void *producer(void *cont_ptr_void) +void *producer(void *unused) { int i, j; void *k; - volatile int *cont; - cont = *((volatile int **) cont_ptr_void); + init_thread_comm(); if (shared) { pthread_t tid; @@ -166,9 +165,7 @@ void *producer(void *cont_ptr_void) return NULL; } } - printf("Registering: %p !\n", (void*) pthread_self()); - add_sender(); - k = cont_ptr_void; + k = (void *) 0x6384923; if (initialize_papi() != -1) { for(i = 0; i < nb_cache_lines; i++) { @@ -179,21 +176,20 @@ void *producer(void *cont_ptr_void) print_results(); } printf("[%p] Producer finished !\n", (void*) pthread_self()); - remove_sender(); /* * When a producer end its thread-local storage vanished. Thus, * producers must finish only after consumer has stopped using them */ pthread_mutex_lock(&mutex_cons_has_finished); if (++producers_ended == nb_prod) - *cont = 0; + cont = 0; if (!consumer_has_finished) pthread_cond_wait(&cond_cons_has_finished, &mutex_cons_has_finished); pthread_mutex_unlock(&mutex_cons_has_finished); return NULL; } -void onMessage(void *val) +void onMessage(volatile void *val) { //printf("Receive value: %p\n", (void *) val); } @@ -225,16 +221,21 @@ void *receptor(void *a) int main(int argc, char *argv[]) { int i; - volatile int *cont; void *return_value; pthread_t *tid; if (analyse_options(argc, argv)) return EXIT_FAILURE; + if (init_library()) + return EXIT_FAILURE; tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t)); - cont = init_comm(); + if (tid == NULL) + { + fprintf(stderr, "Failed to allocate %lu bytes needed for thread creation\n", (nb_prod + 1) * sizeof(pthread_t)); + return EXIT_FAILURE; + } for(i = 0; i < nb_prod; i++) - pthread_create(&tid[i], NULL, producer, &cont); + pthread_create(&tid[i], NULL, producer, NULL); pthread_create(&tid[i], NULL, receptor, NULL); for(i = 0; i < nb_prod; i++) pthread_join(tid[i], &return_value);