#include #include #include #include #include #include /* Non standard include */ #include #include #include __thread int pipefd[2]; int init_thread_comm(struct thread_comm *comm) { int flags; if (pipe(pipefd)) { fprintf(stderr, "Unable to create a pipe for pipe communication\n"); return -1; } flags = fcntl(pipefd[READ_IDX], F_GETFL); fcntl(pipefd[READ_IDX], F_SETFL, flags | O_NONBLOCK); comm->pipefd = pipefd; return 0; } int end_thread_comm(void) { return 0; } void reception(void (*on_receive)(void *)) { wait_initialization(); /* Not needed but here for equity with others techniques */ /* printf("Activate the consumer...\n"); */ while(cont) { int i; for (i = 0; i < nb_prod; i++) { int nb_read; for(nb_read = 0; nb_read < BUF_SIZE / sizeof(void *); nb_read++) { int j, n; void *tmp_buf[BUF_SIZE / sizeof(void *)]; j = nb_read / sizeof(void *); n = read(tcomms[i].pipefd[READ_IDX], (void *) ((uintptr_t) tmp_buf + nb_read), BUF_SIZE - nb_read); if (n > 0) { nb_read += n; for (; j + sizeof(void *) <= nb_read / sizeof(void *); j += sizeof(void *)) on_receive(tmp_buf[j]); } } } } }