rt_benchs/communication_techniques/src/communication/jikes_barrier.c

199 lines
5.6 KiB
C

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
#define LOG_BYTES_IN_PAGE 12
#define LOG_PAGES_PER_BUFFER 0
#define LOG_BYTES_IN_ADDRESS ((sizeof(void *) == 8) ? 3 : 2)
#define LOG_BUFFER_SIZE (LOG_BYTES_IN_PAGE + LOG_PAGES_PER_BUFFER)
#define BYTES_IN_ADDRESS (1 << LOG_BYTES_IN_ADDRESS)
#define BUFFER_MASK ((1 << LOG_BUFFER_SIZE) - 1)
#define BUFFER_SIZE (1 << LOG_BUFFER_SIZE)
#define META_DATA_SIZE (2 * BYTES_IN_ADDRESS)
#define USABLE_BUFFER_BYTES (BUFFER_SIZE - META_DATA_SIZE)
struct double_linked_list
{
struct double_linked_list *prev;
struct double_linked_list *next;
};
static __thread void **local_tail = NULL;
static __thread void **local_tail_buffer_end = NULL;
static struct double_linked_list *global_tail = NULL;
static struct double_linked_list *global_head = NULL;
static int bufsenqueued = 0;
static unsigned int lock = 0;
int init_thread_comm(struct thread_comm *comm)
{
void **new_buffer;
if (posix_memalign((void *) &new_buffer, CACHE_LINE_SIZE, BUFFER_SIZE))
{
fprintf(stderr, "Failed to allocate a new buffer for the thread\n");
return -1;
}
local_tail = new_buffer + (USABLE_BUFFER_BYTES - BYTES_IN_ADDRESS -
(USABLE_BUFFER_BYTES % (1 << LOG_BYTES_IN_ADDRESS))) + BYTES_IN_ADDRESS; /* The second parenthesis is equal to 0 */
local_tail_buffer_end = local_tail;
return 0;
}
int end_thread_comm(void)
{
return 0;
}
void set_next(struct double_linked_list *list, struct double_linked_list *next)
{
list->next = next;
}
void set_prev(struct double_linked_list *list, struct double_linked_list *prev)
{
list->prev = prev;
}
static void spin_lock(unsigned int *lock)
{
int inc = 0x00010000;
int tmp;
asm volatile("lock xaddl %0, %1\n"
"movzwl %w0, %2\n\t"
"shrl $16, %0\n\t"
"1:\t"
"cmpl %0, %2\n\t"
"je 2f\n\t"
"rep ; nop\n\t"
"movzwl %1, %2\n\t"
/* don't need lfence here, because loads are in-order */
"jmp 1b\n"
"2:"
: "+r" (inc), "+m" (*lock), "=&r" (tmp)
:
: "memory", "cc");
}
static void spin_unlock(unsigned int *lock)
{
asm volatile("incw %0"
: "+m" (*lock)
:
: "memory", "cc");
}
void enqueue(struct double_linked_list *list, int arity, int to_tail) /* Insert in the shared buffer: tail here is the tail of the shared buffer */
{
spin_lock(&lock);
if (to_tail)
{
/* Add to the tail of the queue */
set_next(list, NULL);
if (global_tail == NULL)
global_head = list;
else
set_next(global_tail, list);
set_prev(list, global_tail);
global_tail = list;
}
else
{
/* Add to the head of the queue */
set_prev(list, NULL);
if (global_head == NULL)
global_tail = list;
else
set_prev(global_head, list);
set_next(list, global_head);
global_head = list;
}
bufsenqueued++;
spin_unlock(&lock);
}
void **normalizeTail(int arity)
{
void **src = local_tail;
void **buffer_start = (void **) ((uintptr_t) local_tail & ~BUFFER_MASK);
void **last = buffer_start + (USABLE_BUFFER_BYTES - BYTES_IN_ADDRESS - (USABLE_BUFFER_BYTES % (arity << LOG_BYTES_IN_ADDRESS))) - ((uintptr_t) local_tail & BUFFER_MASK);
/* ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ */
/* point on the last address in the buffer */
while (buffer_start <= last) /* copy the addresses in the buffer from the end to the beginning of the buffer */
{
*buffer_start = *src;
src++;
buffer_start++;
}
return last; /* Return the buffer address of the last address (if address goes from 0 to n then it's &buf[n] */
}
void closeAndEnqueueTail(int arity)
{
void **last;
if ((((uintptr_t) local_tail) & BUFFER_MASK) != 0) /* prematurely closed, won't pass here if it comes from insert */
last = normalizeTail(arity);
else /* a full tail buffer */
last = local_tail_buffer_end - BYTES_IN_ADDRESS; /* last space in the buffer before the 8/16 bytes of metadata of the buffer */
enqueue((struct double_linked_list *) ((uintptr_t) last + BYTES_IN_ADDRESS), arity, 1);
}
void checkForAsyncCollection(void)
{
if (bufsenqueued >= 262144) /* We use more than 1 Go */
{
fprintf(stderr, "We use 1.5 Go. Program terminated before kernel panic\n");
exit(1);
}
}
void tailOverflow(int arity)
{
void *new_buffer;
if (local_tail != NULL)
closeAndEnqueueTail(arity); /* Add the buffer to the tail of the shared buffer */
if (posix_memalign(&new_buffer, BUFFER_SIZE, BUFFER_SIZE))
fprintf(stderr, "Failed to allocate space for queue. Is metadata virtual memory exhausted?");
local_tail = (void **) ((uintptr_t) new_buffer + (USABLE_BUFFER_BYTES - BYTES_IN_ADDRESS -
(USABLE_BUFFER_BYTES % (arity << LOG_BYTES_IN_ADDRESS))) + BYTES_IN_ADDRESS); /* The second parenthesis is equal to 0 */
local_tail_buffer_end = local_tail;
checkForAsyncCollection(); /* possible side-effect of posix_memalign() */
}
void insert(void *addr)
{
if ((((uintptr_t) local_tail) & BUFFER_MASK) == 0)
tailOverflow(1);
local_tail--;
*local_tail = addr;
}
void reception(void (*on_receive)(void *))
{
void **buf_start, **buf_ptr;
struct double_linked_list *list_cur = NULL;
wait_initialization(); /* Not needed but here for equity with others techniques */
/* printf("Activate the consumer...\n"); */
while (cont && (global_head == NULL));
if (!cont)
return;
list_cur = global_head;
do
{
buf_start = (void **) (((uintptr_t) &list_cur->prev) & ~BUFFER_MASK);
for (buf_ptr = buf_start; buf_ptr != (void **) &list_cur->prev; buf_ptr++)
on_receive(*buf_ptr);
while (cont && (list_cur->next == NULL));
} while (cont);
}