rt_gccstream/libgomp/stream.c

1298 lines
41 KiB
C

/* Copyright (C) 2010 Free Software Foundation, Inc.
Contributed by Antoniu Pop <antoniu.pop@gmail.com>
and Thomas Preud'homme <thomas.preud-homme@lip6.fr>.
This file is part of the GNU OpenMP Library (libgomp).
Libgomp is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
Libgomp is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
more details.
You should have received a copy of the GNU Lesser General Public License
along with libgomp; see the file COPYING.LIB. If not, write to the
Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
MA 02110-1301, USA. */
/* As a special exception, if you link this library with other files, some
of which are compiled with GCC, to produce an executable, this library
does not by itself cause the resulting executable to be covered by the
GNU General Public License. This exception does not however invalidate
any other reasons why the executable file might be covered by the GNU
General Public License. */
/* This implements the stream communication layer for libGOMP. */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/time.h>
#include "wait.h"
#include "sem.h"
#include "mutex.h"
#include "libgomp.h"
#define AGGREGATION_FACTOR 32
#define ENSURE_BUFSIZE
//#define OMP_STREAM_DEBUG
#ifdef OMP_STREAM_DEBUG
#define debug_log_init(S, V1, V2) printf (S, V1, V2); fflush (stdout)
#define debug_log_init3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout)
#define debug_log(S, V1, V2) printf (S, V1, V2); fflush (stdout)
#define debug_log3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout)
#define debug_log4(S, V1, V2, V3, V4) printf (S, V1, V2, V3, V4); fflush (stdout)
#define debug_log5(S, V1, V2, V3, V4, V5) printf (S, V1, V2, V3, V4, V5); fflush (stdout)
#else
#define debug_log_init(S, V1, V2)
#define debug_log_init3(S, V1, V2, V3)
#define debug_log(S, V1, V2)
#define debug_log3(S, V1, V2, V3)
#define debug_log4(S, V1, V2, V3, V4)
#define debug_log5(S, V1, V2, V3, V4, V5)
#endif
gomp_barrier_t gomp_stream_tasks_wait_until_connected_barrier;
gomp_barrier_t gomp_stream_tasks_exit_barrier;
unsigned gomp_stream_tasks_count;
/* This structure is used to communicate across pthread_create. */
struct gomp_stream_thread_start_data
{
void (*fn) (void *);
void *fn_data;
int id;
};
/* Data structures creation and pipeline initialization. */
/* Allocate and initialize a GOMP_STREAM for data elements of size
ELEMENT_SIZE using a circular buffer of STREAM_BUFFER_SIZE such
elements. Returns a pointer to the newly allocated stream. The
user may provide a pointer to pre-allocated memory to be used as
BUFFER for the stream. */
void *
GOMP_stream_create_stream (size_t element_size, size_t buffer_size)
{
gomp_stream_p stream = (gomp_stream_p) gomp_malloc (sizeof (gomp_stream_t));
debug_log_init ("GOMP_stream_create_stream %zu %zu\n", element_size, buffer_size);
if (buffer_size < 2)
gomp_fatal ("GOMP_stream: insufficient stream buffer size.");
/* Initialize and allocate the data buffer. We force the
buffer_size to be a power of 2 for efficient modulo computation
of the indices in the circular buffer. */
/* To avoid excessive multiplication operations, we convert the
accounting from elements to bytes. */
buffer_size *= element_size;
stream->buffer_size = 1;
while(stream->buffer_size < buffer_size)
stream->buffer_size <<= 1;
stream->buffer_mask = stream->buffer_size - 1;
stream->buffer =
(void *) gomp_malloc (stream->buffer_size * 2);
stream->expected_ready_p = false;
stream->connected_p = false;
stream->eos_p = false;
stream->pre_shift = 0;
stream->unregistered_views = 0;
/* Initialize the view_handles. */
stream->read_views.current_min = stream->buffer_size;
stream->read_views.current_max = 0;
stream->read_views.view_list.views = NULL;
stream->read_views.view_list.nr_views = 0;
stream->read_views.view_list.size = 0;
stream->read_views.nr_expected_views = 0;
stream->read_views.nr_registered_views = 0;
stream->read_views.nr_unregistered_views = 0;
gomp_mutex_init (&stream->read_views.view_list.connect_view_mutex);
stream->write_views.current_min = 0;
stream->write_views.current_max = stream->buffer_size;
stream->write_views.view_list.views = NULL;
stream->write_views.view_list.nr_views = 0;
stream->write_views.view_list.size = 0;
stream->write_views.nr_expected_views = 0;
stream->write_views.nr_registered_views = 0;
stream->write_views.nr_unregistered_views = 0;
stream->tvr.tv_sec = 0;
stream->tvr.tv_usec = 0;
stream->tvw.tv_sec = 0;
stream->tvw.tv_usec = 0;
gomp_mutex_init (&stream->write_views.view_list.connect_view_mutex);
#ifndef HAVE_SYNC_BUILTINS
gomp_mutex_init (&stream->stream_mutex);
#endif
return stream;
}
void *
GOMP_batchQ_create_stream (size_t element_size, size_t buffer_size)
{
#define BASE_SHM_NAME "/channel"
#define ROUNDUP(size) ((size + page_size) & ~(page_size - 1))
int ret, shm_fd;
static int chan_idx = 0;
char shm_name[NAME_MAX];
struct gomp_batchQ *stream;
size_t buffer_realsize, bq_size;
unsigned int page_size;
buffer_size *= element_size * AGGREGATION_FACTOR;
buffer_realsize = 1;
while(buffer_realsize < buffer_size)
buffer_realsize <<= 1;
/* Beginning of BatchQueue code */
stream = NULL;
ret = sysconf(_SC_PAGESIZE);
if (ret == -1)
{
perror("BatchQueue init sysconf");
return NULL;
}
page_size = ret;
bq_size = ROUNDUP(sizeof(gomp_batchQ_t) - 1 + buffer_realsize * 2);
ret = snprintf(shm_name, NAME_MAX, BASE_SHM_NAME"%d\n", chan_idx);
if (ret < 0)
{
fprintf(stderr, "BatchQueue init snprintf failed\n");
return NULL;
}
else if (ret >= NAME_MAX)
{
fprintf(stderr, "Too many streams created: impossible to ");
fprintf(stderr, "create a stream named "BASE_SHM_NAME);
fprintf(stderr, "%d\n", chan_idx);
return NULL;
}
shm_fd = shm_open(shm_name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (shm_fd == -1)
{
perror("BatchQueue init shm_open failed");
return NULL;
}
ret = ftruncate(shm_fd, bq_size);
if (ret == -1)
{
perror("BatchQueue init ftruncate failed");
goto close_file;
}
stream = mmap(NULL, bq_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, shm_fd, 0);
if (stream == MAP_FAILED)
{
perror("Batchqueue init mmap failed");
stream = NULL;
goto close_file;
}
stream->state = 0;
stream->sender_ptr = stream->buf;
stream->receiver_ptr = stream->buf;
stream->buf_start1 = stream->buf;
stream->buf_start2 = stream->buf + buffer_realsize;
stream->sender_buf = stream->buf_start1;
stream->receiver_buf = stream->buf_start1;
stream->buffer_size = buffer_realsize;
stream->buffer_mask = buffer_realsize - 1;
stream->pre_shift = 0;
stream->tvr.tv_sec = 0;
stream->tvr.tv_usec = 0;
stream->tvw.tv_sec = 0;
stream->tvw.tv_usec = 0;
close_file:
shm_unlink(shm_name);
return stream;
#undef BASE_SHM_NAME
#undef ROUNDUP
}
/* Allocate and initialize a generic GOMP_STREAM_VIEW that can be
connected to any stream to give either read or write access
depending on its TYPE. Returns a pointer to the newly allocated
view. This view accesses VIEW_SIZE bytes in the stream and
commits/releases BURST_SIZE bytes per activation. */
static inline void *
gomp_stream_create_view (int type, size_t view_size, size_t burst_size)
{
gomp_stream_view_p view =
(gomp_stream_view_p) gomp_malloc (sizeof(gomp_stream_view_t));
view->lower_index = 0;
view->upper_index = 0;
view->stream = NULL;
view->end_p = false;
view->type = type;
view->local_min_value = 0;
view->view_size = view_size;
view->burst_size = burst_size;
view->pxxk_size = view_size - burst_size;
return view;
}
static inline void *
gomp_batchQ_create_view (int type, size_t view_size, size_t burst_size)
{
gomp_batchQ_view_p view =
(gomp_batchQ_view_p) gomp_malloc (sizeof(gomp_batchQ_view_t));
view->stream = NULL;
view->type = type | BATCHQ_VIEW;
view->view_size = view_size;
view->burst_size = burst_size;
view->pxxk_size = view_size - burst_size;
view->termination_flag = false;
return view;
}
/* Wrapper for creating a READ view . */
void *
GOMP_stream_create_read_view (size_t view_size, size_t burst_size)
{
debug_log_init ("GOMP_stream_create_read_view %zu %zu\n", view_size, burst_size);
return gomp_stream_create_view (READ_VIEW, view_size, burst_size);
}
void *
GOMP_batchQ_create_read_view (size_t view_size, size_t burst_size)
{
debug_log_init ("GOMP_stream_create_read_view %zu %zu\n", view_size, burst_size);
return gomp_batchQ_create_view(READ_VIEW, view_size, burst_size);
}
/* Wrapper for creating a WRITE view. */
void *
GOMP_stream_create_write_view (size_t view_size, size_t burst_size)
{
debug_log_init ("GOMP_stream_create_write_view %zu %zu\n", view_size, burst_size);
return gomp_stream_create_view (WRITE_VIEW, view_size, burst_size);
}
void *
GOMP_batchQ_create_write_view (size_t view_size, size_t burst_size)
{
debug_log_init ("GOMP_stream_create_write_view %zu %zu\n", view_size, burst_size);
return gomp_batchQ_create_view(WRITE_VIEW, view_size, burst_size);
}
/* Allocate and initialize a GOMP_STREAM_TASK data structure. */
void *
GOMP_stream_create_task ()
{
gomp_stream_task_p task =
(gomp_stream_task_p) gomp_malloc (sizeof(gomp_stream_task_t));
debug_log_init3 ("GOMP_stream_create_task %d %d \t %15zu\n", 0, 0, (size_t) task);
task->read_view_list.views = NULL;
task->read_view_list.nr_views = 0;
task->read_view_list.size = 0;
gomp_mutex_init (&task->read_view_list.connect_view_mutex);
task->batchQ_read_view_list.views = NULL;
task->batchQ_read_view_list.nr_views = 0;
task->batchQ_read_view_list.size = 0;
gomp_mutex_init (&task->batchQ_read_view_list.connect_view_mutex);
task->write_view_list.views = NULL;
task->write_view_list.nr_views = 0;
task->write_view_list.size = 0;
gomp_mutex_init (&task->write_view_list.connect_view_mutex);
task->batchQ_write_view_list.views = NULL;
task->batchQ_write_view_list.nr_views = 0;
task->batchQ_write_view_list.size = 0;
gomp_mutex_init (&task->batchQ_write_view_list.connect_view_mutex);
task->activation_counter = 0;
task->termination_flag = false;
task->first_unassigned_activation_counter = 0;
task->num_instances = 0;
__sync_synchronize ();
return task;
}
volatile void *
GOMP_stream_get_task_activation_counter (void *t)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
return &(task->activation_counter);
}
void
GOMP_stream_set_task_termination_flag (void *t)
{
int i;
gomp_stream_view_list_p task_list;
gomp_stream_task_p task = (gomp_stream_task_p) t;
int num_batchQ_read_views = task->batchQ_read_view_list.nr_views;
int num_batchQ_write_views = task->batchQ_write_view_list.nr_views;
task_list = &task->batchQ_read_view_list;
gomp_mutex_lock (&task_list->connect_view_mutex);
for (i = 0; i < num_batchQ_read_views; ++i)
((gomp_batchQ_view_p) task_list->views[i])->termination_flag = true;
gomp_mutex_unlock (&task_list->connect_view_mutex);
task_list = &task->batchQ_write_view_list;
gomp_mutex_lock (&task_list->connect_view_mutex);
for (i = 0; i < num_batchQ_write_views; ++i)
((gomp_batchQ_view_p) task_list->views[i])->termination_flag = true;
gomp_mutex_unlock (&task_list->connect_view_mutex);
task->termination_flag = true;
}
void
GOMP_stream_task_add_instance (void *t)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
__sync_fetch_and_add (&task->num_instances, 1);
__sync_synchronize ();
}
/* Declare additional READ_VIEWS and WRITE_VIEWS expected views on
stream S. When possible, the thread that creates the streaming
tasks should declare, for each stream, the number of read/write
views that will connect to a stream before the streaming tasks are
started. If this function is called on a stream, there will be no
further checks for the number of tasks partaking in the
initialization synchronization. */
void
GOMP_stream_add_expected_views (void *s, int read_views, int write_views,
int final)
{
gomp_stream_p stream = (gomp_stream_p) s;
debug_log_init ("GOMP_stream_add_expected_views %d %d\n", read_views, write_views);
if (stream->expected_ready_p)
gomp_fatal
("GOMP_stream: attempting to modify a final number of expected views.");
stream->expected_ready_p = final;
#ifdef HAVE_SYNC_BUILTINS
__sync_fetch_and_add (&stream->read_views.nr_expected_views, read_views);
__sync_fetch_and_add (&stream->write_views.nr_expected_views, write_views);
#else
gomp_mutex_lock (&stream->stream_mutex);
stream->read_views.nr_expected_views += read_views;
stream->write_views.nr_expected_views += write_views;
gomp_mutex_unlock (&stream->stream_mutex);
#endif
}
/* Add VIEW to the VIEW_LIST. We actually use an array as this list
is only modified in the initialization phase and we never remove
any items from it. */
static inline void
gomp_stream_add_view_to_list (gomp_stream_view_p view,
gomp_stream_view_list_p view_list)
{
/* Allocate memory when needed. */
if (view_list->views == NULL || view_list->nr_views == view_list->size)
{
if (view_list->size == 0)
view_list->size = 4;
if (view_list->nr_views == view_list->size)
view_list->size <<= 1;
view_list->views =
(gomp_stream_view_p *) gomp_realloc (view_list->views,
view_list->size * sizeof (gomp_stream_view_p));
}
view_list->views[view_list->nr_views] = view;
view_list->nr_views += 1;
}
/* Connect a VIEW to a STREAM and also to the TASK which will use it.
This effectively builds the runtime task graph. */
void
GOMP_stream_connect_view (void *t, void *s, void *v)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
gomp_stream_p stream = (gomp_stream_p) s;
gomp_stream_view_p view = (gomp_stream_view_p) v;
gomp_stream_view_handle_p vh = ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ?
&stream->read_views : &stream->write_views;
gomp_stream_view_list_p stream_list = &vh->view_list;
gomp_stream_view_list_p task_list = ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ?
&task->read_view_list : &task->write_view_list;
view->stream = stream;
/* A read view's lower index is shifted by the buffer_size as the
stream is initially empty. This is equivalent to releasing the
original buffer_size elements. A write view will start with
buffer_size free space. */
if ((view->type & VIEW_TYPE_MASK) == READ_VIEW)
view->lower_index = stream->buffer_size;
else
view->local_min_value = stream->buffer_size;
/* Register the view with the TASK to which it belongs. This
operation is local to the task, so there is no need to
synchronize. */
gomp_mutex_lock (&task_list->connect_view_mutex);
gomp_stream_add_view_to_list (view, task_list);
gomp_mutex_unlock (&task_list->connect_view_mutex);
/* Connect the view to the stream. This must be done atomically as
this data structure is shared with the other producer/consumer
tasks. */
gomp_mutex_lock (&vh->view_list.connect_view_mutex);
gomp_stream_add_view_to_list (view, stream_list);
gomp_mutex_unlock (&vh->view_list.connect_view_mutex);
__sync_fetch_and_add (&vh->nr_registered_views, 1);
debug_log5 ("GOMP_stream_connect_view %p %lu %p %s %p\n", stream,
pthread_self(), view, ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? "In" : "Out",
task);
}
void
GOMP_batchQ_connect_view (void *t, void *s, void *v)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
gomp_batchQ_p stream = (gomp_batchQ_p) s;
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v;
gomp_stream_view_list_p task_list = ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ?
&task->batchQ_read_view_list : &task->batchQ_write_view_list;
/* Register the view with the TASK to which it belongs. This
operation is local to the task, so there is no need to
synchronize. */
gomp_mutex_lock (&task_list->connect_view_mutex);
gomp_stream_add_view_to_list ((gomp_stream_view_p) view, task_list);
gomp_mutex_unlock (&task_list->connect_view_mutex);
view->stream = stream;
debug_log5 ("GOMP_batchQ_connect_view %p %lu %p %s %p\n", stream,
pthread_self(), view, ((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? "In" : "Out",
task);
}
/* Check whether all the expected views on STREAM have already
connected. */
static inline bool
gomp_stream_check_connected (gomp_stream_p stream)
{
if (!stream->expected_ready_p)
return false;
if (stream->connected_p)
return true;
if ((stream->read_views.view_list.nr_views
== stream->read_views.nr_expected_views)
&& (stream->write_views.view_list.nr_views
== stream->write_views.nr_expected_views))
{
stream->connected_p = true;
return true;
}
return false;
}
/* Wait until all the streams to which TASK connects are ready and
connected to all producer/consumer tasks. */
void
GOMP_stream_wait_until_connected (void *t)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
bool done;
int i;
do
{
done = true;
gomp_mutex_lock (&task->read_view_list.connect_view_mutex);
for (i = 0; i < task->read_view_list.nr_views; ++i)
{
if (task->read_view_list.views[i]->type & BATCHQ_VIEW)
continue;
if (!gomp_stream_check_connected (task->read_view_list.views[i]->stream))
done = false;
}
gomp_mutex_unlock (&task->read_view_list.connect_view_mutex);
gomp_mutex_lock (&task->write_view_list.connect_view_mutex);
for (i = 0; i < task->write_view_list.nr_views; ++i)
if (!gomp_stream_check_connected (task->write_view_list.views[i]->stream))
done = false;
gomp_mutex_unlock (&task->write_view_list.connect_view_mutex);
}
while (!done);
debug_log_init ("GOMP_stream_wait_until_connected %zu %zu\n", (size_t) task, (size_t) task);
}
/* Stream communication/synchronization. */
/* Compute the minimum of the LOWER_INDEX fields of all views in the
LIST of views. This is used during the termination phase to give
access to the readers up to the highest committed index. This is
only useful when producers, not too far apart in indices they
committed, forget to commit up to the last index that should appear
in the stream. */
static inline unsigned long long
gomp_stream_compute_lower_max (gomp_stream_view_list_p list)
{
unsigned long long local_max = 0;
gomp_stream_view_p *views = list->views;
int i;
for (i = 0; i < list->nr_views; ++i)
if (views[i]->lower_index > local_max)
local_max = views[i]->lower_index;
return local_max;
}
/* Compute the minimum of the LOWER_INDEX fields of all views in the
LIST of views. For a LIST of read views, this minimum represents
the highest index released by all read views on a stream (i.e. the
index of elements that all consumers have already discarded) and
therefore the highest index a write view will be allowed to acquire
for writing. For a LIST of write views, this minimum represents
the highest index all producers have committed and thus the highest
index available for reading. */
static inline unsigned long long
gomp_stream_compute_lower_min (gomp_stream_view_list_p list)
{
unsigned long long local_min = GOMP_STREAM_MAX_INDEX;
gomp_stream_view_p *views = list->views;
int i;
for (i = 0; i < list->nr_views; ++i)
if (views[i]->lower_index < local_min)
local_min = views[i]->lower_index;
return local_min;
}
/* Compute the minimum of the UPPER_INDEX fields of all views in the
LIST of views. Similar to the above, but this is only a hint on
the resources that another producer (resp. consumer) has already
acquired. If a producer (resp. consumer) has successfully acquired
an index with GOMP_stream_stall (resp. GOMP_stream_update) for
writing (resp. reading), then all other producers (resp. consumers)
on the same stream can access up to the same index without further
verification. */
static inline unsigned long long
gomp_stream_compute_upper_min (gomp_stream_view_list_p list)
{
unsigned long long local_min = GOMP_STREAM_MAX_INDEX;
gomp_stream_view_p *views = list->views;
int i;
for (i = 0; i < list->nr_views; ++i)
if (views[i]->upper_index < local_min)
local_min = views[i]->upper_index;
return local_min;
}
/* Wait until the producers (resp. consumers) on this stream have
committed (resp. released) up to the INDEX position in the stream.
When that hapens, the consumer (resp. producer) connected to the
stream through VIEW is allowed to access the elements up to
INDEX. */
static inline void
gomp_stream_wait_release (gomp_stream_view_p view,
gomp_stream_view_handle_p vh,
const unsigned long long index)
{
/* Test whether someone already got a hold of a bigger index
yet. */
if (view->local_min_value < index)
{
while (vh->current_min < index && !view->stream->eos_p)
{
unsigned long long local_min =
gomp_stream_compute_lower_min (&vh->view_list);
if (vh->current_min == local_min)
__asm volatile ("pause" : : : "memory");
else
vh->current_min = local_min;
}
view->local_min_value = vh->current_min;
}
}
/* Request read access for the view V to the stream up to INDEX. In
case the producers have finished and there is not enough data, the
returned value is the highest index to which the view is allowed to
access the stream. */
void *
GOMP_stream_update (void *v, const unsigned long long act_start,
const unsigned long long act_end)
{
unsigned long long low_idx, up_idx;
size_t low_idx_loc, up_idx_loc;
gomp_stream_view_p view = (gomp_stream_view_p) v;
gomp_stream_p stream = view->stream;
void *buffer_pointer;
debug_log4 ("GOMP_stream_update [in] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end);
/* This update requests access to the buffer in [low_idx,up_idx[.
We will release up to low_idx-1 and acquire up to up_idx-1. */
low_idx = act_start * view->burst_size;
up_idx = act_end * view->burst_size + view->pxxk_size - 1;
if (up_idx - low_idx > stream->buffer_size)
gomp_fatal ("GOMP_stream: update requested access to more than buffer_size data.");
view->lower_index = low_idx + view->stream->buffer_size;
view->upper_index = up_idx;
/* In case another consumer has received permission to read up to a
yet higher index, then there is no need to check for this one. */
if (up_idx > view->stream->read_views.current_max)
{
struct timeval tv1, tv2;
time_t sec;
unsigned int usec;
gettimeofday(&tv1, NULL);
gomp_stream_wait_release (view, &view->stream->write_views, up_idx);
gettimeofday(&tv2, NULL);
if (tv2.tv_usec < tv1.tv_usec)
{
sec = tv2.tv_sec - tv1.tv_sec - 1;
usec = 1000000 + tv2.tv_usec - tv1.tv_usec;
}
else
{
sec = tv2.tv_sec - tv1.tv_sec;
usec = tv2.tv_usec - tv1.tv_usec;
}
stream->tvr.tv_usec = (stream->tvr.tv_usec + usec) % 1000000;
stream->tvr.tv_sec += sec + ((stream->tvr.tv_usec + usec) / 1000000);
view->stream->read_views.current_max = up_idx;
}
low_idx_loc = low_idx & stream->buffer_mask;
up_idx_loc = up_idx & stream->buffer_mask;
/* Once we know enough data is available for reading, we need to
check whether the data between the lower and upper buonds is
contiguous or if the buffer wrap-around occurs in the middle. */
if (low_idx_loc > up_idx_loc)
{
/* FIXME: does this require synchronization or is concurrent
overwriting acceptable as long as enough data has been copied
at the end? */
memcpy (stream->buffer + stream->buffer_size, stream->buffer,
up_idx_loc + 1);
//printf ("Update copy: (%llu,%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_start, act_end, low_idx, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1);
}
/* We return a pointer to a contiguous array where this view is
guaranteed access to all the requested data. */
buffer_pointer = stream->buffer + low_idx_loc;
debug_log4 ("GOMP_stream_update [out] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end);
return buffer_pointer;
}
void *
GOMP_batchQ_update (void *v, const unsigned long long act_start,
const unsigned long long act_end)
{
unsigned long long low_idx, up_idx;
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v;
gomp_batchQ_p stream = view->stream;
/* This update requests access to the buffer in [low_idx,up_idx[.
We will release up to low_idx-1 and acquire up to up_idx-1. */
low_idx = act_start * view->burst_size;
up_idx = act_end * view->burst_size + view->pxxk_size - 1;
#ifdef ENSURE_BUFSIZE
if (up_idx - low_idx + 1 != stream->buffer_size)
{
#endif
if (up_idx - low_idx + 1 > stream->buffer_size)
gomp_fatal ("GOMP_batchQ: update requested access to more than buffer_size data.");
#ifdef ENSURE_BUFSIZE
else if (!view->termination_flag)
gomp_fatal ("GOMP_batchQ: update requested access to less than buffer_size data.");
}
#endif
if (!stream->state)
{
struct timeval tv1, tv2;
time_t sec;
unsigned int usec;
gettimeofday(&tv1, NULL);
while (!stream->state)
sched_yield();
gettimeofday(&tv2, NULL);
if (tv2.tv_usec < tv1.tv_usec)
{
sec = tv2.tv_sec - tv1.tv_sec - 1;
usec = 1000000 + tv2.tv_usec - tv1.tv_usec;
}
else
{
sec = tv2.tv_sec - tv1.tv_sec;
usec = tv2.tv_usec - tv1.tv_usec;
}
stream->tvr.tv_usec = (stream->tvr.tv_usec + usec) % 1000000;
stream->tvr.tv_sec += sec + ((stream->tvr.tv_usec + usec) / 1000000);
}
return (void *) stream->receiver_ptr;
}
/* Request write access for the view V to the stream up to INDEX. */
void *
GOMP_stream_stall (void *v, const unsigned long long act_start,
const unsigned long long act_end)
{
unsigned long long low_idx, up_idx;
gomp_stream_view_p view = (gomp_stream_view_p) v;
gomp_stream_p stream = view->stream;
void *buffer_pointer;
debug_log4 ("GOMP_stream_stall [in] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end);
/* This update requests access to the buffer in [low_idx,up_idx[.
We will release up to low_idx-1 and acquire up to up_idx-1. */
low_idx = act_start * view->burst_size + stream->pre_shift;
up_idx = act_end * view->burst_size + view->pxxk_size + stream->pre_shift - 1;
if (up_idx - low_idx > stream->buffer_size)
{
fprintf (stderr, "Requesting data from low: %llu to up: %llu act [%llu,%llu] for burst:%zu size: %zu\n", low_idx, up_idx, act_start, act_end, view->burst_size, view->view_size);
gomp_fatal ("GOMP_stream: stall requested access to more than buffer_size data.");
}
/* We do not need to worry about wrap-around copying as this
"commit" only means that we do not want to write to those
indices below low_idx. */
view->lower_index = low_idx;
view->upper_index = up_idx;
if (up_idx > stream->write_views.current_max)
{
struct timeval tv1, tv2;
time_t sec;
unsigned int usec;
gettimeofday(&tv1, NULL);
gomp_stream_wait_release (view, &stream->read_views, up_idx);
gettimeofday(&tv2, NULL);
if (tv2.tv_usec < tv1.tv_usec)
{
sec = tv2.tv_sec - tv1.tv_sec - 1;
usec = 1000000 + tv2.tv_usec - tv1.tv_usec;
}
else
{
sec = tv2.tv_sec - tv1.tv_sec;
usec = tv2.tv_usec - tv1.tv_usec;
}
stream->tvw.tv_usec = (stream->tvw.tv_usec + usec) % 1000000;
stream->tvw.tv_sec += sec + ((stream->tvw.tv_usec + usec) / 1000000);
stream->write_views.current_max = up_idx;
}
buffer_pointer = stream->buffer + (low_idx & stream->buffer_mask);
debug_log4 ("GOMP_stream_stall [out] %p %zd %llu %llu\n", stream, view->burst_size, act_start, act_end);
return buffer_pointer;
}
void *
GOMP_batchQ_stall (void *v, const unsigned long long act_start,
const unsigned long long act_end)
{
unsigned long long low_idx, up_idx;
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v;
gomp_batchQ_p stream = view->stream;
/* This update requests access to the buffer in [low_idx,up_idx[.
We will release up to low_idx-1 and acquire up to up_idx-1. */
low_idx = act_start * view->burst_size + stream->pre_shift;
up_idx = act_end * view->burst_size + view->pxxk_size + stream->pre_shift - 1;
#ifdef ENSURE_BUFSIZE
if (up_idx - low_idx + 1 != stream->buffer_size)
{
#endif
if (up_idx - low_idx + 1 > stream->buffer_size)
{
gomp_fatal ("GOMP_batchQ: stall requested access to more than buffer_size.");
}
#ifdef ENSURE_BUFSIZE
else if (!view->termination_flag)
gomp_fatal ("GOMP_stream: stall requested access to less than buffer_size.");
}
#endif
return (void *) stream->sender_ptr;
}
/* Relinquish read access for the view V to the stream up to
INDEX. */
void
GOMP_stream_release (void *v, const unsigned long long act_idx)
{
gomp_stream_view_p view = (gomp_stream_view_p) v;
view->lower_index = act_idx * view->burst_size + view->stream->buffer_size - 1;
debug_log ("GOMP_stream_release %llu %llu\n", act_idx, act_idx);
}
void
GOMP_batchQ_release (void *v, const unsigned long long act_idx)
{
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v;
gomp_batchQ_p stream = view->stream;
stream->state = 0;
if (stream->receiver_buf == stream->buf_start1)
{
stream->receiver_ptr = stream->buf_start2;
stream->receiver_buf = stream->buf_start2;
}
else
{
stream->receiver_ptr = stream->buf_start1;
stream->receiver_buf = stream->buf_start1;
}
}
/* Relinquish write access for the view V to the stream up to
INDEX. */
void
GOMP_stream_commit (void *v, const unsigned long long act_idx)
{
gomp_stream_view_p view = (gomp_stream_view_p) v;
gomp_stream_p stream = view->stream;
unsigned long long up_idx = act_idx * view->burst_size + stream->pre_shift - 1;
size_t low_idx_loc, up_idx_loc;
low_idx_loc = view->lower_index & stream->buffer_mask;
up_idx_loc = up_idx & stream->buffer_mask;
/* Once we know enough data is available for reading, we need to
check whether the data between the lower and upper buonds is
contiguous or if the buffer wrap-around occurs in the middle. */
if (low_idx_loc > up_idx_loc)
{
/* FIXME: does this require synchronization or is concurrent
overwriting acceptable as long as enough data has been copied
at the end? */
memcpy (stream->buffer, stream->buffer + stream->buffer_size,
up_idx_loc + 1);
//printf ("Commit copy: (%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_idx, view->lower_index, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1);
}
view->lower_index = up_idx;
debug_log ("GOMP_stream_commit %llu %llu\n", act_idx, act_idx);
}
void
GOMP_batchQ_commit (void *v, const unsigned long long act_idx)
{
gomp_batchQ_view_p view = (gomp_batchQ_view_p) v;
gomp_batchQ_p stream = view->stream;
if (stream->state)
{
struct timeval tv1, tv2;
time_t sec;
unsigned int usec;
gettimeofday(&tv1, NULL);
while (stream->state)
sched_yield();
gettimeofday(&tv2, NULL);
if (tv2.tv_usec < tv1.tv_usec)
{
sec = tv2.tv_sec - tv1.tv_sec - 1;
usec = 1000000 + tv2.tv_usec - tv1.tv_usec;
}
else
{
sec = tv2.tv_sec - tv1.tv_sec;
usec = tv2.tv_usec - tv1.tv_usec;
}
stream->tvw.tv_usec = (stream->tvw.tv_usec + usec) % 1000000;
stream->tvw.tv_sec += sec + ((stream->tvw.tv_usec + usec) / 1000000);
}
stream->state = 1;
if (stream->sender_buf == stream->buf_start1)
{
stream->sender_ptr = stream->buf_start2;
stream->sender_buf = stream->buf_start2;
}
else
{
stream->sender_ptr = stream->buf_start1;
stream->sender_buf = stream->buf_start1;
}
}
/* Finalization and destruction of the streaming data structures. */
/* Disconnects VIEW from the stream to which it is connected and free
the stream if it was the last task to disconnect. */
static inline void
gomp_stream_unregister_view (gomp_stream_view_p view)
{
gomp_stream_p stream = view->stream;
gomp_stream_view_handle_p vh =
((view->type & VIEW_TYPE_MASK) == READ_VIEW) ? &stream->read_views : &stream->write_views;
int unregistered_views;
__sync_fetch_and_add (&(vh->nr_unregistered_views), 1);
unregistered_views = __sync_add_and_fetch (&(stream->unregistered_views), 1);
/* Make sure that when multiple views access a stream, the finished
views do not hinder the others in the min computation. */
if ((view->type & VIEW_TYPE_MASK) == READ_VIEW)
GOMP_stream_release (view, GOMP_STREAM_MAX_INDEX);
/* The last producer exiting will set the eos_p flag and allow the
consumers to read up to the highest committed index. */
else if (vh->nr_unregistered_views == vh->nr_registered_views)
{
stream->eos_p = true;
vh->current_min = gomp_stream_compute_lower_max (&vh->view_list);
}
/* If all known views arre accounted for, this is the last one
unregistering. It frees the memory allocated for the stream as
well as all the views on this stream. */
if (unregistered_views == (stream->read_views.nr_registered_views
+ stream->write_views.nr_registered_views))
{
gomp_stream_view_list_p read_view_list = &stream->read_views.view_list;
gomp_stream_view_list_p write_view_list = &stream->write_views.view_list;
int i;
for (i = 0; i < read_view_list->nr_views; ++i)
free (read_view_list->views[i]);
for (i = 0; i < write_view_list->nr_views; ++i)
free (write_view_list->views[i]);
free (stream->buffer);
free (read_view_list->views);
free (write_view_list->views);
printf("GOMP stream %p spinned for %jd secs %jd usecs on output\n",
stream, (uintmax_t) stream->tvr.tv_sec,
(uintmax_t) stream->tvr.tv_usec);
printf("GOMP stream %p spinned for %jd secs %jd usecs on input\n",
stream, (uintmax_t) stream->tvw.tv_sec,
(uintmax_t) stream->tvw.tv_usec);
free (stream);
}
}
/* TODO */
static inline void
gomp_batchQ_unregister_view (gomp_stream_view_p view)
{
gomp_batchQ_p stream = ((gomp_batchQ_view_p) view)->stream;
printf("BatchQueue stream %p spinned for %jd secs %jd usecs on output\n",
stream, (uintmax_t) stream->tvr.tv_sec,
(uintmax_t) stream->tvr.tv_usec);
printf("BatchQueue stream %p spinned for %jd secs %jd usecs on input\n",
stream, (uintmax_t) stream->tvw.tv_sec,
(uintmax_t) stream->tvw.tv_usec);
}
/* Invoked before terminating a stream TASK, this disconnects all the
views and for all streams for which it is the last one to
disconnect from, it frees up all data structures. */
void
GOMP_stream_task_exit (void *t)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
int num_read_views = task->read_view_list.nr_views;
int num_batchQ_read_views = task->batchQ_read_view_list.nr_views;
int num_write_views = task->write_view_list.nr_views;
int num_batchQ_write_views = task->batchQ_write_view_list.nr_views;
int i, res;
debug_log_init ("GOMP_stream_task_exit %zu %zu\n", (size_t) t, (size_t) t);
res = __sync_sub_and_fetch (&task->num_instances, 1);
if (res == 0)
{
for (i = 0; i < num_read_views; ++i)
gomp_stream_unregister_view (task->read_view_list.views[i]);
for (i = 0; i < num_batchQ_read_views; ++i)
gomp_batchQ_unregister_view (task->batchQ_read_view_list.views[i]);
for (i = 0; i < num_write_views; ++i)
gomp_stream_unregister_view (task->write_view_list.views[i]);
for (i = 0; i < num_batchQ_write_views; ++i)
gomp_batchQ_unregister_view (task->batchQ_write_view_list.views[i]);
free (task->read_view_list.views);
free (task->batchQ_read_view_list.views);
free (task->write_view_list.views);
free (task->batchQ_write_view_list.views);
free (task);
}
}
/* Get info on the amount of work/data available in the stream
starting from INDEX and considering a constant burst of size BURST.
This function does not wait, except in case there is no work yet,
but the EOS flag has not yet been set. The function only returns 0
on termination. */
unsigned long long
GOMP_stream_get_available_work (void *t, unsigned long long *start_idx)
{
gomp_stream_task_p task = (gomp_stream_task_p) t;
long result = 0;
unsigned long long start;
/* Atomically acquire a range, then wait until the range is either
fully available or termination occurs. */
start = __sync_fetch_and_add (&task->first_unassigned_activation_counter,
AGGREGATION_FACTOR);
result = task->activation_counter - start;
debug_log_init3 ("GOMP_stream_get_available_work [entry] %llu %llu \t %15zu\n", start, task->activation_counter, (size_t) task);
if (result >= AGGREGATION_FACTOR)
{
*start_idx = start;
return AGGREGATION_FACTOR;
}
while (result < AGGREGATION_FACTOR)
{
if (task->termination_flag)
{
__sync_synchronize ();
result = task->activation_counter - start;
debug_log ("GOMP_stream_get_available_work [final] %llu %ld\n", start, result);
*start_idx = start;
if (result > AGGREGATION_FACTOR)
return AGGREGATION_FACTOR;
return (result > 0) ? result : 0;
}
result = task->activation_counter - start;
}
debug_log ("GOMP_stream_get_available_work %llu %ld\n", start, result);
*start_idx = start;
return AGGREGATION_FACTOR;
}
/* TODO? GOMP_batchQ_get_available_work */
/* Initialize streaming in this region. */
void
GOMP_stream_init ()
{
/* Add self to ensure at least one member of the team barrier will
be waiting for the streaming tasks. */
gomp_stream_tasks_count = 1;
gomp_barrier_init (&gomp_stream_tasks_exit_barrier, gomp_stream_tasks_count);
}
/* No GOMP_batchQ_init */
/* Wait until all streaming threads complete. */
void
GOMP_stream_exit ()
{
gomp_barrier_wait (&gomp_stream_tasks_exit_barrier);
}
/* No GOMP_batchQ_exit */
/* Request SIZE bytes for a PRE operator on stream S. Return a
pointer where data should be stored. */
void *
GOMP_stream_pre (void *s, const unsigned long long size)
{
gomp_stream_p stream = (gomp_stream_p) s;
debug_log_init ("GOMP_stream_pre %zu \t %llu\n", (size_t) s, size);
stream->pre_shift = size;
stream->write_views.current_min = size;
return stream->buffer;
}
void *
GOMP_batchQ_pre (void *s, const unsigned long long size)
{
gomp_batchQ_p stream = (gomp_batchQ_p) s;
debug_log_init ("GOMP_stream_pre %zu \t %llu\n", (size_t) s, size);
gomp_fatal ("GOMP_batchQ_pre not supported now: missing code in GOMP_batchQ_commit and GOMP_batchQ_stall\n");
stream->pre_shift = size;
return (void *) stream->sender_ptr;
}
/* This function is a pthread_create entry point for streaming
tasks. */
static void *
gomp_stream_thread_start (void *xdata)
{
struct gomp_stream_thread_start_data *data = xdata;
void (*local_fn) (void *);
void *local_data;
local_fn = data->fn;
local_data = data->fn_data;
local_fn (local_data);
gomp_barrier_wait_last (&gomp_stream_tasks_exit_barrier);
debug_log_init ("** exiting task: %d (%u)\n", data->id, gomp_stream_tasks_count);
return NULL;
}
/* Called for starting a streaming task. These tasks do not partake
in existing thread teams and are not subject to scheduling
points. */
void
GOMP_stream_task (void (*fn) (void *), void *data,
void (*cpyfn) (void *, void *),
long arg_size, long arg_align,
long num_instances, bool auto_replicable)
{
pthread_attr_t thread_attr, *attr;
pthread_t pt;
int err, i, base_id;
char *arg, *buf;
base_id = __sync_fetch_and_add (&gomp_stream_tasks_count, num_instances);
gomp_barrier_reinit (&gomp_stream_tasks_exit_barrier,
gomp_stream_tasks_count);
debug_log_init ("** adding tasks: %ld (%u)\n", num_instances, gomp_stream_tasks_count);
attr = &gomp_thread_attr;
if (__builtin_expect (gomp_cpu_affinity != NULL, 0))
{
size_t stacksize;
pthread_attr_init (&thread_attr);
pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED);
if (! pthread_attr_getstacksize (&gomp_thread_attr, &stacksize))
pthread_attr_setstacksize (&thread_attr, stacksize);
attr = &thread_attr;
#if 0 /* This should be handled separately ... we will have to build a
stream mapping and prevent other OMP threads from touching
the cores running streaming tasks. */
gomp_init_thread_affinity (attr);
#endif
}
for (i = 0; i < num_instances; ++i)
{
struct gomp_stream_thread_start_data *start_data;
buf = (char *) gomp_malloc (arg_size + arg_align - 1);
arg = (char *) (((uintptr_t) (buf) + arg_align - 1)
& ~(uintptr_t) (arg_align - 1));
if (cpyfn)
cpyfn (arg, data);
else
memcpy (arg, data, arg_size);
start_data = gomp_malloc (sizeof (struct gomp_stream_thread_start_data));
start_data->fn = fn;
start_data->fn_data = arg;
start_data->id = base_id + i;
err = pthread_create (&pt, attr, gomp_stream_thread_start, start_data);
if (err != 0)
gomp_fatal ("Thread creation failed: %s", strerror (err));
}
if (__builtin_expect (gomp_cpu_affinity != NULL, 0))
pthread_attr_destroy (&thread_attr);
}