598 lines
15 KiB
C
598 lines
15 KiB
C
#define _GNU_SOURCE
|
||
#define _POSIX_SOURCE 1
|
||
|
||
#include <stdio.h>
|
||
#include <stdlib.h>
|
||
#include <pthread.h>
|
||
#include <stdint.h>
|
||
#include <unistd.h>
|
||
#include <errno.h>
|
||
#include <limits.h>
|
||
#include <sys/stat.h>
|
||
#include <string.h>
|
||
#include <dlfcn.h>
|
||
#include <sys/time.h>
|
||
|
||
/* Non standards includes */
|
||
#include <commtech.h>
|
||
#include <specific_comm.h>
|
||
|
||
|
||
#define MAX_BLOCK_ENTRIES (page_size / sizeof(void *))
|
||
#define toString(x) doStringification(x)
|
||
#define doStringification(x) #x
|
||
#define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t))
|
||
#define PROD 1
|
||
#define CONS 2
|
||
|
||
typedef struct prod_cons_thread
|
||
{
|
||
void *prod_comm_channel;
|
||
void *cons_comm_channel;
|
||
int flags; // PROD, CONS or both
|
||
int cpu_binding; // id of the CPU to run the thread on
|
||
} prod_cons_thread_t;
|
||
|
||
typedef int inc_check_t;
|
||
|
||
|
||
static long nb_bufs_sent = 0;
|
||
long nb_prod = 0;
|
||
static int (*init_calc)(int) = NULL;
|
||
static void **(*do_calc)(void) = NULL;
|
||
static int (*end_calc)(void) = NULL;
|
||
static int shared = 0; /* We are not shared by default */
|
||
pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER;
|
||
pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER;
|
||
static int init_calc_arg = 0;
|
||
static int block_reception = 1;
|
||
static int nb_nodes = 2; // Nb of nodes participating to the chain of pipelines
|
||
static int check_recv_match_send = 0;
|
||
static uintptr_t single_prod_check_val; // /!\ Implies only one real producer
|
||
static inc_check_t *single_prod_check_ctxt; // /!\ Implies only one real producer
|
||
static int nb_cpus = 4; // TOFIX: don't hardcode this
|
||
static int page_size = 0;
|
||
|
||
void usage(char *argv[])
|
||
{
|
||
char format[] = "-n <num_buf> -p <num_prod> [options]";
|
||
char options[] = "Required options:\n"
|
||
"-n nb_buffer_sent\t\tNumber of buffer to send to another core\n"
|
||
"\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n"
|
||
"Facultative options:\n"
|
||
"-b\t\t\t\tReceive the biggest amount of data available (The default)\n"
|
||
"-c calculation_libname arg\tLibrary to use for calculation with its argument\n"
|
||
"\t\t\t\tThis library must implement functions in calc.h\n"
|
||
"\t\t\t\t(default to none)\n"
|
||
"-k\t\t\t\tCheck we receive what is sent\n"
|
||
"-d\t\t\t\tReceive one piece of data\n"
|
||
"-h\t\t\t\tPrint this help\n"
|
||
"-s <level>\t\t\tShare the same L<level> cache or not\n"
|
||
"\t\t\t\tIf level is:\n"
|
||
"\t\t\t\t\t> 0, then the same L<level> must be shared\n"
|
||
"\t\t\t\t\t< 0, then different L<level> must be used\n"
|
||
"\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n"
|
||
"-t\t\t\tnb_nodes\t\tNumber of nodes in the pipeline chain\n";
|
||
printf("Usage : %s %s\n", argv[0], format);
|
||
printf("Options :\n");
|
||
printf("%s\n", options);
|
||
}
|
||
|
||
int do_noinit(int unused)
|
||
{
|
||
return 0;
|
||
}
|
||
|
||
void **do_nocalc(void)
|
||
{
|
||
static int an_int, *an_int_ptr = &an_int;
|
||
|
||
return (void **) &an_int_ptr;
|
||
}
|
||
|
||
int do_noend(void)
|
||
{
|
||
return 0;
|
||
}
|
||
|
||
int inc_check_init(int init_value, inc_check_t **context)
|
||
{
|
||
inc_check_t *ctxt;
|
||
|
||
ctxt = malloc(sizeof(*ctxt));
|
||
if (ctxt == NULL)
|
||
return -1;
|
||
*ctxt = init_value;
|
||
*context = ctxt;
|
||
return 0;
|
||
}
|
||
|
||
int inc_check_next(inc_check_t *context, uintptr_t *next_value)
|
||
{
|
||
*next_value = (*context)++;
|
||
return 0;
|
||
}
|
||
|
||
int inc_check_end(inc_check_t *context)
|
||
{
|
||
free(context);
|
||
return 0;
|
||
}
|
||
|
||
int do_checkinit(int init_value)
|
||
{
|
||
return inc_check_init(init_value, &single_prod_check_ctxt);
|
||
}
|
||
|
||
void **do_checkcalc(void)
|
||
{
|
||
int ret;
|
||
|
||
ret = inc_check_next(single_prod_check_ctxt, &single_prod_check_val);
|
||
if (ret)
|
||
return NULL;
|
||
else
|
||
return (void **) single_prod_check_val;
|
||
}
|
||
|
||
int do_checkend(void)
|
||
{
|
||
return inc_check_end(single_prod_check_ctxt);
|
||
}
|
||
|
||
int analyse_options(int argc, char *argv[])
|
||
{
|
||
int opt;
|
||
|
||
opterr = 0;
|
||
while ((opt = getopt(argc, argv, ":bc:dhkn:s::"/*p:"*/)) != -1)
|
||
{
|
||
switch (opt)
|
||
{
|
||
case 'b' :
|
||
block_reception = 1;
|
||
break;
|
||
case 'c' :
|
||
{
|
||
struct stat file_stat;
|
||
void *dl_descriptor;
|
||
|
||
if (stat(optarg, &file_stat))
|
||
{
|
||
fprintf(stderr, "%s: %s\n", optarg, strerror(errno));
|
||
return -1;
|
||
}
|
||
dl_descriptor = dlopen(optarg, RTLD_LAZY | RTLD_LOCAL);
|
||
if (dl_descriptor == NULL)
|
||
{
|
||
fprintf(stderr, "dlopen error: %s\n", dlerror());
|
||
return -1;
|
||
}
|
||
init_calc = (int (*)(int)) dlsym(dl_descriptor, "init_calc");
|
||
do_calc = (void ** (*)(void)) dlsym(dl_descriptor, "do_calc");
|
||
end_calc = (int (*)(void)) dlsym(dl_descriptor, "end_calc");
|
||
if ((init_calc == NULL) || (do_calc == NULL) || (end_calc == NULL))
|
||
{
|
||
fprintf(stderr, "A symbol cannot be loaded: %s\n", dlerror());
|
||
return -1;
|
||
}
|
||
if ((optind == argc) || (*argv[optind] == '-'))
|
||
{
|
||
fprintf(stderr, "Missing argument for -c option\n");
|
||
return -1;
|
||
}
|
||
{
|
||
char *inval;
|
||
init_calc_arg = strtol(argv[optind], &inval, 10);
|
||
if ((*argv[optind] == '\0') || (*inval != '\0'))
|
||
{
|
||
fprintf(stderr, "Option '-c' needs also an integer argument\n");
|
||
return -1;
|
||
}
|
||
if ((init_calc_arg <= 0) || ((init_calc_arg == LONG_MAX) && errno == ERANGE))
|
||
{
|
||
fprintf(stderr, "Number of useless loop to be done between 2 send must be"
|
||
" between 1 and %ld, both inclusive\n", LONG_MAX);
|
||
return -1;
|
||
}
|
||
}
|
||
optind++;
|
||
}
|
||
break;
|
||
case 'd' :
|
||
block_reception = 0;
|
||
break;
|
||
case 'h' :
|
||
usage(argv);
|
||
exit(EXIT_SUCCESS);
|
||
case 'k' :
|
||
check_recv_match_send = 1;
|
||
break;
|
||
case 'l' :
|
||
{
|
||
char *inval;
|
||
nb_nodes = strtol(optarg, &inval, 10);
|
||
if ((*optarg == '\0') || (*inval != '\0'))
|
||
{
|
||
fprintf(stderr, "Option '-l' needs an integer argument\n");
|
||
return -1;
|
||
}
|
||
if ((nb_nodes < 2) || ((nb_nodes == LONG_MAX) && errno == ERANGE))
|
||
{
|
||
fprintf(stderr, "Number of links to participate in the pipeline chain must be between 2 and %ld, both inclusive\n", LONG_MAX);
|
||
return -1;
|
||
}
|
||
}
|
||
break;
|
||
case 'n' :
|
||
{
|
||
char *inval;
|
||
nb_bufs_sent = strtol(optarg, &inval, 10);
|
||
if ((*optarg == '\0') || (*inval != '\0'))
|
||
{
|
||
fprintf(stderr, "Option '-n' needs an integer argument\n");
|
||
return -1;
|
||
}
|
||
if ((nb_bufs_sent <= 0) || ((nb_bufs_sent == LONG_MAX) && errno == ERANGE))
|
||
{
|
||
fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX);
|
||
return -1;
|
||
}
|
||
}
|
||
break;
|
||
#if 0
|
||
case 'p' :
|
||
{
|
||
char *inval;
|
||
nb_prod = strtol(optarg, &inval, 10);
|
||
if ((*optarg == '\0') || (*inval != '\0'))
|
||
{
|
||
fprintf(stderr, "Option '-p' needs an integer argument\n");
|
||
return -1;
|
||
}
|
||
if ((nb_prod <= 0) || ((nb_prod == LONG_MAX) && errno == ERANGE))
|
||
{
|
||
fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX);
|
||
return -1;
|
||
}
|
||
}
|
||
break;
|
||
#endif
|
||
case 's' :
|
||
if ((optind != argc) && (*argv[optind] != '-'))
|
||
{
|
||
int share_level;
|
||
char *inval;
|
||
share_level = strtol(argv[optind], &inval, 10);
|
||
if ((*argv[optind] == '\0') || (*inval != '\0'))
|
||
{
|
||
fprintf(stderr, "Option '-p' needs an integer argument\n");
|
||
return -1;
|
||
}
|
||
if ((share_level == LONG_MIN) || ((share_level == LONG_MAX) && errno == ERANGE))
|
||
{
|
||
fprintf(stderr, "Shared memory level must be between %ld and %ld, both inclusive\n", LONG_MIN, LONG_MAX);
|
||
return -1;
|
||
}
|
||
/* TODO: Real management of shared memory level */
|
||
/* TODO: -x: We want level x not to be shared; 0 do as we want, only memory is guaranteed to be shared */
|
||
if (share_level <= 0)
|
||
shared = 0;
|
||
else
|
||
shared = 1;
|
||
optind++;
|
||
}
|
||
break;
|
||
case '?' :
|
||
fprintf(stderr, "Option inconnue\n");
|
||
/*if (!strncmp("--check", argv[optind], strlen("--check")))
|
||
{
|
||
check_recv_match_send = 1;
|
||
optind++;
|
||
optopt = (int) *argv[optind];
|
||
fprintf(stderr, "--check required\n");
|
||
break;
|
||
}*/
|
||
return -1;
|
||
case ':' :
|
||
fprintf(stderr, "Option %s needs an argument\n", argv[optind]);
|
||
return -1;
|
||
default :
|
||
fprintf(stderr, "Error while analysing command line options\n");
|
||
return -1;
|
||
}
|
||
}
|
||
if (!nb_bufs_sent)
|
||
{
|
||
fprintf(stderr, "You must give the number of cache lines to be sent\n");
|
||
return -1;
|
||
}
|
||
#if 0
|
||
if (!nb_prod)
|
||
{
|
||
fprintf(stderr, "You must give the number of producers\n");
|
||
return -1;
|
||
}
|
||
#endif
|
||
if (shared && (nb_prod > 1))
|
||
{
|
||
fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n");
|
||
return -1;
|
||
}
|
||
if (check_recv_match_send && do_calc)
|
||
{
|
||
fprintf(stderr, "Can't specifying a computation library with check activated\n");
|
||
return -1;
|
||
}
|
||
if (do_calc == NULL)
|
||
{
|
||
if (check_recv_match_send)
|
||
{
|
||
init_calc = do_checkinit;
|
||
do_calc = do_checkcalc;
|
||
end_calc = do_checkend;
|
||
}
|
||
else
|
||
{
|
||
init_calc = do_noinit;
|
||
do_calc = do_nocalc;
|
||
end_calc = do_noend;
|
||
}
|
||
}
|
||
printf("buf size: %lu\n", WORDS_PER_LINE);
|
||
return 0;
|
||
}
|
||
|
||
int producer(void *prod_channel)
|
||
{
|
||
int i, j;
|
||
|
||
if (init_calc(init_calc_arg))
|
||
{
|
||
fprintf(stderr, "Initialization of calculation has failed\n");
|
||
return 1;
|
||
}
|
||
for(i = 0; i < nb_bufs_sent; i++) {
|
||
//printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE);
|
||
for(j = 0; j < WORDS_PER_LINE; j++)
|
||
send(prod_channel, do_calc());
|
||
}
|
||
if (end_calc())
|
||
{
|
||
fprintf(stderr, "uninitialization of calculation has failed\n");
|
||
return 1;
|
||
}
|
||
printf("[%p] Producer finished !\n", (void*) pthread_self());
|
||
return 0;
|
||
}
|
||
|
||
void on_message(void *val)
|
||
{
|
||
//printf("Receive value: %p\n", (void *) val);
|
||
}
|
||
|
||
int consumer(void *cons_channel)
|
||
{
|
||
int delayed_error;
|
||
uintptr_t cons_check_value;
|
||
inc_check_t *cons_check_context;
|
||
|
||
delayed_error = 0;
|
||
if (inc_check_init(init_calc_arg, &cons_check_context))
|
||
{
|
||
fprintf(stderr, "Initialization of check has failed\n");
|
||
return -1; /* &page_size can't be NULL, whatever NULL is bound to */
|
||
}
|
||
cons_check_value = init_calc_arg;
|
||
if (block_reception)
|
||
{
|
||
long long total_data_received = 0;
|
||
void *data_buf[MAX_BLOCK_ENTRIES];
|
||
|
||
while (total_data_received < nb_bufs_sent * WORDS_PER_LINE)
|
||
{
|
||
int i;
|
||
ssize_t nb_data_received;
|
||
|
||
nb_data_received = recv_some_data(cons_channel, data_buf, MAX_BLOCK_ENTRIES);
|
||
total_data_received += nb_data_received;
|
||
for (i = 0; i < nb_data_received; i++)
|
||
{
|
||
if (inc_check_next(cons_check_context, &cons_check_value))
|
||
{
|
||
if (!delayed_error)
|
||
{
|
||
fprintf(stderr, "Error while checking received value match sent value\n");
|
||
delayed_error = 1;
|
||
}
|
||
}
|
||
if (cons_check_value != (uintptr_t) data_buf[i])
|
||
{
|
||
if (!delayed_error)
|
||
{
|
||
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data_buf[i]);
|
||
delayed_error = 1;
|
||
}
|
||
}
|
||
on_message(data_buf[i]);
|
||
}
|
||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");
|
||
}
|
||
}
|
||
else
|
||
{
|
||
int i, j;
|
||
|
||
for(i = 0; i < nb_bufs_sent; i++) {
|
||
for(j = 0; j < WORDS_PER_LINE; j++)
|
||
{
|
||
void *data;
|
||
|
||
data = recv_one_data(cons_channel);
|
||
if (inc_check_next(cons_check_context, &cons_check_value))
|
||
{
|
||
if (!delayed_error)
|
||
{
|
||
fprintf(stderr, "Error while checking received value match sent value\n");
|
||
delayed_error = 1;
|
||
}
|
||
}
|
||
if (cons_check_value != (uintptr_t) data)
|
||
{
|
||
if (!delayed_error)
|
||
{
|
||
fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data);
|
||
delayed_error = 1;
|
||
}
|
||
}
|
||
on_message(data);
|
||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");
|
||
}
|
||
}
|
||
}
|
||
printf("[%p] Consumer finished !\n", (void*) pthread_self());
|
||
if (delayed_error)
|
||
return -1;
|
||
return 0;
|
||
}
|
||
|
||
int consprod(void *cons_channel, void *prod_channel)
|
||
{
|
||
if (block_reception)
|
||
{
|
||
long long total_data_received = 0;
|
||
void *data_buf[MAX_BLOCK_ENTRIES];
|
||
|
||
while (total_data_received < nb_bufs_sent * WORDS_PER_LINE)
|
||
{
|
||
int i;
|
||
ssize_t nb_data_received;
|
||
|
||
nb_data_received = recv_some_data(cons_channel, data_buf, MAX_BLOCK_ENTRIES);
|
||
total_data_received += nb_data_received;
|
||
for (i = 0; i < nb_data_received; i++)
|
||
send(prod_channel, data_buf[i]);
|
||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");
|
||
}
|
||
}
|
||
else
|
||
{
|
||
int i, j;
|
||
|
||
for(i = 0; i < nb_bufs_sent; i++) {
|
||
for(j = 0; j < WORDS_PER_LINE; j++)
|
||
send(prod_channel, recv_one_data(cons_channel));
|
||
//printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");
|
||
}
|
||
}
|
||
printf("[%p] Producer/consumer finished !\n", (void*) pthread_self());
|
||
return 0;
|
||
}
|
||
|
||
void *node(prod_cons_thread_t *thread_params)
|
||
{
|
||
int return_value;
|
||
pthread_t tid;
|
||
cpu_set_t cpuset;
|
||
|
||
tid = pthread_self();
|
||
CPU_ZERO(&cpuset);
|
||
CPU_SET(thread_params->cpu_binding, &cpuset);
|
||
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
|
||
{
|
||
perror("pthread_setaffinity_np");
|
||
return NULL;
|
||
}
|
||
switch (thread_params->flags & (PROD | CONS))
|
||
{
|
||
case PROD:
|
||
return_value = producer(thread_params->prod_comm_channel);
|
||
break;
|
||
|
||
case CONS:
|
||
return_value = consumer(thread_params->cons_comm_channel);
|
||
break;
|
||
|
||
case (PROD | CONS):
|
||
return_value = consprod(thread_params->cons_comm_channel,
|
||
thread_params->prod_comm_channel);
|
||
break;
|
||
|
||
default:
|
||
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
|
||
}
|
||
if (return_value)
|
||
return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */
|
||
return NULL;
|
||
}
|
||
|
||
int main(int argc, char *argv[])
|
||
{
|
||
pthread_t *tids;
|
||
int i, return_value;
|
||
void *pthread_return_value;
|
||
prod_cons_thread_t *thread_params;
|
||
|
||
return_value = EXIT_SUCCESS;
|
||
if (analyse_options(argc, argv))
|
||
return EXIT_FAILURE;
|
||
page_size = sysconf(_SC_PAGE_SIZE);
|
||
if (page_size <= 0)
|
||
return EXIT_FAILURE;
|
||
thread_params = malloc(nb_nodes * sizeof(prod_cons_thread_t));
|
||
if (thread_params == NULL)
|
||
return EXIT_FAILURE;
|
||
tids = malloc(nb_nodes * sizeof(pthread_t));
|
||
if (tids == NULL)
|
||
{
|
||
return_value = EXIT_FAILURE;
|
||
goto error_alloc_tids;
|
||
}
|
||
for (i = 0; i < nb_nodes - 1; i++)
|
||
{
|
||
if (i)
|
||
thread_params[i].flags = PROD | CONS;
|
||
else
|
||
thread_params[i].flags = PROD;
|
||
// Should work in most cases
|
||
if (shared)
|
||
thread_params[i].cpu_binding = i % nb_cpus;
|
||
else
|
||
thread_params[i].cpu_binding = (2 * i) % nb_cpus;
|
||
thread_params[i].prod_comm_channel = create_comm_channel();
|
||
if (thread_params[i].prod_comm_channel == NULL)
|
||
{
|
||
return_value = EXIT_FAILURE;
|
||
goto error_create_channels;
|
||
}
|
||
if (i)
|
||
thread_params[i].cons_comm_channel =
|
||
thread_params[i - 1].prod_comm_channel;
|
||
pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]);
|
||
}
|
||
thread_params[i].flags = CONS;
|
||
if (shared)
|
||
thread_params[i].cpu_binding = i % nb_cpus;
|
||
else
|
||
thread_params[i].cpu_binding = (2 * i) % nb_cpus;
|
||
thread_params[i].cons_comm_channel =
|
||
thread_params[i - 1].prod_comm_channel;
|
||
pthread_create(&tids[i], NULL, (void *(*)(void *)) node, &thread_params[i]);
|
||
for (i = 0; i < nb_nodes; i++)
|
||
{
|
||
pthread_join(tids[i], &pthread_return_value);
|
||
if (pthread_return_value != NULL)
|
||
return_value = EXIT_FAILURE;
|
||
}
|
||
i--;
|
||
error_create_channels:
|
||
for (i-- ; i >= 0; i--) {
|
||
if (destroy_comm_channel(thread_params[i].prod_comm_channel))
|
||
return_value = EXIT_FAILURE;
|
||
}
|
||
free(tids);
|
||
error_alloc_tids:
|
||
free(thread_params);
|
||
return return_value;
|
||
}
|