From a20c9a8a219ce4ef54bebf31d1ab0d5d1fc9e552 Mon Sep 17 00:00:00 2001 From: Thomas Preud'homme Date: Mon, 30 May 2011 16:23:43 +0200 Subject: [PATCH] [commtech] BatchQueue v2 Uses 2 mapping to the same structure to avoid prefetching of the producer semi-buffer by the consumer. The idea is to access everything through mapping 1 except semi-buffer 2 which is accessed through mapping 2. --- communication_techniques/Makefile | 2 +- .../include/batch_queue_common_comm.h | 54 ++++++- .../src/communication/batch_queue.c | 143 ++++++++++++++---- communication_techniques/src/main.c | 3 +- 4 files changed, 165 insertions(+), 37 deletions(-) diff --git a/communication_techniques/Makefile b/communication_techniques/Makefile index bd01670..f95de07 100644 --- a/communication_techniques/Makefile +++ b/communication_techniques/Makefile @@ -14,7 +14,7 @@ COMMDIR:=communication # I know -finline-functions and -finline-functions-called-once are enabled by # -O3 but I did this in case gcc behaviour change one day CFLAGS:=-c -O3 -finline-functions -finline-functions-called-once -Wall -Wextra -Werror -LDFLAGS:=-L$(LIBDIR) -lpthread -ldl +LDFLAGS:=-L$(LIBDIR) -lpthread -ldl -lrt # Executables CC=gcc diff --git a/communication_techniques/include/batch_queue_common_comm.h b/communication_techniques/include/batch_queue_common_comm.h index e6a4b01..06c1884 100644 --- a/communication_techniques/include/batch_queue_common_comm.h +++ b/communication_techniques/include/batch_queue_common_comm.h @@ -1,6 +1,9 @@ #ifndef _BATCH_QUEUE_COMMON_COMM_H_ #define _BATCH_QUEUE_COMMON_COMM_H_ 1 +#include +#include + /* Non standard include */ #include @@ -10,27 +13,66 @@ struct channel { - void * volatile buf[2 * BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); + /* buf must be the first field to allow the buf_mask test */ + void * volatile buf[2][BUF_SIZE / sizeof(void *)] __attribute__ ((aligned (CACHE_LINE_SIZE))); + struct channel *mapping1; /* accesses to buf[0] and state */ + struct channel *mapping2; /* accesses to buf[1] */ + uintptr_t buf_mask; int unused[20] __attribute__ ((aligned (CACHE_LINE_SIZE))); volatile unsigned int state:1 __attribute__ ((aligned (CACHE_LINE_SIZE))); - unsigned int sender_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); - unsigned int receiver_idx __attribute__ ((aligned (CACHE_LINE_SIZE))); + void * volatile *sender_ptr __attribute__ ((aligned (CACHE_LINE_SIZE))); + void * volatile *receiver_ptr __attribute__ ((aligned (CACHE_LINE_SIZE))); }; +#ifndef _BATCH_QUEUE_C_ +#ifndef PAGE_SIZE_HELPER +#define PAGE_SIZE_HELPER 1 +uintptr_t _bq_page_mask; +void *create_comm_channel_internal(void); +static void *create_comm_channel_helper(void) __attribute__ ((unused)); + +static void *create_comm_channel_helper(void) +{ + int ret; + struct channel *channel; + + ret = sysconf(_SC_PAGESIZE); + if (ret == -1) + return NULL; + _bq_page_mask = ~(ret - 1); + channel = create_comm_channel_internal(); + channel->buf_mask = sizeof channel->buf[0] - 1; + return channel; +} + +#else +extern unsigned int _bq_page_mask; +#endif /* PAGE_SIZE_HELPER */ + +#define create_comm_channel() create_comm_channel_helper() __BEGIN_DECLS static inline void send(struct channel *channel, void **addr) { - channel->buf[channel->sender_idx++] = addr; - channel->sender_idx %= 2 * (BUF_SIZE / sizeof(void *)); - if (!(channel->sender_idx % (BUF_SIZE / sizeof(void *)))) + uintptr_t sender_ptr_high; + + *channel->sender_ptr++ = addr; + if (unlikely(!((uintptr_t) channel->sender_ptr & channel->buf_mask))) { while (channel->state); channel->state = 1; + sender_ptr_high = + (uintptr_t) channel->sender_ptr & _bq_page_mask; + if ((void *) sender_ptr_high == channel->mapping1) + channel->sender_ptr = channel->mapping2->buf[1]; + else + channel->sender_ptr = channel->mapping1->buf[0]; } } __END_DECLS +#endif /* _BATCH_QUEUE_C_ */ + #endif diff --git a/communication_techniques/src/communication/batch_queue.c b/communication_techniques/src/communication/batch_queue.c index 19a6ab6..2bfb59a 100644 --- a/communication_techniques/src/communication/batch_queue.c +++ b/communication_techniques/src/communication/batch_queue.c @@ -1,24 +1,93 @@ +#define _BATCH_QUEUE_C_ 1 + #include #include -#include +#include +#include +#include +#include +#include +#include /* Non standard include */ #include #include +#define BASE_SHM_NAME "/channel" +#define ROUNDUP(size) ((size + page_size) & ~(page_size - 1)) -void *create_comm_channel(void) + +static uintptr_t buf_mask; +static uintptr_t page_mask; +static int nb_buf_entries; +static unsigned int page_size; + + +void *create_comm_channel_internal(void) { - struct channel *channel; + int ret, shm_fd; + static int chan_idx = 0; + char shm_name[NAME_MAX]; + struct channel *channel1, *channel2; - if (!posix_memalign((void **) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) + channel1 = NULL; + channel2 = NULL; + ret = sysconf(_SC_PAGESIZE); + if (ret == -1) { - channel->receiver_idx = 0; - channel->state = 0; - channel->sender_idx = 0; - return channel; + perror("BatchQueue init sysconf"); + return NULL; } - return NULL; + page_size = ret; + page_mask = ~((uintptr_t) (page_size - 1)); + ret = snprintf(shm_name, NAME_MAX, BASE_SHM_NAME"%d\n", chan_idx); + if (ret < 0) + { + fprintf(stderr, "BatchQueue init snprintf failed\n"); + return NULL; + } else if (ret >= NAME_MAX) + { + fprintf(stderr, "Too many channels created: impossible to "); + fprintf(stderr, "create a channel named "BASE_SHM_NAME); + fprintf(stderr, "%d\n", chan_idx); + return NULL; + } + shm_fd = shm_open(shm_name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (shm_fd == -1) + { + perror("BatchQueue init shm_open failed"); + return NULL; + } + ret = ftruncate(shm_fd, sizeof(*channel1)); + if (ret == -1) + { + perror("BatchQueue init ftruncate failed"); + goto close_file; + } + channel1 = mmap(NULL, ROUNDUP(sizeof(*channel1)), + PROT_READ | PROT_WRITE, MAP_PRIVATE, shm_fd, 0); + channel2 = mmap(NULL, ROUNDUP(sizeof(*channel1)), + PROT_READ | PROT_WRITE, MAP_PRIVATE, shm_fd, 0); + if ((channel1 == MAP_FAILED) || (channel2 == MAP_FAILED)) + { + perror("Batchqueue init mmap failed"); + channel1 = NULL; + goto close_file; + } + + channel1->mapping1 = channel1; + channel1->mapping2 = channel2; + + channel1->state = 0; + + channel1->sender_ptr = channel1->buf[0]; + channel1->receiver_ptr = channel1->buf[0]; + + buf_mask = sizeof channel1->buf[0] - 1; + nb_buf_entries = sizeof channel1->buf[0] / sizeof channel1->buf[0][0]; +close_file: + shm_unlink(shm_name); + return channel1; } int end_producer(void *unused __attribute__ ((unused))) @@ -28,8 +97,17 @@ int end_producer(void *unused __attribute__ ((unused))) int destroy_comm_channel(void *channel) { - free(channel); - return 0; + size_t mapping_size; + struct channel *mapping1, *mapping2, *channel_ptr; + + channel_ptr = (struct channel *) channel; + mapping1 = channel_ptr->mapping1; + mapping2 = channel_ptr->mapping2; + mapping_size = ROUNDUP(sizeof(*mapping1)); + if (munmap(mapping1, mapping_size) || munmap(mapping2, mapping_size)) + return -1; + else + return 0; } /* @@ -44,12 +122,20 @@ void *recv_one_data(struct channel *channel) { void *result; - if (unlikely(!(channel->receiver_idx % (BUF_SIZE / sizeof(void *))))) + if (unlikely(!((uintptr_t) channel->receiver_ptr & buf_mask))) while (!channel->state); - result = channel->buf[channel->receiver_idx++]; - channel->receiver_idx %= (2 * BUF_SIZE) / sizeof(void *); - if (unlikely(!(channel->receiver_idx % (BUF_SIZE / sizeof(void *))))) + result = *channel->receiver_ptr++; + if (unlikely(!((uintptr_t) channel->receiver_ptr & buf_mask))) + { + uintptr_t rcvr_ptr_high; + + rcvr_ptr_high = (uintptr_t) channel->receiver_ptr & page_mask; channel->state = 0; + if ((void *) rcvr_ptr_high == channel->mapping1) + channel->receiver_ptr = channel->mapping2->buf[1]; + else + channel->receiver_ptr = channel->mapping1->buf[0]; + } return result; } @@ -69,21 +155,20 @@ ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) nb_read = 0; while (channel->state && nb_read < count) { - int i, n; - /* - * cur->receiver_idx point to the last buffer we have read. - * We go to the next cache line "+ (BUF_SIZE / sizeof(void *))" - * (because the line is full of void * and then if we are after - * the second cache line we correct the pointer to point to the - * first one (this is done by the modulo). - */ - i = channel->receiver_idx; - n = channel->receiver_idx + (BUF_SIZE / sizeof(void *)); - channel->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); - for(; i < n; i++) - *buf++ = channel->buf[i]; - nb_read += BUF_SIZE / sizeof(void *); + void * volatile *chanbuf_ptr; + uintptr_t rcvr_ptr_high; + + chanbuf_ptr = channel->receiver_ptr; + channel->receiver_ptr += nb_buf_entries; + rcvr_ptr_high = (uintptr_t) channel->receiver_ptr & page_mask; + for(; chanbuf_ptr != channel->receiver_ptr; chanbuf_ptr++) + *buf++ = *chanbuf_ptr; + nb_read += nb_buf_entries; channel->state = 0; + if ((void *) rcvr_ptr_high == channel->mapping1) + channel->receiver_ptr = channel->mapping2->buf[1]; + else + channel->receiver_ptr = channel->mapping1->buf[0]; } return nb_read; } diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c index 847f1cc..8be663b 100644 --- a/communication_techniques/src/main.c +++ b/communication_techniques/src/main.c @@ -465,7 +465,8 @@ static int consumer_data(node_param_t *node_param) } } cons_check_value = init_calc_arg; - for(i = 0; i < nb_bufs_sent; i++) { + for(i = 0; i < nb_bufs_sent; i++) + { for(j = 0; j < WORDS_PER_LINE; j++) { void *data;