[commtech] Use only 1 thread per core

Creating 2 thread per core in the purpose of receiving while sending is
plain stupid. First it needs 2 threads synchronizing with each other
which has a cost. Second, since only one thread can run at a time the
threads slow each other (using BatchQueue where the sender is on the
same core as the receiver yields bad performance). This patch remove all
this complexity to have one thread receive, compute and then resend
data, which improve performances dramatically.
This commit is contained in:
Thomas Preud'homme 2012-07-07 22:58:49 +02:00
parent 4914b0dcdd
commit df09d89933
1 changed files with 20 additions and 99 deletions

View File

@ -29,19 +29,10 @@
#define INTERM 1
#define SINK 2 /* Final consumer (doing the check) */
typedef struct data_xchg
{
void ** volatile data_buf[2][MAX_BLOCK_ENTRIES];
volatile int filled_buf_entries[2];
int unused[20];
volatile int buf_status:1 __attribute__((aligned (CACHE_LINE_SIZE)));
} data_xchg_t;
typedef struct node_param
{
void *prev_comm_channel; /* Channel with previous node */
void *next_comm_channel; /* Channel with next mode */
data_xchg_t *c2p_xfer; /* Consumer to producer local transfer */
int type; /* SOURCE, INTERM or SINK */
int thread_idx;
} node_param_t;
@ -352,38 +343,18 @@ static int initial_producer(node_param_t *node_param)
return 0;
}
static int intermediate_producer(node_param_t *node_param)
{
int i, j;
long long total_data_sent = 0, total_to_send;
i = 0;
total_to_send = nb_bufs_sent * WORDS_PER_LINE;
while (total_data_sent < total_to_send)
{
while (!node_param->c2p_xfer->buf_status);
for(j = 0; j < node_param->c2p_xfer->filled_buf_entries[i]; j++)
send(node_param->next_comm_channel, node_param->c2p_xfer->data_buf[i][j]);
total_data_sent += node_param->c2p_xfer->filled_buf_entries[i];
node_param->c2p_xfer->buf_status = 0;
i = !i;
}
printf("[%p] Producer finished !\n", (void*) pthread_self());
return 0;
}
static void on_message(void *val __attribute__ ((unused)))
{
/*printf("Receive value: %p\n", (void *) val);*/
}
static void *consumer_block(node_param_t *node_param)
static int consumer_block(node_param_t *node_param)
{
int i = 0;
int delayed_error;
unsigned long long total_data_received = 0, total_to_receive;
uintptr_t cons_check_value;
inc_check_t *cons_check_context;
void *data_buf[MAX_BLOCK_ENTRIES];
cons_check_context = NULL;
delayed_error = 0;
@ -392,29 +363,22 @@ static void *consumer_block(node_param_t *node_param)
if (inc_check_init(init_calc_arg, &cons_check_context))
{
fprintf(stderr, "Initialization of check has failed\n");
return &page_size; /* &page_size can't be NULL */
return -1;
}
}
cons_check_value = init_calc_arg;
total_to_receive = nb_bufs_sent * WORDS_PER_LINE;
while (total_data_received < total_to_receive)
{
int j;
int i;
ssize_t nb_data_received;
size_t to_receive;
to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received);
nb_data_received = recv_some_data(node_param->prev_comm_channel,
(void **) node_param->c2p_xfer->data_buf[i], to_receive);
node_param->c2p_xfer->filled_buf_entries[i] = nb_data_received;
if (unlikely(!(node_param->type & SINK)))
{
/* Check with nb lines sent & recv */
while (node_param->c2p_xfer->buf_status);
node_param->c2p_xfer->buf_status = 1;
}
data_buf, to_receive);
total_data_received += nb_data_received;
for (j = 0; j < nb_data_received; j++)
for (i = 0; i < nb_data_received; i++)
{
if (unlikely(check_recv_match_send))
{
@ -426,25 +390,28 @@ static void *consumer_block(node_param_t *node_param)
delayed_error = 1;
}
}
if (cons_check_value != (uintptr_t) node_param->c2p_xfer->data_buf[i][j])
if (cons_check_value != (uintptr_t) data_buf[i])
{
if (!delayed_error)
{
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) node_param->c2p_xfer->data_buf[i][j]);
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data_buf[i]);
delayed_error = 1;
}
}
}
on_message(node_param->c2p_xfer->data_buf[i][j]);
on_message(data_buf[i]);
if (likely(node_param->type == INTERM))
send(node_param->next_comm_channel, data_buf[i]);
}
i = !i;
/*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");*/
}
if (unlikely(!(node_param->type & SINK)))
if (unlikely(!!(node_param->type & SINK)))
printf("[%p] Consumer finished !\n", (void*) pthread_self());
else if (likely(!!(node_param->type & INTERM)))
printf("[%p] Intermediate producer finished !\n", (void*) pthread_self());
if (delayed_error)
return &page_size; /* &page_size can't be NULL */
return NULL;
return -1;
return 0;
}
static int consumer_data(node_param_t *node_param)
@ -498,51 +465,13 @@ static int consumer_data(node_param_t *node_param)
}
if (unlikely(!!(node_param->type & SINK)))
printf("[%p] Consumer finished !\n", (void*) pthread_self());
else if (likely(!!(node_param->type & INTERM)))
printf("[%p] Intermediate producer finished !\n", (void*) pthread_self());
if (delayed_error)
return -1;
return 0;
}
static int alloc_consprod_databuf(data_xchg_t **c2p_xfer_addr)
{
int ret;
data_xchg_t *c2p_xfer;
ret = posix_memalign((void **) &c2p_xfer, page_size, sizeof(*c2p_xfer));
c2p_xfer->buf_status = 0;
if (ret)
return EXIT_FAILURE;
*c2p_xfer_addr = c2p_xfer;
return 0;
}
static int interm_block_xfer(node_param_t *node_param)
{
int return_value;
void *pthread_return_value;
pthread_t prod_tid;
return_value = 0;
if (alloc_consprod_databuf(&node_param->c2p_xfer))
return -1;
/* Create producer to next node, we are consumer of previous node */
if (pthread_create(&prod_tid, NULL, (void *(*)(void *)) consumer_block,
node_param))
{
perror("pthread_create cons (block_reception)");
return_value = -1;
goto free_c2p_xfer_ressources;
}
return_value = intermediate_producer(node_param);
pthread_join(prod_tid, &pthread_return_value);
if (pthread_return_value != NULL)
return_value = -1;
free_c2p_xfer_ressources:
free(node_param->c2p_xfer);
return return_value;
}
static int set_cpu_binding(int thread_idx)
{
int cpu_binding;
@ -579,22 +508,14 @@ static void *node(node_param_t *node_param)
case INTERM:
if (block_reception)
return_value = interm_block_xfer(node_param);
return_value = consumer_block(node_param);
else
return_value = consumer_data(node_param);
break;
case SINK:
if (block_reception)
{
if (alloc_consprod_databuf(&node_param->c2p_xfer))
return &page_size;
if (consumer_block(node_param) == NULL) //TODO: allocate data_buf
return_value = 0;
else
return_value = -1;
free(node_param->c2p_xfer);
}
return_value = consumer_block(node_param);
else
return_value = consumer_data(node_param);
break;