Improve pipeline (cons and prod in //)
This commit is contained in:
parent
f01db158c2
commit
f05cfdcd92
|
@ -13,7 +13,7 @@ COMMDIR:=communication
|
|||
# Compilation flags
|
||||
# I know -finline-functions and -finline-functions-called-once are enabled by
|
||||
# -O3 but I did this in case gcc behaviour change one day
|
||||
CFLAGS:=-c -g -O3 -finline-functions -finline-functions-called-once -Wall -Werror
|
||||
CFLAGS:=-c -g -O3 -finline-functions -finline-functions-called-once -Wall -Wextra -Werror
|
||||
LDFLAGS:=-L$(LIBDIR) -lpthread -ldl
|
||||
|
||||
# Executables
|
||||
|
|
|
@ -12,9 +12,9 @@ struct channel
|
|||
{
|
||||
void * volatile buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
int unused[20] __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile int state __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
int sender_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
int receiver_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile unsigned int state:1 __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
unsigned int sender_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
unsigned int receiver_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -22,8 +22,8 @@ struct lvl_2
|
|||
struct channel
|
||||
{
|
||||
struct lvl_2 queue[SLOTS] __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
int head __attribute__ ((aligned(CACHE_LINE_SIZE)));
|
||||
int tail __attribute__ ((aligned(CACHE_LINE_SIZE)));
|
||||
unsigned int head __attribute__ ((aligned(CACHE_LINE_SIZE)));
|
||||
unsigned int tail __attribute__ ((aligned(CACHE_LINE_SIZE)));
|
||||
};
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
@ -31,7 +31,7 @@ __BEGIN_DECLS
|
|||
// TODO: Make it send only one data
|
||||
static inline void send(struct channel *channel, void **addr)
|
||||
{
|
||||
static __thread int chkidx = 0;
|
||||
static __thread unsigned int chkidx = 0;
|
||||
|
||||
// If all slots are full, spin
|
||||
if (!chkidx)
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
struct channel
|
||||
{
|
||||
void * volatile *shared_space;
|
||||
int head __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
int tail __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
unsigned int head __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
unsigned int tail __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
};
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
@ -29,7 +29,7 @@ extern int adjust_slip(struct channel *channel);
|
|||
|
||||
static inline void send(struct channel *channel, void **addr)
|
||||
{
|
||||
static __thread int nb_iter = 0;
|
||||
static __thread unsigned int nb_iter = 0;
|
||||
|
||||
assert(addr != NULL);
|
||||
if (nb_iter == ADJUST_FREQ)
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
struct channel
|
||||
{
|
||||
void * volatile *shared_space;
|
||||
volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile unsigned int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile unsigned int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
};
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
|
|
@ -9,22 +9,22 @@
|
|||
|
||||
struct control
|
||||
{
|
||||
volatile int read;
|
||||
volatile int write;
|
||||
volatile unsigned int read;
|
||||
volatile unsigned int write;
|
||||
};
|
||||
|
||||
struct cons
|
||||
{
|
||||
int localWrite;
|
||||
int nextRead;
|
||||
int rBatch;
|
||||
unsigned int localWrite;
|
||||
unsigned int nextRead;
|
||||
unsigned int rBatch;
|
||||
};
|
||||
|
||||
struct prod
|
||||
{
|
||||
int localRead;
|
||||
int nextWrite;
|
||||
int wBatch;
|
||||
unsigned int localRead;
|
||||
unsigned int nextWrite;
|
||||
unsigned int wBatch;
|
||||
};
|
||||
|
||||
|
||||
|
@ -38,13 +38,15 @@ struct channel
|
|||
|
||||
__BEGIN_DECLS
|
||||
|
||||
extern const int batchSize;
|
||||
extern const unsigned int batchSize;
|
||||
|
||||
static inline void send(struct channel *channel, void **addr)
|
||||
{
|
||||
while (1)
|
||||
{
|
||||
int afterNextWrite = (channel->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR;
|
||||
unsigned int afterNextWrite;
|
||||
|
||||
afterNextWrite = (channel->prod.nextWrite + 1) % SHARED_SPACE_VOIDPTR;
|
||||
if (afterNextWrite == channel->prod.localRead)
|
||||
{
|
||||
if (afterNextWrite == channel->ctrl.read)
|
||||
|
|
|
@ -7,7 +7,8 @@ struct channel
|
|||
{
|
||||
};
|
||||
|
||||
static inline void send(struct channel *channel, void **addr) {}
|
||||
static inline void send(struct channel *channel __attribute__ ((unused)),
|
||||
void **addr __attribute__ ((unused))) {}
|
||||
|
||||
__END_DECLS
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ __BEGIN_DECLS
|
|||
|
||||
static inline void send(struct channel *channel, void **addr)
|
||||
{
|
||||
int nb_read;
|
||||
unsigned int nb_read;
|
||||
void *addr_ptr;
|
||||
|
||||
nb_read = 0;
|
||||
|
|
|
@ -10,8 +10,8 @@
|
|||
struct channel
|
||||
{
|
||||
void * volatile *shared_space;
|
||||
volatile int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile unsigned int cons_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
volatile unsigned int prod_idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
};
|
||||
|
||||
|
||||
|
@ -19,8 +19,8 @@ __BEGIN_DECLS
|
|||
|
||||
static inline void send(struct channel *channel, void **addr)
|
||||
{
|
||||
static __thread int local_cons_idx = 0;
|
||||
int local_prod, next_prod;
|
||||
static __thread unsigned int local_cons_idx = 0;
|
||||
unsigned int local_prod, next_prod;
|
||||
|
||||
local_prod = channel->prod_idx;
|
||||
next_prod = (local_prod + 1) % SHARED_SPACE_VOIDPTR;
|
||||
|
|
|
@ -17,7 +17,7 @@ static int lines_per_calc;
|
|||
|
||||
int init_calc(int lines_per_calc_param)
|
||||
{
|
||||
int i;
|
||||
unsigned int i;
|
||||
|
||||
assert(!(CACHE_SIZE % (lines_per_calc_param * CACHE_LINE_SIZE)));
|
||||
lines_per_calc = lines_per_calc_param;
|
||||
|
|
|
@ -59,7 +59,7 @@ void *recv_one_data(struct channel *channel)
|
|||
*/
|
||||
ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
||||
{
|
||||
int nb_read;
|
||||
size_t nb_read;
|
||||
|
||||
nb_read = 0;
|
||||
while (channel->state && nb_read < count)
|
||||
|
@ -76,13 +76,7 @@ ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
|||
n = channel->receiver_idx + (BUF_SIZE / sizeof(void *));
|
||||
channel->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *));
|
||||
for(; i < n; i++)
|
||||
{
|
||||
/*
|
||||
* The behaviour of this is not documented but we know
|
||||
* the values inside buf won't change during this affectation
|
||||
*/
|
||||
*buf++ = channel->buf[i];
|
||||
}
|
||||
nb_read += BUF_SIZE / sizeof(void *);
|
||||
channel->state = 0;
|
||||
}
|
||||
|
|
|
@ -51,16 +51,17 @@ void *recv_one_data(struct channel *channel)
|
|||
|
||||
ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
||||
{
|
||||
int n;
|
||||
size_t n;
|
||||
|
||||
n = 0;
|
||||
// If all slots are empty, spin
|
||||
while (channel->queue[channel->head].flag)
|
||||
{
|
||||
unsigned int i;
|
||||
|
||||
// Dequeue a chunk of data items
|
||||
memcpy(buf, (const void *)
|
||||
channel->queue[channel->head].chunk,
|
||||
SUB_SLOTS * sizeof(*buf));
|
||||
for(i = 0; i < SUB_SLOTS; i++)
|
||||
*buf++ = channel->queue[channel->head].chunk[i];
|
||||
n += SUB_SLOTS;
|
||||
channel->queue[channel->head].flag = 0;
|
||||
channel->head = (channel->head + 1) % SLOTS;
|
||||
|
|
|
@ -16,7 +16,7 @@ void *create_comm_channel(void)
|
|||
{
|
||||
if (!posix_memalign((void *) &channel->shared_space, CACHE_LINE_SIZE, SHARED_SPACE_SIZE))
|
||||
{
|
||||
int i;
|
||||
unsigned int i;
|
||||
|
||||
channel->head = 0;
|
||||
channel->tail = 0;
|
||||
|
@ -39,9 +39,9 @@ int destroy_comm_channel(void *channel)
|
|||
|
||||
int adjust_slip(struct channel *channel)
|
||||
{
|
||||
int dist, dist_old, unused;
|
||||
unsigned int dist, dist_old, unused;
|
||||
|
||||
puts("adjust_slip is called"); /* Must be removed after calibration */
|
||||
//puts("adjust_slip is called"); /* Must be removed after calibration */
|
||||
unused = 0;
|
||||
dist = (channel->head + SHARED_SPACE_VOIDPTR - channel->tail) % SHARED_SPACE_VOIDPTR;
|
||||
if (dist < DANGER)
|
||||
|
@ -49,7 +49,7 @@ int adjust_slip(struct channel *channel)
|
|||
dist_old = 0;
|
||||
do
|
||||
{
|
||||
int i;
|
||||
unsigned int i;
|
||||
|
||||
dist_old = dist;
|
||||
|
||||
|
@ -89,14 +89,12 @@ void *recv_one_data(struct channel *channel)
|
|||
|
||||
ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
||||
{
|
||||
int n, next_adjust;
|
||||
unsigned int n;
|
||||
static __thread int nb_iter = 0;
|
||||
|
||||
next_adjust = ADJUST_FREQ - nb_iter;
|
||||
for(n = 0; n < count; n++)
|
||||
for(n = 0; n < count; n++, nb_iter++)
|
||||
{
|
||||
/* if ((nb_iter + n) % ADJUST_FREQ == 0) */
|
||||
if (n && (n % next_adjust == ADJUST_FREQ))
|
||||
if (nb_iter % ADJUST_FREQ == 0)
|
||||
{
|
||||
adjust_slip(channel);
|
||||
nb_iter = 0;
|
||||
|
@ -112,6 +110,5 @@ ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
|||
channel->shared_space[channel->tail] = NULL;
|
||||
channel->tail = (channel->tail + 1) % SHARED_SPACE_VOIDPTR;
|
||||
}
|
||||
nb_iter = (nb_iter + n) % ADJUST_FREQ;
|
||||
return n;
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ int destroy_comm_channel(void *channel)
|
|||
|
||||
void *recv_one_data(struct channel *channel)
|
||||
{
|
||||
int cons_idx;
|
||||
unsigned int cons_idx;
|
||||
void *result;
|
||||
|
||||
cons_idx = channel->cons_idx;
|
||||
|
@ -48,7 +48,7 @@ void *recv_one_data(struct channel *channel)
|
|||
|
||||
ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
||||
{
|
||||
int n, cons_idx;
|
||||
unsigned int n, cons_idx;
|
||||
|
||||
n = 0;
|
||||
for(cons_idx = channel->cons_idx; cons_idx != channel->prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_VOIDPTR, channel->cons_idx = cons_idx)
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
#include <specific_comm.h>
|
||||
|
||||
|
||||
const int batchSize = 50; // Check with SHARED_SPACE_SIZE
|
||||
const unsigned int batchSize = 64; // Check with SHARED_SPACE_SIZE
|
||||
|
||||
void *create_comm_channel(void)
|
||||
{
|
||||
|
@ -68,7 +68,7 @@ void *recv_one_data(struct channel *channel)
|
|||
|
||||
ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
||||
{
|
||||
int n;
|
||||
unsigned int n;
|
||||
|
||||
for(n = 0; n < count; n++)
|
||||
{
|
||||
|
@ -79,10 +79,6 @@ ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
|||
channel->cons.localWrite = channel->ctrl.write;
|
||||
}
|
||||
|
||||
/*
|
||||
* The behaviour of this is not documented but we know
|
||||
* the values inside buf won't change during this affectation
|
||||
*/
|
||||
*buf++ = channel->shared_space[channel->cons.nextRead];
|
||||
channel->cons.nextRead = (channel->cons.nextRead + 1) % SHARED_SPACE_VOIDPTR;
|
||||
channel->cons.rBatch++;
|
||||
|
|
|
@ -16,7 +16,7 @@ void *create_comm_channel(void)
|
|||
return (void *) &store_var;
|
||||
}
|
||||
|
||||
int destroy_comm_channel(void *unused)
|
||||
int destroy_comm_channel(void *unused __attribute__ ((unused)))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ int destroy_comm_channel(void *unused)
|
|||
* @warning recv_one_data should not be used in conjonction of
|
||||
* recv_some_data
|
||||
*/
|
||||
void *recv_one_data(struct channel *channel)
|
||||
void *recv_one_data(struct channel *channel __attribute__ ((unused)))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
@ -43,7 +43,9 @@ void *recv_one_data(struct channel *channel)
|
|||
* recv_one_data
|
||||
* @warning count must be a multiple of BUF_SIZE
|
||||
*/
|
||||
ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
||||
ssize_t recv_some_data(struct channel *channel __attribute__ ((unused)),
|
||||
void **buf __attribute__ ((unused)),
|
||||
size_t count __attribute__ ((unused)))
|
||||
{
|
||||
return count;
|
||||
}
|
||||
|
|
|
@ -39,7 +39,8 @@ int destroy_comm_channel(void *channel)
|
|||
void *recv_one_data(struct channel *channel)
|
||||
{
|
||||
void *result, **res_ptr;
|
||||
int n, nb_read;
|
||||
int n;
|
||||
unsigned int nb_read;
|
||||
|
||||
nb_read = 0;
|
||||
res_ptr = &result;
|
||||
|
|
|
@ -36,7 +36,7 @@ int destroy_comm_channel(void *channel)
|
|||
void *recv_one_data(struct channel *channel)
|
||||
{
|
||||
void *result;
|
||||
int cons_idx, prod_idx;
|
||||
unsigned int cons_idx, prod_idx;
|
||||
|
||||
cons_idx = channel->cons_idx;
|
||||
prod_idx = channel->prod_idx;
|
||||
|
@ -55,7 +55,7 @@ void *recv_one_data(struct channel *channel)
|
|||
|
||||
ssize_t recv_some_data(struct channel *channel, void **buf, size_t count)
|
||||
{
|
||||
int n, cons_idx, prod_idx;
|
||||
unsigned int n, cons_idx, prod_idx;
|
||||
|
||||
n = 0;
|
||||
cons_idx = channel->cons_idx;
|
||||
|
|
|
@ -18,22 +18,33 @@
|
|||
#include <specific_comm.h>
|
||||
|
||||
|
||||
//#define MAX_BLOCK_ENTRIES (page_size / sizeof(void *))
|
||||
#define MAX_BLOCK_ENTRIES (4096 * CACHE_LINE_SIZE / sizeof(void *)) // Must be a multiple of BUF_SIZE
|
||||
//#define MAX_BLOCK_ENTRIES ((BUF_SIZE * 4 + page_size) & ~(page_size - 1)) // Big buffer size is not a good idea
|
||||
/* Must be a multiple of BUF_SIZE */
|
||||
#define MAX_BLOCK_ENTRIES (2048 * CACHE_LINE_SIZE / sizeof(void *))
|
||||
/*#define MAX_BLOCK_ENTRIES ((BUF_SIZE * 4 + page_size) & ~(page_size - 1)) // Big buffer size is not a good idea */
|
||||
#define toString(x) doStringification(x)
|
||||
#define doStringification(x) #x
|
||||
#define MIN(x,y) ((x < y) ? x : y)
|
||||
#define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t))
|
||||
#define PROD 1
|
||||
#define CONS 2
|
||||
#define SOURCE 0 /* Initial producer (calling do_calc()) */
|
||||
#define INTERM 1
|
||||
#define SINK 2 /* Final consumer (doing the check) */
|
||||
|
||||
typedef struct prod_cons_thread
|
||||
typedef struct data_xchg
|
||||
{
|
||||
void *prod_comm_channel;
|
||||
void *cons_comm_channel;
|
||||
int flags; // PROD, CONS or both
|
||||
int cpu_binding; // id of the CPU to run the thread on
|
||||
} prod_cons_thread_t;
|
||||
void ** volatile data_buf[2][MAX_BLOCK_ENTRIES];
|
||||
volatile int filled_buf_entries[2];
|
||||
int unused[20];
|
||||
volatile int buf_status:1 __attribute__((aligned (CACHE_LINE_SIZE)));
|
||||
} data_xchg_t;
|
||||
|
||||
typedef struct node_param
|
||||
{
|
||||
void *prev_comm_channel; /* Channel with previous node */
|
||||
void *next_comm_channel; /* Channel with next mode */
|
||||
data_xchg_t *c2p_xfer; /* Consumer to producer local transfer */
|
||||
int type; /* SOURCE, INTERM or SINK */
|
||||
int thread_idx;
|
||||
} node_param_t;
|
||||
|
||||
typedef int inc_check_t;
|
||||
|
||||
|
@ -44,18 +55,16 @@ static int (*init_calc)(int) = NULL;
|
|||
static void **(*do_calc)(void) = NULL;
|
||||
static int (*end_calc)(void) = NULL;
|
||||
static int shared = 0; /* We are not shared by default */
|
||||
pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER;
|
||||
pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER;
|
||||
static int init_calc_arg = 0;
|
||||
static long init_calc_arg = 0;
|
||||
static int block_reception = 1;
|
||||
static int nb_nodes = 2; // Nb of nodes participating to the chain of pipelines
|
||||
static long nb_nodes = 2; /* Nb of nodes in the chain of pipelines */
|
||||
static int check_recv_match_send = 0;
|
||||
static uintptr_t single_prod_check_val; // /!\ Implies only one real producer
|
||||
static inc_check_t *single_prod_check_ctxt; // /!\ Implies only one real producer
|
||||
static int nb_cpus = 4; // TOFIX: don't hardcode this
|
||||
static uintptr_t single_prod_check_val; /* /!\ Only one real producer */
|
||||
static inc_check_t *single_prod_check_ctxt; /* /!\ Only one real producer */
|
||||
static int nb_cpus = 4; /* TOFIX: don't hardcode this */
|
||||
static int page_size = 0;
|
||||
|
||||
void usage(char *argv[])
|
||||
static void usage(char *argv[])
|
||||
{
|
||||
char format[] = "-n <num_buf> -p <num_prod> [options]";
|
||||
char options[] = "Required options:\n"
|
||||
|
@ -70,7 +79,7 @@ void usage(char *argv[])
|
|||
"-l nb_nodes\t\t\tNumber of nodes in the pipeline chain\n"
|
||||
"-d\t\t\t\tReceive one piece of data\n"
|
||||
"-h\t\t\t\tPrint this help\n"
|
||||
"-s <level>\t\t\tShare the same L<level> cache or not\n"
|
||||
"-s\t\t\t\tShare the same L2 cache or not\n"
|
||||
"\t\t\t\tIf level is:\n"
|
||||
"\t\t\t\t\t> 0, then the same L<level> must be shared\n"
|
||||
"\t\t\t\t\t< 0, then different L<level> must be used\n"
|
||||
|
@ -80,24 +89,24 @@ void usage(char *argv[])
|
|||
printf("%s\n", options);
|
||||
}
|
||||
|
||||
int do_noinit(int unused)
|
||||
static int do_noinit(int unused __attribute__ ((unused)))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
void **do_nocalc(void)
|
||||
static void **do_nocalc(void)
|
||||
{
|
||||
static int an_int, *an_int_ptr = &an_int;
|
||||
|
||||
return (void **) &an_int_ptr;
|
||||
}
|
||||
|
||||
int do_noend(void)
|
||||
static int do_noend(void)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
int inc_check_init(int init_value, inc_check_t **context)
|
||||
static int inc_check_init(int init_value, inc_check_t **context)
|
||||
{
|
||||
inc_check_t *ctxt;
|
||||
|
||||
|
@ -109,24 +118,24 @@ int inc_check_init(int init_value, inc_check_t **context)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int inc_check_next(inc_check_t *context, uintptr_t *next_value)
|
||||
static int inc_check_next(inc_check_t *context, uintptr_t *next_value)
|
||||
{
|
||||
*next_value = (*context)++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int inc_check_end(inc_check_t *context)
|
||||
static int inc_check_end(inc_check_t *context)
|
||||
{
|
||||
free(context);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int do_checkinit(int init_value)
|
||||
static int do_checkinit(int init_value)
|
||||
{
|
||||
return inc_check_init(init_value, &single_prod_check_ctxt);
|
||||
}
|
||||
|
||||
void **do_checkcalc(void)
|
||||
static void **do_checkcalc(void)
|
||||
{
|
||||
int ret;
|
||||
|
||||
|
@ -137,17 +146,17 @@ void **do_checkcalc(void)
|
|||
return (void **) single_prod_check_val;
|
||||
}
|
||||
|
||||
int do_checkend(void)
|
||||
static int do_checkend(void)
|
||||
{
|
||||
return inc_check_end(single_prod_check_ctxt);
|
||||
}
|
||||
|
||||
int analyse_options(int argc, char *argv[])
|
||||
static int analyse_options(int argc, char *argv[])
|
||||
{
|
||||
int opt;
|
||||
|
||||
opterr = 0;
|
||||
while ((opt = getopt(argc, argv, ":bc:dhl:kn:s::"/*p:"*/)) != -1)
|
||||
while ((opt = getopt(argc, argv, ":bc:dhl:kn:s"/*:p:"*/)) != -1)
|
||||
{
|
||||
switch (opt)
|
||||
{
|
||||
|
@ -261,40 +270,10 @@ int analyse_options(int argc, char *argv[])
|
|||
break;
|
||||
#endif
|
||||
case 's' :
|
||||
if ((optind != argc) && (*argv[optind] != '-'))
|
||||
{
|
||||
int share_level;
|
||||
char *inval;
|
||||
share_level = strtol(argv[optind], &inval, 10);
|
||||
if ((*argv[optind] == '\0') || (*inval != '\0'))
|
||||
{
|
||||
fprintf(stderr, "Option '-p' needs an integer argument\n");
|
||||
return -1;
|
||||
}
|
||||
if ((share_level == LONG_MIN) || ((share_level == LONG_MAX) && errno == ERANGE))
|
||||
{
|
||||
fprintf(stderr, "Shared memory level must be between %ld and %ld, both inclusive\n", LONG_MIN, LONG_MAX);
|
||||
return -1;
|
||||
}
|
||||
/* TODO: Real management of shared memory level */
|
||||
/* TODO: -x: We want level x not to be shared; 0 do as we want, only memory is guaranteed to be shared */
|
||||
if (share_level <= 0)
|
||||
shared = 0;
|
||||
else
|
||||
shared = 1;
|
||||
optind++;
|
||||
}
|
||||
shared = 1;
|
||||
break;
|
||||
case '?' :
|
||||
fprintf(stderr, "Option inconnue\n");
|
||||
/*if (!strncmp("--check", argv[optind], strlen("--check")))
|
||||
{
|
||||
check_recv_match_send = 1;
|
||||
optind++;
|
||||
optopt = (int) *argv[optind];
|
||||
fprintf(stderr, "--check required\n");
|
||||
break;
|
||||
}*/
|
||||
return -1;
|
||||
case ':' :
|
||||
fprintf(stderr, "Option %s needs an argument\n", argv[optind]);
|
||||
|
@ -345,9 +324,9 @@ int analyse_options(int argc, char *argv[])
|
|||
return 0;
|
||||
}
|
||||
|
||||
int producer(void *prod_channel)
|
||||
static int initial_producer(node_param_t *node_param)
|
||||
{
|
||||
int i, j;
|
||||
unsigned int i, j;
|
||||
|
||||
if (init_calc(init_calc_arg))
|
||||
{
|
||||
|
@ -355,9 +334,8 @@ int producer(void *prod_channel)
|
|||
return 1;
|
||||
}
|
||||
for(i = 0; i < nb_bufs_sent; i++) {
|
||||
//printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE);
|
||||
for(j = 0; j < WORDS_PER_LINE; j++)
|
||||
send(prod_channel, do_calc());
|
||||
send(node_param->next_comm_channel, do_calc());
|
||||
}
|
||||
if (end_calc())
|
||||
{
|
||||
|
@ -368,178 +346,348 @@ int producer(void *prod_channel)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void on_message(void *val)
|
||||
static int intermediate_producer(node_param_t *node_param)
|
||||
{
|
||||
//printf("Receive value: %p\n", (void *) val);
|
||||
int i, j;
|
||||
long long total_data_sent = 0, total_to_send;
|
||||
|
||||
i = 0;
|
||||
total_to_send = nb_bufs_sent * WORDS_PER_LINE;
|
||||
while (total_data_sent < total_to_send)
|
||||
{
|
||||
while (!node_param->c2p_xfer->buf_status);
|
||||
for(j = 0; j < node_param->c2p_xfer->filled_buf_entries[i]; j++)
|
||||
send(node_param->next_comm_channel, node_param->c2p_xfer->data_buf[i][j]);
|
||||
total_data_sent += node_param->c2p_xfer->filled_buf_entries[i];
|
||||
node_param->c2p_xfer->buf_status = 0;
|
||||
i = !i;
|
||||
}
|
||||
printf("[%p] Producer finished !\n", (void*) pthread_self());
|
||||
return 0;
|
||||
}
|
||||
|
||||
int consumer(void *cons_channel)
|
||||
static void on_message(void *val __attribute__ ((unused)))
|
||||
{
|
||||
/*printf("Receive value: %p\n", (void *) val);*/
|
||||
}
|
||||
|
||||
static void *consumer_block(node_param_t *node_param)
|
||||
{
|
||||
int i = 0;
|
||||
int delayed_error;
|
||||
unsigned long long total_data_received = 0, total_to_receive;
|
||||
uintptr_t cons_check_value;
|
||||
inc_check_t *cons_check_context;
|
||||
|
||||
cons_check_context = NULL;
|
||||
delayed_error = 0;
|
||||
if (check_recv_match_send)
|
||||
{
|
||||
if (inc_check_init(init_calc_arg, &cons_check_context))
|
||||
{
|
||||
fprintf(stderr, "Initialization of check has failed\n");
|
||||
return &page_size; /* &page_size can't be NULL */
|
||||
}
|
||||
}
|
||||
cons_check_value = init_calc_arg;
|
||||
total_to_receive = nb_bufs_sent * WORDS_PER_LINE;
|
||||
while (total_data_received < total_to_receive)
|
||||
{
|
||||
int j;
|
||||
ssize_t nb_data_received;
|
||||
size_t to_receive;
|
||||
|
||||
to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received);
|
||||
nb_data_received = recv_some_data(node_param->prev_comm_channel,
|
||||
(void **) node_param->c2p_xfer->data_buf[i], to_receive);
|
||||
node_param->c2p_xfer->filled_buf_entries[i] = nb_data_received;
|
||||
if (unlikely(!(node_param->type & SINK)))
|
||||
{
|
||||
/* Check with nb lines sent & recv */
|
||||
while (node_param->c2p_xfer->buf_status);
|
||||
node_param->c2p_xfer->buf_status = 1;
|
||||
}
|
||||
total_data_received += nb_data_received;
|
||||
for (j = 0; j < nb_data_received; j++)
|
||||
{
|
||||
if (unlikely(check_recv_match_send))
|
||||
{
|
||||
if (inc_check_next(cons_check_context, &cons_check_value))
|
||||
{
|
||||
if (!delayed_error)
|
||||
{
|
||||
fprintf(stderr, "Error while checking received value match sent value\n");
|
||||
delayed_error = 1;
|
||||
}
|
||||
}
|
||||
if (cons_check_value != (uintptr_t) node_param->c2p_xfer->data_buf[i][j])
|
||||
{
|
||||
if (!delayed_error)
|
||||
{
|
||||
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) node_param->c2p_xfer->data_buf[i][j]);
|
||||
delayed_error = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
on_message(node_param->c2p_xfer->data_buf[i][j]);
|
||||
}
|
||||
i = !i;
|
||||
/*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");*/
|
||||
}
|
||||
if (unlikely(!(node_param->type & SINK)))
|
||||
printf("[%p] Consumer finished !\n", (void*) pthread_self());
|
||||
if (delayed_error)
|
||||
return &page_size; /* &page_size can't be NULL */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int consumer_data(node_param_t *node_param)
|
||||
{
|
||||
unsigned int i, j;
|
||||
int delayed_error;
|
||||
uintptr_t cons_check_value;
|
||||
inc_check_t *cons_check_context;
|
||||
|
||||
cons_check_context = NULL;
|
||||
delayed_error = 0;
|
||||
if (inc_check_init(init_calc_arg, &cons_check_context))
|
||||
if (check_recv_match_send)
|
||||
{
|
||||
fprintf(stderr, "Initialization of check has failed\n");
|
||||
return -1; /* &page_size can't be NULL, whatever NULL is bound to */
|
||||
if (inc_check_init(init_calc_arg, &cons_check_context))
|
||||
{
|
||||
fprintf(stderr, "Initialization of check has failed\n");
|
||||
return -1; /* &page_size can't be NULL */
|
||||
}
|
||||
}
|
||||
cons_check_value = init_calc_arg;
|
||||
if (block_reception)
|
||||
{
|
||||
long long total_data_received = 0;
|
||||
void *data_buf[MAX_BLOCK_ENTRIES];
|
||||
|
||||
while (total_data_received < nb_bufs_sent * WORDS_PER_LINE)
|
||||
for(i = 0; i < nb_bufs_sent; i++) {
|
||||
for(j = 0; j < WORDS_PER_LINE; j++)
|
||||
{
|
||||
int i;
|
||||
ssize_t nb_data_received;
|
||||
void *data;
|
||||
|
||||
nb_data_received = recv_some_data(cons_channel, data_buf, MAX_BLOCK_ENTRIES);
|
||||
total_data_received += nb_data_received;
|
||||
for (i = 0; i < nb_data_received; i++)
|
||||
data = recv_one_data(node_param->prev_comm_channel);
|
||||
if (unlikely(check_recv_match_send))
|
||||
{
|
||||
if (check_recv_match_send)
|
||||
if (inc_check_next(cons_check_context, &cons_check_value))
|
||||
{
|
||||
if (inc_check_next(cons_check_context, &cons_check_value))
|
||||
if (!delayed_error)
|
||||
{
|
||||
if (!delayed_error)
|
||||
{
|
||||
fprintf(stderr, "Error while checking received value match sent value\n");
|
||||
delayed_error = 1;
|
||||
}
|
||||
}
|
||||
if (cons_check_value != (uintptr_t) data_buf[i])
|
||||
{
|
||||
if (!delayed_error)
|
||||
{
|
||||
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data_buf[i]);
|
||||
delayed_error = 1;
|
||||
}
|
||||
fprintf(stderr, "Error while checking received value match sent value\n");
|
||||
delayed_error = 1;
|
||||
}
|
||||
}
|
||||
on_message(data_buf[i]);
|
||||
}
|
||||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int i, j;
|
||||
|
||||
for(i = 0; i < nb_bufs_sent; i++) {
|
||||
for(j = 0; j < WORDS_PER_LINE; j++)
|
||||
{
|
||||
void *data;
|
||||
|
||||
data = recv_one_data(cons_channel);
|
||||
if (check_recv_match_send)
|
||||
if (cons_check_value != (uintptr_t) data)
|
||||
{
|
||||
if (inc_check_next(cons_check_context, &cons_check_value))
|
||||
if (!delayed_error)
|
||||
{
|
||||
if (!delayed_error)
|
||||
{
|
||||
fprintf(stderr, "Error while checking received value match sent value\n");
|
||||
delayed_error = 1;
|
||||
}
|
||||
}
|
||||
if (cons_check_value != (uintptr_t) data)
|
||||
{
|
||||
if (!delayed_error)
|
||||
{
|
||||
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data);
|
||||
delayed_error = 1;
|
||||
}
|
||||
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data);
|
||||
delayed_error = 1;
|
||||
}
|
||||
}
|
||||
on_message(data);
|
||||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");
|
||||
}
|
||||
on_message(data);
|
||||
if (likely(node_param->type == INTERM))
|
||||
send(node_param->next_comm_channel, data);
|
||||
/*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");*/
|
||||
}
|
||||
}
|
||||
printf("[%p] Consumer finished !\n", (void*) pthread_self());
|
||||
if (unlikely(!(node_param->type & SINK)))
|
||||
printf("[%p] Consumer finished !\n", (void*) pthread_self());
|
||||
if (delayed_error)
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int consprod(void *cons_channel, void *prod_channel)
|
||||
static int alloc_consprod_databuf(data_xchg_t **c2p_xfer_addr)
|
||||
{
|
||||
if (block_reception)
|
||||
{
|
||||
long long total_data_received = 0;
|
||||
void *data_buf[MAX_BLOCK_ENTRIES];
|
||||
int ret;
|
||||
data_xchg_t *c2p_xfer;
|
||||
|
||||
while (total_data_received < nb_bufs_sent * WORDS_PER_LINE)
|
||||
{
|
||||
int i;
|
||||
ssize_t nb_data_received;
|
||||
|
||||
nb_data_received = recv_some_data(cons_channel, data_buf, MAX_BLOCK_ENTRIES);
|
||||
total_data_received += nb_data_received;
|
||||
for (i = 0; i < nb_data_received; i++)
|
||||
send(prod_channel, data_buf[i]);
|
||||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int i, j;
|
||||
|
||||
for(i = 0; i < nb_bufs_sent; i++) {
|
||||
for(j = 0; j < WORDS_PER_LINE; j++)
|
||||
send(prod_channel, recv_one_data(cons_channel));
|
||||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");
|
||||
}
|
||||
}
|
||||
printf("[%p] Producer/consumer finished !\n", (void*) pthread_self());
|
||||
ret = posix_memalign((void **) &c2p_xfer, page_size, sizeof(*c2p_xfer));
|
||||
c2p_xfer->buf_status = 0;
|
||||
if (ret)
|
||||
return EXIT_FAILURE;
|
||||
*c2p_xfer_addr = c2p_xfer;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void *node(prod_cons_thread_t *thread_params)
|
||||
static int interm_block_xfer(node_param_t *node_param)
|
||||
{
|
||||
int return_value;
|
||||
void *pthread_return_value;
|
||||
pthread_t prod_tid;
|
||||
|
||||
return_value = 0;
|
||||
if (alloc_consprod_databuf(&node_param->c2p_xfer))
|
||||
return -1;
|
||||
/* Create producer to next node, we are consumer of previous node */
|
||||
if (pthread_create(&prod_tid, NULL, (void *(*)(void *)) consumer_block,
|
||||
node_param))
|
||||
{
|
||||
perror("pthread_create cons (block_reception)");
|
||||
return_value = -1;
|
||||
goto free_c2p_xfer_ressources;
|
||||
}
|
||||
return_value = intermediate_producer(node_param);
|
||||
pthread_join(prod_tid, &pthread_return_value);
|
||||
if (pthread_return_value != NULL)
|
||||
return_value = -1;
|
||||
|
||||
free_c2p_xfer_ressources:
|
||||
free(node_param->c2p_xfer);
|
||||
return return_value;
|
||||
}
|
||||
|
||||
static int set_cpu_binding(int thread_idx)
|
||||
{
|
||||
int cpu_binding;
|
||||
pthread_t tid;
|
||||
cpu_set_t cpuset;
|
||||
|
||||
/* Should work in most cases */
|
||||
if (shared)
|
||||
cpu_binding = thread_idx % nb_cpus;
|
||||
else
|
||||
cpu_binding = (2 * thread_idx) % nb_cpus;
|
||||
tid = pthread_self();
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(thread_params->cpu_binding, &cpuset);
|
||||
CPU_SET(cpu_binding, &cpuset);
|
||||
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
|
||||
{
|
||||
perror("pthread_setaffinity_np");
|
||||
return NULL;
|
||||
return -1;
|
||||
}
|
||||
switch (thread_params->flags & (PROD | CONS))
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *node(node_param_t *node_param)
|
||||
{
|
||||
int return_value;
|
||||
|
||||
if (set_cpu_binding(node_param->thread_idx))
|
||||
return &page_size;
|
||||
switch (node_param->type)
|
||||
{
|
||||
case PROD:
|
||||
return_value = producer(thread_params->prod_comm_channel);
|
||||
case SOURCE:
|
||||
return_value = initial_producer(node_param);
|
||||
break;
|
||||
|
||||
case CONS:
|
||||
return_value = consumer(thread_params->cons_comm_channel);
|
||||
case INTERM:
|
||||
if (block_reception)
|
||||
return_value = interm_block_xfer(node_param);
|
||||
else
|
||||
return_value = consumer_data(node_param);
|
||||
break;
|
||||
|
||||
case (PROD | CONS):
|
||||
return_value = consprod(thread_params->cons_comm_channel,
|
||||
thread_params->prod_comm_channel);
|
||||
case SINK:
|
||||
if (block_reception)
|
||||
{
|
||||
if (alloc_consprod_databuf(&node_param->c2p_xfer))
|
||||
return &page_size;
|
||||
if (consumer_block(node_param) == NULL) //TODO: allocate data_buf
|
||||
return_value = 0;
|
||||
else
|
||||
return_value = -1;
|
||||
free(node_param->c2p_xfer);
|
||||
}
|
||||
else
|
||||
return_value = consumer_data(node_param);
|
||||
break;
|
||||
|
||||
default:
|
||||
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
|
||||
return &page_size; /* &page_size can't be NULL */
|
||||
}
|
||||
if (return_value)
|
||||
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
|
||||
return &page_size; /* &page_size can't be NULL */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int set_node_params(node_param_t *node_param,
|
||||
node_param_t *prev_node_param, int thread_idx)
|
||||
{
|
||||
if (thread_idx == nb_nodes - 1)
|
||||
node_param->type = SINK;
|
||||
else
|
||||
{
|
||||
if (thread_idx)
|
||||
node_param->type = INTERM;
|
||||
else
|
||||
node_param->type = SOURCE;
|
||||
node_param->next_comm_channel = create_comm_channel();
|
||||
if (node_param->next_comm_channel == NULL)
|
||||
return -1;
|
||||
}
|
||||
if (prev_node_param != NULL)
|
||||
{
|
||||
node_param->prev_comm_channel =
|
||||
prev_node_param->next_comm_channel;
|
||||
}
|
||||
node_param->thread_idx = thread_idx;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int create_threads(int *p_nb_nodes, pthread_t *node_tids,
|
||||
node_param_t *node_params)
|
||||
{
|
||||
int i, nb_nodes;
|
||||
|
||||
nb_nodes = *p_nb_nodes;
|
||||
for (i = 0; i < nb_nodes; i++)
|
||||
{
|
||||
node_param_t *prev_node_param;
|
||||
|
||||
prev_node_param = (i) ? &node_params[i - 1] : NULL;
|
||||
if (set_node_params(&node_params[i], prev_node_param, i))
|
||||
{
|
||||
*p_nb_nodes = i - 1;
|
||||
return -1;
|
||||
}
|
||||
if (pthread_create(&node_tids[i], NULL,
|
||||
(void *(*)(void *)) node, &node_params[i]))
|
||||
{
|
||||
perror("pthread_create node");
|
||||
destroy_comm_channel(node_params[i].next_comm_channel);
|
||||
*p_nb_nodes = i - 1;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
*p_nb_nodes = i - 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int join_threads(int nb_threads, pthread_t *tids)
|
||||
{
|
||||
int i, return_value;
|
||||
void *pthread_return_value;
|
||||
|
||||
for (i = 0, return_value = 0; i < nb_threads; i++)
|
||||
{
|
||||
pthread_join(tids[i], &pthread_return_value);
|
||||
if (pthread_return_value != NULL)
|
||||
return_value = EXIT_FAILURE;
|
||||
}
|
||||
return return_value;
|
||||
}
|
||||
|
||||
static int destroy_threads(int last_allocated, node_param_t *node_params)
|
||||
{
|
||||
int i, return_value;
|
||||
|
||||
for (i = last_allocated, return_value = 0; i >= 0; i--)
|
||||
{
|
||||
if (node_params[i].type != SINK)
|
||||
{
|
||||
if (destroy_comm_channel(node_params[i].next_comm_channel))
|
||||
return_value = -1;
|
||||
}
|
||||
}
|
||||
return return_value;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
int return_value, nb_threads;
|
||||
pthread_t *tids;
|
||||
int i, return_value;
|
||||
void *pthread_return_value;
|
||||
prod_cons_thread_t *thread_params;
|
||||
node_param_t *node_params;
|
||||
|
||||
return_value = EXIT_SUCCESS;
|
||||
if (analyse_options(argc, argv))
|
||||
|
@ -547,59 +695,25 @@ int main(int argc, char *argv[])
|
|||
page_size = sysconf(_SC_PAGE_SIZE);
|
||||
if (page_size <= 0)
|
||||
return EXIT_FAILURE;
|
||||
thread_params = malloc(nb_nodes * sizeof(prod_cons_thread_t));
|
||||
if (thread_params == NULL)
|
||||
nb_threads = nb_nodes;
|
||||
node_params = malloc(nb_threads * sizeof(node_param_t));
|
||||
if (node_params == NULL)
|
||||
return EXIT_FAILURE;
|
||||
tids = malloc(nb_nodes * sizeof(pthread_t));
|
||||
tids = malloc(nb_threads * sizeof(pthread_t));
|
||||
if (tids == NULL)
|
||||
{
|
||||
return_value = EXIT_FAILURE;
|
||||
goto error_alloc_tids;
|
||||
}
|
||||
for (i = 0; i < nb_nodes - 1; i++)
|
||||
{
|
||||
if (i)
|
||||
thread_params[i].flags = PROD | CONS;
|
||||
else
|
||||
thread_params[i].flags = PROD;
|
||||
// Should work in most cases
|
||||
if (shared)
|
||||
thread_params[i].cpu_binding = i % nb_cpus;
|
||||
else
|
||||
thread_params[i].cpu_binding = (2 * i) % nb_cpus;
|
||||
thread_params[i].prod_comm_channel = create_comm_channel();
|
||||
if (thread_params[i].prod_comm_channel == NULL)
|
||||
{
|
||||
return_value = EXIT_FAILURE;
|
||||
goto error_create_channels;
|
||||
}
|
||||
if (i)
|
||||
thread_params[i].cons_comm_channel =
|
||||
thread_params[i - 1].prod_comm_channel;
|
||||
pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]);
|
||||
}
|
||||
thread_params[i].flags = CONS;
|
||||
if (shared)
|
||||
thread_params[i].cpu_binding = i % nb_cpus;
|
||||
else
|
||||
thread_params[i].cpu_binding = (2 * i) % nb_cpus;
|
||||
thread_params[i].cons_comm_channel =
|
||||
thread_params[i - 1].prod_comm_channel;
|
||||
pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]);
|
||||
for (i = 0; i < nb_nodes; i++)
|
||||
{
|
||||
pthread_join(tids[i], &pthread_return_value);
|
||||
if (pthread_return_value != NULL)
|
||||
return_value = EXIT_FAILURE;
|
||||
}
|
||||
i--;
|
||||
if (create_threads(&nb_threads, tids, node_params))
|
||||
goto error_create_channels;
|
||||
if (join_threads(nb_threads, tids))
|
||||
return_value = EXIT_FAILURE;
|
||||
error_create_channels:
|
||||
for (i-- ; i >= 0; i--) {
|
||||
if (destroy_comm_channel(thread_params[i].prod_comm_channel))
|
||||
return_value = EXIT_FAILURE;
|
||||
}
|
||||
if (destroy_threads(nb_threads, node_params))
|
||||
return_value = EXIT_FAILURE;
|
||||
free(tids);
|
||||
error_alloc_tids:
|
||||
free(thread_params);
|
||||
free(node_params);
|
||||
return return_value;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue