2010-10-05 14:58:55 +02:00
/* Copyright (C) 2010 Free Software Foundation, Inc.
Contributed by Antoniu Pop < antoniu . pop @ gmail . com > .
This file is part of the GNU OpenMP Library ( libgomp ) .
Libgomp is free software ; you can redistribute it and / or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation ; either version 2.1 of the License , or
( at your option ) any later version .
Libgomp is distributed in the hope that it will be useful , but WITHOUT ANY
WARRANTY ; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE . See the GNU Lesser General Public License for
more details .
You should have received a copy of the GNU Lesser General Public License
along with libgomp ; see the file COPYING . LIB . If not , write to the
Free Software Foundation , Inc . , 51 Franklin Street , Fifth Floor , Boston ,
MA 02110 - 1301 , USA . */
/* As a special exception, if you link this library with other files, some
of which are compiled with GCC , to produce an executable , this library
does not by itself cause the resulting executable to be covered by the
GNU General Public License . This exception does not however invalidate
any other reasons why the executable file might be covered by the GNU
General Public License . */
/* This implements the stream communication layer for libGOMP. */
# include <stdlib.h>
# include <string.h>
# include <stdio.h>
# include "wait.h"
# include "sem.h"
# include "mutex.h"
# include "libgomp.h"
2011-06-14 17:13:31 +02:00
# define AGGREGATION_FACTOR 32
//#define debug_log_init(S, V1, V2) printf (S, V1, V2); fflush (stdout)
# define debug_log_init(S, V1, V2)
//#define debug_log_init3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout)
# define debug_log_init3(S, V1, V2, V3)
//#define debug_log(S, V1, V2) printf (S, V1, V2); fflush (stdout)
# define debug_log(S, V1, V2)
gomp_barrier_t gomp_stream_tasks_wait_until_connected_barrier ;
gomp_barrier_t gomp_stream_tasks_exit_barrier ;
unsigned gomp_stream_tasks_count ;
/* This structure is used to communicate across pthread_create. */
struct gomp_stream_thread_start_data
{
void ( * fn ) ( void * ) ;
void * fn_data ;
int id ;
} ;
2010-10-05 14:58:55 +02:00
/* Data structures creation and pipeline initialization. */
/* Allocate and initialize a GOMP_STREAM for data elements of size
ELEMENT_SIZE using a circular buffer of STREAM_BUFFER_SIZE such
elements . Returns a pointer to the newly allocated stream . The
user may provide a pointer to pre - allocated memory to be used as
BUFFER for the stream . */
void *
2011-06-14 17:13:31 +02:00
GOMP_stream_create_stream ( size_t element_size , size_t buffer_size )
2010-10-05 14:58:55 +02:00
{
gomp_stream_p stream = ( gomp_stream_p ) gomp_malloc ( sizeof ( gomp_stream_t ) ) ;
2011-06-14 17:13:31 +02:00
debug_log_init ( " GOMP_stream_create_stream %zu %zu \n " , element_size , buffer_size ) ;
if ( buffer_size < 2 )
gomp_fatal ( " GOMP_stream: insufficient stream buffer size. " ) ;
2010-10-05 14:58:55 +02:00
/* Initialize and allocate the data buffer. We force the
buffer_size to be a power of 2 for efficient modulo computation
of the indices in the circular buffer . */
2011-06-14 17:13:31 +02:00
/* To avoid excessive multiplication operations, we convert the
accounting from elements to bytes . */
buffer_size * = element_size ;
2010-10-05 14:58:55 +02:00
stream - > buffer_size = 1 ;
2011-06-14 17:13:31 +02:00
while ( stream - > buffer_size < buffer_size )
2010-10-05 14:58:55 +02:00
stream - > buffer_size < < = 1 ;
stream - > buffer_mask = stream - > buffer_size - 1 ;
2011-06-14 17:13:31 +02:00
stream - > buffer =
( void * ) gomp_malloc ( stream - > buffer_size * 2 ) ;
2010-10-05 14:58:55 +02:00
stream - > expected_ready_p = false ;
stream - > connected_p = false ;
stream - > eos_p = false ;
2011-06-14 17:13:31 +02:00
stream - > pre_shift = 0 ;
2011-09-08 15:20:05 +02:00
stream - > unregistered_views = 0 ;
2010-10-05 14:58:55 +02:00
/* Initialize the view_handles. */
stream - > read_views . current_min = stream - > buffer_size ;
stream - > read_views . current_max = 0 ;
stream - > read_views . view_list . views = NULL ;
stream - > read_views . view_list . nr_views = 0 ;
stream - > read_views . view_list . size = 0 ;
stream - > read_views . nr_expected_views = 0 ;
stream - > read_views . nr_registered_views = 0 ;
stream - > read_views . nr_unregistered_views = 0 ;
2011-06-14 17:13:31 +02:00
gomp_mutex_init ( & stream - > read_views . view_list . connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
stream - > write_views . current_min = 0 ;
stream - > write_views . current_max = stream - > buffer_size ;
stream - > write_views . view_list . views = NULL ;
stream - > write_views . view_list . nr_views = 0 ;
stream - > write_views . view_list . size = 0 ;
stream - > write_views . nr_expected_views = 0 ;
stream - > write_views . nr_registered_views = 0 ;
stream - > write_views . nr_unregistered_views = 0 ;
2011-06-14 17:13:31 +02:00
gomp_mutex_init ( & stream - > write_views . view_list . connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
# ifndef HAVE_SYNC_BUILTINS
gomp_mutex_init ( & stream - > stream_mutex ) ;
# endif
return stream ;
}
/* Allocate and initialize a generic GOMP_STREAM_VIEW that can be
connected to any stream to give either read or write access
depending on its TYPE . Returns a pointer to the newly allocated
2011-06-14 17:13:31 +02:00
view . This view accesses VIEW_SIZE bytes in the stream and
commits / releases BURST_SIZE bytes per activation . */
2010-10-05 14:58:55 +02:00
static inline void *
2011-06-14 17:13:31 +02:00
gomp_stream_create_view ( int type , size_t view_size , size_t burst_size )
2010-10-05 14:58:55 +02:00
{
gomp_stream_view_p view =
( gomp_stream_view_p ) gomp_malloc ( sizeof ( gomp_stream_view_t ) ) ;
view - > lower_index = 0 ;
view - > upper_index = 0 ;
view - > stream = NULL ;
view - > end_p = false ;
view - > type = type ;
view - > local_min_value = 0 ;
2011-06-14 17:13:31 +02:00
view - > view_size = view_size ;
view - > burst_size = burst_size ;
view - > pxxk_size = view_size - burst_size ;
2010-10-05 14:58:55 +02:00
return view ;
}
2011-06-14 17:13:31 +02:00
/* Wrapper for creating a READ view . */
2010-10-05 14:58:55 +02:00
void *
2011-06-14 17:13:31 +02:00
GOMP_stream_create_read_view ( size_t view_size , size_t burst_size )
2010-10-05 14:58:55 +02:00
{
2011-06-14 17:13:31 +02:00
debug_log_init ( " GOMP_stream_create_read_view %zu %zu \n " , view_size , burst_size ) ;
return gomp_stream_create_view ( READ_VIEW , view_size , burst_size ) ;
2010-10-05 14:58:55 +02:00
}
/* Wrapper for creating a WRITE view. */
void *
2011-06-14 17:13:31 +02:00
GOMP_stream_create_write_view ( size_t view_size , size_t burst_size )
2010-10-05 14:58:55 +02:00
{
2011-06-14 17:13:31 +02:00
debug_log_init ( " GOMP_stream_create_write_view %zu %zu \n " , view_size , burst_size ) ;
return gomp_stream_create_view ( WRITE_VIEW , view_size , burst_size ) ;
2010-10-05 14:58:55 +02:00
}
/* Allocate and initialize a GOMP_STREAM_TASK data structure. */
void *
2011-06-14 17:13:31 +02:00
GOMP_stream_create_task ( )
2010-10-05 14:58:55 +02:00
{
2011-06-14 17:13:31 +02:00
gomp_stream_task_p task =
2010-10-05 14:58:55 +02:00
( gomp_stream_task_p ) gomp_malloc ( sizeof ( gomp_stream_task_t ) ) ;
2011-06-14 17:13:31 +02:00
debug_log_init3 ( " GOMP_stream_create_task %d %d \t %15zu \n " , 0 , 0 , ( size_t ) task ) ;
2010-10-05 14:58:55 +02:00
task - > read_view_list . views = NULL ;
task - > read_view_list . nr_views = 0 ;
task - > read_view_list . size = 0 ;
2011-06-14 17:13:31 +02:00
gomp_mutex_init ( & task - > read_view_list . connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
task - > write_view_list . views = NULL ;
task - > write_view_list . nr_views = 0 ;
task - > write_view_list . size = 0 ;
2011-06-14 17:13:31 +02:00
gomp_mutex_init ( & task - > write_view_list . connect_view_mutex ) ;
task - > activation_counter = 0 ;
task - > termination_flag = false ;
task - > first_unassigned_activation_counter = 0 ;
task - > num_instances = 0 ;
__sync_synchronize ( ) ;
2010-10-05 14:58:55 +02:00
return task ;
}
2011-06-14 17:13:31 +02:00
volatile void *
GOMP_stream_get_task_activation_counter ( void * t )
{
gomp_stream_task_p task = ( gomp_stream_task_p ) t ;
return & ( task - > activation_counter ) ;
}
void
GOMP_stream_set_task_termination_flag ( void * t )
{
gomp_stream_task_p task = ( gomp_stream_task_p ) t ;
task - > termination_flag = true ;
}
void
GOMP_stream_task_add_instance ( void * t )
{
gomp_stream_task_p task = ( gomp_stream_task_p ) t ;
__sync_fetch_and_add ( & task - > num_instances , 1 ) ;
__sync_synchronize ( ) ;
}
2010-10-05 14:58:55 +02:00
/* Declare additional READ_VIEWS and WRITE_VIEWS expected views on
stream S . When possible , the thread that creates the streaming
tasks should declare , for each stream , the number of read / write
views that will connect to a stream before the streaming tasks are
started . If this function is called on a stream , there will be no
further checks for the number of tasks partaking in the
initialization synchronization . */
void
GOMP_stream_add_expected_views ( void * s , int read_views , int write_views ,
int final )
{
gomp_stream_p stream = ( gomp_stream_p ) s ;
2011-06-14 17:13:31 +02:00
debug_log_init ( " GOMP_stream_add_expected_views %d %d \n " , read_views , write_views ) ;
2010-10-05 14:58:55 +02:00
if ( stream - > expected_ready_p )
gomp_fatal
( " GOMP_stream: attempting to modify a final number of expected views. " ) ;
stream - > expected_ready_p = final ;
# ifdef HAVE_SYNC_BUILTINS
__sync_fetch_and_add ( & stream - > read_views . nr_expected_views , read_views ) ;
__sync_fetch_and_add ( & stream - > write_views . nr_expected_views , write_views ) ;
# else
gomp_mutex_lock ( & stream - > stream_mutex ) ;
stream - > read_views . nr_expected_views + = read_views ;
stream - > write_views . nr_expected_views + = write_views ;
gomp_mutex_unlock ( & stream - > stream_mutex ) ;
# endif
}
/* Add VIEW to the VIEW_LIST. We actually use an array as this list
is only modified in the initialization phase and we never remove
any items from it . */
static inline void
gomp_stream_add_view_to_list ( gomp_stream_view_p view ,
gomp_stream_view_list_p view_list )
{
/* Allocate memory when needed. */
if ( view_list - > views = = NULL | | view_list - > nr_views = = view_list - > size )
{
if ( view_list - > size = = 0 )
view_list - > size = 4 ;
if ( view_list - > nr_views = = view_list - > size )
view_list - > size < < = 1 ;
view_list - > views =
( gomp_stream_view_p * ) gomp_realloc ( view_list - > views ,
view_list - > size * sizeof ( gomp_stream_view_p ) ) ;
}
view_list - > views [ view_list - > nr_views ] = view ;
view_list - > nr_views + = 1 ;
}
/* Connect a VIEW to a STREAM and also to the TASK which will use it.
This effectively builds the runtime task graph . */
void
GOMP_stream_connect_view ( void * t , void * s , void * v )
{
gomp_stream_task_p task = ( gomp_stream_task_p ) t ;
gomp_stream_p stream = ( gomp_stream_p ) s ;
gomp_stream_view_p view = ( gomp_stream_view_p ) v ;
gomp_stream_view_handle_p vh = ( view - > type = = READ_VIEW ) ?
& stream - > read_views : & stream - > write_views ;
gomp_stream_view_list_p stream_list = & vh - > view_list ;
gomp_stream_view_list_p task_list = ( view - > type = = READ_VIEW ) ?
& task - > read_view_list : & task - > write_view_list ;
view - > stream = stream ;
/* A read view's lower index is shifted by the buffer_size as the
stream is initially empty . This is equivalent to releasing the
original buffer_size elements . A write view will start with
buffer_size free space . */
if ( view - > type = = READ_VIEW )
view - > lower_index = stream - > buffer_size ;
else
view - > local_min_value = stream - > buffer_size ;
/* Register the view with the TASK to which it belongs. This
operation is local to the task , so there is no need to
synchronize . */
2011-06-14 17:13:31 +02:00
gomp_mutex_lock ( & task_list - > connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
gomp_stream_add_view_to_list ( view , task_list ) ;
2011-06-14 17:13:31 +02:00
gomp_mutex_unlock ( & task_list - > connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
/* Connect the view to the stream. This must be done atomically as
this data structure is shared with the other producer / consumer
tasks . */
2011-06-14 17:13:31 +02:00
gomp_mutex_lock ( & vh - > view_list . connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
gomp_stream_add_view_to_list ( view , stream_list ) ;
2011-06-14 17:13:31 +02:00
gomp_mutex_unlock ( & vh - > view_list . connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
__sync_fetch_and_add ( & vh - > nr_registered_views , 1 ) ;
}
/* Check whether all the expected views on STREAM have already
connected . */
static inline bool
gomp_stream_check_connected ( gomp_stream_p stream )
{
if ( ! stream - > expected_ready_p )
return false ;
if ( stream - > connected_p )
return true ;
if ( ( stream - > read_views . view_list . nr_views
= = stream - > read_views . nr_expected_views )
& & ( stream - > write_views . view_list . nr_views
= = stream - > write_views . nr_expected_views ) )
{
stream - > connected_p = true ;
return true ;
}
return false ;
}
/* Wait until all the streams to which TASK connects are ready and
connected to all producer / consumer tasks . */
void
GOMP_stream_wait_until_connected ( void * t )
{
gomp_stream_task_p task = ( gomp_stream_task_p ) t ;
bool done ;
2011-06-14 17:13:31 +02:00
int i ;
2010-10-05 14:58:55 +02:00
do
{
done = true ;
2011-06-14 17:13:31 +02:00
gomp_mutex_lock ( & task - > read_view_list . connect_view_mutex ) ;
for ( i = 0 ; i < task - > read_view_list . nr_views ; + + i )
2010-10-05 14:58:55 +02:00
if ( ! gomp_stream_check_connected ( task - > read_view_list . views [ i ] - > stream ) )
done = false ;
2011-06-14 17:13:31 +02:00
gomp_mutex_unlock ( & task - > read_view_list . connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
gomp_mutex_lock ( & task - > write_view_list . connect_view_mutex ) ;
for ( i = 0 ; i < task - > write_view_list . nr_views ; + + i )
2010-10-05 14:58:55 +02:00
if ( ! gomp_stream_check_connected ( task - > write_view_list . views [ i ] - > stream ) )
done = false ;
2011-06-14 17:13:31 +02:00
gomp_mutex_unlock ( & task - > write_view_list . connect_view_mutex ) ;
2010-10-05 14:58:55 +02:00
}
while ( ! done ) ;
2011-06-14 17:13:31 +02:00
debug_log_init ( " GOMP_stream_wait_until_connected %zu %zu \n " , ( size_t ) task , ( size_t ) task ) ;
2010-10-05 14:58:55 +02:00
}
/* Stream communication/synchronization. */
/* Compute the minimum of the LOWER_INDEX fields of all views in the
LIST of views . This is used during the termination phase to give
access to the readers up to the highest committed index . This is
only useful when producers , not too far apart in indices they
committed , forget to commit up to the last index that should appear
in the stream . */
static inline unsigned long long
gomp_stream_compute_lower_max ( gomp_stream_view_list_p list )
{
unsigned long long local_max = 0 ;
gomp_stream_view_p * views = list - > views ;
int i ;
for ( i = 0 ; i < list - > nr_views ; + + i )
if ( views [ i ] - > lower_index > local_max )
local_max = views [ i ] - > lower_index ;
return local_max ;
}
/* Compute the minimum of the LOWER_INDEX fields of all views in the
LIST of views . For a LIST of read views , this minimum represents
the highest index released by all read views on a stream ( i . e . the
index of elements that all consumers have already discarded ) and
therefore the highest index a write view will be allowed to acquire
for writing . For a LIST of write views , this minimum represents
the highest index all producers have committed and thus the highest
index available for reading . */
static inline unsigned long long
gomp_stream_compute_lower_min ( gomp_stream_view_list_p list )
{
unsigned long long local_min = GOMP_STREAM_MAX_INDEX ;
gomp_stream_view_p * views = list - > views ;
int i ;
for ( i = 0 ; i < list - > nr_views ; + + i )
if ( views [ i ] - > lower_index < local_min )
local_min = views [ i ] - > lower_index ;
return local_min ;
}
/* Compute the minimum of the UPPER_INDEX fields of all views in the
LIST of views . Similar to the above , but this is only a hint on
the resources that another producer ( resp . consumer ) has already
acquired . If a producer ( resp . consumer ) has successfully acquired
an index with GOMP_stream_stall ( resp . GOMP_stream_update ) for
writing ( resp . reading ) , then all other producers ( resp . consumers )
on the same stream can access up to the same index without further
verification . */
static inline unsigned long long
gomp_stream_compute_upper_min ( gomp_stream_view_list_p list )
{
unsigned long long local_min = GOMP_STREAM_MAX_INDEX ;
gomp_stream_view_p * views = list - > views ;
int i ;
for ( i = 0 ; i < list - > nr_views ; + + i )
if ( views [ i ] - > upper_index < local_min )
local_min = views [ i ] - > upper_index ;
return local_min ;
}
/* Wait until the producers (resp. consumers) on this stream have
committed ( resp . released ) up to the INDEX position in the stream .
When that hapens , the consumer ( resp . producer ) connected to the
stream through VIEW is allowed to access the elements up to
INDEX . */
static inline void
gomp_stream_wait_release ( gomp_stream_view_p view ,
gomp_stream_view_handle_p vh ,
const unsigned long long index )
{
/* Test whether someone already got a hold of a bigger index
yet . */
if ( view - > local_min_value < index )
{
while ( vh - > current_min < index & & ! view - > stream - > eos_p )
{
unsigned long long local_min =
gomp_stream_compute_lower_min ( & vh - > view_list ) ;
if ( vh - > current_min = = local_min )
__asm volatile ( " pause " : : : " memory " ) ;
else
vh - > current_min = local_min ;
}
view - > local_min_value = vh - > current_min ;
}
}
/* Request read access for the view V to the stream up to INDEX. In
case the producers have finished and there is not enough data , the
returned value is the highest index to which the view is allowed to
access the stream . */
2011-06-14 17:13:31 +02:00
void *
GOMP_stream_update ( void * v , const unsigned long long act_start ,
const unsigned long long act_end )
2010-10-05 14:58:55 +02:00
{
2011-06-14 17:13:31 +02:00
unsigned long long low_idx , up_idx ;
size_t low_idx_loc , up_idx_loc ;
2010-10-05 14:58:55 +02:00
gomp_stream_view_p view = ( gomp_stream_view_p ) v ;
2011-06-14 17:13:31 +02:00
gomp_stream_p stream = view - > stream ;
void * buffer_pointer ;
debug_log ( " GOMP_stream_update [in] %llu %llu \n " , act_start , act_end ) ;
/* This update requests access to the buffer in [low_idx,up_idx[.
We will release up to low_idx - 1 and acquire up to up_idx - 1. */
low_idx = act_start * view - > burst_size ;
up_idx = act_end * view - > burst_size + view - > pxxk_size - 1 ;
if ( up_idx - low_idx > stream - > buffer_size )
gomp_fatal ( " GOMP_stream: update requested access to more than buffer_size data. " ) ;
view - > lower_index = low_idx + view - > stream - > buffer_size ;
view - > upper_index = up_idx ;
2010-10-05 14:58:55 +02:00
/* In case another consumer has received permission to read up to a
yet higher index , then there is no need to check for this one . */
2011-06-14 17:13:31 +02:00
if ( up_idx > view - > stream - > read_views . current_max )
2010-10-05 14:58:55 +02:00
{
2011-06-14 17:13:31 +02:00
gomp_stream_wait_release ( view , & view - > stream - > write_views , up_idx ) ;
view - > stream - > read_views . current_max = up_idx ;
}
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
low_idx_loc = low_idx & stream - > buffer_mask ;
up_idx_loc = up_idx & stream - > buffer_mask ;
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
/* Once we know enough data is available for reading, we need to
check whether the data between the lower and upper buonds is
contiguous or if the buffer wrap - around occurs in the middle . */
if ( low_idx_loc > up_idx_loc )
{
/* FIXME: does this require synchronization or is concurrent
overwriting acceptable as long as enough data has been copied
at the end ? */
memcpy ( stream - > buffer + stream - > buffer_size , stream - > buffer ,
up_idx_loc + 1 ) ;
//printf ("Update copy: (%llu,%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_start, act_end, low_idx, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1);
2010-10-05 14:58:55 +02:00
}
2011-06-14 17:13:31 +02:00
/* We return a pointer to a contiguous array where this view is
guaranteed access to all the requested data . */
buffer_pointer = stream - > buffer + low_idx_loc ;
debug_log ( " GOMP_stream_update [out] %llu %llu \n " , act_start , act_end ) ;
return buffer_pointer ;
2010-10-05 14:58:55 +02:00
}
/* Request write access for the view V to the stream up to INDEX. */
2011-06-14 17:13:31 +02:00
void *
GOMP_stream_stall ( void * v , const unsigned long long act_start ,
const unsigned long long act_end )
2010-10-05 14:58:55 +02:00
{
2011-06-14 17:13:31 +02:00
unsigned long long low_idx , up_idx ;
2010-10-05 14:58:55 +02:00
gomp_stream_view_p view = ( gomp_stream_view_p ) v ;
2011-06-14 17:13:31 +02:00
gomp_stream_p stream = view - > stream ;
void * buffer_pointer ;
debug_log ( " GOMP_stream_stall [in] %llu %llu \n " , act_start , act_end ) ;
/* This update requests access to the buffer in [low_idx,up_idx[.
We will release up to low_idx - 1 and acquire up to up_idx - 1. */
low_idx = act_start * view - > burst_size + stream - > pre_shift ;
up_idx = act_end * view - > burst_size + view - > pxxk_size + stream - > pre_shift - 1 ;
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
if ( up_idx - low_idx > stream - > buffer_size )
2010-10-05 14:58:55 +02:00
{
2011-06-14 17:13:31 +02:00
fprintf ( stderr , " Requesting data from low: %llu to up: %llu act [%llu,%llu] for burst:%zu size: %zu \n " , low_idx , up_idx , act_start , act_end , view - > burst_size , view - > view_size ) ;
gomp_fatal ( " GOMP_stream: stall requested access to more than buffer_size data. " ) ;
2010-10-05 14:58:55 +02:00
}
2011-06-14 17:13:31 +02:00
/* We do not need to worry about wrap-around copying as this
" commit " only means that we do not want to write to those
indices below low_idx . */
view - > lower_index = low_idx ;
view - > upper_index = up_idx ;
if ( up_idx > stream - > write_views . current_max )
{
gomp_stream_wait_release ( view , & stream - > read_views , up_idx ) ;
stream - > write_views . current_max = up_idx ;
}
buffer_pointer = stream - > buffer + ( low_idx & stream - > buffer_mask ) ;
debug_log ( " GOMP_stream_stall [out] %llu %llu \n " , act_start , act_end ) ;
return buffer_pointer ;
2010-10-05 14:58:55 +02:00
}
/* Relinquish read access for the view V to the stream up to
INDEX . */
void
2011-06-14 17:13:31 +02:00
GOMP_stream_release ( void * v , const unsigned long long act_idx )
2010-10-05 14:58:55 +02:00
{
gomp_stream_view_p view = ( gomp_stream_view_p ) v ;
2011-06-14 17:13:31 +02:00
view - > lower_index = act_idx * view - > burst_size + view - > stream - > buffer_size - 1 ;
debug_log ( " GOMP_stream_release %llu %llu \n " , act_idx , act_idx ) ;
2010-10-05 14:58:55 +02:00
}
/* Relinquish write access for the view V to the stream up to
INDEX . */
void
2011-06-14 17:13:31 +02:00
GOMP_stream_commit ( void * v , const unsigned long long act_idx )
2010-10-05 14:58:55 +02:00
{
gomp_stream_view_p view = ( gomp_stream_view_p ) v ;
2011-06-14 17:13:31 +02:00
gomp_stream_p stream = view - > stream ;
unsigned long long up_idx = act_idx * view - > burst_size + stream - > pre_shift - 1 ;
size_t low_idx_loc , up_idx_loc ;
low_idx_loc = view - > lower_index & stream - > buffer_mask ;
up_idx_loc = up_idx & stream - > buffer_mask ;
/* Once we know enough data is available for reading, we need to
check whether the data between the lower and upper buonds is
contiguous or if the buffer wrap - around occurs in the middle . */
if ( low_idx_loc > up_idx_loc )
{
/* FIXME: does this require synchronization or is concurrent
overwriting acceptable as long as enough data has been copied
at the end ? */
memcpy ( stream - > buffer , stream - > buffer + stream - > buffer_size ,
up_idx_loc + 1 ) ;
//printf ("Commit copy: (%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_idx, view->lower_index, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1);
}
view - > lower_index = up_idx ;
debug_log ( " GOMP_stream_commit %llu %llu \n " , act_idx , act_idx ) ;
2010-10-05 14:58:55 +02:00
}
/* Finalization and destruction of the streaming data structures. */
/* Disconnects VIEW from the stream to which it is connected and free
the stream if it was the last task to disconnect . */
static inline void
gomp_stream_unregister_view ( gomp_stream_view_p view )
{
gomp_stream_p stream = view - > stream ;
gomp_stream_view_handle_p vh =
( view - > type = = READ_VIEW ) ? & stream - > read_views : & stream - > write_views ;
int unregistered_views ;
__sync_fetch_and_add ( & ( vh - > nr_unregistered_views ) , 1 ) ;
unregistered_views = __sync_add_and_fetch ( & ( stream - > unregistered_views ) , 1 ) ;
/* Make sure that when multiple views access a stream, the finished
views do not hinder the others in the min computation . */
if ( view - > type = = READ_VIEW )
GOMP_stream_release ( view , GOMP_STREAM_MAX_INDEX ) ;
/* The last producer exiting will set the eos_p flag and allow the
consumers to read up to the highest committed index . */
else if ( vh - > nr_unregistered_views = = vh - > nr_registered_views )
{
stream - > eos_p = true ;
vh - > current_min = gomp_stream_compute_lower_max ( & vh - > view_list ) ;
}
/* If all known views arre accounted for, this is the last one
unregistering . It frees the memory allocated for the stream as
well as all the views on this stream . */
if ( unregistered_views = = ( stream - > read_views . nr_registered_views
+ stream - > write_views . nr_registered_views ) )
{
gomp_stream_view_list_p read_view_list = & stream - > read_views . view_list ;
gomp_stream_view_list_p write_view_list = & stream - > write_views . view_list ;
int i ;
for ( i = 0 ; i < read_view_list - > nr_views ; + + i )
free ( read_view_list - > views [ i ] ) ;
for ( i = 0 ; i < write_view_list - > nr_views ; + + i )
free ( write_view_list - > views [ i ] ) ;
free ( stream - > buffer ) ;
free ( read_view_list - > views ) ;
free ( write_view_list - > views ) ;
free ( stream ) ;
}
}
/* Invoked before terminating a stream TASK, this disconnects all the
views and for all streams for which it is the last one to
disconnect from , it frees up all data structures . */
void
GOMP_stream_task_exit ( void * t )
{
gomp_stream_task_p task = ( gomp_stream_task_p ) t ;
int num_read_views = task - > read_view_list . nr_views ;
int num_write_views = task - > write_view_list . nr_views ;
2011-06-14 17:13:31 +02:00
int i , res ;
debug_log_init ( " GOMP_stream_task_exit %zu %zu \n " , ( size_t ) t , ( size_t ) t ) ;
res = __sync_sub_and_fetch ( & task - > num_instances , 1 ) ;
if ( res = = 0 )
{
for ( i = 0 ; i < num_read_views ; + + i )
gomp_stream_unregister_view ( task - > read_view_list . views [ i ] ) ;
for ( i = 0 ; i < num_write_views ; + + i )
gomp_stream_unregister_view ( task - > write_view_list . views [ i ] ) ;
free ( task - > read_view_list . views ) ;
free ( task - > write_view_list . views ) ;
free ( task ) ;
}
}
/* Get info on the amount of work/data available in the stream
starting from INDEX and considering a constant burst of size BURST .
This function does not wait , except in case there is no work yet ,
but the EOS flag has not yet been set . The function only returns 0
on termination . */
unsigned long long
GOMP_stream_get_available_work ( void * t , unsigned long long * start_idx )
{
gomp_stream_task_p task = ( gomp_stream_task_p ) t ;
long result = 0 ;
unsigned long long start ;
/* Atomically acquire a range, then wait until the range is either
fully available or termination occurs . */
start = __sync_fetch_and_add ( & task - > first_unassigned_activation_counter ,
AGGREGATION_FACTOR ) ;
result = task - > activation_counter - start ;
debug_log_init3 ( " GOMP_stream_get_available_work [entry] %llu %llu \t %15zu \n " , start , task - > activation_counter , ( size_t ) task ) ;
if ( result > = AGGREGATION_FACTOR )
{
* start_idx = start ;
return AGGREGATION_FACTOR ;
}
while ( result < AGGREGATION_FACTOR )
{
if ( task - > termination_flag )
{
__sync_synchronize ( ) ;
result = task - > activation_counter - start ;
debug_log ( " GOMP_stream_get_available_work [final] %llu %ld \n " , start , result ) ;
* start_idx = start ;
if ( result > AGGREGATION_FACTOR )
return AGGREGATION_FACTOR ;
return ( result > 0 ) ? result : 0 ;
}
result = task - > activation_counter - start ;
}
debug_log ( " GOMP_stream_get_available_work %llu %ld \n " , start , result ) ;
* start_idx = start ;
return AGGREGATION_FACTOR ;
}
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
/* Initialize streaming in this region. */
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
void
GOMP_stream_init ( )
{
/* Add self to ensure at least one member of the team barrier will
be waiting for the streaming tasks . */
gomp_stream_tasks_count = 1 ;
gomp_barrier_init ( & gomp_stream_tasks_exit_barrier , gomp_stream_tasks_count ) ;
}
/* Wait until all streaming threads complete. */
void
GOMP_stream_exit ( )
{
gomp_barrier_wait ( & gomp_stream_tasks_exit_barrier ) ;
}
/* Request SIZE bytes for a PRE operator on stream S. Return a
pointer where data should be stored . */
void *
GOMP_stream_pre ( void * s , const unsigned long long size )
{
gomp_stream_p stream = ( gomp_stream_p ) s ;
debug_log_init ( " GOMP_stream_pre %zu \t %llu \n " , ( size_t ) s , size ) ;
stream - > pre_shift = size ;
stream - > write_views . current_min = size ;
return stream - > buffer ;
}
/* This function is a pthread_create entry point for streaming
tasks . */
static void *
gomp_stream_thread_start ( void * xdata )
{
struct gomp_stream_thread_start_data * data = xdata ;
void ( * local_fn ) ( void * ) ;
void * local_data ;
local_fn = data - > fn ;
local_data = data - > fn_data ;
local_fn ( local_data ) ;
gomp_barrier_wait_last ( & gomp_stream_tasks_exit_barrier ) ;
debug_log_init ( " ** exiting task: %d (%u) \n " , data - > id , gomp_stream_tasks_count ) ;
return NULL ;
2010-10-05 14:58:55 +02:00
}
2011-06-14 17:13:31 +02:00
/* Called for starting a streaming task. These tasks do not partake
in existing thread teams and are not subject to scheduling
points . */
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
void
GOMP_stream_task ( void ( * fn ) ( void * ) , void * data ,
void ( * cpyfn ) ( void * , void * ) ,
long arg_size , long arg_align ,
long num_instances , bool auto_replicable )
{
pthread_attr_t thread_attr , * attr ;
pthread_t pt ;
int err , i , base_id ;
char * arg , * buf ;
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
base_id = __sync_fetch_and_add ( & gomp_stream_tasks_count , num_instances ) ;
gomp_barrier_reinit ( & gomp_stream_tasks_exit_barrier ,
gomp_stream_tasks_count ) ;
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
debug_log_init ( " ** adding tasks: %ld (%u) \n " , num_instances , gomp_stream_tasks_count ) ;
2010-10-05 14:58:55 +02:00
2011-06-14 17:13:31 +02:00
attr = & gomp_thread_attr ;
if ( __builtin_expect ( gomp_cpu_affinity ! = NULL , 0 ) )
{
size_t stacksize ;
pthread_attr_init ( & thread_attr ) ;
pthread_attr_setdetachstate ( & thread_attr , PTHREAD_CREATE_DETACHED ) ;
if ( ! pthread_attr_getstacksize ( & gomp_thread_attr , & stacksize ) )
pthread_attr_setstacksize ( & thread_attr , stacksize ) ;
attr = & thread_attr ;
#if 0 /* This should be handled separately ... we will have to build a
stream mapping and prevent other OMP threads from touching
the cores running streaming tasks . */
gomp_init_thread_affinity ( attr ) ;
# endif
}
for ( i = 0 ; i < num_instances ; + + i )
{
struct gomp_stream_thread_start_data * start_data ;
buf = ( char * ) gomp_malloc ( arg_size + arg_align - 1 ) ;
arg = ( char * ) ( ( ( uintptr_t ) ( buf ) + arg_align - 1 )
& ~ ( uintptr_t ) ( arg_align - 1 ) ) ;
if ( cpyfn )
cpyfn ( arg , data ) ;
else
memcpy ( arg , data , arg_size ) ;
start_data = gomp_malloc ( sizeof ( struct gomp_stream_thread_start_data ) ) ;
start_data - > fn = fn ;
start_data - > fn_data = arg ;
start_data - > id = base_id + i ;
err = pthread_create ( & pt , attr , gomp_stream_thread_start , start_data ) ;
if ( err ! = 0 )
gomp_fatal ( " Thread creation failed: %s " , strerror ( err ) ) ;
}
if ( __builtin_expect ( gomp_cpu_affinity ! = NULL , 0 ) )
pthread_attr_destroy ( & thread_attr ) ;
}