Add benchs to compare communication techniques
This commit is contained in:
parent
46fd16312a
commit
9b4388a964
|
@ -0,0 +1 @@
|
|||
channel.o .channel.d: channel.c channel.h Makefile
|
|
@ -0,0 +1,2 @@
|
|||
main.o .main.d: main.c channel.h Makefile
|
||||
channel.o .main.d: channel.h Makefile
|
|
@ -0,0 +1,77 @@
|
|||
# Directories
|
||||
OBJDIR=obj
|
||||
BINDIR=bin
|
||||
SRCDIR=src
|
||||
INCDIR=include
|
||||
LIBDIR=lib
|
||||
|
||||
# Compilation flags
|
||||
# I know -finline-functions and -finline-functions-called-once are enabled by
|
||||
# -O3 but I did this in case gcc behaviour change one day
|
||||
CFLAGS=-g -O3 -finline-functions -finline-functions-called-once -Wall -Werror
|
||||
LDFLAGS=-L$(LIBDIR) -L$(HOME)/local/lib -Wl,-rpath-link,$(HOME)/local/lib -lpthread -lpapihighlevel
|
||||
|
||||
# Executables
|
||||
CC=gcc
|
||||
|
||||
# Files
|
||||
BINNAMES=asm_cache_comm c_cache_comm pipe_comm shared_mem_comm
|
||||
BINS=$(patsubst %, $(BINDIR)/%, $(BINNAMES))
|
||||
MAIN_OBJS=main.o common.o
|
||||
COMMON_LIB_OBJS=common.o
|
||||
|
||||
.PHONY: all tidy clean distclean symlink
|
||||
.SECONDARY:
|
||||
.SUFFIXES: .c .o
|
||||
|
||||
default: $(BINS)
|
||||
|
||||
ifneq (,$(findstring $(MAKECMDGOALS),$(BINS)))
|
||||
BASE_TARGET=$(patsubst $(BINDIR)/%_comm,%,$(MAKECMDGOALS))
|
||||
|
||||
# Compilation of binary
|
||||
$(BINDIR)/$(BASE_TARGET)_comm: $(patsubst %,$(OBJDIR)/$(BASE_TARGET)_%,$(MAIN_OBJS)) $(LIBDIR)/lib$(BASE_TARGET).a
|
||||
if [ ! -d $(BINDIR) ] ; then mkdir $(BINDIR) ; fi
|
||||
$(CC) -o $@ $^ $(LDFLAGS)
|
||||
|
||||
# Compilation of library
|
||||
$(LIBDIR)/lib$(BASE_TARGET).a: $(OBJDIR)/$(BASE_TARGET).o $(patsubst %,$(OBJDIR)/$(BASE_TARGET)_%,$(COMMON_LIB_OBJS))
|
||||
if [ ! -d $(LIBDIR) ] ; then mkdir $(LIBDIR) ; fi
|
||||
$(AR) -rcus $@ $^
|
||||
|
||||
# Compile common source files
|
||||
$(OBJDIR)/$(BASE_TARGET)_%.o: $(SRCDIR)/%.c symlink
|
||||
if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi
|
||||
$(CC) $(CFLAGS) -I$(INCDIR) -I$(HOME)/local/include -c $< -o $@
|
||||
endif
|
||||
|
||||
# Compile non common source files
|
||||
$(OBJDIR)/%.o: $(SRCDIR)/%.c symlink
|
||||
if [ ! -d $(OBJDIR) ] ; then mkdir $(OBJDIR) ; fi
|
||||
$(CC) $(CFLAGS) -I$(INCDIR) -c $< -o $@
|
||||
|
||||
symlink:
|
||||
ln -sfT $(BASE_TARGET)_comm.h $(INCDIR)/specific_comm.h
|
||||
|
||||
#.%.d: %.c
|
||||
# gcc $(CFLAGS) -MM $^ | sed -e 's/\([^:]*\):\(.*\)/\1 $@: \2 Makefile/' > $@
|
||||
|
||||
tidy:
|
||||
rm -f $(SRCDIR)/*~ \#*
|
||||
|
||||
clean:
|
||||
rm -f $(INCDIR)/specific_comm.h
|
||||
rm -rf $(OBJDIR)
|
||||
|
||||
distclean: clean
|
||||
rm -rf $(BINDIR) $(LIBDIR)
|
||||
|
||||
#ifneq ($(MAKECMDGOALS),tidy)
|
||||
#ifneq ($(MAKECMDGOALS),clean)
|
||||
#ifneq ($(MAKECMDGOALS),distclean)
|
||||
# If the rules called is not a phony rules, then include the %.d makefile
|
||||
# corresponding to all objects
|
||||
#include $(patsubst %.o, .%.d, $(OBJ))
|
||||
#endif
|
||||
#endif
|
||||
#endif
|
|
@ -0,0 +1,51 @@
|
|||
#ifndef _SPECIFIC_COMM_H_
|
||||
#define _SPECIFIC_COMM_H_ 1
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
/* Non standard include */
|
||||
#include <common_comm.h>
|
||||
|
||||
/* This is not an error, we need this two-macro system */
|
||||
#define toString(x) doStringification(x)
|
||||
#define doStringification(x) #x
|
||||
|
||||
struct communication_channel
|
||||
{
|
||||
uintptr_t buf[2 * BUF_SIZE / sizeof(uintptr_t)] __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
int state __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
int idx __attribute__ ((aligned (CACHE_LINE_SIZE)));
|
||||
};
|
||||
|
||||
struct communication_assoc
|
||||
{
|
||||
struct communication_assoc *next;
|
||||
struct communication_assoc *prev;
|
||||
pthread_t tid;
|
||||
struct communication_channel *channel;
|
||||
int receiver_idx;
|
||||
};
|
||||
|
||||
extern struct communication_assoc assoc_root;
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
||||
struct communication_assoc *create_comm_assoc(void);
|
||||
static inline void send(uintptr_t value) {
|
||||
asm volatile("mov %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) " + " toString(CACHE_LINE_SIZE) ", %%eax\n\t"
|
||||
"mov %0, %%gs:channel@NTPOFF(%%eax)\n\t"
|
||||
"addl $4, %%eax\n\t"
|
||||
"andl $(2*" toString(BUF_SIZE) "-1), %%eax\n\t"
|
||||
"mov %%eax, %%gs:channel@NTPOFF + 2 * " toString(BUF_SIZE)" + " toString(CACHE_LINE_SIZE) "\n\t"
|
||||
"test $(" toString(BUF_SIZE) " - 1), %%eax\n\t"
|
||||
"mov $2f, %%eax\n\t"
|
||||
"jz swap_buffer\n\t"
|
||||
"2:"
|
||||
:
|
||||
: "r"(value)
|
||||
: "%eax");
|
||||
}
|
||||
|
||||
__END_DECLS
|
||||
|
||||
#endif
|
|
@ -0,0 +1,23 @@
|
|||
#ifndef _COMMON_COMM_H_
|
||||
#define _COMMON_COMM_H_ 1
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#define CACHE_LINE_SIZE 128
|
||||
#define BUF_SIZE CACHE_LINE_SIZE
|
||||
|
||||
extern volatile int cont;
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
||||
void add_sender(void);
|
||||
void remove_sender(void);
|
||||
volatile int *init_comm(void);
|
||||
void reception(void (*)(uintptr_t));
|
||||
extern int swap_buffer;
|
||||
void wait_initialization(void);
|
||||
void discover_new_producers(void);
|
||||
|
||||
__END_DECLS
|
||||
|
||||
#endif
|
|
@ -0,0 +1,35 @@
|
|||
#ifndef __COMM_H_
|
||||
#define __COMM_H_ 1
|
||||
|
||||
#include <stdint.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/* Non standart include */
|
||||
#include <specific_comm.h>
|
||||
|
||||
#define READ_IDX 0
|
||||
#define WRITE_IDX 1
|
||||
|
||||
struct communication_assoc
|
||||
{
|
||||
struct communication_assoc *next;
|
||||
struct communication_assoc *prev;
|
||||
pthread_t tid;
|
||||
int *pipefd;
|
||||
struct communication_channel *channel;
|
||||
int receiver_idx;
|
||||
};
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
||||
extern __thread int pipefd[];
|
||||
extern struct communication_assoc assoc_root;
|
||||
|
||||
struct communication_assoc *create_comm_assoc(void);
|
||||
static inline void send(uintptr_t value) {
|
||||
write(pipefd[WRITE_IDX], &value, sizeof(uintptr_t));
|
||||
}
|
||||
|
||||
__END_DECLS
|
||||
|
||||
#endif
|
|
@ -0,0 +1,36 @@
|
|||
#ifndef __COMM_H_
|
||||
#define __COMM_H_ 1
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
/* Non standard include */
|
||||
#include <common_comm.h>
|
||||
|
||||
#define SHARED_SPACE_SIZE (2 * CACHE_LINE_SIZE)
|
||||
|
||||
struct communication_assoc
|
||||
{
|
||||
struct communication_assoc *next;
|
||||
struct communication_assoc *prev;
|
||||
pthread_t tid;
|
||||
uintptr_t *shared_space;
|
||||
int *cons_idx;
|
||||
int *prod_idx;
|
||||
};
|
||||
|
||||
extern struct communication_assoc assoc_root;
|
||||
|
||||
__BEGIN_DECLS
|
||||
|
||||
extern __thread uintptr_t *shared_space;
|
||||
extern __thread int prod_idx;
|
||||
|
||||
struct communication_assoc *create_comm_assoc(void);
|
||||
static inline void send(uintptr_t value) {
|
||||
shared_space[prod_idx] = value;
|
||||
prod_idx = (prod_idx + 1) % SHARED_SPACE_SIZE;
|
||||
}
|
||||
|
||||
__END_DECLS
|
||||
|
||||
#endif
|
|
@ -0,0 +1,72 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
|
||||
/* Non standard include */
|
||||
#include <common_comm.h>
|
||||
#include <specific_comm.h>
|
||||
|
||||
|
||||
__thread struct communication_channel channel;
|
||||
|
||||
struct communication_assoc *create_comm_assoc(void)
|
||||
{
|
||||
struct communication_assoc *assoc;
|
||||
|
||||
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
|
||||
assoc->tid = pthread_self();
|
||||
assoc->receiver_idx = 0;
|
||||
assoc->channel = &channel;
|
||||
return assoc;
|
||||
}
|
||||
|
||||
char *dstr="buffer transition\n";
|
||||
|
||||
void _swap_buffer()
|
||||
{
|
||||
asm volatile(".globl swap_buffer\n\t"
|
||||
"swap_buffer:\n\t"
|
||||
|
||||
"1:\n\t"
|
||||
"testl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t"
|
||||
"jnz 1b\n\t"
|
||||
"movl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t"
|
||||
"jmp *%%eax\n\t"
|
||||
: : "m"(dstr));
|
||||
}
|
||||
|
||||
void reception(void (*on_receive)(uintptr_t))
|
||||
{
|
||||
wait_initialization();
|
||||
/* printf("Activate the consumer...\n"); */
|
||||
while(cont)
|
||||
{
|
||||
struct communication_assoc *cur;
|
||||
|
||||
discover_new_producers();
|
||||
cur = assoc_root.next;
|
||||
while(cur != &assoc_root)
|
||||
{
|
||||
struct communication_channel *channel = cur->channel;
|
||||
if(channel->state)
|
||||
{
|
||||
/*
|
||||
* cur->receiver_idx point to the last cache
|
||||
* line we have read. We go to the next cache
|
||||
* line "+ (CACHE_LINE_SIZE >> 2)" (because
|
||||
* the line is full of integer (2^2 octets)
|
||||
* and then if we are after the second cache
|
||||
* line we correct the pointer to point to
|
||||
* the first one (this is done by the modulo)
|
||||
*/
|
||||
int i = cur->receiver_idx;
|
||||
int n = cur->receiver_idx + (BUF_SIZE / sizeof(uintptr_t));
|
||||
cur->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(uintptr_t));
|
||||
for(; i<n; i++)
|
||||
on_receive(channel->buf[i]);
|
||||
channel->state = 0;
|
||||
}
|
||||
cur = cur->next;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/* Non standard include */
|
||||
#include <specific_comm.h>
|
||||
|
||||
|
||||
struct communication_assoc assoc_root;
|
||||
static pthread_mutex_t assoc_lock = PTHREAD_MUTEX_INITIALIZER;
|
||||
static struct communication_assoc assoc_tmp;
|
||||
static volatile int init = 0;
|
||||
volatile int cont = 1;
|
||||
|
||||
void initialize_library(void)
|
||||
{
|
||||
assoc_tmp.prev = &assoc_tmp;
|
||||
assoc_tmp.next = &assoc_tmp;
|
||||
assoc_root.prev = &assoc_root;
|
||||
assoc_root.next = &assoc_root;
|
||||
}
|
||||
|
||||
volatile int *init_comm(void)
|
||||
{
|
||||
return &cont;
|
||||
}
|
||||
|
||||
void wait_initialization(void)
|
||||
{
|
||||
while (!init);
|
||||
}
|
||||
|
||||
void add_sender(void)
|
||||
{
|
||||
struct communication_assoc *assoc;
|
||||
|
||||
assoc = create_comm_assoc();
|
||||
pthread_mutex_lock(&assoc_lock);
|
||||
if (!init)
|
||||
{
|
||||
initialize_library();
|
||||
init = 1;
|
||||
}
|
||||
assoc->next = assoc_tmp.next;
|
||||
assoc_tmp.next->prev = assoc;
|
||||
assoc->prev = &assoc_tmp;
|
||||
assoc_tmp.next = assoc;
|
||||
pthread_mutex_unlock(&assoc_lock);
|
||||
}
|
||||
|
||||
void remove_sender()
|
||||
{
|
||||
printf("remove_communication_channel: Not yet implemented\n");
|
||||
}
|
||||
|
||||
void discover_new_producers(void)
|
||||
{
|
||||
/* If there is some new thread for the write barrier */
|
||||
if(&assoc_tmp != assoc_tmp.next)
|
||||
{
|
||||
/* printf("Adding a new set of producers\n"); */
|
||||
pthread_mutex_lock(&assoc_lock);
|
||||
/*
|
||||
* list in assoc_tmp is inserted between assoc_root
|
||||
* and the first elements of assoc_root list
|
||||
*/
|
||||
assoc_root.next->prev = assoc_tmp.prev;
|
||||
assoc_tmp.prev->next = assoc_root.next;
|
||||
assoc_root.next = assoc_tmp.next;
|
||||
assoc_root.next->prev = &assoc_root;
|
||||
/*
|
||||
* assoc_tmp temporary list has been copied in
|
||||
* assoc_root list. assoc_tmp is now alone and so
|
||||
* double linked to itself
|
||||
*/
|
||||
assoc_tmp.prev = &assoc_tmp;
|
||||
assoc_tmp.next = &assoc_tmp;
|
||||
pthread_mutex_unlock(&assoc_lock);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <stdint.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <limits.h>
|
||||
#include <sys/stat.h>
|
||||
#include <string.h>
|
||||
|
||||
/* Non standards includes */
|
||||
#include <papihighlevel.h>
|
||||
#include <common_comm.h>
|
||||
#include <specific_comm.h>
|
||||
|
||||
static long nb_cache_lines = 0;
|
||||
static long nb_prod = 0;
|
||||
static long size_buf = 1;
|
||||
static char *calculation_lib = NULL;
|
||||
|
||||
void usage(char *argv[])
|
||||
{
|
||||
char format[] = "-n [options]";
|
||||
char options[] = "Required options :\n"
|
||||
"-n nb_cache_lines\tNumber of cache lines to send to another core\n"
|
||||
"-p nb_producers\tNumber of producers which send data to another core\n"
|
||||
"Facultative options :\n"
|
||||
"-h\t\t\tPrint this help\n"
|
||||
"-s\t\t\tShare the same L2 cache or not\n"
|
||||
"-c calculation_lib\tLibrary to use for calculation\n"
|
||||
"\t\t\tThis library must implement functions in calc.h\n";
|
||||
printf("Usage : %s %s\n", argv[0], format);
|
||||
printf("Options :\n");
|
||||
printf("%s\n", options);
|
||||
}
|
||||
|
||||
int analyse_options(int argc, char *argv[])
|
||||
{
|
||||
int opt;
|
||||
|
||||
opterr = 0;
|
||||
while ((opt = getopt(argc, argv, ":hsc:n:p:b:")) != -1)
|
||||
{
|
||||
switch (opt)
|
||||
{
|
||||
case 'b' :
|
||||
{
|
||||
char *inval;
|
||||
size_buf = strtol(optarg, &inval, 10);
|
||||
if ((*optarg == '\0') || (*inval != '\0'))
|
||||
{
|
||||
fprintf(stderr, "Option '-b' needs an integer argument\n");
|
||||
return -1;
|
||||
}
|
||||
if ((nb_cache_lines <= 0) || ((nb_cache_lines == LONG_MAX) && errno == ERANGE))
|
||||
{
|
||||
fprintf(stderr, "Number of cache lines for each buffer must be between 1 and %ld, both inclusive\n", LONG_MAX);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 'c' :
|
||||
calculation_lib = optarg;
|
||||
{
|
||||
struct stat file_stat;
|
||||
if (stat(calculation_lib, &file_stat))
|
||||
{
|
||||
printf("%s: %s\n", optarg, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 'h' :
|
||||
usage(argv);
|
||||
exit(EXIT_SUCCESS);
|
||||
case 'n' :
|
||||
{
|
||||
char *inval;
|
||||
nb_cache_lines = strtol(optarg, &inval, 10);
|
||||
if ((*optarg == '\0') || (*inval != '\0'))
|
||||
{
|
||||
fprintf(stderr, "Option '-n' needs an integer argument\n");
|
||||
return -1;
|
||||
}
|
||||
if ((nb_cache_lines <= 0) || ((nb_cache_lines == LONG_MAX) && errno == ERANGE))
|
||||
{
|
||||
fprintf(stderr, "Number of cache lines to be sent must be between 1 and %ld, both inclusive\n", LONG_MAX);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 'p' :
|
||||
{
|
||||
char *inval;
|
||||
nb_prod = strtol(optarg, &inval, 10);
|
||||
if ((*optarg == '\0') || (*inval != '\0'))
|
||||
{
|
||||
fprintf(stderr, "Option '-p' needs an integer argument\n");
|
||||
return -1;
|
||||
}
|
||||
if ((nb_cache_lines <= 0) || ((nb_cache_lines == LONG_MAX) && errno == ERANGE))
|
||||
{
|
||||
fprintf(stderr, "Number of producers must be between 1 and %ld, both inclusive\n", LONG_MAX);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 's' :
|
||||
/* TODO: shared L2 cache */
|
||||
break;
|
||||
case '?' :
|
||||
fprintf(stderr, "Option inconnue\n");
|
||||
return -1;
|
||||
case ':' :
|
||||
fprintf(stderr, "Option %s needs an argument\n", argv[optind]);
|
||||
return -1;
|
||||
default :
|
||||
fprintf(stderr, "Error while analysing command line options\n");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
if (!nb_cache_lines)
|
||||
{
|
||||
fprintf(stderr, "You must give the number of cache lines to be sent\n");
|
||||
return -1;
|
||||
}
|
||||
if (!nb_prod)
|
||||
{
|
||||
fprintf(stderr, "You must give the number of producers\n");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void *producer(void *unused)
|
||||
{
|
||||
int i, j, k;
|
||||
|
||||
printf("Registering: %p !\n", (void*) pthread_self());
|
||||
add_sender();
|
||||
k = (uintptr_t) pthread_self();
|
||||
if (initialize_papi() != -1)
|
||||
{
|
||||
for(i = 0; i < nb_cache_lines; i++) {
|
||||
//printf("[%p] Send a new CACHE_LINE\n", (void *) pthread_self());
|
||||
for(j = 0; j < (CACHE_LINE_SIZE / sizeof(uintptr_t)); j++)
|
||||
send(k++);
|
||||
}
|
||||
print_results();
|
||||
}
|
||||
printf("[%p] Producer finished !\n", (void*) pthread_self());
|
||||
remove_sender();
|
||||
/* Threads must wait the consumer because of thread-local storages */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void onMessage(uintptr_t val)
|
||||
{
|
||||
//printf("Receive value: %p\n", (void *) val);
|
||||
}
|
||||
|
||||
void *receptor(void *a)
|
||||
{
|
||||
reception(onMessage);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
int i;
|
||||
volatile int *cont;
|
||||
void *return_value;
|
||||
pthread_t *tid;
|
||||
|
||||
if (analyse_options(argc, argv))
|
||||
return EXIT_FAILURE;
|
||||
tid = (pthread_t *) malloc((nb_prod + 1) * sizeof(pthread_t));
|
||||
cont = init_comm();
|
||||
for(i = 0; i < nb_prod; i++)
|
||||
pthread_create(&tid[i], NULL, producer, NULL);
|
||||
pthread_create(&tid[i], NULL, receptor, NULL);
|
||||
for(i = 0; i < nb_prod; i++)
|
||||
pthread_join(tid[i], &return_value);
|
||||
*cont = 0;
|
||||
pthread_join(tid[i], &return_value);
|
||||
free(tid);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/* Non standard include */
|
||||
#include <common_comm.h>
|
||||
#include <specific_comm.h>
|
||||
|
||||
|
||||
__thread int pipefd[2];
|
||||
|
||||
struct communication_assoc *create_comm_assoc(void)
|
||||
{
|
||||
struct communication_assoc *assoc;
|
||||
|
||||
pipe(pipefd);
|
||||
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
|
||||
assoc->tid = pthread_self();
|
||||
assoc->pipefd = pipefd;
|
||||
return assoc;
|
||||
}
|
||||
|
||||
void reception(void (*on_receive)(uintptr_t))
|
||||
{
|
||||
wait_initialization();
|
||||
/* printf("Activate the consumer...\n"); */
|
||||
while(cont)
|
||||
{
|
||||
struct communication_assoc *cur;
|
||||
|
||||
discover_new_producers();
|
||||
cur = assoc_root.next;
|
||||
while(cur != &assoc_root)
|
||||
{
|
||||
int i;
|
||||
|
||||
for(i = 0; i < BUF_SIZE / sizeof(uintptr_t); i++)
|
||||
{
|
||||
uintptr_t tmp;
|
||||
read(cur->pipefd[READ_IDX], &tmp, sizeof(uintptr_t));
|
||||
on_receive(tmp);
|
||||
}
|
||||
cur = cur->next;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
|
||||
/* Non standard include */
|
||||
#include <common_comm.h>
|
||||
#include <specific_comm.h>
|
||||
|
||||
|
||||
__thread uintptr_t *shared_space;
|
||||
__thread int cons_idx = 0;
|
||||
__thread int prod_idx = 0;
|
||||
|
||||
struct communication_assoc *create_comm_assoc(void)
|
||||
{
|
||||
struct communication_assoc *assoc;
|
||||
|
||||
shared_space = (uintptr_t *) malloc(SHARED_SPACE_SIZE * sizeof(uintptr_t));
|
||||
assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc));
|
||||
assoc->tid = pthread_self();
|
||||
assoc->shared_space = shared_space;
|
||||
assoc->cons_idx = &cons_idx;
|
||||
assoc->prod_idx = &prod_idx;
|
||||
return assoc;
|
||||
}
|
||||
|
||||
void reception(void (*on_receive)(uintptr_t))
|
||||
{
|
||||
wait_initialization();
|
||||
/* printf("Activate the consumer...\n"); */
|
||||
while(cont)
|
||||
{
|
||||
struct communication_assoc *cur;
|
||||
|
||||
discover_new_producers();
|
||||
cur = assoc_root.next;
|
||||
while(cur != &assoc_root)
|
||||
{
|
||||
int cons_idx, prod_idx;
|
||||
|
||||
cons_idx = *cur->cons_idx;
|
||||
do
|
||||
{
|
||||
prod_idx = *cur->prod_idx;
|
||||
for(; cons_idx != prod_idx; cons_idx = (cons_idx + 1) % SHARED_SPACE_SIZE)
|
||||
{
|
||||
uintptr_t tmp;
|
||||
tmp = cur->shared_space[cons_idx];
|
||||
on_receive(tmp);
|
||||
}
|
||||
} while (prod_idx != *cur->prod_idx);
|
||||
*cur->cons_idx = cons_idx;
|
||||
cur = cur->next;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue