rt_benchs/communication_techniques/src/main.c

304 lines
7.2 KiB
C

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdint.h>
#include <unistd.h>
#include <errno.h>
#include <limits.h>
#include <sys/stat.h>
#include <string.h>
#include <dlfcn.h>
#include <sys/time.h>
/* Non standards includes */
#include <papihighlevel.h>
#include <commtech.h>
#include <specific_comm.h>
#define toString(x) doStringification(x)
#define doStringification(x) #x
#define INIT_CALC_ARG 16
static long nb_bufs_sent = 0;
long nb_prod = 0;
static void (*init_calc)(int) = NULL;
static void *(*do_calc)(void) = NULL;
static void (*end_calc)(void) = NULL;
static int shared = 0;
pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER;
static int consumer_has_finished = 0;
static int producers_ended = 0;
void usage(char *argv[])
{
char format[] = "-n [options]";
char options[] = "Required options :\n"
"-n nb_buffer_sent\tNumber of buffer to send to another core\n"
"\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n"
"-p nb_producers\tNumber of producers which send data to another core\n"
"Facultative options :\n"
"-h\t\t\tPrint this help\n"
"-s\t\t\tShare the same L2 cache or not\n"
"-c calculation_libname\tLibrary to use for calculation\n"
"\t\t\tThis library must implement functions in calc.h\n";
printf("Usage : %s %s\n", argv[0], format);
printf("Options :\n");
printf("%s\n", options);
}
void do_noinit(int unused)
{
}
void *do_nocalc(void)
{
static int an_int;
return &an_int;
}
void do_noend(void)
{
}
int analyse_options(int argc, char *argv[])
{
int opt;
opterr = 0;
while ((opt = getopt(argc, argv, ":hsc:n:p:")) != -1)
{
switch (opt)
{
case 'c' :
{
struct stat file_stat;
void *dl_descriptor;
if (stat(optarg, &file_stat))
{
fprintf(stderr, "%s: %s\n", optarg, strerror(errno));
return -1;
}
dl_descriptor = dlopen(optarg, RTLD_LAZY | RTLD_LOCAL);
if (dl_descriptor == NULL)
{
fprintf(stderr, "dlopen error: %s\n", dlerror());
return -1;
}
init_calc = dlsym(dl_descriptor, "init_calc");
do_calc = dlsym(dl_descriptor, "do_calc");
end_calc = dlsym(dl_descriptor, "end_calc");
if ((init_calc == NULL) || (do_calc == NULL) || (end_calc == NULL))
{
fprintf(stderr, "A symbol cannot be loaded: %s\n", dlerror());
return -1;
}
}
break;
case 'h' :
usage(argv);
exit(EXIT_SUCCESS);
case 'n' :
{
char *inval;
nb_bufs_sent = strtol(optarg, &inval, 10);
if ((*optarg == '\0') || (*inval != '\0'))
{
fprintf(stderr, "Option '-n' needs an integer argument\n");
return -1;
}
if ((nb_bufs_sent <= 0) || ((nb_bufs_sent == LONG_MAX) && errno == ERANGE))
{
fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX);
return -1;
}
}
break;
case 'p' :
{
char *inval;
nb_prod = strtol(optarg, &inval, 10);
if ((*optarg == '\0') || (*inval != '\0'))
{
fprintf(stderr, "Option '-p' needs an integer argument\n");
return -1;
}
if ((nb_prod <= 0) || ((nb_prod == LONG_MAX) && errno == ERANGE))
{
fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX);
return -1;
}
}
break;
case 's' :
shared = 1;
/* TODO: shared Ln cache */
break;
case '?' :
fprintf(stderr, "Option inconnue\n");
return -1;
case ':' :
fprintf(stderr, "Option %s needs an argument\n", argv[optind]);
return -1;
default :
fprintf(stderr, "Error while analysing command line options\n");
return -1;
}
}
if (!nb_bufs_sent)
{
fprintf(stderr, "You must give the number of cache lines to be sent\n");
return -1;
}
if (!nb_prod)
{
fprintf(stderr, "You must give the number of producers\n");
return -1;
}
if (shared && (nb_prod > 1))
{
fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n");
return -1;
}
if (do_calc == NULL)
{
init_calc = do_noinit;
do_calc = do_nocalc;
end_calc = do_noend;
}
return 0;
}
void *producer(void *unused)
{
int i, j;
struct timeval tv1, tv2, tv_result;
init_producer_thread();
if (shared)
{
pthread_t tid;
cpu_set_t cpuset;
tid = pthread_self();
CPU_ZERO(&cpuset);
CPU_SET(1, &cpuset);
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
{
perror("pthread_setaffinity_np");
return NULL;
}
}
else
{
pthread_t tid;
cpu_set_t cpuset;
tid = pthread_self();
CPU_ZERO(&cpuset);
CPU_SET(2, &cpuset);
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
{
perror("pthread_setaffinity_np");
return NULL;
}
}
init_calc(INIT_CALC_ARG);
gettimeofday(&tv1, NULL);
if (initialize_papi() != -1)
{
for(i = 0; i < nb_bufs_sent; i++) {
//printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE);
for(j = 0; j < (BUF_SIZE / sizeof(uintptr_t)); j++)
send(do_calc());
}
print_results(BUF_SIZE / sizeof(uintptr_t), nb_bufs_sent);
}
gettimeofday(&tv2, NULL);
tv_result.tv_sec = tv2.tv_sec - tv1.tv_sec;
if (tv2.tv_usec < tv1.tv_usec)
{
tv_result.tv_usec = tv1.tv_usec - tv2.tv_usec;
tv_result.tv_sec--;
}
else
tv_result.tv_usec = tv2.tv_usec - tv1.tv_usec;
printf("total_time: %u.%6u / %u.%6u / %u.%6u\n", (unsigned) tv_result.tv_sec, (unsigned) tv_result.tv_usec,
(unsigned) tv_result.tv_sec, (unsigned) tv_result.tv_usec, (unsigned) tv_result.tv_sec,
(unsigned) tv_result.tv_usec);
end_calc();
printf("[%p] Producer finished !\n", (void*) pthread_self());
/*
* When a producer end its thread-local storage vanished. Thus,
* producers must finish only after consumer has stopped using them
*/
pthread_mutex_lock(&mutex_cons_has_finished);
if (++producers_ended == nb_prod)
cont = 0;
if (!consumer_has_finished)
pthread_cond_wait(&cond_cons_has_finished, &mutex_cons_has_finished);
pthread_mutex_unlock(&mutex_cons_has_finished);
return NULL;
}
void onMessage(void *val)
{
//printf("Receive value: %p\n", (void *) val);
}
void *receptor(void *a)
{
if (shared)
{
pthread_t tid;
cpu_set_t cpuset;
tid = pthread_self();
CPU_ZERO(&cpuset);
CPU_SET(0, &cpuset);
if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset))
{
perror("pthread_setaffinity_np");
return NULL;
}
}
reception(onMessage);
pthread_mutex_lock(&mutex_cons_has_finished);
consumer_has_finished = 1;
pthread_cond_broadcast(&cond_cons_has_finished);
pthread_mutex_unlock(&mutex_cons_has_finished);
return NULL;
}
int main(int argc, char *argv[])
{
int i;
void *return_value;
pthread_t *tid;
if (analyse_options(argc, argv))
return EXIT_FAILURE;
if (init_library())
return EXIT_FAILURE;
tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t));
if (tid == NULL)
{
fprintf(stderr, "Failed to allocate %lu bytes needed for thread creation\n", (nb_prod + 1) * sizeof(pthread_t));
return EXIT_FAILURE;
}
for(i = 0; i < nb_prod; i++)
pthread_create(&tid[i], NULL, producer, NULL);
pthread_create(&tid[i], NULL, receptor, NULL);
for(i = 0; i < nb_prod; i++)
pthread_join(tid[i], &return_value);
pthread_join(tid[i], &return_value);
free(tid);
return EXIT_SUCCESS;
}