diff options
Diffstat (limited to 'freebsd/sys/sys/buf_ring.h')
-rw-r--r-- | freebsd/sys/sys/buf_ring.h | 127 |
1 files changed, 82 insertions, 45 deletions
diff --git a/freebsd/sys/sys/buf_ring.h b/freebsd/sys/sys/buf_ring.h index ee7a48ce..4fa72824 100644 --- a/freebsd/sys/sys/buf_ring.h +++ b/freebsd/sys/sys/buf_ring.h @@ -47,25 +47,14 @@ struct buf_ring { int br_prod_size; int br_prod_mask; uint64_t br_drops; - uint64_t br_prod_bufs; - /* - * Pad out to next L2 cache line - */ - uint64_t _pad0[11]; - - volatile uint32_t br_cons_head; + volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE); volatile uint32_t br_cons_tail; int br_cons_size; int br_cons_mask; - - /* - * Pad out to next L2 cache line - */ - uint64_t _pad1[14]; #ifdef DEBUG_BUFRING struct mtx *br_lock; #endif - void *br_ring[0]; + void *br_ring[0] __aligned(CACHE_LINE_SIZE); }; /* @@ -75,9 +64,7 @@ struct buf_ring { static __inline int buf_ring_enqueue(struct buf_ring *br, void *buf) { - uint32_t prod_head, prod_next; - uint32_t cons_tail; - int success; + uint32_t prod_head, prod_next, cons_tail; #ifdef DEBUG_BUFRING int i; for (i = br->br_cons_head; i != br->br_prod_head; @@ -89,35 +76,34 @@ buf_ring_enqueue(struct buf_ring *br, void *buf) critical_enter(); do { prod_head = br->br_prod_head; + prod_next = (prod_head + 1) & br->br_prod_mask; cons_tail = br->br_cons_tail; - prod_next = (prod_head + 1) & br->br_prod_mask; - if (prod_next == cons_tail) { - br->br_drops++; - critical_exit(); - return (ENOBUFS); + rmb(); + if (prod_head == br->br_prod_head && + cons_tail == br->br_cons_tail) { + br->br_drops++; + critical_exit(); + return (ENOBUFS); + } + continue; } - - success = atomic_cmpset_int((volatile int *)&br->br_prod_head, prod_head, - prod_next); - } while (success == 0); + } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); #ifdef DEBUG_BUFRING if (br->br_ring[prod_head] != NULL) panic("dangling value in enqueue"); #endif br->br_ring[prod_head] = buf; - wmb(); /* * If there are other enqueues in progress - * that preceeded us, we need to wait for them + * that preceded us, we need to wait for them * to complete */ while (br->br_prod_tail != prod_head) cpu_spinwait(); - br->br_prod_bufs++; - br->br_prod_tail = prod_next; + atomic_store_rel_int(&br->br_prod_tail, prod_next); critical_exit(); return (0); } @@ -130,41 +116,32 @@ static __inline void * buf_ring_dequeue_mc(struct buf_ring *br) { uint32_t cons_head, cons_next; - uint32_t prod_tail; void *buf; - int success; critical_enter(); do { cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; - cons_next = (cons_head + 1) & br->br_cons_mask; - - if (cons_head == prod_tail) { + + if (cons_head == br->br_prod_tail) { critical_exit(); return (NULL); } - - success = atomic_cmpset_int((volatile int *)&br->br_cons_head, cons_head, - cons_next); - } while (success == 0); + } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); buf = br->br_ring[cons_head]; #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; #endif - rmb(); - /* * If there are other dequeues in progress - * that preceeded us, we need to wait for them + * that preceded us, we need to wait for them * to complete */ while (br->br_cons_tail != cons_head) cpu_spinwait(); - br->br_cons_tail = cons_next; + atomic_store_rel_int(&br->br_cons_tail, cons_next); critical_exit(); return (buf); @@ -184,9 +161,38 @@ buf_ring_dequeue_sc(struct buf_ring *br) #endif uint32_t prod_tail; void *buf; - + + /* + * This is a workaround to allow using buf_ring on ARM and ARM64. + * ARM64TODO: Fix buf_ring in a generic way. + * REMARKS: It is suspected that br_cons_head does not require + * load_acq operation, but this change was extensively tested + * and confirmed it's working. To be reviewed once again in + * FreeBSD-12. + * + * Preventing following situation: + + * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc() + * ----------------------------------------- ---------------------------------------------- + * + * cons_head = br->br_cons_head; + * atomic_cmpset_acq_32(&br->br_prod_head, ...)); + * buf = br->br_ring[cons_head]; <see <1>> + * br->br_ring[prod_head] = buf; + * atomic_store_rel_32(&br->br_prod_tail, ...); + * prod_tail = br->br_prod_tail; + * if (cons_head == prod_tail) + * return (NULL); + * <condition is false and code uses invalid(old) buf>` + * + * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU. + */ +#if defined(__arm__) || defined(__aarch64__) + cons_head = atomic_load_acq_32(&br->br_cons_head); +#else cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; +#endif + prod_tail = atomic_load_acq_32(&br->br_prod_tail); cons_next = (cons_head + 1) & br->br_cons_mask; #ifdef PREFETCH_DEFINED @@ -291,6 +297,37 @@ buf_ring_peek(struct buf_ring *br) return (br->br_ring[br->br_cons_head]); } +static __inline void * +buf_ring_peek_clear_sc(struct buf_ring *br) +{ +#ifdef DEBUG_BUFRING + void *ret; + + if (!mtx_owned(br->br_lock)) + panic("lock not held on single consumer dequeue"); +#endif + /* + * I believe it is safe to not have a memory barrier + * here because we control cons and tail is worst case + * a lagging indicator so we worst case we might + * return NULL immediately after a buffer has been enqueued + */ + if (br->br_cons_head == br->br_prod_tail) + return (NULL); + +#ifdef DEBUG_BUFRING + /* + * Single consumer, i.e. cons_head will not move while we are + * running, so atomic_swap_ptr() is not necessary here. + */ + ret = br->br_ring[br->br_cons_head]; + br->br_ring[br->br_cons_head] = NULL; + return (ret); +#else + return (br->br_ring[br->br_cons_head]); +#endif +} + static __inline int buf_ring_full(struct buf_ring *br) { |