From c6786815cd5459746c2fa385beda634106f6346c Mon Sep 17 00:00:00 2001 From: Thomas Preud'homme Date: Mon, 30 Jan 2012 19:40:50 +0100 Subject: [PATCH] Add native algo from OpenMP stream extension Add native algorithm from OpenMP stream extension. This require adding one function in commtech.h: end_producer(). This function does nothing for all communication algorithm but gomp_stream (the algorithm added by this commit). --- communication_techniques/Makefile | 2 +- communication_techniques/include/commtech.h | 10 + .../include/gomp_stream_comm.h | 329 ++++++ .../src/communication/batch_queue.c | 5 + .../src/communication/csq.c | 5 + .../src/communication/fast_forward.c | 5 + .../src/communication/gomp_stream.c | 1025 +++++++++++++++++ .../src/communication/lamport.c | 5 + .../src/communication/mcringbuffer.c | 5 + .../src/communication/none.c | 5 + .../src/communication/pipe.c | 5 + .../src/communication/shared_mem_opt.c | 5 + communication_techniques/src/main.c | 6 + 13 files changed, 1411 insertions(+), 1 deletion(-) create mode 100644 communication_techniques/include/gomp_stream_comm.h create mode 100644 communication_techniques/src/communication/gomp_stream.c diff --git a/communication_techniques/Makefile b/communication_techniques/Makefile index 3e98577..2f5ddd6 100644 --- a/communication_techniques/Makefile +++ b/communication_techniques/Makefile @@ -25,7 +25,7 @@ BINNAMES+=batch_queue_16_comm batch_queue_32_comm batch_queue_64_comm BINNAMES+=batch_queue_128_comm batch_queue_256_comm batch_queue_512_comm BINNAMES+=batch_queue_1024_comm batch_queue_2048_comm batch_queue_4096_comm BINNAMES+=lamport_comm shared_mem_opt_comm none_comm csq_2_comm csq_64_comm -BINNAMES+=fast_forward_comm mcringbuffer_comm pipe_comm +BINNAMES+=fast_forward_comm mcringbuffer_comm pipe_comm gomp_stream_comm #BINNAMES+=jikes_barrier_comm CALCLIBSNAMES:=calc_mat calc_line calc_useless_loop BINS:=$(patsubst %,$(BINDIR)/%,$(BINNAMES)) diff --git a/communication_techniques/include/commtech.h b/communication_techniques/include/commtech.h index a73d731..c63e795 100644 --- a/communication_techniques/include/commtech.h +++ b/communication_techniques/include/commtech.h @@ -32,6 +32,16 @@ void *create_comm_channel(void); */ int destroy_comm_channel(void *); +/* + * @param channel The production channel + * @return 0 if success, -1 else + * + * Notify the communication algorithm that the producer has finished producing + * + * @comment Must be called by the producer when it has finished producing + */ +int end_producer(void *); + /* * @param channel Channel from which to receive data diff --git a/communication_techniques/include/gomp_stream_comm.h b/communication_techniques/include/gomp_stream_comm.h new file mode 100644 index 0000000..9285f77 --- /dev/null +++ b/communication_techniques/include/gomp_stream_comm.h @@ -0,0 +1,329 @@ +/* Copyright (C) 2010 Free Software Foundation, Inc. + Contributed by Antoniu Pop . + + This file is part of the GNU OpenMP Library (libgomp). + + Libgomp is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + Libgomp is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + more details. + + You should have received a copy of the GNU Lesser General Public License + along with libgomp; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/* As a special exception, if you link this library with other files, some + of which are compiled with GCC, to produce an executable, this library + does not by itself cause the resulting executable to be covered by the + GNU General Public License. This exception does not however invalidate + any other reasons why the executable file might be covered by the GNU + General Public License. */ + +/* This implements the stream communication layer for libGOMP. */ + +#ifndef GOMP_STREAM_H +#define GOMP_STREAM_H 1 +#define _CSQ_COMMON_COMM_H_ 1 + +#include + +/* Define the type and maximum value for the indices used within the + stream. The indices are strictly monotonically increasing + integers, so we need a type that does not wrap around too fast. An + implementation of the wrap-around is necessary for systems where + this poses a problem. */ +/* typedef unsigned long long int gomp_stream_index_t; */ +#define GOMP_STREAM_MAX_INDEX ULLONG_MAX + +/*#define LAZY_SYNCH + #define GUARDED_WAKE*/ + +/* +typedef enum { + GOMP_STREAM_INITIALIZED = 1, + GOMP_STREAM_ALLOCATED = 2, + GOMP_STREAM_PRODUCED = 4, + GOMP_STREAM_CONSUMED = 8, + GOMP_STREAM_ZOMBIE = 16, + GOMP_STREAM_STALL = 32 +} gomp_stream_state; +*/ + +// Things added by RoboTux +#include + +#ifndef BUF_SIZE +#define BUF_SIZE (64 * CACHE_LINE_SIZE) +#endif +#define BURST_SIZE 128 +typedef enum +{ + false, + true +} bool; + +typedef pthread_mutex_t gomp_mutex_t; + +struct channel +{ + void *stream; + void *task; + void *rview; + void *wview; + void **read_ptr __attribute__ ((aligned (CACHE_LINE_SIZE))); + void **read_end_ptr; + unsigned long long read_act; + void **write_ptr __attribute__ ((aligned (CACHE_LINE_SIZE))); + void **write_end_ptr; + unsigned long long write_act; +}; + +void * +GOMP_stream_stall (void *, const unsigned long long, + const unsigned long long); + +void +GOMP_stream_commit (void *, const unsigned long long); + +static inline void send(struct channel *channel, void **addr) +{ + if (channel->write_ptr == channel->write_end_ptr) + { + channel->write_ptr = (void **) GOMP_stream_stall(channel->wview, + channel->write_act, channel->write_act + 1); + channel->write_act++; + channel->write_end_ptr = channel->write_ptr + BURST_SIZE / sizeof(void *); + } + *channel->write_ptr++ = addr; + if (channel->write_ptr == channel->write_end_ptr) + GOMP_stream_commit(channel->wview, channel->write_act); +} +// End of things added by RoboTux + +typedef enum +{ + READ_VIEW, + WRITE_VIEW +} gomp_stream_view_type_t; + +struct gomp_stream; +struct gomp_stream_task; + + +/* GOMP_STREAM_VIEW data structure. Producer and consumer tasks + connect to a GOMP_STREAM using views. */ + +typedef struct gomp_stream_view +{ + /* The stream accessed through this view. WARNING: this field needs + to be first. */ + struct gomp_stream *stream; + + /* Size in bytes of the burst associated to this view. Later this + may become a stream. */ + size_t view_size; + size_t burst_size; + size_t pxxk_size; + + unsigned long long min_update; + unsigned long long max_update; + unsigned long long nb_updates; + unsigned long long sum_updates; + unsigned long long min_commit; + unsigned long long max_commit; + unsigned long long nb_commits; + unsigned long long sum_commits; + /* The alignment directives are needed to ensure these + high-potential false-sharing fields are on their own cache + lines. */ + /* Lower and upper bounds accessible in the stream through this + view. */ + unsigned long long lower_index __attribute__((aligned (64))); + unsigned long long upper_index __attribute__((aligned (64))); + + /* The task using this view. */ + struct gomp_stream_task *task __attribute__((aligned (64))); + + /* Setting this flag means the process accessing the stream through + this view relinquishes his access to the stream (generally when + terminating). */ + bool end_p; + + /* Type of this view (read or write). */ + gomp_stream_view_type_t type; + + /* In order to avoid excessive accesses to the global minimum index + (released or consumed), which strongly impacts cache traffic, use + this duplcated field for an additional level of caching and only + update when needed. */ + unsigned long long local_min_value; + +} gomp_stream_view_t, *gomp_stream_view_p; + +/* List of GOMP_STREAM_VIEWs. As this list is only modified in the + initialization phase and we never remove items, we'll use an + array. */ + +typedef struct gomp_stream_view_list +{ + gomp_stream_view_p *views; + int nr_views; + int size; + + /* Enforce atomic connection of the views in this list. */ + gomp_mutex_t connect_view_mutex; + +} gomp_stream_view_list_t, *gomp_stream_view_list_p; + + +/* GOMP_STREAM_VIEW_HANDLE data structure. This allows the tasks + interacting through the GOMP_STREAM to which this handle is + attached to keep updated information global to all similar views on + this stream. */ + +typedef struct gomp_stream_view_handle +{ + /* The alignment directives are needed to ensure these + high-potential false-sharing fields are on their own cache + lines. */ + /* Latest computed value of the min released index and max acquired + index values across views. */ + volatile unsigned long long current_min __attribute__((aligned (64))); + volatile unsigned long long current_max __attribute__((aligned (64))); + + /* Bookkeeping for view connections. */ + gomp_stream_view_list_t view_list __attribute__((aligned (64))); + int nr_expected_views; + int nr_registered_views; + int nr_unregistered_views; + +} gomp_stream_view_handle_t, *gomp_stream_view_handle_p; + + +/* GOMP_STREAM data structure. */ + +typedef struct gomp_stream +{ + /* WARNING: the first two fields (BUFFER and BUFFER_MASK) need to + remain in their respective positions. */ + + /* The pointer to the buffer, as well as the subsequent buffer + description, should be on a single mostly read cache line. The + EOS_P flag is only set at the very end of the use of this + stream. */ + /* Circular buffer containing the data communicated through this + stream. */ + char *buffer __attribute__((aligned (64))); + + /* Size of the buffer and the bitmask used for modulo computation + for the wrap-around. The size is expressed in basic elements for + this stream. The size in bytes of the buffer is + BUFFER_SIZE * ELEMENT_SIZE. */ + unsigned long long buffer_size; + unsigned long long buffer_mask; + unsigned long long pre_shift; + + /* True once all the tasks that should be expected to connect to + this stream been declared. */ + bool expected_ready_p; + + /* True once all the tasks expected to connect to this stream have + done so. */ + bool connected_p; + + /* End of stream: true when all producers have finished committing + all the data and are terminating. */ + bool eos_p; + + /* Handles for read and write views on this stream. */ + gomp_stream_view_handle_t read_views; + gomp_stream_view_handle_t write_views; + + /* Barrier used both for waiting for all views to connect to the + stream and to find the last view disconnecting (and who therefore + frees this stream). */ + /*gomp_barrier_t view_handling_barrier;*/ + + /* Counter of the number of total unregistered views, both read and + write, used to determine the last task deconnecting its view. + The last task will also free the data structures. */ + int unregistered_views; + +#ifndef HAVE_SYNC_BUILTINS + /* We may need a lock for atomicity if no atomic operations are + available. */ + gomp_mutex_t stream_mutex; +#endif +} gomp_stream_t, *gomp_stream_p; + + +/* GOMP_STREAM_TASK data structure. Runtime node in the task + graph. */ + +typedef struct gomp_stream_task +{ + /* Lists of views on streams used by this task. */ + gomp_stream_view_list_t read_view_list; + gomp_stream_view_list_t write_view_list; + + /* The following are used directly in the generated code and should + only be read here. A memory fence is guaranteed before the + termination flag is set to true. */ + + /* Number of activations allowed for the task. */ + volatile unsigned long long activation_counter; + volatile unsigned long long first_unassigned_activation_counter; + + int num_instances; + + /* True only when the activation counter has reached the maximum + number of activations allowed for this task. */ + volatile bool termination_flag; + +} gomp_stream_task_t, *gomp_stream_task_p; + + +#if 0 +/* GOMP_STREAM_CONTROL_STREAM data structure. Implements a simple + if-conversion analog that allows a non data-driven task to conform + to its original control dependences. */ + +typedef struct gomp_stream_control_stream +{ + /* In all cases where the streams bypass control (i.e., for + sequential control flow only), an activation counter is + sufficient to carry the control flow. */ + + /* Local counter of the number of times a task has been activated. */ + unsigned long long activation_counter __attribute__((aligned (64))); + unsigned long long local_enabled_activations; + + /* Number of times this task is allowed to activate. */ + unsigned long long enabled_activations __attribute__((aligned (64))); + + /* When a task's inputs or outputs cross a parallel control flow + boundary (i.e., worksharing construct), their activation pattern + can be sparse wrt. the actual stream of data that they share with + their sibling tasks. We use activation ranges as an optimization + to streaming the activation indexes themselves, but the two + options are equivalent. These streams are inherently 1-to-1, so + a simpler implementation of the synchronization should be used in + this case.*/ + /* gomp_stream_p activation_range_stream; */ + + /* End of stream: true when all producers have finished committing + all the data and are terminating. */ + bool eos_p; + +} gomp_stream_control_stream_t, *gomp_stream_control_stream_p; + +#endif + + +#endif /* GOMP_STREAM_H */ diff --git a/communication_techniques/src/communication/batch_queue.c b/communication_techniques/src/communication/batch_queue.c index 594874c..19a6ab6 100644 --- a/communication_techniques/src/communication/batch_queue.c +++ b/communication_techniques/src/communication/batch_queue.c @@ -21,6 +21,11 @@ void *create_comm_channel(void) return NULL; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *channel) { free(channel); diff --git a/communication_techniques/src/communication/csq.c b/communication_techniques/src/communication/csq.c index ff2d278..1e6a389 100644 --- a/communication_techniques/src/communication/csq.c +++ b/communication_techniques/src/communication/csq.c @@ -26,6 +26,11 @@ void *create_comm_channel(void) return NULL; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *channel) { free(channel); diff --git a/communication_techniques/src/communication/fast_forward.c b/communication_techniques/src/communication/fast_forward.c index 089c83e..d774a8d 100644 --- a/communication_techniques/src/communication/fast_forward.c +++ b/communication_techniques/src/communication/fast_forward.c @@ -30,6 +30,11 @@ void *create_comm_channel(void) return NULL; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *channel) { free((void *) ((struct channel *) channel)->shared_space); diff --git a/communication_techniques/src/communication/gomp_stream.c b/communication_techniques/src/communication/gomp_stream.c new file mode 100644 index 0000000..bdef6d2 --- /dev/null +++ b/communication_techniques/src/communication/gomp_stream.c @@ -0,0 +1,1025 @@ +/* Copyright (C) 2010 Free Software Foundation, Inc. + Contributed by Antoniu Pop . + + This file is part of the GNU OpenMP Library (libgomp). + + Libgomp is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + Libgomp is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for + more details. + + You should have received a copy of the GNU Lesser General Public License + along with libgomp; see the file COPYING.LIB. If not, write to the + Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, + MA 02110-1301, USA. */ + +/* As a special exception, if you link this library with other files, some + of which are compiled with GCC, to produce an executable, this library + does not by itself cause the resulting executable to be covered by the + GNU General Public License. This exception does not however invalidate + any other reasons why the executable file might be covered by the GNU + General Public License. */ + +/* This implements the stream communication layer for libGOMP. */ + +#include +#include +#include + +// Things disabled by RoboTux +#if 0 +#include "wait.h" +#include "sem.h" +#include "mutex.h" +#include "libgomp.h" +#endif +// End of things disabled by RoboTux +// Things added by RoboTux +#include +#include +#include + +static inline void *gomp_malloc(size_t size) +{ + void *ret; + + ret = malloc(size); + if (ret == NULL) + exit(EXIT_FAILURE); + return ret; +} + +static inline void *gomp_realloc(void *ptr, size_t size) +{ + void *ret; + + ret = realloc(ptr, size); + if (ret == NULL) + exit(EXIT_FAILURE); + return ret; +} + +static inline void gomp_fatal(char *msg) +{ + fprintf(stderr, "%s\n", msg); + exit(EXIT_FAILURE); +} +#define gomp_mutex_init(mutex) pthread_mutex_init(mutex, NULL) +#define gomp_mutex_lock(mutex) pthread_mutex_lock(mutex) +#define gomp_mutex_unlock(mutex) pthread_mutex_unlock(mutex) +// End of things added by RoboTux + +#define AGGREGATION_FACTOR 32 + +//#define OMP_STREAM_DEBUG +#ifdef OMP_STREAM_DEBUG + #define debug_log_init(S, V1, V2) printf (S, V1, V2); fflush (stdout) + #define debug_log_init3(S, V1, V2, V3) printf (S, V1, V2, V3); fflush (stdout) + #define debug_log(S, V1, V2) printf (S, V1, V2); fflush (stdout) +#else + #define debug_log_init(S, V1, V2) + #define debug_log_init3(S, V1, V2, V3) + #define debug_log(S, V1, V2) +#endif + +// Things disabled by RoboTux +#if 0 +gomp_barrier_t gomp_stream_tasks_wait_until_connected_barrier; +gomp_barrier_t gomp_stream_tasks_exit_barrier; +unsigned gomp_stream_tasks_count; +#endif +// End of things disabled by RoboTux + +/* This structure is used to communicate across pthread_create. */ + +struct gomp_stream_thread_start_data +{ + void (*fn) (void *); + void *fn_data; + + int id; +}; + +/* Data structures creation and pipeline initialization. */ + +/* Allocate and initialize a GOMP_STREAM for data elements of size + ELEMENT_SIZE using a circular buffer of STREAM_BUFFER_SIZE such + elements. Returns a pointer to the newly allocated stream. The + user may provide a pointer to pre-allocated memory to be used as + BUFFER for the stream. */ + +void * +GOMP_stream_create_stream (size_t element_size, size_t buffer_size) +{ + gomp_stream_p stream = (gomp_stream_p) gomp_malloc (sizeof (gomp_stream_t)); + if (stream == NULL) + exit(EXIT_FAILURE); + debug_log_init ("GOMP_stream_create_stream %zu %zu\n", element_size, buffer_size); + + if (buffer_size < 2) + gomp_fatal ("GOMP_stream: insufficient stream buffer size."); + + /* Initialize and allocate the data buffer. We force the + buffer_size to be a power of 2 for efficient modulo computation + of the indices in the circular buffer. */ + + /* To avoid excessive multiplication operations, we convert the + accounting from elements to bytes. */ + buffer_size *= element_size; + stream->buffer_size = 1; + while(stream->buffer_size < buffer_size) + stream->buffer_size <<= 1; + stream->buffer_mask = stream->buffer_size - 1; + stream->buffer = + (void *) gomp_malloc (stream->buffer_size * 2); + + stream->expected_ready_p = false; + stream->connected_p = false; + stream->eos_p = false; + stream->pre_shift = 0; + + /* Initialize the view_handles. */ + stream->read_views.current_min = stream->buffer_size; + stream->read_views.current_max = 0; + stream->read_views.view_list.views = NULL; + stream->read_views.view_list.nr_views = 0; + stream->read_views.view_list.size = 0; + stream->read_views.nr_expected_views = 0; + stream->read_views.nr_registered_views = 0; + stream->read_views.nr_unregistered_views = 0; + gomp_mutex_init (&stream->read_views.view_list.connect_view_mutex); + + stream->write_views.current_min = 0; + stream->write_views.current_max = stream->buffer_size; + stream->write_views.view_list.views = NULL; + stream->write_views.view_list.nr_views = 0; + stream->write_views.view_list.size = 0; + stream->write_views.nr_expected_views = 0; + stream->write_views.nr_registered_views = 0; + stream->write_views.nr_unregistered_views = 0; + gomp_mutex_init (&stream->write_views.view_list.connect_view_mutex); + +#ifndef HAVE_SYNC_BUILTINS + gomp_mutex_init (&stream->stream_mutex); +#endif + + return stream; +} + +/* Allocate and initialize a generic GOMP_STREAM_VIEW that can be + connected to any stream to give either read or write access + depending on its TYPE. Returns a pointer to the newly allocated + view. This view accesses VIEW_SIZE bytes in the stream and + commits/releases BURST_SIZE bytes per activation. */ + +static inline void * +gomp_stream_create_view (int type, size_t view_size, size_t burst_size) +{ + gomp_stream_view_p view = + (gomp_stream_view_p) gomp_malloc (sizeof(gomp_stream_view_t)); + + view->lower_index = 0; + view->upper_index = 0; + view->stream = NULL; + view->end_p = false; + view->type = type; + view->local_min_value = 0; + view->view_size = view_size; + view->burst_size = burst_size; + view->pxxk_size = view_size - burst_size; + + return view; +} + +/* Wrapper for creating a READ view . */ + +void * +GOMP_stream_create_read_view (size_t view_size, size_t burst_size) +{ + debug_log_init ("GOMP_stream_create_read_view %zu %zu\n", view_size, burst_size); + return gomp_stream_create_view (READ_VIEW, view_size, burst_size); +} + +/* Wrapper for creating a WRITE view. */ + +void * +GOMP_stream_create_write_view (size_t view_size, size_t burst_size) +{ + debug_log_init ("GOMP_stream_create_write_view %zu %zu\n", view_size, burst_size); + return gomp_stream_create_view (WRITE_VIEW, view_size, burst_size); +} + +/* Allocate and initialize a GOMP_STREAM_TASK data structure. */ + +void * +GOMP_stream_create_task () +{ + gomp_stream_task_p task = + (gomp_stream_task_p) gomp_malloc (sizeof(gomp_stream_task_t)); + debug_log_init3 ("GOMP_stream_create_task %d %d \t %15zu\n", 0, 0, (size_t) task); + + task->read_view_list.views = NULL; + task->read_view_list.nr_views = 0; + task->read_view_list.size = 0; + gomp_mutex_init (&task->read_view_list.connect_view_mutex); + + task->write_view_list.views = NULL; + task->write_view_list.nr_views = 0; + task->write_view_list.size = 0; + gomp_mutex_init (&task->write_view_list.connect_view_mutex); + + task->activation_counter = 0; + task->termination_flag = false; + + task->first_unassigned_activation_counter = 0; + task->num_instances = 0; + + __sync_synchronize (); + + return task; +} + +volatile void * +GOMP_stream_get_task_activation_counter (void *t) +{ + gomp_stream_task_p task = (gomp_stream_task_p) t; + + return &(task->activation_counter); +} + +void +GOMP_stream_set_task_termination_flag (void *t) +{ + gomp_stream_task_p task = (gomp_stream_task_p) t; + + task->termination_flag = true; +} + +void +GOMP_stream_task_add_instance (void *t) +{ + gomp_stream_task_p task = (gomp_stream_task_p) t; + + __sync_fetch_and_add (&task->num_instances, 1); + __sync_synchronize (); +} + +/* Declare additional READ_VIEWS and WRITE_VIEWS expected views on + stream S. When possible, the thread that creates the streaming + tasks should declare, for each stream, the number of read/write + views that will connect to a stream before the streaming tasks are + started. If this function is called on a stream, there will be no + further checks for the number of tasks partaking in the + initialization synchronization. */ + +void +GOMP_stream_add_expected_views (void *s, int read_views, int write_views, + int final) +{ + gomp_stream_p stream = (gomp_stream_p) s; + debug_log_init ("GOMP_stream_add_expected_views %d %d\n", read_views, write_views); + + if (stream->expected_ready_p) + gomp_fatal + ("GOMP_stream: attempting to modify a final number of expected views."); + stream->expected_ready_p = final; + +#ifdef HAVE_SYNC_BUILTINS + __sync_fetch_and_add (&stream->read_views.nr_expected_views, read_views); + __sync_fetch_and_add (&stream->write_views.nr_expected_views, write_views); +#else + gomp_mutex_lock (&stream->stream_mutex); + stream->read_views.nr_expected_views += read_views; + stream->write_views.nr_expected_views += write_views; + gomp_mutex_unlock (&stream->stream_mutex); +#endif +} + +/* Add VIEW to the VIEW_LIST. We actually use an array as this list + is only modified in the initialization phase and we never remove + any items from it. */ + +static inline void +gomp_stream_add_view_to_list (gomp_stream_view_p view, + gomp_stream_view_list_p view_list) +{ + /* Allocate memory when needed. */ + if (view_list->views == NULL || view_list->nr_views == view_list->size) + { + if (view_list->size == 0) + view_list->size = 4; + if (view_list->nr_views == view_list->size) + view_list->size <<= 1; + + view_list->views = + (gomp_stream_view_p *) gomp_realloc (view_list->views, + view_list->size * sizeof (gomp_stream_view_p)); + } + + view_list->views[view_list->nr_views] = view; + view_list->nr_views += 1; +} + +/* Connect a VIEW to a STREAM and also to the TASK which will use it. + This effectively builds the runtime task graph. */ + +void +GOMP_stream_connect_view (void *t, void *s, void *v) +{ + gomp_stream_task_p task = (gomp_stream_task_p) t; + gomp_stream_p stream = (gomp_stream_p) s; + gomp_stream_view_p view = (gomp_stream_view_p) v; + + gomp_stream_view_handle_p vh = (view->type == READ_VIEW) ? + &stream->read_views : &stream->write_views; + gomp_stream_view_list_p stream_list = &vh->view_list; + gomp_stream_view_list_p task_list = (view->type == READ_VIEW) ? + &task->read_view_list : &task->write_view_list; + + view->stream = stream; + + /* A read view's lower index is shifted by the buffer_size as the + stream is initially empty. This is equivalent to releasing the + original buffer_size elements. A write view will start with + buffer_size free space. */ + if (view->type == READ_VIEW) + view->lower_index = stream->buffer_size; + else + view->local_min_value = stream->buffer_size; + + /* Register the view with the TASK to which it belongs. This + operation is local to the task, so there is no need to + synchronize. */ + gomp_mutex_lock (&task_list->connect_view_mutex); + gomp_stream_add_view_to_list (view, task_list); + gomp_mutex_unlock (&task_list->connect_view_mutex); + + /* Connect the view to the stream. This must be done atomically as + this data structure is shared with the other producer/consumer + tasks. */ + gomp_mutex_lock (&vh->view_list.connect_view_mutex); + gomp_stream_add_view_to_list (view, stream_list); + gomp_mutex_unlock (&vh->view_list.connect_view_mutex); + __sync_fetch_and_add (&vh->nr_registered_views, 1); +} + +/* Check whether all the expected views on STREAM have already + connected. */ + +static inline bool +gomp_stream_check_connected (gomp_stream_p stream) +{ + if (!stream->expected_ready_p) + return false; + + if (stream->connected_p) + return true; + + if ((stream->read_views.view_list.nr_views + == stream->read_views.nr_expected_views) + && (stream->write_views.view_list.nr_views + == stream->write_views.nr_expected_views)) + { + stream->connected_p = true; + return true; + } + return false; +} + +/* Wait until all the streams to which TASK connects are ready and + connected to all producer/consumer tasks. */ + +void +GOMP_stream_wait_until_connected (void *t) +{ + gomp_stream_task_p task = (gomp_stream_task_p) t; + bool done; + int i; + + do + { + done = true; + + gomp_mutex_lock (&task->read_view_list.connect_view_mutex); + for (i = 0; i < task->read_view_list.nr_views; ++i) + if (!gomp_stream_check_connected (task->read_view_list.views[i]->stream)) + done = false; + gomp_mutex_unlock (&task->read_view_list.connect_view_mutex); + + gomp_mutex_lock (&task->write_view_list.connect_view_mutex); + for (i = 0; i < task->write_view_list.nr_views; ++i) + if (!gomp_stream_check_connected (task->write_view_list.views[i]->stream)) + done = false; + gomp_mutex_unlock (&task->write_view_list.connect_view_mutex); + } + while (!done); + debug_log_init ("GOMP_stream_wait_until_connected %zu %zu\n", (size_t) task, (size_t) task); +} + +/* Stream communication/synchronization. */ + +/* Compute the minimum of the LOWER_INDEX fields of all views in the + LIST of views. This is used during the termination phase to give + access to the readers up to the highest committed index. This is + only useful when producers, not too far apart in indices they + committed, forget to commit up to the last index that should appear + in the stream. */ + +static inline unsigned long long +gomp_stream_compute_lower_max (gomp_stream_view_list_p list) +{ + unsigned long long local_max = 0; + gomp_stream_view_p *views = list->views; + int i; + + for (i = 0; i < list->nr_views; ++i) + if (views[i]->lower_index > local_max) + local_max = views[i]->lower_index; + + return local_max; +} + +/* Compute the minimum of the LOWER_INDEX fields of all views in the + LIST of views. For a LIST of read views, this minimum represents + the highest index released by all read views on a stream (i.e. the + index of elements that all consumers have already discarded) and + therefore the highest index a write view will be allowed to acquire + for writing. For a LIST of write views, this minimum represents + the highest index all producers have committed and thus the highest + index available for reading. */ + +static inline unsigned long long +gomp_stream_compute_lower_min (gomp_stream_view_list_p list) +{ + unsigned long long local_min = GOMP_STREAM_MAX_INDEX; + gomp_stream_view_p *views = list->views; + int i; + + for (i = 0; i < list->nr_views; ++i) + if (views[i]->lower_index < local_min) + local_min = views[i]->lower_index; + + return local_min; +} + +/* Compute the minimum of the UPPER_INDEX fields of all views in the + LIST of views. Similar to the above, but this is only a hint on + the resources that another producer (resp. consumer) has already + acquired. If a producer (resp. consumer) has successfully acquired + an index with GOMP_stream_stall (resp. GOMP_stream_update) for + writing (resp. reading), then all other producers (resp. consumers) + on the same stream can access up to the same index without further + verification. */ + +static inline unsigned long long +gomp_stream_compute_upper_min (gomp_stream_view_list_p list) +{ + unsigned long long local_min = GOMP_STREAM_MAX_INDEX; + gomp_stream_view_p *views = list->views; + int i; + + for (i = 0; i < list->nr_views; ++i) + if (views[i]->upper_index < local_min) + local_min = views[i]->upper_index; + + return local_min; +} + +/* Wait until the producers (resp. consumers) on this stream have + committed (resp. released) up to the INDEX position in the stream. + When that hapens, the connumer (resp. producer) connected to the + stream through VIEW is allowed to access the elements up to + INDEX. */ + +static inline void +gomp_stream_wait_release (gomp_stream_view_p view, + gomp_stream_view_handle_p vh, + const unsigned long long index) +{ + /* Test whether someone already got a hold of a bigger index + yet. */ + + if (view->local_min_value < index) + { + while (vh->current_min < index && !view->stream->eos_p) + { + unsigned long long local_min = + gomp_stream_compute_lower_min (&vh->view_list); + + if (vh->current_min == local_min) + __asm volatile ("pause" : : : "memory"); + else + vh->current_min = local_min; + } + view->local_min_value = vh->current_min; + } +} + +/* Request read access for the view V to the stream up to INDEX. In + case the producers have finished and there is not enough data, the + returned value is the highest index to which the view is allowed to + access the stream. */ + +void * +GOMP_stream_update (void *v, const unsigned long long act_start, + const unsigned long long act_end) +{ + unsigned long long low_idx, up_idx; + size_t low_idx_loc, up_idx_loc; + gomp_stream_view_p view = (gomp_stream_view_p) v; + gomp_stream_p stream = view->stream; + void *buffer_pointer; + + debug_log ("GOMP_stream_update [in] %llu %llu\n", act_start, act_end); + + /* This update requests access to the buffer in [low_idx,up_idx[. + We will release up to low_idx-1 and acquire up to up_idx-1. */ + low_idx = act_start * view->burst_size; + up_idx = act_end * view->burst_size + view->pxxk_size - 1; + + if (up_idx - low_idx > stream->buffer_size) + gomp_fatal ("GOMP_stream: update requested access to more than buffer_size data."); + + + view->lower_index = low_idx + view->stream->buffer_size; + view->upper_index = up_idx; + + /* In case another consumer has received permission to read up to a + yet higher index, then there is no need to check for this one. */ + if (up_idx > view->stream->read_views.current_max) + { + gomp_stream_wait_release (view, &view->stream->write_views, up_idx); + view->stream->read_views.current_max = up_idx; + } + + low_idx_loc = low_idx & stream->buffer_mask; + up_idx_loc = up_idx & stream->buffer_mask; + + /* Once we know enough data is available for reading, we need to + check whether the data between the lower and upper buonds is + contiguous or if the buffer wrap-around occurs in the middle. */ + if (low_idx_loc > up_idx_loc) + { + /* FIXME: does this require synchronization or is concurrent + overwriting acceptable as long as enough data has been copied + at the end? */ + memcpy (stream->buffer + stream->buffer_size, stream->buffer, + up_idx_loc + 1); + //printf ("Update copy: (%llu,%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_start, act_end, low_idx, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1); + } + + /* We return a pointer to a contiguous array where this view is + guaranteed access to all the requested data. */ + buffer_pointer = stream->buffer + low_idx_loc; + + debug_log ("GOMP_stream_update [out] %llu %llu\n", act_start, act_end); + + return buffer_pointer; +} + +/* Request write access for the view V to the stream up to INDEX. */ + +void * +GOMP_stream_stall (void *v, const unsigned long long act_start, + const unsigned long long act_end) +{ + unsigned long long low_idx, up_idx; + gomp_stream_view_p view = (gomp_stream_view_p) v; + gomp_stream_p stream = view->stream; + void *buffer_pointer; + + debug_log ("GOMP_stream_stall [in] %llu %llu\n", act_start, act_end); + + /* This update requests access to the buffer in [low_idx,up_idx[. + We will release up to low_idx-1 and acquire up to up_idx-1. */ + low_idx = act_start * view->burst_size + stream->pre_shift; + up_idx = act_end * view->burst_size + view->pxxk_size + stream->pre_shift - 1; + + if (up_idx - low_idx > stream->buffer_size) + { + fprintf (stderr, "Requesting data from low: %llu to up: %llu act [%llu,%llu] for burst:%zu size: %zu\n", low_idx, up_idx, act_start, act_end, view->burst_size, view->view_size); + gomp_fatal ("GOMP_stream: stall requested access to more than buffer_size data."); + } + /* We do not need to worry about wrap-around copying as this + "commit" only means that we do not want to write to those + indices below low_idx. */ + view->lower_index = low_idx; + view->upper_index = up_idx; + + if (up_idx > stream->write_views.current_max) + { + gomp_stream_wait_release (view, &stream->read_views, up_idx); + stream->write_views.current_max = up_idx; + } + + buffer_pointer = stream->buffer + (low_idx & stream->buffer_mask); + + debug_log ("GOMP_stream_stall [out] %llu %llu\n", act_start, act_end); + + return buffer_pointer; +} + +/* Relinquish read access for the view V to the stream up to + INDEX. */ + +void +GOMP_stream_release (void *v, const unsigned long long act_idx) +{ + gomp_stream_view_p view = (gomp_stream_view_p) v; + view->lower_index = act_idx * view->burst_size + view->stream->buffer_size - 1; + debug_log ("GOMP_stream_release %llu %llu\n", act_idx, act_idx); +} + +/* Relinquish write access for the view V to the stream up to + INDEX. */ + +void +GOMP_stream_commit (void *v, const unsigned long long act_idx) +{ + gomp_stream_view_p view = (gomp_stream_view_p) v; + gomp_stream_p stream = view->stream; + unsigned long long up_idx = act_idx * view->burst_size + stream->pre_shift - 1; + size_t low_idx_loc, up_idx_loc; + + low_idx_loc = view->lower_index & stream->buffer_mask; + up_idx_loc = up_idx & stream->buffer_mask; + + /* Once we know enough data is available for reading, we need to + check whether the data between the lower and upper buonds is + contiguous or if the buffer wrap-around occurs in the middle. */ + if (low_idx_loc > up_idx_loc) + { + /* FIXME: does this require synchronization or is concurrent + overwriting acceptable as long as enough data has been copied + at the end? */ + memcpy (stream->buffer, stream->buffer + stream->buffer_size, + up_idx_loc + 1); + + //printf ("Commit copy: (%llu) %llu - %llu | %zu - %zu (size: %zu)\n ", act_idx, view->lower_index, up_idx, low_idx_loc, up_idx_loc, up_idx_loc + 1); + } + + view->lower_index = up_idx; + debug_log ("GOMP_stream_commit %llu %llu\n", act_idx, act_idx); +} + +/* Finalization and destruction of the streaming data structures. */ + +/* Disconnects VIEW from the stream to which it is connected and free + the stream if it was the last task to disconnect. */ + +static inline void +gomp_stream_unregister_view (gomp_stream_view_p view) +{ + gomp_stream_p stream = view->stream; + gomp_stream_view_handle_p vh = + (view->type == READ_VIEW) ? &stream->read_views : &stream->write_views; + int unregistered_views; + + __sync_fetch_and_add (&(vh->nr_unregistered_views), 1); + unregistered_views = __sync_add_and_fetch (&(stream->unregistered_views), 1); + + /* Make sure that when multiple views access a stream, the finished + views do not hinder the others in the min computation. */ + if (view->type == READ_VIEW) + GOMP_stream_release (view, GOMP_STREAM_MAX_INDEX); + /* The last producer exiting will set the eos_p flag and allow the + consumers to read up to the highest committed index. */ + else if (vh->nr_unregistered_views == vh->nr_registered_views) + { + stream->eos_p = true; + vh->current_min = gomp_stream_compute_lower_max (&vh->view_list); + } + + /* If all known views arre accounted for, this is the last one + unregistering. It frees the memory allocated for the stream as + well as all the views on this stream. */ + if (unregistered_views == (stream->read_views.nr_registered_views + + stream->write_views.nr_registered_views)) + { + gomp_stream_view_list_p read_view_list = &stream->read_views.view_list; + gomp_stream_view_list_p write_view_list = &stream->write_views.view_list; + int i; + + for (i = 0; i < read_view_list->nr_views; ++i) + free (read_view_list->views[i]); + + for (i = 0; i < write_view_list->nr_views; ++i) + free (write_view_list->views[i]); + + free (stream->buffer); + free (read_view_list->views); + free (write_view_list->views); + free (stream); + } +} + +// Things disabled by RoboTux +#if 0 + +/* Invoked before terminating a stream TASK, this disconnects all the + views and for all streams for which it is the last one to + disconnect from, it frees up all data structures. */ + +void +GOMP_stream_task_exit (void *t) +{ + gomp_stream_task_p task = (gomp_stream_task_p) t; + int num_read_views = task->read_view_list.nr_views; + int num_write_views = task->write_view_list.nr_views; + int i, res; + debug_log_init ("GOMP_stream_task_exit %zu %zu\n", (size_t) t, (size_t) t); + + res = __sync_sub_and_fetch (&task->num_instances, 1); + + if (res == 0) + { + for (i = 0; i < num_read_views; ++i) + gomp_stream_unregister_view (task->read_view_list.views[i]); + + for (i = 0; i < num_write_views; ++i) + gomp_stream_unregister_view (task->write_view_list.views[i]); + + free (task->read_view_list.views); + free (task->write_view_list.views); + free (task); + } +} + +/* Get info on the amount of work/data available in the stream + starting from INDEX and considering a constant burst of size BURST. + This function does not wait, except in case there is no work yet, + but the EOS flag has not yet been set. The function only returns 0 + on termination. */ +unsigned long long +GOMP_stream_get_available_work (void *t, unsigned long long *start_idx) +{ + gomp_stream_task_p task = (gomp_stream_task_p) t; + long result = 0; + unsigned long long start; + + /* Atomically acquire a range, then wait until the range is either + fully available or termination occurs. */ + start = __sync_fetch_and_add (&task->first_unassigned_activation_counter, + AGGREGATION_FACTOR); + result = task->activation_counter - start; + debug_log_init3 ("GOMP_stream_get_available_work [entry] %llu %llu \t %15zu\n", start, task->activation_counter, (size_t) task); + + if (result >= AGGREGATION_FACTOR) + { + *start_idx = start; + return AGGREGATION_FACTOR; + } + + while (result < AGGREGATION_FACTOR) + { + if (task->termination_flag) + { + __sync_synchronize (); + result = task->activation_counter - start; + debug_log ("GOMP_stream_get_available_work [final] %llu %ld\n", start, result); + *start_idx = start; + if (result > AGGREGATION_FACTOR) + return AGGREGATION_FACTOR; + return (result > 0) ? result : 0; + } + result = task->activation_counter - start; + } + debug_log ("GOMP_stream_get_available_work %llu %ld\n", start, result); + + *start_idx = start; + return AGGREGATION_FACTOR; +} + + +/* Initialize streaming in this region. */ + +void +GOMP_stream_init () +{ + /* Add self to ensure at least one member of the team barrier will + be waiting for the streaming tasks. */ + gomp_stream_tasks_count = 1; + gomp_barrier_init (&gomp_stream_tasks_exit_barrier, gomp_stream_tasks_count); +} + +/* Wait until all streaming threads complete. */ + +void +GOMP_stream_exit () +{ + gomp_barrier_wait (&gomp_stream_tasks_exit_barrier); +} + +/* Request SIZE bytes for a PRE operator on stream S. Return a + pointer where data should be stored. */ +void * +GOMP_stream_pre (void *s, const unsigned long long size) +{ + gomp_stream_p stream = (gomp_stream_p) s; + + debug_log_init ("GOMP_stream_pre %zu \t %llu\n", (size_t) s, size); + + stream->pre_shift = size; + stream->write_views.current_min = size; + + return stream->buffer; +} + + +/* This function is a pthread_create entry point for streaming + tasks. */ + +static void * +gomp_stream_thread_start (void *xdata) +{ + struct gomp_stream_thread_start_data *data = xdata; + void (*local_fn) (void *); + void *local_data; + + local_fn = data->fn; + local_data = data->fn_data; + + local_fn (local_data); + + gomp_barrier_wait_last (&gomp_stream_tasks_exit_barrier); + debug_log_init ("** exiting task: %d (%u)\n", data->id, gomp_stream_tasks_count); + + return NULL; +} + +/* Called for starting a streaming task. These tasks do not partake + in existing thread teams and are not subject to scheduling + points. */ + +void +GOMP_stream_task (void (*fn) (void *), void *data, + void (*cpyfn) (void *, void *), + long arg_size, long arg_align, + long num_instances, bool auto_replicable) +{ + pthread_attr_t thread_attr, *attr; + pthread_t pt; + int err, i, base_id; + char *arg, *buf; + + base_id = __sync_fetch_and_add (&gomp_stream_tasks_count, num_instances); + gomp_barrier_reinit (&gomp_stream_tasks_exit_barrier, + gomp_stream_tasks_count); + + debug_log_init ("** adding tasks: %ld (%u)\n", num_instances, gomp_stream_tasks_count); + + attr = &gomp_thread_attr; + if (__builtin_expect (gomp_cpu_affinity != NULL, 0)) + { + size_t stacksize; + pthread_attr_init (&thread_attr); + pthread_attr_setdetachstate (&thread_attr, PTHREAD_CREATE_DETACHED); + if (! pthread_attr_getstacksize (&gomp_thread_attr, &stacksize)) + pthread_attr_setstacksize (&thread_attr, stacksize); + attr = &thread_attr; +#if 0 /* This should be handled separately ... we will have to build a + stream mapping and prevent other OMP threads from touching + the cores running streaming tasks. */ + gomp_init_thread_affinity (attr); +#endif + } + + for (i = 0; i < num_instances; ++i) + { + struct gomp_stream_thread_start_data *start_data; + + buf = (char *) gomp_malloc (arg_size + arg_align - 1); + arg = (char *) (((uintptr_t) (buf) + arg_align - 1) + & ~(uintptr_t) (arg_align - 1)); + if (cpyfn) + cpyfn (arg, data); + else + memcpy (arg, data, arg_size); + + start_data = gomp_malloc (sizeof (struct gomp_stream_thread_start_data)); + start_data->fn = fn; + start_data->fn_data = arg; + start_data->id = base_id + i; + + err = pthread_create (&pt, attr, gomp_stream_thread_start, start_data); + if (err != 0) + gomp_fatal ("Thread creation failed: %s", strerror (err)); + } + + if (__builtin_expect (gomp_cpu_affinity != NULL, 0)) + pthread_attr_destroy (&thread_attr); +} +#endif +// End of things disabled by RoboTux + +void *create_comm_channel(void) +{ + void *stream, *task, *rview, *wview; + struct channel *channel; + + assert(BUF_SIZE >= BURST_SIZE * 2 + CACHE_LINE_SIZE * 2); + stream = GOMP_stream_create_stream(sizeof(void *), + BUF_SIZE / sizeof(void *)); + if (stream == NULL) + return NULL; + GOMP_stream_add_expected_views(stream, 1, 1, 1); + + task = GOMP_stream_create_task(); + if (task == NULL) + goto create_task_error; + + // BURST_SIZE = 128 ou 1024 + rview = GOMP_stream_create_read_view(BURST_SIZE + CACHE_LINE_SIZE, + BURST_SIZE); + if (rview == NULL) + goto create_rview_error; + GOMP_stream_connect_view(task, stream, rview); + + wview = GOMP_stream_create_write_view(BURST_SIZE + CACHE_LINE_SIZE, + BURST_SIZE); + if (wview == NULL) + goto create_wview_error; + GOMP_stream_connect_view(task, stream, wview); + + channel = malloc(sizeof(struct channel)); + if (channel == NULL) + goto create_channel_error; + channel->stream = stream; + channel->task = task; + channel->rview = rview; + channel->wview = wview; + channel->read_ptr = NULL; + channel->read_end_ptr = NULL; + channel->read_act = 0; + return channel; + +create_channel_error: + free(channel); +create_wview_error: + free(rview); +create_rview_error: + free(task); +create_task_error: + free(stream); + return NULL; +} + +int end_producer(void *channel_voidptr) +{ + struct channel *channel; + + channel = (struct channel *) channel_voidptr; + gomp_stream_unregister_view(channel->wview); + return 0; +} + +int destroy_comm_channel(void *channel_voidptr) +{ + struct channel *channel; + + channel = (struct channel *) channel_voidptr; + gomp_stream_unregister_view(channel->wview); + gomp_stream_unregister_view(channel->rview); + return 0; +} + +void *recv_one_data(struct channel *channel) +{ + void *result; + + if (channel->read_ptr == channel->read_end_ptr) + { + channel->read_ptr = (void **) GOMP_stream_update(channel->rview, + channel->read_act, channel->read_act + 1); + channel->read_act++; + channel->read_end_ptr = channel->read_ptr + BURST_SIZE / sizeof(void *); + } + result = *channel->read_ptr++; + if (channel->read_ptr == channel->read_end_ptr) + GOMP_stream_release(channel->rview, channel->read_act); + return result; +} + +ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) +{ + size_t i, nb_read; + + if (channel->read_ptr == channel->read_end_ptr) + { + channel->read_ptr = (void **) GOMP_stream_update(channel->rview, + channel->read_act, channel->read_act + 1); + channel->read_act++; + channel->read_end_ptr = channel->read_ptr + BURST_SIZE / sizeof(void *); + } + nb_read = channel->read_end_ptr - channel->read_ptr; + nb_read = (nb_read > count) ? count : nb_read; + for (i = 0; i < nb_read; i++) + buf[i] = *channel->read_ptr++; + GOMP_stream_release(channel->rview, channel->read_act); + return nb_read; +} diff --git a/communication_techniques/src/communication/lamport.c b/communication_techniques/src/communication/lamport.c index 8529ee3..6f43ba8 100644 --- a/communication_techniques/src/communication/lamport.c +++ b/communication_techniques/src/communication/lamport.c @@ -26,6 +26,11 @@ void *create_comm_channel(void) return NULL; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *channel) { free((void *) ((struct channel *) channel)->shared_space); diff --git a/communication_techniques/src/communication/mcringbuffer.c b/communication_techniques/src/communication/mcringbuffer.c index ba40aa3..a60464f 100644 --- a/communication_techniques/src/communication/mcringbuffer.c +++ b/communication_techniques/src/communication/mcringbuffer.c @@ -34,6 +34,11 @@ void *create_comm_channel(void) return NULL; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *channel) { free((void *) ((struct channel *) channel)->shared_space); diff --git a/communication_techniques/src/communication/none.c b/communication_techniques/src/communication/none.c index 81b5309..0ed282b 100644 --- a/communication_techniques/src/communication/none.c +++ b/communication_techniques/src/communication/none.c @@ -16,6 +16,11 @@ void *create_comm_channel(void) return (void *) &store_var; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *unused __attribute__ ((unused))) { return 0; diff --git a/communication_techniques/src/communication/pipe.c b/communication_techniques/src/communication/pipe.c index ad07867..86c5a09 100644 --- a/communication_techniques/src/communication/pipe.c +++ b/communication_techniques/src/communication/pipe.c @@ -30,6 +30,11 @@ void *create_comm_channel(void) return NULL; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *channel) { free(channel); diff --git a/communication_techniques/src/communication/shared_mem_opt.c b/communication_techniques/src/communication/shared_mem_opt.c index e3c0a04..20fc40d 100644 --- a/communication_techniques/src/communication/shared_mem_opt.c +++ b/communication_techniques/src/communication/shared_mem_opt.c @@ -26,6 +26,11 @@ void *create_comm_channel(void) return NULL; } +int end_producer(void *unused __attribute__ ((unused))) +{ + return 0; +} + int destroy_comm_channel(void *channel) { free((void *) ((struct channel *) channel)->shared_space); diff --git a/communication_techniques/src/main.c b/communication_techniques/src/main.c index 555db04..847f1cc 100644 --- a/communication_techniques/src/main.c +++ b/communication_techniques/src/main.c @@ -337,6 +337,12 @@ static int initial_producer(node_param_t *node_param) for(j = 0; j < WORDS_PER_LINE; j++) send(node_param->next_comm_channel, do_calc()); } + if (end_producer(node_param->next_comm_channel)) + { + fprintf(stderr, "Notification of end of production to the " + "communication algorithm failed\n"); + return 1; + } if (end_calc()) { fprintf(stderr, "uninitialization of calculation has failed\n");