#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include /* Non standards includes */ #include #include #include #define toString(x) doStringification(x) #define doStringification(x) #x #define INIT_CALC_ARG 16 static long nb_bufs_sent = 0; long nb_prod = 0; static void (*init_calc)(int) = NULL; static void *(*do_calc)(void) = NULL; static void (*end_calc)(void) = NULL; static int shared = 0; pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER; static int consumer_has_finished = 0; static int producers_ended = 0; void usage(char *argv[]) { char format[] = "-n [options]"; char options[] = "Required options :\n" "-n nb_buffer_sent\tNumber of buffer to send to another core\n" "\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n" "-p nb_producers\tNumber of producers which send data to another core\n" "Facultative options :\n" "-h\t\t\tPrint this help\n" "-s\t\t\tShare the same L2 cache or not\n" "-c calculation_libname\tLibrary to use for calculation\n" "\t\t\tThis library must implement functions in calc.h\n"; printf("Usage : %s %s\n", argv[0], format); printf("Options :\n"); printf("%s\n", options); } void do_noinit(int unused) { } void *do_nocalc(void) { static int an_int; return &an_int; } void do_noend(void) { } int analyse_options(int argc, char *argv[]) { int opt; opterr = 0; while ((opt = getopt(argc, argv, ":hsc:n:p:")) != -1) { switch (opt) { case 'c' : { struct stat file_stat; void *dl_descriptor; if (stat(optarg, &file_stat)) { fprintf(stderr, "%s: %s\n", optarg, strerror(errno)); return -1; } dl_descriptor = dlopen(optarg, RTLD_LAZY | RTLD_LOCAL); if (dl_descriptor == NULL) { fprintf(stderr, "dlopen error: %s\n", dlerror()); return -1; } init_calc = dlsym(dl_descriptor, "init_calc"); do_calc = dlsym(dl_descriptor, "do_calc"); end_calc = dlsym(dl_descriptor, "end_calc"); if ((init_calc == NULL) || (do_calc == NULL) || (end_calc == NULL)) { fprintf(stderr, "A symbol cannot be loaded: %s\n", dlerror()); return -1; } } break; case 'h' : usage(argv); exit(EXIT_SUCCESS); case 'n' : { char *inval; nb_bufs_sent = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-n' needs an integer argument\n"); return -1; } if ((nb_bufs_sent <= 0) || ((nb_bufs_sent == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; case 'p' : { char *inval; nb_prod = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-p' needs an integer argument\n"); return -1; } if ((nb_prod <= 0) || ((nb_prod == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; case 's' : shared = 1; /* TODO: shared Ln cache */ break; case '?' : fprintf(stderr, "Option inconnue\n"); return -1; case ':' : fprintf(stderr, "Option %s needs an argument\n", argv[optind]); return -1; default : fprintf(stderr, "Error while analysing command line options\n"); return -1; } } if (!nb_bufs_sent) { fprintf(stderr, "You must give the number of cache lines to be sent\n"); return -1; } if (!nb_prod) { fprintf(stderr, "You must give the number of producers\n"); return -1; } if (shared && (nb_prod > 1)) { fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n"); return -1; } if (do_calc == NULL) { init_calc = do_noinit; do_calc = do_nocalc; end_calc = do_noend; } return 0; } void *producer(void *unused) { int i, j; struct timeval tv1, tv2, tv_result; init_producer_thread(); if (shared) { pthread_t tid; cpu_set_t cpuset; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(1, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return NULL; } } else { pthread_t tid; cpu_set_t cpuset; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(2, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return NULL; } } init_calc(INIT_CALC_ARG); gettimeofday(&tv1, NULL); if (initialize_papi() != -1) { for(i = 0; i < nb_bufs_sent; i++) { //printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE); for(j = 0; j < (BUF_SIZE / sizeof(uintptr_t)); j++) send(do_calc()); } print_results(BUF_SIZE / sizeof(uintptr_t), nb_bufs_sent); } gettimeofday(&tv2, NULL); tv_result.tv_sec = tv2.tv_sec - tv1.tv_sec; if (tv2.tv_usec < tv1.tv_usec) { tv_result.tv_usec = tv1.tv_usec - tv2.tv_usec; tv_result.tv_sec--; } else tv_result.tv_usec = tv2.tv_usec - tv1.tv_usec; printf("total_time: %u.%06u / %u.%06u / %u.%06u\n", (unsigned) tv_result.tv_sec, (unsigned) tv_result.tv_usec, (unsigned) tv_result.tv_sec, (unsigned) tv_result.tv_usec, (unsigned) tv_result.tv_sec, (unsigned) tv_result.tv_usec); end_calc(); printf("[%p] Producer finished !\n", (void*) pthread_self()); /* * When a producer end its thread-local storage vanished. Thus, * producers must finish only after consumer has stopped using them */ pthread_mutex_lock(&mutex_cons_has_finished); if (++producers_ended == nb_prod) cont = 0; if (!consumer_has_finished) pthread_cond_wait(&cond_cons_has_finished, &mutex_cons_has_finished); pthread_mutex_unlock(&mutex_cons_has_finished); return NULL; } void onMessage(void *val) { //printf("Receive value: %p\n", (void *) val); } void *receptor(void *a) { if (shared) { pthread_t tid; cpu_set_t cpuset; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(0, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return NULL; } } reception(onMessage); pthread_mutex_lock(&mutex_cons_has_finished); consumer_has_finished = 1; pthread_cond_broadcast(&cond_cons_has_finished); pthread_mutex_unlock(&mutex_cons_has_finished); return NULL; } int main(int argc, char *argv[]) { int i; void *return_value; pthread_t *tid; if (analyse_options(argc, argv)) return EXIT_FAILURE; if (init_library()) return EXIT_FAILURE; tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t)); if (tid == NULL) { fprintf(stderr, "Failed to allocate %lu bytes needed for thread creation\n", (nb_prod + 1) * sizeof(pthread_t)); return EXIT_FAILURE; } for(i = 0; i < nb_prod; i++) pthread_create(&tid[i], NULL, producer, NULL); pthread_create(&tid[i], NULL, receptor, NULL); for(i = 0; i < nb_prod; i++) pthread_join(tid[i], &return_value); pthread_join(tid[i], &return_value); free(tid); return EXIT_SUCCESS; }