/* * Copyright (C) 2009-2012 Thomas Preud'homme * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #define _GNU_SOURCE #define _POSIX_SOURCE 1 #include #include #include #include #include #include #include #include #include #include #include /* Non standards includes */ #include #include /* Must be a multiple of BUF_SIZE */ #define MAX_BLOCK_ENTRIES (2048 * CACHE_LINE_SIZE / sizeof(void *)) /*#define MAX_BLOCK_ENTRIES ((BUF_SIZE * 4 + page_size) & ~(page_size - 1)) // Big buffer size is not a good idea */ #define toString(x) doStringification(x) #define doStringification(x) #x #define MIN(x,y) ((x < y) ? x : y) #define WORDS_PER_LINE (CACHE_LINE_SIZE / sizeof(uintptr_t)) #define SOURCE 0 /* Initial producer (calling do_calc()) */ #define INTERM 1 #define SINK 2 /* Final consumer (doing the check) */ typedef struct node_param { void *prev_comm_channel; /* Channel with previous node */ void *next_comm_channel; /* Channel with next mode */ int type; /* SOURCE, INTERM or SINK */ int thread_idx; } node_param_t; typedef int inc_check_t; static long nb_bufs_sent = 0; long nb_prod = 0; static int (*init_calc)(int) = NULL; static void **(*do_calc)(void) = NULL; static int (*end_calc)(void) = NULL; static int shared = 0; /* We are not shared by default */ static long init_calc_arg = 0; static int block_reception = 1; static long nb_nodes = 2; /* Nb of nodes in the chain of pipelines */ static int check_recv_match_send = 0; static uintptr_t single_prod_check_val; /* /!\ Only one real producer */ static inc_check_t *single_prod_check_ctxt; /* /!\ Only one real producer */ static int nb_cpus = 4; /* TOFIX: don't hardcode this */ static int page_size = 0; static void usage(char *argv[]) { char format[] = "-n -p [options]"; char options[] = "Required options:\n" "-n nb_buffer_sent\t\tNumber of buffer to send to another core\n" "\t\t\t\tBuffer size is " toString(BUF_SIZE) " bytes\n" "Facultative options:\n" "-b\t\t\t\tReceive the biggest amount of data available (The default)\n" "-c calculation_libname arg\tLibrary to use for calculation with its argument\n" "\t\t\t\tThis library must implement functions in calc.h\n" "\t\t\t\t(default to none)\n" "-k\t\t\t\tCheck we receive what is sent\n" "-l nb_nodes\t\t\tNumber of nodes in the pipeline chain\n" "-d\t\t\t\tReceive one piece of data\n" "-h\t\t\t\tPrint this help\n" "-s\t\t\t\tShare the same L2 cache or not\n" "\t\t\t\tIf level is:\n" "\t\t\t\t\t> 0, then the same L must be shared\n" "\t\t\t\t\t< 0, then different L must be used\n" "\t\t\t\t\t= 0, then no constraint is given, only main memory (RAM) is guaranteed to be shared\n"; printf("Usage : %s %s\n", argv[0], format); printf("Options :\n"); printf("%s\n", options); } static int do_noinit(int unused __attribute__ ((unused))) { return 0; } static void **do_nocalc(void) { static int an_int, *an_int_ptr = &an_int; return (void **) &an_int_ptr; } static int do_noend(void) { return 0; } static int inc_check_init(int init_value, inc_check_t **context) { inc_check_t *ctxt; ctxt = malloc(sizeof(*ctxt)); if (ctxt == NULL) return -1; *ctxt = init_value; *context = ctxt; return 0; } static int inc_check_next(inc_check_t *context, uintptr_t *next_value) { *next_value = (*context)++; return 0; } static int inc_check_end(inc_check_t *context) { free(context); return 0; } static int do_checkinit(int init_value) { return inc_check_init(init_value, &single_prod_check_ctxt); } static void **do_checkcalc(void) { int ret; ret = inc_check_next(single_prod_check_ctxt, &single_prod_check_val); if (ret) return NULL; else return (void **) single_prod_check_val; } static int do_checkend(void) { return inc_check_end(single_prod_check_ctxt); } static int analyse_options(int argc, char *argv[]) { int opt; opterr = 0; while ((opt = getopt(argc, argv, ":bc:dhl:kn:s"/*:p:"*/)) != -1) { switch (opt) { case 'b' : block_reception = 1; break; case 'c' : { struct stat file_stat; void *dl_descriptor; if (stat(optarg, &file_stat)) { fprintf(stderr, "%s: %s\n", optarg, strerror(errno)); return -1; } dl_descriptor = dlopen(optarg, RTLD_LAZY | RTLD_LOCAL); if (dl_descriptor == NULL) { fprintf(stderr, "dlopen error: %s\n", dlerror()); return -1; } init_calc = (int (*)(int)) dlsym(dl_descriptor, "init_calc"); do_calc = (void ** (*)(void)) dlsym(dl_descriptor, "do_calc"); end_calc = (int (*)(void)) dlsym(dl_descriptor, "end_calc"); if ((init_calc == NULL) || (do_calc == NULL) || (end_calc == NULL)) { fprintf(stderr, "A symbol cannot be loaded: %s\n", dlerror()); return -1; } if ((optind == argc) || (*argv[optind] == '-')) { fprintf(stderr, "Missing argument for -c option\n"); return -1; } { char *inval; init_calc_arg = strtol(argv[optind], &inval, 10); if ((*argv[optind] == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-c' needs also an integer argument\n"); return -1; } if ((init_calc_arg <= 0) || ((init_calc_arg == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of useless loop to be done between 2 send must be" " between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } optind++; } break; case 'd' : block_reception = 0; break; case 'h' : usage(argv); exit(EXIT_SUCCESS); case 'k' : check_recv_match_send = 1; break; case 'l' : { char *inval; nb_nodes = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-l' needs an integer argument\n"); return -1; } if ((nb_nodes < 2) || ((nb_nodes == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of links to participate in the pipeline chain must be between 2 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; case 'n' : { char *inval; nb_bufs_sent = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-n' needs an integer argument\n"); return -1; } if ((nb_bufs_sent <= 0) || ((nb_bufs_sent == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; #if 0 case 'p' : { char *inval; nb_prod = strtol(optarg, &inval, 10); if ((*optarg == '\0') || (*inval != '\0')) { fprintf(stderr, "Option '-p' needs an integer argument\n"); return -1; } if ((nb_prod <= 0) || ((nb_prod == LONG_MAX) && errno == ERANGE)) { fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX); return -1; } } break; #endif case 's' : shared = 1; break; case '?' : fprintf(stderr, "Option inconnue\n"); return -1; case ':' : fprintf(stderr, "Option %s needs an argument\n", argv[optind]); return -1; default : fprintf(stderr, "Error while analysing command line options\n"); return -1; } } if (!nb_bufs_sent) { fprintf(stderr, "You must give the number of cache lines to be sent\n"); return -1; } #if 0 if (!nb_prod) { fprintf(stderr, "You must give the number of producers\n"); return -1; } #endif if (shared && (nb_prod > 1)) { fprintf(stderr, "Too many producers to fit with the consumer in processors which share a same cache\n"); return -1; } if (check_recv_match_send && do_calc) { fprintf(stderr, "Can't specifying a computation library with check activated\n"); return -1; } if (do_calc == NULL) { if (check_recv_match_send) { init_calc = do_checkinit; do_calc = do_checkcalc; end_calc = do_checkend; } else { init_calc = do_noinit; do_calc = do_nocalc; end_calc = do_noend; } } printf("buf size: %lu\n", WORDS_PER_LINE); return 0; } static int initial_producer(node_param_t *node_param) { unsigned int i, j; if (init_calc(init_calc_arg)) { fprintf(stderr, "Initialization of calculation has failed\n"); return 1; } for(i = 0; i < nb_bufs_sent; i++) { for(j = 0; j < WORDS_PER_LINE; j++) send(node_param->next_comm_channel, do_calc()); } if (end_producer(node_param->next_comm_channel)) { fprintf(stderr, "Notification of end of production to the " "communication algorithm failed\n"); return 1; } if (end_calc()) { fprintf(stderr, "uninitialization of calculation has failed\n"); return 1; } printf("[%p] Producer finished !\n", (void*) pthread_self()); return 0; } static void on_message(void *val __attribute__ ((unused))) { /*printf("Receive value: %p\n", (void *) val);*/ } static int consumer_block(node_param_t *node_param) { int delayed_error; unsigned long long total_data_received = 0, total_to_receive; uintptr_t cons_check_value; inc_check_t *cons_check_context; void *data_buf[MAX_BLOCK_ENTRIES]; cons_check_context = NULL; delayed_error = 0; if (check_recv_match_send) { if (inc_check_init(init_calc_arg, &cons_check_context)) { fprintf(stderr, "Initialization of check has failed\n"); return -1; } } cons_check_value = init_calc_arg; total_to_receive = nb_bufs_sent * WORDS_PER_LINE; while (total_data_received < total_to_receive) { int i; ssize_t nb_data_received; size_t to_receive; to_receive = MIN(MAX_BLOCK_ENTRIES, total_to_receive - total_data_received); nb_data_received = recv_some_data(node_param->prev_comm_channel, data_buf, to_receive); total_data_received += nb_data_received; for (i = 0; i < nb_data_received; i++) { if (unlikely(check_recv_match_send)) { if (inc_check_next(cons_check_context, &cons_check_value)) { if (!delayed_error) { fprintf(stderr, "Error while checking received value match sent value\n"); delayed_error = 1; } } if (cons_check_value != (uintptr_t) data_buf[i]) { if (!delayed_error) { fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data_buf[i]); delayed_error = 1; } } } on_message(data_buf[i]); if (likely(node_param->type == INTERM)) send(node_param->next_comm_channel, data_buf[i]); } /*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), nb_data_received, nb_data_received ? "s" : "");*/ } if (unlikely(!!(node_param->type & SINK))) printf("[%p] Consumer finished !\n", (void*) pthread_self()); else if (likely(!!(node_param->type & INTERM))) printf("[%p] Intermediate producer finished !\n", (void*) pthread_self()); if (delayed_error) return -1; return 0; } static int consumer_data(node_param_t *node_param) { unsigned int i, j; int delayed_error; uintptr_t cons_check_value; inc_check_t *cons_check_context; cons_check_context = NULL; delayed_error = 0; if (check_recv_match_send) { if (inc_check_init(init_calc_arg, &cons_check_context)) { fprintf(stderr, "Initialization of check has failed\n"); return -1; /* &page_size can't be NULL */ } } cons_check_value = init_calc_arg; for(i = 0; i < nb_bufs_sent; i++) { for(j = 0; j < WORDS_PER_LINE; j++) { void *data; data = recv_one_data(node_param->prev_comm_channel); if (unlikely(check_recv_match_send)) { if (inc_check_next(cons_check_context, &cons_check_value)) { if (!delayed_error) { fprintf(stderr, "Error while checking received value match sent value\n"); delayed_error = 1; } } if (cons_check_value != (uintptr_t) data) { if (!delayed_error) { fprintf(stderr, "Mismatch between expected(%lu) and received values(%lu)\n", cons_check_value, (uintptr_t) data); delayed_error = 1; } } } on_message(data); if (likely(node_param->type == INTERM)) send(node_param->next_comm_channel, data); /*printf("[%p] Just received %d word-sized data%s\n", (void *) pthread_self(), WORDS_PER_LINE, WORDS_PER_LINE ? "s" : "");*/ } } if (unlikely(!!(node_param->type & SINK))) printf("[%p] Consumer finished !\n", (void*) pthread_self()); else if (likely(!!(node_param->type & INTERM))) printf("[%p] Intermediate producer finished !\n", (void*) pthread_self()); if (delayed_error) return -1; return 0; } static int set_cpu_binding(int thread_idx) { int cpu_binding; pthread_t tid; cpu_set_t cpuset; /* Should work in most cases */ if (shared) cpu_binding = thread_idx % nb_cpus; else cpu_binding = (2 * thread_idx) % nb_cpus; tid = pthread_self(); CPU_ZERO(&cpuset); CPU_SET(cpu_binding, &cpuset); if (pthread_setaffinity_np(tid, sizeof(cpu_set_t), &cpuset)) { perror("pthread_setaffinity_np"); return -1; } return 0; } static void *node(node_param_t *node_param) { int return_value; if (set_cpu_binding(node_param->thread_idx)) return &page_size; switch (node_param->type) { case SOURCE: return_value = initial_producer(node_param); break; case INTERM: if (block_reception) return_value = consumer_block(node_param); else return_value = consumer_data(node_param); break; case SINK: if (block_reception) return_value = consumer_block(node_param); else return_value = consumer_data(node_param); break; default: return &page_size; /* &page_size can't be NULL */ } if (return_value) return &page_size; /* &page_size can't be NULL */ return NULL; } static int set_node_params(node_param_t *node_param, node_param_t *prev_node_param, int thread_idx) { if (thread_idx == nb_nodes - 1) node_param->type = SINK; else { if (thread_idx) node_param->type = INTERM; else node_param->type = SOURCE; node_param->next_comm_channel = create_comm_channel(); if (node_param->next_comm_channel == NULL) return -1; } if (prev_node_param != NULL) { node_param->prev_comm_channel = prev_node_param->next_comm_channel; } node_param->thread_idx = thread_idx; return 0; } static int create_threads(int nb_nodes, int *last_node, pthread_t *node_tids, node_param_t *node_params) { int i; for (i = 0; i < nb_nodes; i++) { node_param_t *prev_node_param; prev_node_param = (i) ? &node_params[i - 1] : NULL; if (set_node_params(&node_params[i], prev_node_param, i)) { *last_node = i - 1; return -1; } if (pthread_create(&node_tids[i], NULL, (void *(*)(void *)) node, &node_params[i])) { perror("pthread_create node"); destroy_comm_channel(node_params[i].next_comm_channel); *last_node = i - 1; return -1; } } *last_node = i - 1; return 0; } static int join_threads(int last_node, pthread_t *tids) { int i, return_value; void *pthread_return_value; for (i = last_node, return_value = 0; i >= 0; i--) { pthread_join(tids[i], &pthread_return_value); if (pthread_return_value != NULL) return_value = EXIT_FAILURE; } return return_value; } static int destroy_threads(int last_node, node_param_t *node_params) { int i, return_value; for (i = last_node, return_value = 0; i >= 0; i--) { if (node_params[i].type != SINK) { if (destroy_comm_channel(node_params[i].next_comm_channel)) return_value = -1; } } return return_value; } int main(int argc, char *argv[]) { int return_value, last_node; pthread_t *tids; node_param_t *node_params; return_value = EXIT_SUCCESS; if (analyse_options(argc, argv)) return EXIT_FAILURE; page_size = sysconf(_SC_PAGE_SIZE); if (page_size <= 0) return EXIT_FAILURE; node_params = malloc(nb_nodes * sizeof(node_param_t)); if (node_params == NULL) return EXIT_FAILURE; tids = malloc(nb_nodes * sizeof(pthread_t)); if (tids == NULL) { return_value = EXIT_FAILURE; goto error_alloc_tids; } if (create_threads(nb_nodes, &last_node, tids, node_params)) goto error_create_channels; if (join_threads(last_node, tids)) return_value = EXIT_FAILURE; error_create_channels: if (destroy_threads(last_node, node_params)) return_value = EXIT_FAILURE; free(tids); error_alloc_tids: free(node_params); return return_value; }