#define _GNU_SOURCE #define _POSIX_SOURCE 1 #include #include #include #include #include #include #include #include #include #include #include /* Non standards includes */ #include #include /* Must be a multiple of BUF_SIZE */ #define MAX_BLOCK_ENTRIES (2048 * CACHE_LINE_SIZE / sizeof(void *)) /*#define MAX_BLOCK_ENTRIES ((BUF_SIZE * 4 + page_size) & ~(page_size - 1)) // Big buffer size is not a good idea */ #define toString(x) doStringification(x) #define doStringification(x) #x #define MIN(x,y) ((x < y) ? x : y) #define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t)) #define SOURCE 0 /* Initial producer (calling do_calc()) */ #define INTERM 1 #define SINK 2 /* Final consumer (doing the check) */ typedef struct data_xchg { void ** volatile data_buf[2][MAX_BLOCK_ENTRIES]; volatile int filled_buf_entries[2]; int unused[20]; volatile int buf_status:1 __attribute__((aligned (CACHE_LINE_SIZE))); } data_xchg_t; typedef struct node_param { void *prev_comm_channel; /* Channel with previous node */ void *next_comm_channel; /* Channel with next mode */ data_xchg_t *c2p_xfer; /* Consumer to producer local transfer */ int type; /* SOURCE, INTERM or SINK */ int thread_idx; } node_param_t; typedef int inc_check_t; static long nb_bufs_sent = 0; long nb_prod = 0; static int (*init_calc)(int) = NULL; static void **(*do_calc)(void) = NULL; static int (*end_calc)(void) = NULL; static int shared = 0; /* We are not shared by default */ static long init_calc_arg = 0; static int block_reception = 1; static long nb_nodes = 2; /* Nb of nodes in the chain of pipelines */ static int check_recv_match_send = 0; static uintptr_t single_prod_check_val; /* /!\ Only one real producer */ static inc_check_t *single_prod_check_ctxt; /* /!\ Only one real producer */ static int nb_cpus = 4; /* TOFIX: don't hardcode this */ static int page_size = 0; static void usage(char *argv[]) { char format[] = "-n -p [options]"; char options[] = "Required options:\n" "-n nb_buffer_sent\t\tNumber of buffer to send to another core\n" "\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n" "Facultative options:\n" "-b\t\t\t\tReceive the biggest amount of data available (The default)\n" "-c calculation_libname arg\tLibrary to use for calculation with its argument\n" "\t\t\t\tThis library must implement functions in calc.h\n" "\t\t\t\t(default to none)\n" "-k\t\t\t\tCheck we receive what is sent\n" "-l nb_nodes\t\t\tNumber of nodes in the pipeline chain\n" "-d\t\t\t\tReceive one piece of data\n" "-h\t\t\t\tPrint this help\n" "-s\t\t\t\tShare the same L2 cache or not\n" "\t\t\t\tIf level is:\n" "\t\t\t\t\t> 0, then the same L must be shared\n" "\t\t\t\t\t< 0, then different L must be used\n" "\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n"; printf("Usage : %s %s\n", argv[0], format); printf("Options :\n"); printf("%s\n", options); } static int do_noinit(int unused __attribute__ ((unused))) { return 0; } static void **do_nocalc(void) { static int an_int, *an_int_ptr = &an_int; return (void **) &an_int_ptr; } static int do_noend(void) { return 0; } static int inc_check_init(int init_value, inc_check_t **context) { inc_check_t *ctxt; ctxt = malloc(sizeof(*ctxt)); if (ctxt == NULL) return -1; *ctxt = init_value; *context = ctxt; return 0; } static int inc_check_next(inc_check_t *context, uintptr_t *next_value) { *next_value = (*context)++; return 0; } static int inc_check_end(inc_check_t *context) { free(context); return 0; } static int do_checkinit(int init_value) { return inc_check_init(init_value, &single_prod_check_ctxt); } static void **do_checkcalc(void) { int ret; ret = inc_check_next(single_prod_check_ctxt, &single_prod_check_val); if (ret) return NULL; else return (void **) single_prod_check_val; } static int do_checkend(void) { return inc_check_end(single_prod_check_ctxt); } static int analyse_options(int argc, char *argv[]) { int opt; opterr = 0; while ((opt = getopt(argc, argv, ":bc:dhl:kn:s"/*:p:"*/)) != -1) { switch (opt) { case 'b' : block_reception = 1; break; case 'c' : { struct stat file_stat; void *dl_descriptor; if (stat(optarg, &file_stat)) { fprintf(stderr, "%s: %s\n", optarg, strerror(errno)); return -1; } dl_descriptor = dlopen(optarg, RTLD_LAZY | RTLD_LOCAL); if (dl_descriptor == NULL) { fprintf(stderr, "dlopen error: %s\n", dlerror()); return -1; } init_calc = (int (*)(int)) dlsym(dl_descriptor, "init_calc"); do_calc = (void ** (*)(void)) dlsym(dl_descriptor, "do_calc"); end_calc = (int (*)(void)) dlsym(dl_descriptor, "end_calc"); if ((init_calc == NULL) || (do_calc == NULL) || (end_calc == NULL)) { fprintf(stderr, "A symbol cannot be loaded: %s\n", dlerror()); return -1; } if ((optind == argc) || (*argv[optind] == '-')) { fprintf(stderr, "Missing argument for -c option\n"); return -1; } { char *inval; init_calc_arg = strtol(argv[optind], &inval, 10); if ((*argv[optind] == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-c' needs also an integer argument\n"); return -1; } if ((init_calc_arg <= 0) || ((init_calc_arg == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of useless loop to be done between 2 send must be" " between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } optind++; } break; case 'd' : block_reception = 0; break; case 'h' : usage(argv); exit(EXIT_SUCCESS); case 'k' : check_recv_match_send = 1; break; case 'l' : { char *inval; nb_nodes = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-l' needs an integer argument\n"); return -1; } if ((nb_nodes < 2) || ((nb_nodes == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of links to participate in the pipeline chain must be between 2 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; case 'n' : { char *inval; nb_bufs_sent = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-n' needs an integer argument\n"); return -1; } if ((nb_bufs_sent <= 0) || ((nb_bufs_sent == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; #if 0 case 'p' : { char *inval; nb_prod = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-p' needs an integer argument\n"); return -1; } if ((nb_prod <= 0) || ((nb_prod == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; #endif case 's' : shared = 1; break; case '?' : fprintf(stderr, "Option inconnue\n"); return -1; case ':' : fprintf(stderr, "Option %s needs an argument\n", argv[optind]); return -1; default : fprintf(stderr, "Error while analysing command line options\n"); return -1; } } if (!nb_bufs_sent) { fprintf(stderr, "You must give the number of cache lines to be sent\n"); return -1; } #if 0 if (!nb_prod) { fprintf(stderr, "You must give the number of producers\n"); return -1; } #endif if (shared && (nb_prod > 1)) { fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n"); return -1; } if (check_recv_match_send && do_calc) { fprintf(stderr, "Can't specifying a computation library with check activated\n"); return -1; } if (do_calc == NULL) { if (check_recv_match_send) { init_calc = do_checkinit; do_calc = do_checkcalc; end_calc = do_checkend; } else { init_calc = do_noinit; do_calc = do_nocalc; end_calc = do_noend; } } printf("buf size: %lu\n", WORDS_PER_LINE); return 0; } static int initial_producer(node_param_t *node_param) { unsigned int i, j; if (init_calc(init_calc_arg)) { fprintf(stderr, "Initialization of calculation has failed\n"); return 1; } for(i = 0; i < nb_bufs_sent; i++) { for(j = 0; j < WORDS_PER_LINE; j++) send(node_param->next_comm_channel, do_calc()); } if (end_producer(node_param->next_comm_channel)) { fprintf(stderr, "Notification of end of production to the " "communication algorithm failed\n"); return 1; } if (end_calc()) { fprintf(stderr, "uninitialization of calculation has failed\n"); return 1; } printf("[%p] Producer finished !\n", (void*) pthread_self()); return 0; } static int intermediate_producer(node_param_t *node_param) { int i, j; long long total_data_sent = 0, total_to_send; i = 0; total_to_send = nb_bufs_sent * WORDS_PER_LINE; while (total_data_sent < total_to_send) { while (!node_param->c2p_xfer->buf_status); for(j = 0; j < node_param->c2p_xfer->filled_buf_entries[i]; j++) send(node_param->next_comm_channel, node_param->c2p_xfer->data_buf[i][j]); total_data_sent += node_param->c2p_xfer->filled_buf_entries[i]; node_param->c2p_xfer->buf_status = 0; i = !i; } printf("[%p] Producer finished !\n", (void*) pthread_self()); return 0; } static void on_message(void *val __attribute__ ((unused))) { /*printf("Receive value: %p\n", (void *) val);*/ } static void *consumer_block(node_param_t *node_param) { int i = 0; int delayed_error; unsigned long long total_data_received = 0, total_to_receive; uintptr_t cons_check_value; inc_check_t *cons_check_context; cons_check_context = NULL; delayed_error = 0; if (check_recv_match_send) { if (inc_check_init(init_calc_arg, &cons_check_context)) { fprintf(stderr, "Initialization of check has failed\n"); return &page_size; /* &page_size can't be NULL */ } } cons_check_value = init_calc_arg; total_to_receive = nb_bufs_sent * WORDS_PER_LINE; while (total_data_received < total_to_receive) { int j; ssize_t nb_data_received; size_t to_receive; to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received); nb_data_received = recv_some_data(node_param->prev_comm_channel, (void **) node_param->c2p_xfer->data_buf[i], to_receive); node_param->c2p_xfer->filled_buf_entries[i] = nb_data_received; if (unlikely(!(node_param->type & SINK))) { /* Check with nb lines sent & recv */ while (node_param->c2p_xfer->buf_status); node_param->c2p_xfer->buf_status = 1; } total_data_received += nb_data_received; for (j = 0; j < nb_data_received; j++) { if (unlikely(check_recv_match_send)) { if (inc_check_next(cons_check_context, &cons_check_value)) { if (!delayed_error) { fprintf(stderr, "Error while checking received value match sent value\n"); delayed_error = 1; } } if (cons_check_value != (uintptr_t) node_param->c2p_xfer->data_buf[i][j]) { if (!delayed_error) { fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) node_param->c2p_xfer->data_buf[i][j]); delayed_error = 1; } } } on_message(node_param->c2p_xfer->data_buf[i][j]); } i = !i; /*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");*/ } if (unlikely(!(node_param->type & SINK))) printf("[%p] Consumer finished !\n", (void*) pthread_self()); if (delayed_error) return &page_size; /* &page_size can't be NULL */ return NULL; } static int consumer_data(node_param_t *node_param) { unsigned int i, j; int delayed_error; uintptr_t cons_check_value; inc_check_t *cons_check_context; cons_check_context = NULL; delayed_error = 0; if (check_recv_match_send) { if (inc_check_init(init_calc_arg, &cons_check_context)) { fprintf(stderr, "Initialization of check has failed\n"); return -1; /* &page_size can't be NULL */ } } cons_check_value = init_calc_arg; for(i = 0; i < nb_bufs_sent; i++) { for(j = 0; j < WORDS_PER_LINE; j++) { void *data; data = recv_one_data(node_param->prev_comm_channel); if (unlikely(check_recv_match_send)) { if (inc_check_next(cons_check_context, &cons_check_value)) { if (!delayed_error) { fprintf(stderr, "Error while checking received value match sent value\n"); delayed_error = 1; } } if (cons_check_value != (uintptr_t) data) { if (!delayed_error) { fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data); delayed_error = 1; } } } on_message(data); if (likely(node_param->type == INTERM)) send(node_param->next_comm_channel, data); /*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");*/ } } if (unlikely(!!(node_param->type & SINK))) printf("[%p] Consumer finished !\n", (void*) pthread_self()); if (delayed_error) return -1; return 0; } static int alloc_consprod_databuf(data_xchg_t **c2p_xfer_addr) { int ret; data_xchg_t *c2p_xfer; ret = posix_memalign((void **) &c2p_xfer, page_size, sizeof(*c2p_xfer)); c2p_xfer->buf_status = 0; if (ret) return EXIT_FAILURE; *c2p_xfer_addr = c2p_xfer; return 0; } static int interm_block_xfer(node_param_t *node_param) { int return_value; void *pthread_return_value; pthread_t prod_tid; return_value = 0; if (alloc_consprod_databuf(&node_param->c2p_xfer)) return -1; /* Create producer to next node, we are consumer of previous node */ if (pthread_create(&prod_tid, NULL, (void *(*)(void *)) consumer_block, node_param)) { perror("pthread_create cons (block_reception)"); return_value = -1; goto free_c2p_xfer_ressources; } return_value = intermediate_producer(node_param); pthread_join(prod_tid, &pthread_return_value); if (pthread_return_value != NULL) return_value = -1; free_c2p_xfer_ressources: free(node_param->c2p_xfer); return return_value; } static int set_cpu_binding(int thread_idx) { int cpu_binding; pthread_t tid; cpu_set_t cpuset; /* Should work in most cases */ if (shared) cpu_binding = thread_idx % nb_cpus; else cpu_binding = (2 * thread_idx) % nb_cpus; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(cpu_binding, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return -1; } return 0; } static void *node(node_param_t *node_param) { int return_value; if (set_cpu_binding(node_param->thread_idx)) return &page_size; switch (node_param->type) { case SOURCE: return_value = initial_producer(node_param); break; case INTERM: if (block_reception) return_value = interm_block_xfer(node_param); else return_value = consumer_data(node_param); break; case SINK: if (block_reception) { if (alloc_consprod_databuf(&node_param->c2p_xfer)) return &page_size; if (consumer_block(node_param) == NULL) //TODO: allocate data_buf return_value = 0; else return_value = -1; free(node_param->c2p_xfer); } else return_value = consumer_data(node_param); break; default: return &page_size; /* &page_size can't be NULL */ } if (return_value) return &page_size; /* &page_size can't be NULL */ return NULL; } static int set_node_params(node_param_t *node_param, node_param_t *prev_node_param, int thread_idx) { if (thread_idx == nb_nodes - 1) node_param->type = SINK; else { if (thread_idx) node_param->type = INTERM; else node_param->type = SOURCE; node_param->next_comm_channel = create_comm_channel(); if (node_param->next_comm_channel == NULL) return -1; } if (prev_node_param != NULL) { node_param->prev_comm_channel = prev_node_param->next_comm_channel; } node_param->thread_idx = thread_idx; return 0; } static int create_threads(int nb_nodes, int *last_node, pthread_t *node_tids, node_param_t *node_params) { int i; for (i = 0; i < nb_nodes; i++) { node_param_t *prev_node_param; prev_node_param = (i) ? &node_params[i - 1] : NULL; if (set_node_params(&node_params[i], prev_node_param, i)) { *last_node = i - 1; return -1; } if (pthread_create(&node_tids[i], NULL, (void *(*)(void *)) node, &node_params[i])) { perror("pthread_create node"); destroy_comm_channel(node_params[i].next_comm_channel); *last_node = i - 1; return -1; } } *last_node = i - 1; return 0; } static int join_threads(int last_node, pthread_t *tids) { int i, return_value; void *pthread_return_value; for (i = last_node, return_value = 0; i >= 0; i--) { pthread_join(tids[i], &pthread_return_value); if (pthread_return_value != NULL) return_value = EXIT_FAILURE; } return return_value; } static int destroy_threads(int last_node, node_param_t *node_params) { int i, return_value; for (i = last_node, return_value = 0; i >= 0; i--) { if (node_params[i].type != SINK) { if (destroy_comm_channel(node_params[i].next_comm_channel)) return_value = -1; } } return return_value; } int main(int argc, char *argv[]) { int return_value, last_node; pthread_t *tids; node_param_t *node_params; return_value = EXIT_SUCCESS; if (analyse_options(argc, argv)) return EXIT_FAILURE; page_size = sysconf(_SC_PAGE_SIZE); if (page_size <= 0) return EXIT_FAILURE; node_params = malloc(nb_nodes * sizeof(node_param_t)); if (node_params == NULL) return EXIT_FAILURE; tids = malloc(nb_nodes * sizeof(pthread_t)); if (tids == NULL) { return_value = EXIT_FAILURE; goto error_alloc_tids; } if (create_threads(nb_nodes, &last_node, tids, node_params)) goto error_create_channels; if (join_threads(last_node, tids)) return_value = EXIT_FAILURE; error_create_channels: if (destroy_threads(last_node, node_params)) return_value = EXIT_FAILURE; free(tids); error_alloc_tids: free(node_params); return return_value; }