communication techniques bench: refactoring (1)

Main changes:
* change library initialization: initialization is done with
  init_library once and init_thread_comm by each thread
* cont is now directly accessed by main
* list of struct communication_assoc -> array of struct thread_comm
* struct communication_channel -> struct comm_channel
This commit is contained in:
Thomas Preud'homme 2009-06-17 18:15:16 +02:00 committed by Thomas Preud'homme
parent 68589566a7
commit 4cdee1503e
5 changed files with 58 additions and 119 deletions

View File

@ -10,27 +10,24 @@
#define toString(x) doStringification(x)
#define doStringification(x) #x
struct communication_channel
struct comm_channel
{
void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE)));
int state __attribute__ ((aligned (CACHE_LINE_SIZE)));
volatile void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE)));
volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE)));
int idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
};
struct communication_assoc
struct thread_comm
{
struct communication_assoc *next;
struct communication_assoc *prev;
pthread_t tid;
struct communication_channel *channel;
struct comm_channel *channel;
int receiver_idx;
};
extern struct communication_assoc assoc_root;
extern struct thread_comm *tcomms;
__BEGIN_DECLS
struct communication_assoc *create_comm_assoc(void);
void init_thread_comm(void);
static inline void send(void **addr) {
asm volatile("mov %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) " + " toString(CACHE_LINE_SIZE) ", %%eax\n\t"
"mov %0, %%gs:channel@NTPOFF(%%eax)\n\t"

View File

@ -10,16 +10,13 @@
#define unlikely(x) __builtin_expect(!!(x), 0)
extern volatile int cont;
extern long nb_prod;
__BEGIN_DECLS
void add_sender(void);
void remove_sender(void);
volatile int *init_comm(void);
void reception(void (*)(void *));
int init_library(void);
void reception(void (*)(volatile void *));
extern int swap_buffer;
void wait_initialization(void);
void discover_new_producers(void);
__END_DECLS

View File

@ -7,17 +7,24 @@
#include <specific_comm.h>
__thread struct communication_channel channel;
__thread struct comm_channel channel;
struct communication_assoc *create_comm_assoc(void)
void init_thread_comm(void)
{
struct communication_assoc *assoc;
static int i = 0;
static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER;
int i_local;
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
assoc->tid = pthread_self();
assoc->receiver_idx = 0;
assoc->channel = &channel;
return assoc;
pthread_mutex_lock(&i_lock);
i_local = i;
pthread_mutex_unlock(&i_lock);
tcomms[i].receiver_idx = 0;
tcomms[i].channel = &channel;
tcomms[i].channel->state = 0;
tcomms[i].channel->idx = 0;
pthread_mutex_lock(&i_lock);
i++;
pthread_mutex_unlock(&i_lock);
}
char *dstr="buffer transition\n";
@ -35,21 +42,18 @@ void _swap_buffer()
: : "m"(dstr));
}
void reception(void (*on_receive)(void *))
void reception(void (*on_receive)(volatile void *))
{
wait_initialization();
/* printf("Activate the consumer...\n"); */
while(cont)
while (cont)
{
struct communication_assoc *cur;
int i;
discover_new_producers();
cur = assoc_root.next;
while(cur != &assoc_root)
for (i = 0; i < nb_prod; i++)
{
struct communication_channel *channel = cur->channel;
if(channel->state)
if(tcomms[i].channel->state)
{
int j, n;
/*
* cur->receiver_idx point to the last cache
* line we have read. We go to the next cache
@ -59,14 +63,13 @@ void reception(void (*on_receive)(void *))
* line we correct the pointer to point to
* the first one (this is done by the modulo)
*/
int i = cur->receiver_idx;
int n = cur->receiver_idx + (BUF_SIZE / sizeof(void *));
cur->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; i<n; i++)
on_receive(channel->buf[i]);
channel->state = 0;
j = tcomms[i].receiver_idx;
n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *));
tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; j<n; j++)
on_receive(tcomms[i].channel->buf[j]);
tcomms[i].channel->state = 0;
}
cur = cur->next;
}
}
}

View File

