communication techniques bench: refactoring (2)

Main changes:

* Better separation of what is common from what is specific to a
  communication technique
* Consumer wait initialization of all producer threads
This commit is contained in:
Thomas Preud'homme 2009-06-18 14:48:07 +02:00 committed by Thomas Preud'homme
parent b5575aa710
commit 274f3cfd1b
18 changed files with 207 additions and 203 deletions

View File

@ -1,15 +1,15 @@
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
#include <stdint.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
/* This is not an error, we need this two-macro system */
#define toString(x) doStringification(x)
#define doStringification(x) #x
__BEGIN_DECLS
struct comm_channel
{
volatile void *buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE)));
@ -21,13 +21,13 @@ struct thread_comm
{
struct comm_channel *channel;
int receiver_idx;
int unused;
};
extern struct thread_comm *tcomms;
extern int swap_buffer;
__BEGIN_DECLS
void init_thread_comm(void);
void init_thread_comm(struct thread_comm *);
static inline void send(void **addr)
{
asm volatile("mov %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) " + " toString(CACHE_LINE_SIZE) ", %%eax\n\t"

View File

@ -1,10 +1,8 @@
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
#include <stdint.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
/* This is not an error, we need this two-macro system */
#define toString(x) doStringification(x)
@ -28,7 +26,7 @@ extern __thread struct comm_channel channel;
__BEGIN_DECLS
void init_thread_comm(void);
void init_thread_comm(struct thread_comm *);
static inline void send(void **addr)
{
channel.buf[channel.idx++] = addr;

View File

@ -1,7 +1,5 @@
#ifndef _COMMON_COMM_H_
#define _COMMON_COMM_H_ 1
#include <stdint.h>
#ifndef _COMMTECH_H_
#define _COMMTECH_H_ 1
#define CACHE_LINE_SIZE 128
#define BUF_SIZE CACHE_LINE_SIZE
@ -15,8 +13,8 @@ extern long nb_prod;
__BEGIN_DECLS
int init_library(void);
void reception(void (*)(volatile void *));
extern int swap_buffer;
void init_producer_thread(void);
void reception(void (*)(void *));
__END_DECLS

View File

@ -1,25 +1,18 @@
#ifndef __COMM_H_
#define __COMM_H_ 1
#include <stdint.h>
/* Non standard include */
#include <common_comm.h>
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
#define FAKE_NURSERY_START 1
struct communication_assoc
struct thread_comm
{
struct communication_assoc *next;
struct communication_assoc *prev;
pthread_t tid;
int unused;
};
extern struct communication_assoc assoc_root;
extern struct thread_comm *tcomms;
__BEGIN_DECLS
struct communication_assoc *create_comm_assoc(void);
void init_thread_comm(struct thread_comm *);
static inline void send(void **addr) {
static __thread void **store_var = NULL;
store_var = addr;

View File

@ -1,32 +1,28 @@
#ifndef __COMM_H_
#define __COMM_H_ 1
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
#include <stdint.h>
/* Non standard include */
#include <common_comm.h>
#define FAKE_NURSERY_START 1
#define REALLY_FAKE_NURSERY_START ((uintptr_t) -1)
#define ANOTHER_FAKE_NURSERY_START ((uintptr_t) -1)
struct communication_assoc
struct thread_comm
{
struct communication_assoc *next;
struct communication_assoc *prev;
pthread_t tid;
int unused;
};
extern struct communication_assoc assoc_root;
extern struct thread_comm *tcomms;
__BEGIN_DECLS
struct communication_assoc *create_comm_assoc(void);
void init_thread_comm(struct thread_comm *);
void insert(void *);
static inline void send(void **addr) {
static inline void send(void **addr)
{
void *ptd_addr;
ptd_addr = *addr; /* NOTE: Not done in real jikes barrier */
if (!((uintptr_t) addr >= REALLY_FAKE_NURSERY_START) && ((uintptr_t) ptd_addr >= FAKE_NURSERY_START))
if (!((uintptr_t) addr >= ANOTHER_FAKE_NURSERY_START) && ((uintptr_t) ptd_addr >= FAKE_NURSERY_START))
insert(addr);
}

View File

@ -1,31 +1,22 @@
#ifndef __COMM_H_
#define __COMM_H_ 1
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
#include <stdint.h>
#include <unistd.h>
/* Non standart include */
#include <specific_comm.h>
#define READ_IDX 0
#define WRITE_IDX 1
struct communication_assoc
struct thread_comm
{
struct communication_assoc *next;
struct communication_assoc *prev;
pthread_t tid;
int *pipefd;
struct communication_channel *channel;
int receiver_idx;
};
__BEGIN_DECLS
extern __thread int pipefd[];
extern struct communication_assoc assoc_root;
extern struct thread_comm *tcomms;
struct communication_assoc *create_comm_assoc(void);
void init_thread_comm(struct thread_comm *);
static inline void send(void **addr) {
write(pipefd[WRITE_IDX], &addr, sizeof(void *));
}

View File

@ -0,0 +1,10 @@
#ifndef _PRIVATE_COMMON_H_
#define _PRIVATE_COMMON_H_ 1
__BEGIN_DECLS
void wait_initialization(void);
__END_DECLS
#endif

View File

@ -1,33 +1,28 @@
#ifndef __COMM_H_
#define __COMM_H_ 1
#include <stdint.h>
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#define SHARED_SPACE_SIZE (2 * BUF_SIZE)
#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *))
struct communication_assoc
struct thread_comm
{
struct communication_assoc *next;
struct communication_assoc *prev;
pthread_t tid;
void **shared_space;
volatile void **shared_space;
volatile int *cons_idx;
volatile int *prod_idx;
};
extern struct communication_assoc assoc_root;
extern struct thread_comm *tcomms;
__BEGIN_DECLS
extern __thread void **shared_space;
extern __thread volatile void **shared_space;
extern __thread volatile int prod_idx;
extern __thread volatile int cons_idx;
struct communication_assoc *create_comm_assoc(void);
void init_thread_comm(struct thread_comm *);
static inline void send(void **addr) {
while ((prod_idx + 1) % SHARED_SPACE_VOIDPTR == cons_idx);
shared_space[prod_idx] = addr;

View File

@ -1,33 +1,28 @@
#ifndef __COMM_H_
#define __COMM_H_ 1
#include <stdint.h>
#ifndef _SPECIFIC_COMM_H_
#define _SPECIFIC_COMM_H_ 1
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#define SHARED_SPACE_SIZE (2 * BUF_SIZE)
#define SHARED_SPACE_VOIDPTR (SHARED_SPACE_SIZE / sizeof(void *))
struct communication_assoc
struct thread_comm
{
struct communication_assoc *next;
struct communication_assoc *prev;
pthread_t tid;
void **shared_space;
volatile void **shared_space;
volatile int *cons_idx;
volatile int *prod_idx;
};
extern struct communication_assoc assoc_root;
extern struct thread_comm *tcomms;
__BEGIN_DECLS
extern __thread void **shared_space;
extern __thread volatile void **shared_space;
extern __thread volatile int prod_idx;
extern __thread volatile int cons_idx;
struct communication_assoc *create_comm_assoc(void);
void init_thread_comm(struct thread_comm *);
static inline void send(void **addr) {
static __thread int local_cons_idx = 0;

View File

@ -3,28 +3,19 @@
#include <pthread.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread struct comm_channel channel;
void init_thread_comm(void)
void init_thread_comm(struct thread_comm *comm)
{
static int i = 0;
static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER;
int i_local;
pthread_mutex_lock(&i_lock);
i_local = i;
pthread_mutex_unlock(&i_lock);
tcomms[i].receiver_idx = 0;
tcomms[i].channel = &channel;
tcomms[i].channel->state = 0;
tcomms[i].channel->idx = 0;
pthread_mutex_lock(&i_lock);
i++;
pthread_mutex_unlock(&i_lock);
comm->receiver_idx = 0;
comm->channel = &channel;
comm->channel->state = 0;
comm->channel->idx = 0;
}
char *dstr="buffer transition\n";
@ -42,8 +33,9 @@ void _swap_buffer()
: : "m"(dstr));
}
void reception(void (*on_receive)(volatile void *))
void reception(void (*on_receive)(void *))
{
wait_initialization();
/* printf("Activate the consumer...\n"); */
while (cont)
{
@ -67,7 +59,13 @@ void reception(void (*on_receive)(volatile void *))
n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *));
tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; j<n; j++)
on_receive(tcomms[i].channel->buf[j]);
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].channel->buf[j]);
}
tcomms[i].channel->state = 0;
}
}

