724 lines
18 KiB
C
724 lines
18 KiB
C
#define _GNU_SOURCE
|
|
#define _POSIX_SOURCE 1
|
|
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <pthread.h>
|
|
#include <stdint.h>
|
|
#include <unistd.h>
|
|
#include <errno.h>
|
|
#include <limits.h>
|
|
#include <sys/stat.h>
|
|
#include <string.h>
|
|
#include <dlfcn.h>
|
|
#include <sys/time.h>
|
|
|
|
/* Non standards includes */
|
|
#include <commtech.h>
|
|
#include <specific_comm.h>
|
|
|
|
|
|
/* Must be a multiple of BUF_SIZE */
|
|
#define MAX_BLOCK_ENTRIES (2048 * CACHE_LINE_SIZE / sizeof(void *))
|
|
/*#define MAX_BLOCK_ENTRIES ((BUF_SIZE * 4 + page_size) & ~(page_size - 1)) // Big buffer size is not a good idea */
|
|
#define toString(x) doStringification(x)
|
|
#define doStringification(x) #x
|
|
#define MIN(x,y) ((x < y) ? x : y)
|
|
#define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t))
|
|
#define SOURCE 0 /* Initial producer (calling do_calc()) */
|
|
#define INTERM 1
|
|
#define SINK 2 /* Final consumer (doing the check) */
|
|
|
|
typedef struct data_xchg
|
|
{
|
|
void ** volatile data_buf[2][MAX_BLOCK_ENTRIES];
|
|
volatile int filled_buf_entries[2];
|
|
int unused[20];
|
|
volatile int buf_status:1 __attribute__((aligned (CACHE_LINE_SIZE)));
|
|
} data_xchg_t;
|
|
|
|
typedef struct node_param
|
|
{
|
|
void *prev_comm_channel; /* Channel with previous node */
|
|
void *next_comm_channel; /* Channel with next mode */
|
|
data_xchg_t *c2p_xfer; /* Consumer to producer local transfer */
|
|
int type; /* SOURCE, INTERM or SINK */
|
|
int thread_idx;
|
|
} node_param_t;
|
|
|
|
typedef int inc_check_t;
|
|
|
|
|
|
static long nb_bufs_sent = 0;
|
|
long nb_prod = 0;
|
|
static int (*init_calc)(int) = NULL;
|
|
static void **(*do_calc)(void) = NULL;
|
|
static int (*end_calc)(void) = NULL;
|
|
static int shared = 0; /* We are not shared by default */
|
|
static long init_calc_arg = 0;
|
|
static int block_reception = 1;
|
|
static long nb_nodes = 2; /* Nb of nodes in the chain of pipelines */
|
|
static int check_recv_match_send = 0;
|
|
static uintptr_t single_prod_check_val; /* /!\ Only one real producer */
|
|
static inc_check_t *single_prod_check_ctxt; /* /!\ Only one real producer */
|
|
static int nb_cpus = 4; /* TOFIX: don't hardcode this */
|
|
static int page_size = 0;
|
|
|
|
static void usage(char *argv[])
|
|
{
|
|
char format[] = "-n <num_buf> -p <num_prod> [options]";
|
|
char options[] = "Required options:\n"
|
|
"-n nb_buffer_sent\t\tNumber of buffer to send to another core\n"
|
|
"\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n"
|
|
"Facultative options:\n"
|
|
"-b\t\t\t\tReceive the biggest amount of data available (The default)\n"
|
|
"-c calculation_libname arg\tLibrary to use for calculation with its argument\n"
|
|
"\t\t\t\tThis library must implement functions in calc.h\n"
|
|
"\t\t\t\t(default to none)\n"
|
|
"-k\t\t\t\tCheck we receive what is sent\n"
|
|
"-l nb_nodes\t\t\tNumber of nodes in the pipeline chain\n"
|
|
"-d\t\t\t\tReceive one piece of data\n"
|
|
"-h\t\t\t\tPrint this help\n"
|
|
"-s\t\t\t\tShare the same L2 cache or not\n"
|
|
"\t\t\t\tIf level is:\n"
|
|
"\t\t\t\t\t> 0, then the same L<level> must be shared\n"
|
|
"\t\t\t\t\t< 0, then different L<level> must be used\n"
|
|
"\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n";
|
|
printf("Usage : %s %s\n", argv[0], format);
|
|
printf("Options :\n");
|
|
printf("%s\n", options);
|
|
}
|
|
|
|
static int do_noinit(int unused __attribute__ ((unused)))
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
static void **do_nocalc(void)
|
|
{
|
|
static int an_int, *an_int_ptr = &an_int;
|
|
|
|
return (void **) &an_int_ptr;
|
|
}
|
|
|
|
static int do_noend(void)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
static int inc_check_init(int init_value, inc_check_t **context)
|
|
{
|
|
inc_check_t *ctxt;
|
|
|
|
ctxt = malloc(sizeof(*ctxt));
|
|
if (ctxt == NULL)
|
|
return -1;
|
|
*ctxt = init_value;
|
|
*context = ctxt;
|
|
return 0;
|
|
}
|
|
|
|
static int inc_check_next(inc_check_t *context, uintptr_t *next_value)
|
|
{
|
|
*next_value = (*context)++;
|
|
return 0;
|
|
}
|
|
|
|
static int inc_check_end(inc_check_t *context)
|
|
{
|
|
free(context);
|
|
return 0;
|
|
}
|
|
|
|
static int do_checkinit(int init_value)
|
|
{
|
|
return inc_check_init(init_value, &single_prod_check_ctxt);
|
|
}
|
|
|
|
static void **do_checkcalc(void)
|
|
{
|
|
int ret;
|
|
|
|
ret = inc_check_next(single_prod_check_ctxt, &single_prod_check_val);
|
|
if (ret)
|
|
return NULL;
|
|
else
|
|
return (void **) single_prod_check_val;
|
|
}
|
|
|
|
static int do_checkend(void)
|
|
{
|
|
return inc_check_end(single_prod_check_ctxt);
|
|
}
|
|
|
|
static int analyse_options(int argc, char *argv[])
|
|
{
|
|
int opt;
|
|
|
|
opterr = 0;
|
|
while ((opt = getopt(argc, argv, ":bc:dhl:kn:s"/*:p:"*/)) != -1)
|
|
{
|
|
switch (opt)
|
|
{
|
|
case 'b' :
|
|
block_reception = 1;
|
|
break;
|
|
case 'c' :
|
|
{
|
|
struct stat file_stat;
|
|
void *dl_descriptor;
|
|
|
|
if (stat(optarg, &file_stat))
|
|
{
|
|
fprintf(stderr, "%s: %s\n", optarg, strerror(errno));
|
|
return -1;
|
|
}
|
|
dl_descriptor = dlopen(optarg, RTLD_LAZY | RTLD_LOCAL);
|
|
if (dl_descriptor == NULL)
|
|
{
|
|
fprintf(stderr, "dlopen error: %s\n", dlerror());
|
|
return -1;
|
|
}
|
|
init_calc = (int (*)(int)) dlsym(dl_descriptor, "init_calc");
|
|
do_calc = (void ** (*)(void)) dlsym(dl_descriptor, "do_calc");
|
|
end_calc = (int (*)(void)) dlsym(dl_descriptor, "end_calc");
|
|
if ((init_calc == NULL) || (do_calc == NULL) || (end_calc == NULL))
|
|
{
|
|
fprintf(stderr, "A symbol cannot be loaded: %s\n", dlerror());
|
|
return -1;
|
|
}
|
|
if ((optind == argc) || (*argv[optind] == '-'))
|
|
{
|
|
fprintf(stderr, "Missing argument for -c option\n");
|
|
return -1;
|
|
}
|
|
{
|
|
char *inval;
|
|
init_calc_arg = strtol(argv[optind], &inval, 10);
|
|
if ((*argv[optind] == '\0') || (*inval != '\0'))
|
|
{
|
|
fprintf(stderr, "Option '-c' needs also an integer argument\n");
|
|
return -1;
|
|
}
|
|
if ((init_calc_arg <= 0) || ((init_calc_arg == LONG_MAX) && errno == ERANGE))
|
|
{
|
|
fprintf(stderr, "Number of useless loop to be done between 2 send must be"
|
|
" between 1 and %ld, both inclusive\n", LONG_MAX);
|
|
return -1;
|
|
}
|
|
}
|
|
optind++;
|
|
}
|
|
break;
|
|
case 'd' :
|
|
block_reception = 0;
|
|
break;
|
|
case 'h' :
|
|
usage(argv);
|
|
exit(EXIT_SUCCESS);
|
|
case 'k' :
|
|
check_recv_match_send = 1;
|
|
break;
|
|
case 'l' :
|
|
{
|
|
char *inval;
|
|
nb_nodes = strtol(optarg, &inval, 10);
|
|
if ((*optarg == '\0') || (*inval != '\0'))
|
|
{
|
|
fprintf(stderr, "Option '-l' needs an integer argument\n");
|
|
return -1;
|
|
}
|
|
if ((nb_nodes < 2) || ((nb_nodes == LONG_MAX) && errno == ERANGE))
|
|
{
|
|
fprintf(stderr, "Number of links to participate in the pipeline chain must be between 2 and %ld, both inclusive\n", LONG_MAX);
|
|
return -1;
|
|
}
|
|
}
|
|
break;
|
|
case 'n' :
|
|
{
|
|
char *inval;
|
|
nb_bufs_sent = strtol(optarg, &inval, 10);
|
|
if ((*optarg == '\0') || (*inval != '\0'))
|
|
{
|
|
fprintf(stderr, "Option '-n' needs an integer argument\n");
|
|
return -1;
|
|
}
|
|
if ((nb_bufs_sent <= 0) || ((nb_bufs_sent == LONG_MAX) && errno == ERANGE))
|
|
{
|
|
fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX);
|
|
return -1;
|
|
}
|
|
}
|
|
break;
|
|
#if 0
|
|
case 'p' :
|
|
{
|
|
char *inval;
|
|
nb_prod = strtol(optarg, &inval, 10);
|
|
if ((*optarg == '\0') || (*inval != '\0'))
|
|
{
|
|
fprintf(stderr, "Option '-p' needs an integer argument\n");
|
|
return -1;
|
|
}
|
|
if ((nb_prod <= 0) || ((nb_prod == LONG_MAX) && errno == ERANGE))
|
|
{
|
|
fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX);
|
|
return -1;
|
|
}
|
|
}
|
|
break;
|
|
#endif
|
|
case 's' :
|
|
shared = 1;
|
|
break;
|
|
case '?' :
|
|
fprintf(stderr, "Option inconnue\n");
|
|
return -1;
|
|
case ':' :
|
|
fprintf(stderr, "Option %s needs an argument\n", argv[optind]);
|
|
return -1;
|
|
default :
|
|
fprintf(stderr, "Error while analysing command line options\n");
|
|
return -1;
|
|
}
|
|
}
|
|
if (!nb_bufs_sent)
|
|
{
|
|
fprintf(stderr, "You must give the number of cache lines to be sent\n");
|
|
return -1;
|
|
}
|
|
#if 0
|
|
if (!nb_prod)
|
|
{
|
|
fprintf(stderr, "You must give the number of producers\n");
|
|
return -1;
|
|
}
|
|
#endif
|
|
if (shared && (nb_prod > 1))
|
|
{
|
|
fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n");
|
|
return -1;
|
|
}
|
|
if (check_recv_match_send && do_calc)
|
|
{
|
|
fprintf(stderr, "Can't specifying a computation library with check activated\n");
|
|
return -1;
|
|
}
|
|
if (do_calc == NULL)
|
|
{
|
|
if (check_recv_match_send)
|
|
{
|
|
init_calc = do_checkinit;
|
|
do_calc = do_checkcalc;
|
|
end_calc = do_checkend;
|
|
}
|
|
else
|
|
{
|
|
init_calc = do_noinit;
|
|
do_calc = do_nocalc;
|
|
end_calc = do_noend;
|
|
}
|
|
}
|
|
printf("buf size: %lu\n", WORDS_PER_LINE);
|
|
return 0;
|
|
}
|
|
|
|
static int initial_producer(node_param_t *node_param)
|
|
{
|
|
unsigned int i, j;
|
|
|
|
if (init_calc(init_calc_arg))
|
|
{
|
|
fprintf(stderr, "Initialization of calculation has failed\n");
|
|
return 1;
|
|
}
|
|
for(i = 0; i < nb_bufs_sent; i++) {
|
|
for(j = 0; j < WORDS_PER_LINE; j++)
|
|
send(node_param->next_comm_channel, do_calc());
|
|
}
|
|
if (end_producer(node_param->next_comm_channel))
|
|
{
|
|
fprintf(stderr, "Notification of end of production to the "
|
|
"communication algorithm failed\n");
|
|
return 1;
|
|
}
|
|
if (end_calc())
|
|
{
|
|
fprintf(stderr, "uninitialization of calculation has failed\n");
|
|
return 1;
|
|
}
|
|
printf("[%p] Producer finished !\n", (void*) pthread_self());
|
|
return 0;
|
|
}
|
|
|
|
static int intermediate_producer(node_param_t *node_param)
|
|
{
|
|
int i, j;
|
|
long long total_data_sent = 0, total_to_send;
|
|
|
|
i = 0;
|
|
total_to_send = nb_bufs_sent * WORDS_PER_LINE;
|
|
while (total_data_sent < total_to_send)
|
|
{
|
|
while (!node_param->c2p_xfer->buf_status);
|
|
for(j = 0; j < node_param->c2p_xfer->filled_buf_entries[i]; j++)
|
|
send(node_param->next_comm_channel, node_param->c2p_xfer->data_buf[i][j]);
|
|
total_data_sent += node_param->c2p_xfer->filled_buf_entries[i];
|
|
node_param->c2p_xfer->buf_status = 0;
|
|
i = !i;
|
|
}
|
|
printf("[%p] Producer finished !\n", (void*) pthread_self());
|
|
return 0;
|
|
}
|
|
|
|
static void on_message(void *val __attribute__ ((unused)))
|
|
{
|
|
/*printf("Receive value: %p\n", (void *) val);*/
|
|
}
|
|
|
|
static void *consumer_block(node_param_t *node_param)
|
|
{
|
|
int i = 0;
|
|
int delayed_error;
|
|
unsigned long long total_data_received = 0, total_to_receive;
|
|
uintptr_t cons_check_value;
|
|
inc_check_t *cons_check_context;
|
|
|
|
cons_check_context = NULL;
|
|
delayed_error = 0;
|
|
if (check_recv_match_send)
|
|
{
|
|
if (inc_check_init(init_calc_arg, &cons_check_context))
|
|
{
|
|
fprintf(stderr, "Initialization of check has failed\n");
|
|
return &page_size; /* &page_size can't be NULL */
|
|
}
|
|
}
|
|
cons_check_value = init_calc_arg;
|
|
total_to_receive = nb_bufs_sent * WORDS_PER_LINE;
|
|
while (total_data_received < total_to_receive)
|
|
{
|
|
int j;
|
|
ssize_t nb_data_received;
|
|
size_t to_receive;
|
|
|
|
to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received);
|
|
nb_data_received = recv_some_data(node_param->prev_comm_channel,
|
|
(void **) node_param->c2p_xfer->data_buf[i], to_receive);
|
|
node_param->c2p_xfer->filled_buf_entries[i] = nb_data_received;
|
|
if (unlikely(!(node_param->type & SINK)))
|
|
{
|
|
/* Check with nb lines sent & recv */
|
|
while (node_param->c2p_xfer->buf_status);
|
|
node_param->c2p_xfer->buf_status = 1;
|
|
}
|
|
total_data_received += nb_data_received;
|
|
for (j = 0; j < nb_data_received; j++)
|
|
{
|
|
if (unlikely(check_recv_match_send))
|
|
{
|
|
if (inc_check_next(cons_check_context, &cons_check_value))
|
|
{
|
|
if (!delayed_error)
|
|
{
|
|
fprintf(stderr, "Error while checking received value match sent value\n");
|
|
delayed_error = 1;
|
|
}
|
|
}
|
|
if (cons_check_value != (uintptr_t) node_param->c2p_xfer->data_buf[i][j])
|
|
{
|
|
if (!delayed_error)
|
|
{
|
|
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) node_param->c2p_xfer->data_buf[i][j]);
|
|
delayed_error = 1;
|
|
}
|
|
}
|
|
}
|
|
on_message(node_param->c2p_xfer->data_buf[i][j]);
|
|
}
|
|
i = !i;
|
|
/*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");*/
|
|
}
|
|
if (unlikely(!(node_param->type & SINK)))
|
|
printf("[%p] Consumer finished !\n", (void*) pthread_self());
|
|
if (delayed_error)
|
|
return &page_size; /* &page_size can't be NULL */
|
|
return NULL;
|
|
}
|
|
|
|
static int consumer_data(node_param_t *node_param)
|
|
{
|
|
unsigned int i, j;
|
|
int delayed_error;
|
|
uintptr_t cons_check_value;
|
|
inc_check_t *cons_check_context;
|
|
|
|
cons_check_context = NULL;
|
|
delayed_error = 0;
|
|
if (check_recv_match_send)
|
|
{
|
|
if (inc_check_init(init_calc_arg, &cons_check_context))
|
|
{
|
|
fprintf(stderr, "Initialization of check has failed\n");
|
|
return -1; /* &page_size can't be NULL */
|
|
}
|
|
}
|
|
cons_check_value = init_calc_arg;
|
|
for(i = 0; i < nb_bufs_sent; i++) {
|
|
for(j = 0; j < WORDS_PER_LINE; j++)
|
|
{
|
|
void *data;
|
|
|
|
data = recv_one_data(node_param->prev_comm_channel);
|
|
if (unlikely(check_recv_match_send))
|
|
{
|
|
if (inc_check_next(cons_check_context, &cons_check_value))
|
|
{
|
|
if (!delayed_error)
|
|
{
|
|
fprintf(stderr, "Error while checking received value match sent value\n");
|
|
delayed_error = 1;
|
|
}
|
|
}
|
|
if (cons_check_value != (uintptr_t) data)
|
|
{
|
|
if (!delayed_error)
|
|
{
|
|
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data);
|
|
delayed_error = 1;
|
|
}
|
|
}
|
|
}
|
|
on_message(data);
|
|
if (likely(node_param->type == INTERM))
|
|
send(node_param->next_comm_channel, data);
|
|
/*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");*/
|
|
}
|
|
}
|
|
if (unlikely(!!(node_param->type & SINK)))
|
|
printf("[%p] Consumer finished !\n", (void*) pthread_self());
|
|
if (delayed_error)
|
|
return -1;
|
|
return 0;
|
|
}
|
|
|
|
static int alloc_consprod_databuf(data_xchg_t **c2p_xfer_addr)
|
|
{
|
|
int ret;
|
|
data_xchg_t *c2p_xfer;
|
|
|
|
ret = posix_memalign((void **) &c2p_xfer, page_size, sizeof(*c2p_xfer));
|
|
c2p_xfer->buf_status = 0;
|
|
if (ret)
|
|
return EXIT_FAILURE;
|
|
*c2p_xfer_addr = c2p_xfer;
|
|
return 0;
|
|
}
|
|
|
|
static int interm_block_xfer(node_param_t *node_param)
|
|
{
|
|
int return_value;
|
|
void *pthread_return_value;
|
|
pthread_t prod_tid;
|
|
|
|
return_value = 0;
|
|
if (alloc_consprod_databuf(&node_param->c2p_xfer))
|
|
return -1;
|
|
/* Create producer to next node, we are consumer of previous node */
|
|
if (pthread_create(&prod_tid, NULL, (void *(*)(void *)) consumer_block,
|
|
node_param))
|
|
{
|
|
perror("pthread_create cons (block_reception)");
|
|
return_value = -1;
|
|
goto free_c2p_xfer_ressources;
|
|
}
|
|
return_value = intermediate_producer(node_param);
|
|
pthread_join(prod_tid, &pthread_return_value);
|
|
if (pthread_return_value != NULL)
|
|
return_value = -1;
|
|
|
|
free_c2p_xfer_ressources:
|
|
free(node_param->c2p_xfer);
|
|
return return_value;
|
|
}
|
|
|
|
static int set_cpu_binding(int thread_idx)
|
|
{
|
|
int cpu_binding;
|
|
pthread_t tid;
|
|
cpu_set_t cpuset;
|
|
|
|
/* Should work in most cases */
|
|
if (shared)
|
|
cpu_binding = thread_idx % nb_cpus;
|
|
else
|
|
cpu_binding = (2 * thread_idx) % nb_cpus;
|
|
tid = pthread_self();
|
|
CPU_ZERO(&cpuset);
|
|
CPU_SET(cpu_binding, &cpuset);
|
|
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
|
|
{
|
|
perror("pthread_setaffinity_np");
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void *node(node_param_t *node_param)
|
|
{
|
|
int return_value;
|
|
|
|
if (set_cpu_binding(node_param->thread_idx))
|
|
return &page_size;
|
|
switch (node_param->type)
|
|
{
|
|
case SOURCE:
|
|
return_value = initial_producer(node_param);
|
|
break;
|
|
|
|
case INTERM:
|
|
if (block_reception)
|
|
return_value = interm_block_xfer(node_param);
|
|
else
|
|
return_value = consumer_data(node_param);
|
|
break;
|
|
|
|
case SINK:
|
|
if (block_reception)
|
|
{
|
|
if (alloc_consprod_databuf(&node_param->c2p_xfer))
|
|
return &page_size;
|
|
if (consumer_block(node_param) == NULL) //TODO: allocate data_buf
|
|
return_value = 0;
|
|
else
|
|
return_value = -1;
|
|
free(node_param->c2p_xfer);
|
|
}
|
|
else
|
|
return_value = consumer_data(node_param);
|
|
break;
|
|
|
|
default:
|
|
return &page_size; /* &page_size can't be NULL */
|
|
}
|
|
if (return_value)
|
|
return &page_size; /* &page_size can't be NULL */
|
|
return NULL;
|
|
}
|
|
|
|
static int set_node_params(node_param_t *node_param,
|
|
node_param_t *prev_node_param, int thread_idx)
|
|
{
|
|
if (thread_idx == nb_nodes - 1)
|
|
node_param->type = SINK;
|
|
else
|
|
{
|
|
if (thread_idx)
|
|
node_param->type = INTERM;
|
|
else
|
|
node_param->type = SOURCE;
|
|
node_param->next_comm_channel = create_comm_channel();
|
|
if (node_param->next_comm_channel == NULL)
|
|
return -1;
|
|
}
|
|
if (prev_node_param != NULL)
|
|
{
|
|
node_param->prev_comm_channel =
|
|
prev_node_param->next_comm_channel;
|
|
}
|
|
node_param->thread_idx = thread_idx;
|
|
return 0;
|
|
}
|
|
|
|
static int create_threads(int nb_nodes, int *last_node, pthread_t *node_tids,
|
|
node_param_t *node_params)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < nb_nodes; i++)
|
|
{
|
|
node_param_t *prev_node_param;
|
|
|
|
prev_node_param = (i) ? &node_params[i - 1] : NULL;
|
|
if (set_node_params(&node_params[i], prev_node_param, i))
|
|
{
|
|
*last_node = i - 1;
|
|
return -1;
|
|
}
|
|
if (pthread_create(&node_tids[i], NULL,
|
|
(void *(*)(void *)) node, &node_params[i]))
|
|
{
|
|
perror("pthread_create node");
|
|
destroy_comm_channel(node_params[i].next_comm_channel);
|
|
*last_node = i - 1;
|
|
return -1;
|
|
}
|
|
}
|
|
*last_node = i - 1;
|
|
return 0;
|
|
}
|
|
|
|
static int join_threads(int last_node, pthread_t *tids)
|
|
{
|
|
int i, return_value;
|
|
void *pthread_return_value;
|
|
|
|
for (i = last_node, return_value = 0; i >= 0; i--)
|
|
{
|
|
pthread_join(tids[i], &pthread_return_value);
|
|
if (pthread_return_value != NULL)
|
|
return_value = EXIT_FAILURE;
|
|
}
|
|
return return_value;
|
|
}
|
|
|
|
static int destroy_threads(int last_node, node_param_t *node_params)
|
|
{
|
|
int i, return_value;
|
|
|
|
for (i = last_node, return_value = 0; i >= 0; i--)
|
|
{
|
|
if (node_params[i].type != SINK)
|
|
{
|
|
if (destroy_comm_channel(node_params[i].next_comm_channel))
|
|
return_value = -1;
|
|
}
|
|
}
|
|
return return_value;
|
|
}
|
|
|
|
int main(int argc, char *argv[])
|
|
{
|
|
int return_value, last_node;
|
|
pthread_t *tids;
|
|
node_param_t *node_params;
|
|
|
|
return_value = EXIT_SUCCESS;
|
|
if (analyse_options(argc, argv))
|
|
return EXIT_FAILURE;
|
|
page_size = sysconf(_SC_PAGE_SIZE);
|
|
if (page_size <= 0)
|
|
return EXIT_FAILURE;
|
|
node_params = malloc(nb_nodes * sizeof(node_param_t));
|
|
if (node_params == NULL)
|
|
return EXIT_FAILURE;
|
|
tids = malloc(nb_nodes * sizeof(pthread_t));
|
|
if (tids == NULL)
|
|
{
|
|
return_value = EXIT_FAILURE;
|
|
goto error_alloc_tids;
|
|
}
|
|
if (create_threads(nb_nodes, &last_node, tids, node_params))
|
|
goto error_create_channels;
|
|
if (join_threads(last_node, tids))
|
|
return_value = EXIT_FAILURE;
|
|
error_create_channels:
|
|
if (destroy_threads(last_node, node_params))
|
|
return_value = EXIT_FAILURE;
|
|
free(tids);
|
|
error_alloc_tids:
|
|
free(node_params);
|
|
return return_value;
|
|
}
|