#define _GNU_SOURCE #define _POSIX_SOURCE 1 #include #include #include #include #include #include #include #include #include #include #include /* Non standards includes */ #include #include #define MAX_BLOCK_ENTRIES (page_size / sizeof(void *)) #define toString(x) doStringification(x) #define doStringification(x) #x #define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t)) typedef int inc_check_t; static long nb_bufs_sent = 0; long nb_prod = 0; static int (*init_calc)(int) = NULL; static void **(*do_calc)(void) = NULL; static int (*end_calc)(void) = NULL; static int shared = 0; /* We are not shared by default */ pthread_cond_t cond_cons_has_finished = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex_cons_has_finished = PTHREAD_MUTEX_INITIALIZER; static int init_calc_arg = 0; static int block_reception = 1; static int check_recv_match_send = 0; static int page_size = 0; static uintptr_t single_prod_check_val; // /!\ Implies only one real producer static inc_check_t *single_prod_check_ctxt; // /!\ Implies only one real producer void usage(char *argv[]) { char format[] = "-n -p [options]"; char options[] = "Required options :\n" "-n nb_buffer_sent\t\tNumber of buffer to send to another core\n" "\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n" "Facultative options :\n" "-b\t\t\t\tReceive the biggest amount of data available (The default)\n" "-c calculation_libname arg\tLibrary to use for calculation with its argument\n" "\t\t\t\tThis library must implement functions in calc.h\n" "\t\t\t\t(default to none)\n" "--check\t\t\t\tCheck we receive what is sent\n" "-d\t\t\t\tReceive one piece of data\n" "-h\t\t\t\tPrint this help\n" "-s \t\t\tShare the same L cache or not\n" "\t\t\t\tIf level is:\n" "\t\t\t\t\t> 0, then the same L must be shared\n" "\t\t\t\t\t< 0, then different L must be used\n" "\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n"; printf("Usage : %s %s\n", argv[0], format); printf("Options :\n"); printf("%s\n", options); } int do_noinit(int unused) { return 0; } void **do_nocalc(void) { static int an_int, *an_int_ptr = &an_int; return (void **) &an_int_ptr; } int do_noend(void) { return 0; } int inc_check_init(int init_value, inc_check_t **context) { inc_check_t *ctxt; ctxt = malloc(sizeof(*ctxt)); if (ctxt == NULL) return -1; *ctxt = init_value; *context = ctxt; return 0; } int inc_check_next(inc_check_t *context, uintptr_t *next_value) { *next_value = (*context)++; return 0; } int inc_check_end(inc_check_t *context) { free(context); return 0; } int do_checkinit(int init_value) { return inc_check_init(init_value, &single_prod_check_ctxt); } void **do_checkcalc(void) { int ret; ret = inc_check_next(single_prod_check_ctxt, &single_prod_check_val); if (ret) return NULL; else return (void **) single_prod_check_val; } int do_checkend(void) { return inc_check_end(single_prod_check_ctxt); } int analyse_options(int argc, char *argv[]) { int opt; opterr = 0; while ((opt = getopt(argc, argv, ":bc:dhkn:s::"/*p:"*/)) != -1) { switch (opt) { case 'b' : block_reception = 1; break; case 'c' : { struct stat file_stat; void *dl_descriptor; if (stat(optarg, &file_stat)) { fprintf(stderr, "%s: %s\n", optarg, strerror(errno)); return -1; } dl_descriptor = dlopen(optarg, RTLD_LAZY | RTLD_LOCAL); if (dl_descriptor == NULL) { fprintf(stderr, "dlopen error: %s\n", dlerror()); return -1; } init_calc = (int (*)(int)) dlsym(dl_descriptor, "init_calc"); do_calc = (void ** (*)(void)) dlsym(dl_descriptor, "do_calc"); end_calc = (int (*)(void)) dlsym(dl_descriptor, "end_calc"); if ((init_calc == NULL) || (do_calc == NULL) || (end_calc == NULL)) { fprintf(stderr, "A symbol cannot be loaded: %s\n", dlerror()); return -1; } if ((optind == argc) || (*argv[optind] == '-')) { fprintf(stderr, "Missing argument for -c option\n"); return -1; } { char *inval; init_calc_arg = strtol(argv[optind], &inval, 10); if ((*argv[optind] == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-c' needs also an integer argument\n"); return -1; } if ((init_calc_arg <= 0) || ((init_calc_arg == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of useless loop to be done between 2 send must be" " between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } optind++; } break; case 'd' : block_reception = 0; break; case 'h' : usage(argv); exit(EXIT_SUCCESS); case 'k' : check_recv_match_send = 1; break; case 'n' : { char *inval; nb_bufs_sent = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-n' needs an integer argument\n"); return -1; } if ((nb_bufs_sent <= 0) || ((nb_bufs_sent == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; #if 0 case 'p' : { char *inval; nb_prod = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-p' needs an integer argument\n"); return -1; } if ((nb_prod <= 0) || ((nb_prod == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; #endif case 's' : if ((optind != argc) && (*argv[optind] != '-')) { int share_level; char *inval; share_level = strtol(argv[optind], &inval, 10); if ((*argv[optind] == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-p' needs an integer argument\n"); return -1; } if ((share_level == LONG_MIN) || ((share_level == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Shared memory level must be between %ld and %ld, both inclusive\n", LONG_MIN, LONG_MAX); return -1; } /* TODO: Real management of shared memory level */ /* TODO: -x: We want level x not to be shared; 0 do as we want, only memory is guaranteed to be shared */ if (share_level <= 0) shared = 0; else shared = 1; optind++; } break; case '?' : fprintf(stderr, "Option inconnue\n"); /*if (!strncmp("--check", argv[optind], strlen("--check"))) { check_recv_match_send = 1; optind++; optopt = (int) *argv[optind]; fprintf(stderr, "--check required\n"); break; }*/ return -1; case ':' : fprintf(stderr, "Option %s needs an argument\n", argv[optind]); return -1; default : fprintf(stderr, "Error while analysing command line options\n"); return -1; } } if (!nb_bufs_sent) { fprintf(stderr, "You must give the number of cache lines to be sent\n"); return -1; } #if 0 if (!nb_prod) { fprintf(stderr, "You must give the number of producers\n"); return -1; } #endif if (shared && (nb_prod > 1)) { fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n"); return -1; } if (check_recv_match_send && do_calc) { fprintf(stderr, "Can't specifying a computation library with check activated\n"); return -1; } if (do_calc == NULL) { if (check_recv_match_send) { init_calc = do_checkinit; do_calc = do_checkcalc; end_calc = do_checkend; } else { init_calc = do_noinit; do_calc = do_nocalc; end_calc = do_noend; } } printf("buf size: %lu\n", WORDS_PER_LINE); return 0; } void *producer(void *channel) { int i, j; if (init_producer_thread(channel)) { fprintf(stderr, "Initialization of thread has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } if (shared) { pthread_t tid; cpu_set_t cpuset; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(1, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } } else { pthread_t tid; cpu_set_t cpuset; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(2, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } } if (init_calc(init_calc_arg)) { fprintf(stderr, "Initialization of calculation has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } for(i = 0; i < nb_bufs_sent; i++) { //printf("[%p] Send %d new CACHE_LINE\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE); for(j = 0; j < WORDS_PER_LINE; j++) send(do_calc()); } if (end_calc()) { fprintf(stderr, "uninitialization of calculation has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } printf("[%p] Producer finished !\n", (void*) pthread_self()); /* * When a producer end its thread-local storage vanished. Thus, * producers must finish only after consumer has stopped using them */ if (finalize_producer_thread(channel)) { fprintf(stderr, "Finalization of thread has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } return NULL; } void on_message(void *val) { //printf("Receive value: %p\n", (void *) val); } void *consumer(void *channel) { int delayed_error; uintptr_t cons_check_value; inc_check_t *cons_check_context; delayed_error = 0; if (shared) { pthread_t tid; cpu_set_t cpuset; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(0, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return NULL; } } if (init_consumer_thread(channel)) { fprintf(stderr, "Initialization of thread has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } if (inc_check_init(init_calc_arg, &cons_check_context)) { fprintf(stderr, "Initialization of check has failed\n"); finalize_consumer_thread(channel); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } cons_check_value = init_calc_arg; if (block_reception) { long long total_data_received = 0; void *data_buf[MAX_BLOCK_ENTRIES]; while (total_data_received < nb_bufs_sent * WORDS_PER_LINE) { int i; ssize_t nb_data_received; nb_data_received = recv_some_data(data_buf, MAX_BLOCK_ENTRIES); total_data_received += nb_data_received; for (i = 0; i < nb_data_received; i++) { if (inc_check_next(cons_check_context, &cons_check_value)) { if (!delayed_error) { fprintf(stderr, "Error while checking received value match sent value\n"); delayed_error = 1; } } if (cons_check_value != (uintptr_t) data_buf[i]) { if (!delayed_error) { fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data_buf[i]); delayed_error = 1; } } on_message(data_buf[i]); } //printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : ""); } } else { int i, j; for(i = 0; i < nb_bufs_sent; i++) { //printf("[%p] About to receive %d new cache line%s\n", (void *) pthread_self(), BUF_SIZE / CACHE_LINE_SIZE, (BUF_SIZE / CACHE_LINE_SIZE > 1) ? "s" : ""); for(j = 0; j < WORDS_PER_LINE; j++) { void *data; data = recv_one_data(); if (inc_check_next(cons_check_context, &cons_check_value)) { if (!delayed_error) { fprintf(stderr, "Error while checking received value match sent value\n"); delayed_error = 1; } } if (cons_check_value != (uintptr_t) data) { if (!delayed_error) { fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data); delayed_error = 1; } } on_message(data); } } } if (inc_check_end(cons_check_context)) { fprintf(stderr, "Finalization of check has failed\n"); finalize_consumer_thread(channel); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } if (finalize_consumer_thread(channel)) { fprintf(stderr, "Finalization of thread has failed\n"); return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ } if (delayed_error) return &page_size; /* &page_size can't be NULL, whatever NULL is bound to */ return NULL; } int main(int argc, char *argv[]) { pthread_t tid[2]; int return_value; void *pthread_return_value; void *channel; return_value = EXIT_SUCCESS; if (analyse_options(argc, argv)) return EXIT_FAILURE; page_size = sysconf(_SC_PAGE_SIZE); if (page_size <= 0) return EXIT_FAILURE; if (init_library()) return EXIT_FAILURE; channel = create_comm_channel(); if (channel != NULL) { pthread_create(&tid[0], NULL, producer, channel); pthread_create(&tid[1], NULL, consumer, channel); pthread_join(tid[0], &pthread_return_value); if (pthread_return_value != NULL) return_value = EXIT_FAILURE; pthread_join(tid[1], &pthread_return_value); if (pthread_return_value != NULL) return_value = EXIT_FAILURE; } else return_value = EXIT_FAILURE; if (destroy_comm_channel(channel)) return_value = EXIT_FAILURE; if (finalize_library()) return EXIT_FAILURE; return return_value; }