summaryrefslogtreecommitdiffstats
path: root/freebsd/sys/sys/buf_ring.h
diff options
context:
space:
mode:
Diffstat (limited to 'freebsd/sys/sys/buf_ring.h')
-rw-r--r--freebsd/sys/sys/buf_ring.h127
1 files changed, 82 insertions, 45 deletions
diff --git a/freebsd/sys/sys/buf_ring.h b/freebsd/sys/sys/buf_ring.h
index ee7a48ce..4fa72824 100644
--- a/freebsd/sys/sys/buf_ring.h
+++ b/freebsd/sys/sys/buf_ring.h
@@ -47,25 +47,14 @@ struct buf_ring {
int br_prod_size;
int br_prod_mask;
uint64_t br_drops;
- uint64_t br_prod_bufs;
- /*
- * Pad out to next L2 cache line
- */
- uint64_t _pad0[11];
-
- volatile uint32_t br_cons_head;
+ volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE);
volatile uint32_t br_cons_tail;
int br_cons_size;
int br_cons_mask;
-
- /*
- * Pad out to next L2 cache line
- */
- uint64_t _pad1[14];
#ifdef DEBUG_BUFRING
struct mtx *br_lock;
#endif
- void *br_ring[0];
+ void *br_ring[0] __aligned(CACHE_LINE_SIZE);
};
/*
@@ -75,9 +64,7 @@ struct buf_ring {
static __inline int
buf_ring_enqueue(struct buf_ring *br, void *buf)
{
- uint32_t prod_head, prod_next;
- uint32_t cons_tail;
- int success;
+ uint32_t prod_head, prod_next, cons_tail;
#ifdef DEBUG_BUFRING
int i;
for (i = br->br_cons_head; i != br->br_prod_head;
@@ -89,35 +76,34 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
critical_enter();
do {
prod_head = br->br_prod_head;
+ prod_next = (prod_head + 1) & br->br_prod_mask;
cons_tail = br->br_cons_tail;
- prod_next = (prod_head + 1) & br->br_prod_mask;
-
if (prod_next == cons_tail) {
- br->br_drops++;
- critical_exit();
- return (ENOBUFS);
+ rmb();
+ if (prod_head == br->br_prod_head &&
+ cons_tail == br->br_cons_tail) {
+ br->br_drops++;
+ critical_exit();
+ return (ENOBUFS);
+ }
+ continue;
}
-
- success = atomic_cmpset_int((volatile int *)&br->br_prod_head, prod_head,
- prod_next);
- } while (success == 0);
+ } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
#endif
br->br_ring[prod_head] = buf;
- wmb();
/*
* If there are other enqueues in progress
- * that preceeded us, we need to wait for them
+ * that preceded us, we need to wait for them
* to complete
*/
while (br->br_prod_tail != prod_head)
cpu_spinwait();
- br->br_prod_bufs++;
- br->br_prod_tail = prod_next;
+ atomic_store_rel_int(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -130,41 +116,32 @@ static __inline void *
buf_ring_dequeue_mc(struct buf_ring *br)
{
uint32_t cons_head, cons_next;
- uint32_t prod_tail;
void *buf;
- int success;
critical_enter();
do {
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
-
cons_next = (cons_head + 1) & br->br_cons_mask;
-
- if (cons_head == prod_tail) {
+
+ if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
-
- success = atomic_cmpset_int((volatile int *)&br->br_cons_head, cons_head,
- cons_next);
- } while (success == 0);
+ } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
- rmb();
-
/*
* If there are other dequeues in progress
- * that preceeded us, we need to wait for them
+ * that preceded us, we need to wait for them
* to complete
*/
while (br->br_cons_tail != cons_head)
cpu_spinwait();
- br->br_cons_tail = cons_next;
+ atomic_store_rel_int(&br->br_cons_tail, cons_next);
critical_exit();
return (buf);
@@ -184,9 +161,38 @@ buf_ring_dequeue_sc(struct buf_ring *br)
#endif
uint32_t prod_tail;
void *buf;
-
+
+ /*
+ * This is a workaround to allow using buf_ring on ARM and ARM64.
+ * ARM64TODO: Fix buf_ring in a generic way.
+ * REMARKS: It is suspected that br_cons_head does not require
+ * load_acq operation, but this change was extensively tested
+ * and confirmed it's working. To be reviewed once again in
+ * FreeBSD-12.
+ *
+ * Preventing following situation:
+
+ * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc()
+ * ----------------------------------------- ----------------------------------------------
+ *
+ * cons_head = br->br_cons_head;
+ * atomic_cmpset_acq_32(&br->br_prod_head, ...));
+ * buf = br->br_ring[cons_head]; <see <1>>
+ * br->br_ring[prod_head] = buf;
+ * atomic_store_rel_32(&br->br_prod_tail, ...);
+ * prod_tail = br->br_prod_tail;
+ * if (cons_head == prod_tail)
+ * return (NULL);
+ * <condition is false and code uses invalid(old) buf>`
+ *
+ * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU.
+ */
+#if defined(__arm__) || defined(__aarch64__)
+ cons_head = atomic_load_acq_32(&br->br_cons_head);
+#else
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
+#endif
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
@@ -291,6 +297,37 @@ buf_ring_peek(struct buf_ring *br)
return (br->br_ring[br->br_cons_head]);
}
+static __inline void *
+buf_ring_peek_clear_sc(struct buf_ring *br)
+{
+#ifdef DEBUG_BUFRING
+ void *ret;
+
+ if (!mtx_owned(br->br_lock))
+ panic("lock not held on single consumer dequeue");
+#endif
+ /*
+ * I believe it is safe to not have a memory barrier
+ * here because we control cons and tail is worst case
+ * a lagging indicator so we worst case we might
+ * return NULL immediately after a buffer has been enqueued
+ */
+ if (br->br_cons_head == br->br_prod_tail)
+ return (NULL);
+
+#ifdef DEBUG_BUFRING
+ /*
+ * Single consumer, i.e. cons_head will not move while we are
+ * running, so atomic_swap_ptr() is not necessary here.
+ */
+ ret = br->br_ring[br->br_cons_head];
+ br->br_ring[br->br_cons_head] = NULL;
+ return (ret);
+#else
+ return (br->br_ring[br->br_cons_head]);
+#endif
+}
+
static __inline int
buf_ring_full(struct buf_ring *br)
{