#include #include #include #include #include /* Non standard include */ #include #include void *create_comm_channel(void) { struct channel *channel; if (!posix_memalign((void *) &channel, CACHE_LINE_SIZE, sizeof(struct channel))) { int i; for (i = 0; i < SLOTS; i++) channel->queue[i].flag = 0; channel->head = 0; channel->tail = 0; return channel; } return NULL; } int destroy_comm_channel(void *channel) { free(channel); return 0; } void *recv_one_data(struct channel *channel) { static __thread int i; void *result; if (__builtin_expect(!i, 0)) while (!channel->queue[channel->head].flag); result = channel->queue[channel->head].chunk[i++]; if (i % SUB_SLOTS) { i = 0; channel->queue[channel->head].flag = 0; channel->head = (channel->head + 1) % SLOTS; } return result; } ssize_t recv_some_data(struct channel *channel, void **buf, size_t count) { int n; n = 0; // If all slots are empty, spin while (channel->queue[channel->head].flag) { // Dequeue a chunk of data items memcpy(buf, (const void *) channel->queue[channel->head].chunk, SUB_SLOTS * sizeof(*buf)); n += SUB_SLOTS; channel->queue[channel->head].flag = 0; channel->head = (channel->head + 1) % SLOTS; if (n == count) break; } return n; }