@ -7,75 +7,16 @@
#include <specific_comm.h>
struct communication_assoc assoc_root;
static pthread_mutex_t assoc_lock = PTHREAD_MUTEX_INITIALIZER;
static struct communication_assoc assoc_tmp;
static volatile int init = 0;
struct thread_comm *tcomms;
volatile int cont = 1;
void initialize_library(void)
int init_library()
{
assoc_tmp.prev = &assoc_tmp;
assoc_tmp.next = &assoc_tmp;
assoc_root.prev = &assoc_root;
assoc_root.next = &assoc_root;
}
volatile int *init_comm(void)
{
return &cont;
}
void wait_initialization(void)
{
while (!init);
}
void add_sender(void)
{
struct communication_assoc *assoc;
assoc = create_comm_assoc();
pthread_mutex_lock(&assoc_lock);
if (!init)
tcomms = (struct thread_comm *) malloc(nb_prod * sizeof(struct thread_comm));
if (tcomms == NULL)
{
initialize_library();
init = 1;
}
assoc->next = assoc_tmp.next;
assoc_tmp.next->prev = assoc;
assoc->prev = &assoc_tmp;
assoc_tmp.next = assoc;
pthread_mutex_unlock(&assoc_lock);
}
void remove_sender()
{
printf("remove_communication_channel: Not yet implemented\n");
}
void discover_new_producers(void)
{
/* If there is some new thread for the write barrier */
if(&assoc_tmp != assoc_tmp.next)
{
/* printf("Adding a new set of producers\n"); */
pthread_mutex_lock(&assoc_lock);
/*
* list in assoc_tmp is inserted between assoc_root
* and the first elements of assoc_root list
*/
assoc_root.next->prev = assoc_tmp.prev;
assoc_tmp.prev->next = assoc_root.next;
assoc_root.next = assoc_tmp.next;
assoc_root.next->prev = &assoc_root;
/*
* assoc_tmp temporary list has been copied in
* assoc_root list. assoc_tmp is now alone and so
* double linked to itself
*/
assoc_tmp.prev = &assoc_tmp;
assoc_tmp.next = &assoc_tmp;
pthread_mutex_unlock(&assoc_lock);
fprintf(stderr, "Failed to allocate %lu bytes needed by the library to work\n", nb_prod * sizeof(struct thread_comm));
return -1;
}
return 0;
}

View File

@ -16,7 +16,7 @@
#include <specific_comm.h>
static long nb_cache_lines = 0;
static long nb_prod = 0;
long nb_prod = 0;
static long size_buf = 1;
static char *calculation_lib = NULL;
static int shared = 0;
@ -145,13 +145,12 @@ int analyse_options(int argc, char *argv[])
return 0;
}
void *producer(void *cont_ptr_void)
void *producer(void *unused)
{
int i, j;
void *k;
volatile int *cont;
cont = *((volatile int **) cont_ptr_void);
init_thread_comm();
if (shared)
{
pthread_t tid;
@ -166,9 +165,7 @@ void *producer(void *cont_ptr_void)
return NULL;
}
}
printf("Registering: %p !\n", (void*) pthread_self());
add_sender();
k = cont_ptr_void;
k = (void *) 0x6384923;
if (initialize_papi() != -1)
{
for(i = 0; i < nb_cache_lines; i++) {
@ -179,21 +176,20 @@ void *producer(void *cont_ptr_void)
print_results();
}
printf("[%p] Producer finished !\n", (void*) pthread_self());
remove_sender();
/*
* When a producer end its thread-local storage vanished. Thus,
* producers must finish only after consumer has stopped using them
*/
pthread_mutex_lock(&mutex_cons_has_finished);
if (++producers_ended == nb_prod)
*cont = 0;
cont = 0;
if (!consumer_has_finished)
pthread_cond_wait(&cond_cons_has_finished, &mutex_cons_has_finished);
pthread_mutex_unlock(&mutex_cons_has_finished);
return NULL;
}
void onMessage(void *val)
void onMessage(volatile void *val)
{
//printf("Receive value: %p\n", (void *) val);
}
@ -225,16 +221,21 @@ void *receptor(void *a)
int main(int argc, char *argv[])
{
int i;
volatile int *cont;
void *return_value;
pthread_t *tid;
if (analyse_options(argc, argv))
return EXIT_FAILURE;
if (init_library())
return EXIT_FAILURE;
tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t));
cont = init_comm();
if (tid == NULL)
{
fprintf(stderr, "Failed to allocate %lu bytes needed for thread creation\n", (nb_prod + 1) * sizeof(pthread_t));
return EXIT_FAILURE;
}
for(i = 0; i < nb_prod; i++)
pthread_create(&tid[i], NULL, producer, &cont);
pthread_create(&tid[i], NULL, producer, NULL);
pthread_create(&tid[i], NULL, receptor, NULL);
for(i = 0; i < nb_prod; i++)
pthread_join(tid[i], &return_value);