View File

@ -3,34 +3,26 @@
#include <pthread.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread struct comm_channel channel;
void init_thread_comm(void)
void init_thread_comm(struct thread_comm *comm)
{
static int i = 0;
static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER;
int i_local;
pthread_mutex_lock(&i_lock);
i_local = i;
pthread_mutex_unlock(&i_lock);
tcomms[i].receiver_idx = 0;
tcomms[i].channel = &channel;
tcomms[i].channel->state = 0;
tcomms[i].channel->idx = 0;
pthread_mutex_lock(&i_lock);
i++;
pthread_mutex_unlock(&i_lock);
comm->receiver_idx = 0;
comm->channel = &channel;
comm->channel->state = 0;
comm->channel->idx = 0;
}
char *dstr="buffer transition\n";
void reception(void (*on_receive)(volatile void *))
void reception(void (*on_receive)(void *))
{
wait_initialization();
/* printf("Activate the consumer...\n"); */
while (cont)
{
@ -54,7 +46,13 @@ void reception(void (*on_receive)(volatile void *))
n = tcomms[i].receiver_idx + (BUF_SIZE / sizeof(void *));
tcomms[i].receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
for(; j<n; j++)
on_receive(tcomms[i].channel->buf[j]);
{
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].channel->buf[j]);
}
tcomms[i].channel->state = 0;
}
}

