#include #include #include /* Non standard include */ #include #include __thread struct communication_channel channel; struct communication_assoc *create_comm_assoc(void) { struct communication_assoc *assoc; assoc = (struct communication_assoc *) malloc(sizeof(struct communication_assoc)); assoc->tid = pthread_self(); assoc->receiver_idx = 0; assoc->channel = &channel; return assoc; } char *dstr="buffer transition\n"; void _swap_buffer() { asm volatile(".globl swap_buffer\n\t" "swap_buffer:\n\t" "1:\n\t" "testl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" "jnz 1b\n\t" "movl $1, %%gs:channel@NTPOFF + 2 *" toString(BUF_SIZE) "\n\t" "jmp *%%eax\n\t" : : "m"(dstr)); } void reception(void (*on_receive)(void *)) { wait_initialization(); /* printf("Activate the consumer...\n"); */ while(cont) { struct communication_assoc *cur; discover_new_producers(); cur = assoc_root.next; while(cur != &assoc_root) { struct communication_channel *channel = cur->channel; if(channel->state) { /* * cur->receiver_idx point to the last cache * line we have read. We go to the next cache * line "+ (CACHE_LINE_SIZE >> 2)" (because * the line is full of integer (2^2 octets) * and then if we are after the second cache * line we correct the pointer to point to * the first one (this is done by the modulo) */ int i = cur->receiver_idx; int n = cur->receiver_idx + (BUF_SIZE / sizeof(void *)); cur->receiver_idx = n % ((2 * BUF_SIZE) / sizeof(void *)); for(; ibuf[i]); channel->state = 0; } cur = cur->next; } } }