#include #include #include #include /* Non standard include */ #include #include #define LOG_BYTES_IN_PAGE 12 #define LOG_PAGES_PER_BUFFER 0 #define LOG_BYTES_IN_ADDRESS ((sizeof(void *) == 8) ? 3 : 2) #define LOG_BUFFER_SIZE (LOG_BYTES_IN_PAGE + LOG_PAGES_PER_BUFFER) #define BYTES_IN_ADDRESS (1 << LOG_BYTES_IN_ADDRESS) #define BUFFER_MASK ((1 << LOG_BUFFER_SIZE) - 1) #define BUFFER_SIZE (1 << LOG_BUFFER_SIZE) #define META_DATA_SIZE (2 * BYTES_IN_ADDRESS) #define USABLE_BUFFER_BYTES (BUFFER_SIZE - META_DATA_SIZE) struct double_linked_list { struct double_linked_list *prev; struct double_linked_list *next; }; static __thread void **local_tail = NULL; static __thread void **local_tail_buffer_end = NULL; static struct double_linked_list *global_tail = NULL; static struct double_linked_list *global_head = NULL; static int bufsenqueued = 0; static unsigned int lock = 0; struct communication_assoc *create_comm_assoc(void) { struct communication_assoc *assoc; void **new_buffer; new_buffer = (void **) malloc(BUFFER_SIZE); local_tail = new_buffer + (USABLE_BUFFER_BYTES - BYTES_IN_ADDRESS - (USABLE_BUFFER_BYTES % (1 << LOG_BYTES_IN_ADDRESS))) + BYTES_IN_ADDRESS; // The second parenthesis is equal to 0 local_tail_buffer_end = local_tail; assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc)); assoc->tid = pthread_self(); return assoc; } void set_next(struct double_linked_list *list, struct double_linked_list *next) { list->next = next; } void set_prev(struct double_linked_list *list, struct double_linked_list *prev) { list->prev = prev; } static void spin_lock(unsigned int *lock) { int inc = 0x00010000; int tmp; asm volatile("lock xaddl %0, %1\n" "movzwl %w0, %2\n\t" "shrl $16, %0\n\t" "1:\t" "cmpl %0, %2\n\t" "je 2f\n\t" "rep ; nop\n\t" "movzwl %1, %2\n\t" /* don't need lfence here, because loads are in-order */ "jmp 1b\n" "2:" : "+r" (inc), "+m" (*lock), "=&r" (tmp) : : "memory", "cc"); } static void spin_unlock(unsigned int *lock) { asm volatile("incw %0" : "+m" (*lock) : : "memory", "cc"); } void enqueue(struct double_linked_list *list, int arity, int to_tail) // Insert in the shared buffer: tail here is the tail of the shared buffer { spin_lock(&lock); if (to_tail) { /* Add to the tail of the queue */ set_next(list, NULL); if (global_tail == NULL) global_head = list; else set_next(global_tail, list); set_prev(list, global_tail); global_tail = list; } else { /* Add to the head of the queue */ set_prev(list, NULL); if (global_head == NULL) global_tail = list; else set_prev(global_head, list); set_next(list, global_head); global_head = list; } bufsenqueued++; spin_unlock(&lock); } void **normalizeTail(int arity) { void **src = local_tail; void **buffer_start = (void **) ((uintptr_t) local_tail & ~BUFFER_MASK); void **last = buffer_start + (USABLE_BUFFER_BYTES - BYTES_IN_ADDRESS - (USABLE_BUFFER_BYTES % (arity << LOG_BYTES_IN_ADDRESS))) - ((uintptr_t) local_tail & BUFFER_MASK); /* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */ /* point on the last address in the buffer */ while (buffer_start <= last) /* copy the addresses in the buffer from the end to the beginning of the buffer */ { *buffer_start = *src; src++; buffer_start++; } return last; // Return the buffer address of the last address (if address goes from 0 to n then it's &buf[n] } void closeAndEnqueueTail(int arity) { void **last; if ((((uintptr_t) local_tail) & BUFFER_MASK) != 0) // prematurely closed, won't pass here if it comes from insert last = normalizeTail(arity); else // a full tail buffer last = local_tail_buffer_end - BYTES_IN_ADDRESS; // last space in the buffer before the 8/16 bytes of metadata of the buffer enqueue((struct double_linked_list *) ((uintptr_t) last + BYTES_IN_ADDRESS), arity, 1); } void checkForAsyncCollection(void) { } void tailOverflow(int arity) { void *new_buffer; if (local_tail != NULL) closeAndEnqueueTail(arity); // Add the buffer to the tail of the shared buffer if (posix_memalign(&new_buffer, BUFFER_SIZE, BUFFER_SIZE)) fprintf(stderr, "Failed to allocate space for queue. Is metadata virtual memory exhausted?"); local_tail = (void **) ((uintptr_t) new_buffer + (USABLE_BUFFER_BYTES - BYTES_IN_ADDRESS - (USABLE_BUFFER_BYTES % (arity << LOG_BYTES_IN_ADDRESS))) + BYTES_IN_ADDRESS); // The second parenthesis is equal to 0 local_tail_buffer_end = local_tail; checkForAsyncCollection(); // possible side-effect of alloc() } void insert(void *addr) { if ((((uintptr_t) local_tail) & BUFFER_MASK) == 0) tailOverflow(1); local_tail--; *local_tail = addr; } void reception(void (*on_receive)(void *)) { struct double_linked_list *list_cur; void **buf_start, **buf_ptr; wait_initialization(); /* printf("Activate the consumer...\n"); */ while (cont && (global_head == NULL)); if (!cont) return; list_cur = global_head; do { buf_start = (void **) (((uintptr_t) &list_cur->prev) & ~BUFFER_MASK); for (buf_ptr = buf_start; buf_ptr != (void **) &list_cur->prev; buf_ptr++) on_receive(*buf_ptr); while (cont && (list_cur->next == NULL)); } while (cont); }