View File

@ -5,12 +5,16 @@
/* Non standard include */
#include <specific_comm.h>
#include <commtech.h>
struct thread_comm *tcomms;
volatile int cont = 1;
static int init = 0;
static pthread_mutex_t init_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t init_cond = PTHREAD_COND_INITIALIZER;
int init_library()
int init_library(void)
{
tcomms = (struct thread_comm *) malloc(nb_prod * sizeof(struct thread_comm));
if (tcomms == NULL)
@ -20,3 +24,40 @@ int init_library()
}
return 0;
}
int get_thread_number(void)
{
static int i = 0;
static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER;
int i_local;
pthread_mutex_lock(&i_lock);
i_local = i;
i++;
pthread_mutex_unlock(&i_lock);
return i_local;
}
void init_producer_thread(void)
{
static int i = 0;
static pthread_mutex_t i_lock = PTHREAD_MUTEX_INITIALIZER;
int i_local;
pthread_mutex_lock(&i_lock);
i_local = i;
i++;
pthread_mutex_unlock(&i_lock);
init_thread_comm(&tcomms[i_local]);
pthread_mutex_lock(&init_lock);
init = 1;
pthread_mutex_unlock(&init_lock);
}
void wait_initialization(void)
{
pthread_mutex_lock(&init_lock);
if (!init)
pthread_cond_wait(&init_cond, &init_lock);
pthread_mutex_unlock(&init_lock);
}

View File

@ -4,22 +4,18 @@
#include <unistd.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
struct communication_assoc *create_comm_assoc(void)
void init_thread_comm(struct thread_comm *comm)
{
struct communication_assoc *assoc;
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
assoc->tid = pthread_self();
return assoc;
}
void reception(void (*on_receive)(void *))
{
wait_initialization();
wait_initialization(); /* Not needed but here for equity with others techniques */
/* printf("Activate the consumer...\n"); */
while (cont);
}

