rt_benchs/communication_techniques/src/communication/pipe.c

61 lines
1.2 KiB
C

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdint.h>
/* Non standard include */
#include <commtech.h>
#include <private_common.h>
#include <specific_comm.h>
__thread int pipefd[2];
int init_thread_comm(struct thread_comm *comm)
{
int flags;
pipe(pipefd);
flags = fcntl(pipefd[READ_IDX], F_GETFL);
fcntl(pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK);
comm->pipefd = pipefd;
return 0;
}
int end_thread_comm(void)
{
return 0;
}
void reception(void (*on_receive)(void *))
{
wait_initialization(); /* Not needed but here for equity with others techniques */
/* printf("Activate the consumer...\n"); */
while(cont)
{
int i;
for (i = 0; i < nb_prod; i++)
{
int nb_read;
for(nb_read = 0; nb_read < BUF_SIZE / sizeof(void *); nb_read++)
{
int j, n;
void *tmp_buf[BUF_SIZE / sizeof(void *)];
j = nb_read / sizeof(void *);
n = read(tcomms[i].pipefd[READ_IDX], (void *) ((uintptr_t) tmp_buf + nb_read), BUF_SIZE - nb_read);
if (n > 0)
{
nb_read += n;
for (; j + sizeof(void *) <= nb_read / sizeof(void *); j += sizeof(void *))
on_receive(tmp_buf[j]);
}
}
}
}
}