|
|
|
@ -30,6 +30,10 @@
|
|
|
|
|
#include <stdlib.h> |
|
|
|
|
#include <string.h> |
|
|
|
|
#include <stdio.h> |
|
|
|
|
#include <unistd.h> |
|
|
|
|
#include <sys/mman.h> |
|
|
|
|
#include <sys/stat.h> |
|
|
|
|
#include <fcntl.h> |
|
|
|
|
|
|
|
|
|
#include "wait.h" |
|
|
|
|
#include "sem.h" |
|
|
|
@ -38,14 +42,22 @@
|
|
|
|
|
|
|
|
|
|
#define AGGREGATION_FACTOR 32 |
|
|
|
|
|
|
|
|
|
//#define debug_log_init(S, V1, V2) printf (S, V1, V2); fflush (stdout)
|
|
|
|
|
#define debug_log_init(S, V1, V2) |
|
|
|
|
|
|
|
|
|
//#define debug_log_init3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout)
|
|
|
|
|
#define debug_log_init3(S, V1, V2, V3) |
|
|
|
|
|
|
|
|
|
//#define debug_log(S, V1, V2) printf (S, V1, V2); fflush (stdout)
|
|
|
|
|
#define debug_log(S, V1, V2) |
|
|
|
|
//#define OMP_STREAM_DEBUG
|
|
|
|
|
#ifdef OMP_STREAM_DEBUG |
|
|
|
|
#define debug_log_init(S, V1, V2) printf (S, V1, V2); fflush (stdout) |
|
|
|
|
#define debug_log_init3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout) |
|
|
|
|
#define debug_log(S, V1, V2) printf (S, V1, V2); fflush (stdout) |
|
|
|
|
#define debug_log3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout) |
|
|
|
|
#define debug_log4(S, V1, V2, V3, V4) printf (S, V1, V2, V3, V4); fflush (stdout) |
|
|
|
|
#define debug_log5(S, V1, V2, V3, V4, V5) printf (S, V1, V2, V3, V4, V5); fflush (stdout) |
|
|
|
|
#else |
|
|
|
|
#define debug_log_init(S, V1, V2) |
|
|
|
|
#define debug_log_init3(S, V1, V2, V3) |
|
|
|
|
#define debug_log(S, V1, V2) |
|
|
|
|
#define debug_log3(S, V1, V2, V3) |
|
|
|
|
#define debug_log4(S, V1, V2, V3, V4) |
|
|
|
|
#define debug_log5(S, V1, V2, V3, V4, V5) |
|
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
gomp_barrier_t gomp_stream_tasks_wait_until_connected_barrier; |
|
|
|
|
gomp_barrier_t gomp_stream_tasks_exit_barrier; |
|
|
|
@ -126,6 +138,89 @@ GOMP_stream_create_stream (size_t element_size, size_t buffer_size)
|
|
|
|
|
return stream; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
|
GOMP_batchQ_create_stream (size_t element_size, size_t buffer_size) |
|
|
|
|
{ |
|
|
|
|
#define BASE_SHM_NAME "/channel" |
|
|
|
|
#define ROUNDUP(size) ((size + page_size) & ~(page_size - 1)) |
|
|
|
|
int ret, shm_fd; |
|
|
|
|
static int chan_idx = 0; |
|
|
|
|
char shm_name[NAME_MAX]; |
|
|
|
|
struct gomp_batchQ *stream; |
|
|
|
|
size_t buffer_realsize, bq_size; |
|
|
|
|
unsigned int page_size; |
|
|
|
|
|
|
|
|
|
buffer_size *= element_size * AGGREGATION_FACTOR; |
|
|
|
|
buffer_realsize = 1; |
|
|
|
|
while(buffer_realsize < buffer_size) |
|
|
|
|
buffer_realsize <<= 1; |
|
|
|
|
|
|
|
|
|
/* Beginning of BatchQueue code */ |
|
|
|
|
stream = NULL; |
|
|
|
|
ret = sysconf(_SC_PAGESIZE); |
|
|
|
|
if (ret == -1) |
|
|
|
|
{ |
|
|
|
|
perror("BatchQueue init sysconf"); |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
page_size = ret; |
|
|
|
|
bq_size = ROUNDUP(sizeof(gomp_batchQ_t) - 1 + buffer_realsize * 2); |
|
|
|
|
ret = snprintf(shm_name, NAME_MAX, BASE_SHM_NAME"%d\n", chan_idx); |
|
|
|
|
if (ret < 0) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "BatchQueue init snprintf failed\n"); |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
else if (ret >= NAME_MAX) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, "Too many streams created: impossible to "); |
|
|
|
|
fprintf(stderr, "create a stream named "BASE_SHM_NAME); |
|
|
|
|
fprintf(stderr, "%d\n", chan_idx); |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
shm_fd = shm_open(shm_name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); |
|
|
|
|
if (shm_fd == -1) |
|
|
|
|
{ |
|
|
|
|
perror("BatchQueue init shm_open failed"); |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
ret = ftruncate(shm_fd, bq_size); |
|
|
|
|
if (ret == -1) |
|
|
|
|
{ |
|
|
|
|
perror("BatchQueue init ftruncate failed"); |
|
|
|
|
goto close_file; |
|
|
|
|
} |
|
|
|
|
stream = mmap(NULL, bq_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, shm_fd, 0); |
|
|
|
|
if (stream == MAP_FAILED) |
|
|
|
|
{ |
|
|
|
|
perror("Batchqueue init mmap failed"); |
|
|
|
|
stream = NULL; |
|
|
|
|
goto close_file; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
stream->state = 0; |
|
|
|
|
|
|
|
|
|
stream->sender_ptr = stream->buf; |
|
|
|
|
stream->receiver_ptr = stream->buf; |
|
|
|
|
|
|
|
|
|
stream->buf_start1 = stream->buf; |
|
|
|
|
stream->buf_start2 = stream->buf + buffer_realsize; |
|
|
|
|
stream->sender_buf = stream->buf_start1; |
|
|
|
|
stream->receiver_buf = stream->buf_start1; |
|
|
|
|
|
|
|
|
|
stream->buffer_size = buffer_realsize; |
|
|
|
|
stream->buffer_mask = buffer_realsize - 1; |
|
|
|
|
|
|
|
|
|
stream->pre_shift = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
close_file: |
|
|
|
|
shm_unlink(shm_name); |
|
|
|
|
return stream; |
|
|
|
|
#undef BASE_SHM_NAME |
|
|
|
|
#undef ROUNDUP |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Allocate and initialize a generic GOMP_STREAM_VIEW that can be
|
|
|
|
|
connected to any stream to give either read or write access |
|
|
|
|
depending on its TYPE. Returns a pointer to the newly allocated |
|
|
|
@ -151,6 +246,22 @@ gomp_stream_create_view (int type, size_t view_size, size_t burst_size)
|
|
|
|
|
return view; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static inline void * |
|
|
|
|
gomp_batchQ_create_view (int type, size_t view_size, size_t burst_size) |
|
|
|
|
{ |
|
|
|
|
gomp_batchQ_view_p view = |
|
|
|
|
(gomp_batchQ_view_p) gomp_malloc (sizeof(gomp_batchQ_view_t)); |
|
|
|
|
|
|
|
|
|
view->stream = NULL; |
|
|
|
|
view->type = type | BATCHQ_VIEW; |
|
|
|
|
view->view_size = view_size; |
|
|
|
|
view->burst_size = burst_size; |
|
|
|
|
view->pxxk_size = view_size - burst_size; |
|
|
|
|
view->termination_flag = false; |
|
|
|
|
|
|
|
|
|
return view; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Wrapper for creating a READ view . */ |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
@ -160,6 +271,13 @@ GOMP_stream_create_read_view (size_t view_size, size_t burst_size)
|
|
|
|
|
return gomp_stream_create_view (READ_VIEW, view_size, burst_size); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
|
GOMP_batchQ_create_read_view (size_t view_size, size_t burst_size) |
|
|
|
|
{ |
|
|
|
|
debug_log_init ("GOMP_stream_create_read_view %zu %zu\n", view_size, burst_size); |
|
|
|
|
return gomp_batchQ_create_view(READ_VIEW, view_size, burst_size); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Wrapper for creating a WRITE view. */ |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
@ -169,6 +287,13 @@ GOMP_stream_create_write_view (size_t view_size, size_t burst_size)
|
|
|
|
|
return gomp_stream_create_view (WRITE_VIEW, view_size, burst_size); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
|
GOMP_batchQ_create_write_view (size_t view_size, size_t burst_size) |
|
|
|
|
{ |
|
|
|
|
debug_log_init ("GOMP_stream_create_write_view %zu %zu\n", view_size, burst_size); |
|
|
|
|
return gomp_batchQ_create_view(WRITE_VIEW, view_size, burst_size); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Allocate and initialize a GOMP_STREAM_TASK data structure. */ |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
@ -182,11 +307,19 @@ GOMP_stream_create_task ()
|
|
|
|
|
task->read_view_list.nr_views = 0; |
|
|
|
|
task->read_view_list.size = 0; |
|
|
|
|
gomp_mutex_init (&task->read_view_list.connect_view_mutex); |
|
|
|
|
task->batchQ_read_view_list.views = NULL; |
|
|
|
|
task->batchQ_read_view_list.nr_views = 0; |
|
|
|
|
task->batchQ_read_view_list.size = 0; |
|
|
|
|
gomp_mutex_init (&task->batchQ_read_view_list.connect_view_mutex); |
|
|
|
|
|
|
|
|
|
task->write_view_list.views = NULL; |
|
|
|
|
task->write_view_list.nr_views = 0; |
|
|
|
|
task->write_view_list.size = 0; |
|
|
|
|
gomp_mutex_init (&task->write_view_list.connect_view_mutex); |
|
|
|
|
task->batchQ_write_view_list.views = NULL; |
|
|
|
|
task->batchQ_write_view_list.nr_views = 0; |
|
|
|
|
task->batchQ_write_view_list.size = 0; |
|
|
|
|
gomp_mutex_init (&task->batchQ_write_view_list.connect_view_mutex); |
|
|
|
|
|
|
|
|
|
task->activation_counter = 0; |
|
|
|
|
task->termination_flag = false; |
|
|
|
@ -210,8 +343,22 @@ GOMP_stream_get_task_activation_counter (void *t)
|
|
|
|
|
void |
|
|
|
|
GOMP_stream_set_task_termination_flag (void *t) |
|
|
|
|
{ |
|
|
|
|
int i; |
|
|
|
|
gomp_stream_view_list_p task_list; |
|
|
|
|
gomp_stream_task_p task = (gomp_stream_task_p) t; |
|
|
|
|
int num_batchQ_read_views = task->batchQ_read_view_list.nr_views; |
|
|
|
|
int num_batchQ_write_views = task->batchQ_write_view_list.nr_views; |
|
|
|
|
|
|
|
|
|
task_list = &task->batchQ_read_view_list; |
|
|
|
|
gomp_mutex_lock (&task_list->connect_view_mutex); |
|
|
|
|
for (i = 0; i < num_batchQ_read_views; ++i) |
|
|
|
|
((gomp_batchQ_view_p) task_list->views[i])->termination_flag = true; |
|
|
|
|
gomp_mutex_unlock (&task_list->connect_view_mutex); |
|
|
|
|
task_list = &task->batchQ_write_view_list; |
|
|
|
|
gomp_mutex_lock (&task_list->connect_view_mutex); |
|
|
|
|
for (i = 0; i < num_batchQ_write_views; ++i) |
|
|
|
|
((gomp_batchQ_view_p) task_list->views[i])->termination_flag = true; |
|
|
|
|
gomp_mutex_unlock (&task_list->connect_view_mutex); |
|
|
|
|
task->termination_flag = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -290,10 +437,10 @@ GOMP_stream_connect_view (void *t, void *s, void *v)
|
|
|
|
|
gomp_stream_p stream = (gomp_stream_p) s; |
|
|
|
|
gomp_stream_view_p view = (gomp_stream_view_p) v; |
|
|
|
|
|
|
|
|
|
gomp_stream_view_handle_p vh = (view->type == READ_VIEW) ? |
|
|
|
|
gomp_stream_view_handle_p vh = ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? |
|
|
|
|
&stream->read_views : &stream->write_views; |
|
|
|
|
gomp_stream_view_list_p stream_list = &vh->view_list; |
|
|
|
|
gomp_stream_view_list_p task_list = (view->type == READ_VIEW) ? |
|
|
|
|
gomp_stream_view_list_p task_list = ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? |
|
|
|
|
&task->read_view_list : &task->write_view_list; |
|
|
|
|
|
|
|
|
|
view->stream = stream; |
|
|
|
@ -302,7 +449,7 @@ GOMP_stream_connect_view (void *t, void *s, void *v)
|
|
|
|
|
stream is initially empty. This is equivalent to releasing the |
|
|
|
|
original buffer_size elements. A write view will start with |
|
|
|
|
buffer_size free space. */ |
|
|
|
|
if (view->type == READ_VIEW) |
|
|
|
|
if ((view->type & VIEW_TYPE_MASK) == READ_VIEW) |
|
|
|
|
view->lower_index = stream->buffer_size; |
|
|
|
|
else |
|
|
|
|
view->local_min_value = stream->buffer_size; |
|
|
|
@ -321,6 +468,32 @@ GOMP_stream_connect_view (void *t, void *s, void *v)
|
|
|
|
|
gomp_stream_add_view_to_list (view, stream_list); |
|
|
|
|
gomp_mutex_unlock (&vh->view_list.connect_view_mutex); |
|
|
|
|
__sync_fetch_and_add (&vh->nr_registered_views, 1); |
|
|
|
|
debug_log5 ("GOMP_stream_connect_view %p %lu %p %s %p\n", stream, |
|
|
|
|
pthread_self(), view, ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? "In" : "Out", |
|
|
|
|
task); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void |
|
|
|
|
GOMP_batchQ_connect_view (void *t, void *s, void *v) |
|
|
|
|
{ |
|
|
|
|
gomp_stream_task_p task = (gomp_stream_task_p) t; |
|
|
|
|
gomp_batchQ_p stream = (gomp_batchQ_p) s; |
|
|
|
|
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v; |
|
|
|
|
|
|
|
|
|
gomp_stream_view_list_p task_list = ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? |
|
|
|
|
&task->batchQ_read_view_list : &task->batchQ_write_view_list; |
|
|
|
|
|
|
|
|
|
/* Register the view with the TASK to which it belongs. This
|
|
|
|
|
operation is local to the task, so there is no need to |
|
|
|
|
synchronize. */ |
|
|
|
|
gomp_mutex_lock (&task_list->connect_view_mutex); |
|
|
|
|
gomp_stream_add_view_to_list ((gomp_stream_view_p) view, task_list); |
|
|
|
|
gomp_mutex_unlock (&task_list->connect_view_mutex); |
|
|
|
|
|
|
|
|
|
view->stream = stream; |
|
|
|
|
debug_log5 ("GOMP_batchQ_connect_view %p %lu %p %s %p\n", stream, |
|
|
|
|
pthread_self(), view, ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? "In" : "Out", |
|
|
|
|
task); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Check whether all the expected views on STREAM have already
|
|
|
|
@ -362,8 +535,12 @@ GOMP_stream_wait_until_connected (void *t)
|
|
|
|
|
|
|
|
|
|
gomp_mutex_lock (&task->read_view_list.connect_view_mutex); |
|
|
|
|
for (i = 0; i < task->read_view_list.nr_views; ++i) |
|
|
|
|
if (!gomp_stream_check_connected (task->read_view_list.views[i]->stream)) |
|
|
|
|
done = false; |
|
|
|
|
{ |
|
|
|
|
if (task->read_view_list.views[i]->type & BATCHQ_VIEW) |
|
|
|
|
continue; |
|
|
|
|
if (!gomp_stream_check_connected (task->read_view_list.views[i]->stream)) |
|
|
|
|
done = false; |
|
|
|
|
} |
|
|
|
|
gomp_mutex_unlock (&task->read_view_list.connect_view_mutex); |
|
|
|
|
|
|
|
|
|
gomp_mutex_lock (&task->write_view_list.connect_view_mutex); |
|
|
|
@ -490,7 +667,7 @@ GOMP_stream_update (void *v, const unsigned long long act_start,
|
|
|
|
|
gomp_stream_p stream = view->stream; |
|
|
|
|
void *buffer_pointer; |
|
|
|
|
|
|
|
|
|
debug_log ("GOMP_stream_update [in] %llu %llu\n", act_start, act_end); |
|
|
|
|
debug_log4 ("GOMP_stream_update [in] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end); |
|
|
|
|
|
|
|
|
|
/* This update requests access to the buffer in [low_idx,up_idx[.
|
|
|
|
|
We will release up to low_idx-1 and acquire up to up_idx-1. */ |
|
|
|
@ -532,11 +709,33 @@ GOMP_stream_update (void *v, const unsigned long long act_start,
|
|
|
|
|
guaranteed access to all the requested data. */ |
|
|
|
|
buffer_pointer = stream->buffer + low_idx_loc; |
|
|
|
|
|
|
|
|
|
debug_log ("GOMP_stream_update [out] %llu %llu\n", act_start, act_end); |
|
|
|
|
debug_log4 ("GOMP_stream_update [out] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end); |
|
|
|
|
|
|
|
|
|
return buffer_pointer; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
|
GOMP_batchQ_update (void *v, const unsigned long long act_start, |
|
|
|
|
const unsigned long long act_end) |
|
|
|
|
{ |
|
|
|
|
unsigned long long low_idx, up_idx; |
|
|
|
|
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v; |
|
|
|
|
gomp_batchQ_p stream = view->stream; |
|
|
|
|
|
|
|
|
|
/* This update requests access to the buffer in [low_idx,up_idx[.
|
|
|
|
|
We will release up to low_idx-1 and acquire up to up_idx-1. */ |
|
|
|
|
low_idx = act_start * view->burst_size; |
|
|
|
|
up_idx = act_end * view->burst_size + view->pxxk_size - 1; |
|
|
|
|
|
|
|
|
|
if (up_idx - low_idx + 1 != stream->buffer_size) |
|
|
|
|
if (!view->termination_flag) |
|
|
|
|
gomp_fatal ("GOMP_batchQ: update requested access to more than buffer_size data."); |
|
|
|
|
|
|
|
|
|
while (!stream->state); |
|
|
|
|
|
|
|
|
|
return (void *) stream->receiver_ptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Request write access for the view V to the stream up to INDEX. */ |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
@ -548,7 +747,7 @@ GOMP_stream_stall (void *v, const unsigned long long act_start,
|
|
|
|
|
gomp_stream_p stream = view->stream; |
|
|
|
|
void *buffer_pointer; |
|
|
|
|
|
|
|
|
|
debug_log ("GOMP_stream_stall [in] %llu %llu\n", act_start, act_end); |
|
|
|
|
debug_log4 ("GOMP_stream_stall [in] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end); |
|
|
|
|
|
|
|
|
|
/* This update requests access to the buffer in [low_idx,up_idx[.
|
|
|
|
|
We will release up to low_idx-1 and acquire up to up_idx-1. */ |
|
|
|
@ -574,11 +773,31 @@ GOMP_stream_stall (void *v, const unsigned long long act_start,
|
|
|
|
|
|
|
|
|
|
buffer_pointer = stream->buffer + (low_idx & stream->buffer_mask); |
|
|
|
|
|
|
|
|
|
debug_log ("GOMP_stream_stall [out] %llu %llu\n", act_start, act_end); |
|
|
|
|
debug_log4 ("GOMP_stream_stall [out] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end); |
|
|
|
|
|
|
|
|
|
return buffer_pointer; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
|
GOMP_batchQ_stall (void *v, const unsigned long long act_start, |
|
|
|
|
const unsigned long long act_end) |
|
|
|
|
{ |
|
|
|
|
unsigned long long low_idx, up_idx; |
|
|
|
|
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v; |
|
|
|
|
gomp_batchQ_p stream = view->stream; |
|
|
|
|
|
|
|
|
|
/* This update requests access to the buffer in [low_idx,up_idx[.
|
|
|
|
|
We will release up to low_idx-1 and acquire up to up_idx-1. */ |
|
|
|
|
low_idx = act_start * view->burst_size + stream->pre_shift; |
|
|
|
|
up_idx = act_end * view->burst_size + view->pxxk_size + stream->pre_shift - 1; |
|
|
|
|
|
|
|
|
|
if (up_idx - low_idx + 1 != stream->buffer_size) |
|
|
|
|
if (!view->termination_flag) |
|
|
|
|
gomp_fatal ("GOMP_stream: stall requested access to an amount of data different that buffer_size."); |
|
|
|
|
|
|
|
|
|
return (void *) stream->sender_ptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Relinquish read access for the view V to the stream up to
|
|
|
|
|
INDEX. */ |
|
|
|
|
|
|
|
|
@ -590,6 +809,25 @@ GOMP_stream_release (void *v, const unsigned long long act_idx)
|
|
|
|
|
debug_log ("GOMP_stream_release %llu %llu\n", act_idx, act_idx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void |
|
|
|
|
GOMP_batchQ_release (void *v, const unsigned long long act_idx) |
|
|
|
|
{ |
|
|
|
|
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v; |
|
|
|
|
gomp_batchQ_p stream = view->stream; |
|
|
|
|
|
|
|
|
|
stream->state = 0; |
|
|
|
|
if (stream->receiver_buf == stream->buf_start1) |
|
|
|
|
{ |
|
|
|
|
stream->receiver_ptr = stream->buf_start2; |
|
|
|
|
stream->receiver_buf = stream->buf_start2; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
stream->receiver_ptr = stream->buf_start1; |
|
|
|
|
stream->receiver_buf = stream->buf_start1; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Relinquish write access for the view V to the stream up to
|
|
|
|
|
INDEX. */ |
|
|
|
|
|
|
|
|
@ -622,6 +860,26 @@ GOMP_stream_commit (void *v, const unsigned long long act_idx)
|
|
|
|
|
debug_log ("GOMP_stream_commit %llu %llu\n", act_idx, act_idx); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void |
|
|
|
|
GOMP_batchQ_commit (void *v, const unsigned long long act_idx) |
|
|
|
|
{ |
|
|
|
|
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v; |
|
|
|
|
gomp_batchQ_p stream = view->stream; |
|
|
|
|
|
|
|
|
|
while (stream->state); |
|
|
|
|
stream->state = 1; |
|
|
|
|
if (stream->sender_buf == stream->buf_start1) |
|
|
|
|
{ |
|
|
|
|
stream->sender_ptr = stream->buf_start2; |
|
|
|
|
stream->sender_buf = stream->buf_start2; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
stream->sender_ptr = stream->buf_start1; |
|
|
|
|
stream->sender_buf = stream->buf_start1; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Finalization and destruction of the streaming data structures. */ |
|
|
|
|
|
|
|
|
|
/* Disconnects VIEW from the stream to which it is connected and free
|
|
|
|
@ -632,7 +890,7 @@ gomp_stream_unregister_view (gomp_stream_view_p view)
|
|
|
|
|
{ |
|
|
|
|
gomp_stream_p stream = view->stream; |
|
|
|
|
gomp_stream_view_handle_p vh = |
|
|
|
|
(view->type == READ_VIEW) ? &stream->read_views : &stream->write_views; |
|
|
|
|
((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? &stream->read_views : &stream->write_views; |
|
|
|
|
int unregistered_views; |
|
|
|
|
|
|
|
|
|
__sync_fetch_and_add (&(vh->nr_unregistered_views), 1); |
|
|
|
@ -641,7 +899,7 @@ gomp_stream_unregister_view (gomp_stream_view_p view)
|
|
|
|
|
|
|
|
|
|
/* Make sure that when multiple views access a stream, the finished
|
|
|
|
|
views do not hinder the others in the min computation. */ |
|
|
|
|
if (view->type == READ_VIEW) |
|
|
|
|
if ((view->type & VIEW_TYPE_MASK) == READ_VIEW) |
|
|
|
|
GOMP_stream_release (view, GOMP_STREAM_MAX_INDEX); |
|
|
|
|
/* The last producer exiting will set the eos_p flag and allow the
|
|
|
|
|
consumers to read up to the highest committed index. */ |
|
|
|
@ -674,6 +932,12 @@ gomp_stream_unregister_view (gomp_stream_view_p view)
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO */ |
|
|
|
|
static inline void |
|
|
|
|
gomp_batchQ_unregister_view (gomp_stream_view_p view) |
|
|
|
|
{ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Invoked before terminating a stream TASK, this disconnects all the
|
|
|
|
|
views and for all streams for which it is the last one to |
|
|
|
|
disconnect from, it frees up all data structures. */ |
|
|
|
@ -683,7 +947,9 @@ GOMP_stream_task_exit (void *t)
|
|
|
|
|
{ |
|
|
|
|
gomp_stream_task_p task = (gomp_stream_task_p) t; |
|
|
|
|
int num_read_views = task->read_view_list.nr_views; |
|
|
|
|
int num_batchQ_read_views = task->batchQ_read_view_list.nr_views; |
|
|
|
|
int num_write_views = task->write_view_list.nr_views; |
|
|
|
|
int num_batchQ_write_views = task->batchQ_write_view_list.nr_views; |
|
|
|
|
int i, res; |
|
|
|
|
debug_log_init ("GOMP_stream_task_exit %zu %zu\n", (size_t) t, (size_t) t); |
|
|
|
|
|
|
|
|
@ -694,11 +960,19 @@ GOMP_stream_task_exit (void *t)
|
|
|
|
|
for (i = 0; i < num_read_views; ++i) |
|
|
|
|
gomp_stream_unregister_view (task->read_view_list.views[i]); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < num_batchQ_read_views; ++i) |
|
|
|
|
gomp_batchQ_unregister_view (task->batchQ_read_view_list.views[i]); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < num_write_views; ++i) |
|
|
|
|
gomp_stream_unregister_view (task->write_view_list.views[i]); |
|
|
|
|
|
|
|
|
|
for (i = 0; i < num_batchQ_write_views; ++i) |
|
|
|
|
gomp_batchQ_unregister_view (task->batchQ_write_view_list.views[i]); |
|
|
|
|
|
|
|
|
|
free (task->read_view_list.views); |
|
|
|
|
free (task->batchQ_read_view_list.views); |
|
|
|
|
free (task->write_view_list.views); |
|
|
|
|
free (task->batchQ_write_view_list.views); |
|
|
|
|
free (task); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -748,6 +1022,8 @@ GOMP_stream_get_available_work (void *t, unsigned long long *start_idx)
|
|
|
|
|
return AGGREGATION_FACTOR; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* TODO? GOMP_batchQ_get_available_work */ |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* Initialize streaming in this region. */ |
|
|
|
|
|
|
|
|
@ -760,6 +1036,8 @@ GOMP_stream_init ()
|
|
|
|
|
gomp_barrier_init (&gomp_stream_tasks_exit_barrier, gomp_stream_tasks_count); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* No GOMP_batchQ_init */ |
|
|
|
|
|
|
|
|
|
/* Wait until all streaming threads complete. */ |
|
|
|
|
|
|
|
|
|
void |
|
|
|
@ -768,6 +1046,8 @@ GOMP_stream_exit ()
|
|
|
|
|
gomp_barrier_wait (&gomp_stream_tasks_exit_barrier); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* No GOMP_batchQ_exit */ |
|
|
|
|
|
|
|
|
|
/* Request SIZE bytes for a PRE operator on stream S. Return a
|
|
|
|
|
pointer where data should be stored. */ |
|
|
|
|
void * |
|
|
|
@ -783,6 +1063,19 @@ GOMP_stream_pre (void *s, const unsigned long long size)
|
|
|
|
|
return stream->buffer; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void * |
|
|
|
|
GOMP_batchQ_pre (void *s, const unsigned long long size) |
|
|
|
|
{ |
|
|
|
|
gomp_batchQ_p stream = (gomp_batchQ_p) s; |
|
|
|
|
|
|
|
|
|
debug_log_init ("GOMP_stream_pre %zu \t %llu\n", (size_t) s, size); |
|
|
|
|
|
|
|
|
|
gomp_fatal ("GOMP_batchQ_pre not supported now: missing code in GOMP_batchQ_commit and GOMP_batchQ_stall\n"); |
|
|
|
|
stream->pre_shift = size; |
|
|
|
|
|
|
|
|
|
return (void *) stream->sender_ptr; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* This function is a pthread_create entry point for streaming
|
|
|
|
|
tasks. */ |
|
|
|
|