View File

@ -4,7 +4,8 @@
#include <unistd.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
@ -32,18 +33,14 @@ static struct double_linked_list *global_head = NULL;
static int bufsenqueued = 0;
static unsigned int lock = 0;
struct communication_assoc *create_comm_assoc(void)
void init_thread_comm(struct thread_comm *comm)
{
struct communication_assoc *assoc;
void **new_buffer;
new_buffer = (void **) malloc(BUFFER_SIZE);
local_tail = new_buffer + (USABLE_BUFFER_BYTES - BYTES_IN_ADDRESS -
(USABLE_BUFFER_BYTES % (1 << LOG_BYTES_IN_ADDRESS))) + BYTES_IN_ADDRESS; // The second parenthesis is equal to 0
local_tail_buffer_end = local_tail;
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
assoc->tid = pthread_self();
return assoc;
}
void set_next(struct double_linked_list *list, struct double_linked_list *next)
@ -169,7 +166,8 @@ void reception(void (*on_receive)(void *))
{
struct double_linked_list *list_cur;
void **buf_start, **buf_ptr;
wait_initialization();
wait_initialization(); /* Not needed but here for equity with others techniques */
/* printf("Activate the consumer...\n"); */
while (cont && (global_head == NULL));
if (!cont)

View File

@ -12,7 +12,7 @@
/* Non standards includes */
#include <papihighlevel.h>
#include <common_comm.h>
#include <commtech.h>
#include <specific_comm.h>
static long nb_cache_lines = 0;
@ -150,7 +150,7 @@ void *producer(void *unused)
int i, j;
void *k;
init_thread_comm();
init_producer_thread();
if (shared)
{
pthread_t tid;
@ -183,8 +183,8 @@ void *producer(void *unused)
if (initialize_papi() != -1)
{
for(i = 0; i < nb_cache_lines; i++) {
//printf("[%p] Send a new CACHE_LINE\n", (void *) pthread_self());
for(j = 0; j < (CACHE_LINE_SIZE / sizeof(uintptr_t)); j++)
//printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE);
for(j = 0; j < (BUF_SIZE / sizeof(uintptr_t)); j++)
send(&k);
}
print_results();
@ -203,7 +203,7 @@ void *producer(void *unused)
return NULL;
}
void onMessage(volatile void *val)
void onMessage(void *val)
{
//printf("Receive value: %p\n", (void *) val);
}

View File

@ -2,46 +2,53 @@
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdint.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread int pipefd[2];
struct communication_assoc *create_comm_assoc(void)
void init_thread_comm(struct thread_comm *comm)
{
struct communication_assoc *assoc;
int flags;
pipe(pipefd);
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
assoc->tid = pthread_self();
assoc->pipefd = pipefd;
return assoc;
flags = fcntl(pipefd[READ_IDX], F_GETFL);
fcntl(pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK);
comm->pipefd = pipefd;
}
void reception(void (*on_receive)(void *))
{
wait_initialization();
wait_initialization(); /* Not needed but here for equity with others techniques */
/* printf("Activate the consumer...\n"); */
while(cont)
{
struct communication_assoc *cur;
int i;
discover_new_producers();
cur = assoc_root.next;
while(cur != &assoc_root)
for (i = 0; i < nb_prod; i++)
{
int i;
int nb_read;
for(i = 0; i < BUF_SIZE / sizeof(void *); i++)
for(nb_read = 0; nb_read < BUF_SIZE / sizeof(void *); nb_read++)
{
void *tmp;
read(cur->pipefd[READ_IDX], &tmp, sizeof(void *));
on_receive(tmp);
int j, n;
void *tmp_buf[BUF_SIZE / sizeof(void *)];
j = nb_read / sizeof(void *);
n = read(tcomms[i].pipefd[READ_IDX], (void *) ((uintptr_t) tmp_buf + nb_read), BUF_SIZE - nb_read);
if (n > 0)
{
nb_read += n;
for (; j + sizeof(void *) <= nb_read / sizeof(void *); j += sizeof(void *))
on_receive(tmp_buf[j]);
}
}
cur = cur->next;
}
}
}

View File

@ -4,25 +4,21 @@
#include <unistd.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread void **shared_space;
__thread volatile void **shared_space;
__thread volatile int cons_idx = 0;
__thread volatile int prod_idx = 0;
struct communication_assoc *create_comm_assoc(void)
void init_thread_comm(struct thread_comm *comm)
{
struct communication_assoc *assoc;
shared_space = (void **) malloc(SHARED_SPACE_SIZE);
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
assoc->tid = pthread_self();
assoc->shared_space = shared_space;
assoc->cons_idx = &cons_idx;
assoc->prod_idx = &prod_idx;
return assoc;
shared_space = (volatile void **) malloc(SHARED_SPACE_SIZE);
comm->shared_space = shared_space;
comm->cons_idx = &cons_idx;
comm->prod_idx = &prod_idx;
}
void reception(void (*on_receive)(void *))
@ -31,21 +27,20 @@ void reception(void (*on_receive)(void *))
/* printf("Activate the consumer...\n"); */
while(cont)
{
struct communication_assoc *cur;
int i;
discover_new_producers();
cur = assoc_root.next;
while(cur != &assoc_root)
for (i = 0; i < nb_prod; i++)
{
int cons_idx;
for(cons_idx = *cur->cons_idx; cons_idx != *cur->prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, *cur->cons_idx = cons_idx)
for(cons_idx = *tcomms[i].cons_idx; cons_idx != *tcomms[i].prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, *tcomms[i].cons_idx = cons_idx)
{
void *tmp;
tmp = cur->shared_space[cons_idx];
on_receive(tmp);
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].shared_space[cons_idx]);
}
cur = cur->next;
}
}
}

View File

@ -4,25 +4,21 @@
#include <unistd.h>
/* Non standard include */
#include <common_comm.h>
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread void **shared_space;
__thread volatile void **shared_space;
__thread volatile int cons_idx = 0;
__thread volatile int prod_idx = 0;
struct communication_assoc *create_comm_assoc(void)
void init_thread_comm(struct thread_comm *comm)
{
struct communication_assoc *assoc;
shared_space = (void **) malloc(SHARED_SPACE_SIZE);
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
assoc->tid = pthread_self();
assoc->shared_space = shared_space;
assoc->cons_idx = &cons_idx;
assoc->prod_idx = &prod_idx;
return assoc;
shared_space = (volatile void **) malloc(SHARED_SPACE_SIZE);
comm->shared_space = shared_space;
comm->cons_idx = &cons_idx;
comm->prod_idx = &prod_idx;
}
void reception(void (*on_receive)(void *))
@ -31,27 +27,26 @@ void reception(void (*on_receive)(void *))
/* printf("Activate the consumer...\n"); */
while(cont)
{
struct communication_assoc *cur;
int i;
discover_new_producers();
cur = assoc_root.next;
while(cur != &assoc_root)
for( i = 0; i < nb_prod; i++)
{
int cons_idx, prod_idx;
cons_idx = *cur->cons_idx;
cons_idx = *tcomms[i].cons_idx;
do
{
prod_idx = *cur->prod_idx;
prod_idx = *tcomms[i].prod_idx;
for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR)
{
void *tmp;
tmp = cur->shared_space[cons_idx];
on_receive(tmp);
/*
* The behaviour of this is not documented but we know
* the values inside buf won't change during this affectation
*/
on_receive((void *) tcomms[i].shared_space[cons_idx]);
}
} while (prod_idx != *cur->prod_idx);
*cur->cons_idx = cons_idx;
cur = cur->next;
} while (prod_idx != *tcomms[i].prod_idx);
*tcomms[i].cons_idx = cons_idx;
}
}
}