From dfdc3faa91a415fc0f082cb5737d75eb38817556 Mon Sep 17 00:00:00 2001 From: Sebastian Huber Date: Sun, 25 Aug 2019 12:14:11 +0200 Subject: record: Improve overflow handling Signal the accumulated item overflow count with the time of the first new item. --- cpukit/include/rtems/recordclient.h | 5 ++ cpukit/libtrace/record/record-client.c | 83 ++++++++++++++++++++++++---------- 2 files changed, 63 insertions(+), 25 deletions(-) diff --git a/cpukit/include/rtems/recordclient.h b/cpukit/include/rtems/recordclient.h index 2509ba5cc8..7f21d90a0e 100644 --- a/cpukit/include/rtems/recordclient.h +++ b/cpukit/include/rtems/recordclient.h @@ -105,6 +105,11 @@ typedef struct { */ size_t tail_head_index; + /** + * @brief Count of lost items due to ring buffer overflows. + */ + uint32_t overflow; + /** * @brief If true, then hold back items for overflow or initial ramp up * processing. diff --git a/cpukit/libtrace/record/record-client.c b/cpukit/libtrace/record/record-client.c index cfe05fd43b..c26893ec4c 100644 --- a/cpukit/libtrace/record/record-client.c +++ b/cpukit/libtrace/record/record-client.c @@ -98,14 +98,13 @@ static rtems_record_client_status call_handler( } static void signal_overflow( - const rtems_record_client_context *ctx, - rtems_record_client_per_cpu *per_cpu, - uint32_t data + rtems_record_client_per_cpu *per_cpu, + uint32_t data ) { per_cpu->hold_back = true; per_cpu->item_index = 0; - call_handler( ctx, 0, RTEMS_RECORD_PER_CPU_OVERFLOW, data ); + per_cpu->overflow += data; } static void resolve_hold_back( @@ -114,35 +113,44 @@ static void resolve_hold_back( ) { if ( per_cpu->hold_back ) { + uint32_t last_tail; uint32_t last_head; + uint32_t last_capacity; uint32_t new_head; - uint32_t overwritten; + uint32_t new_content; + uint32_t begin_index; uint32_t index; uint32_t first; uint32_t last; uint32_t delta; uint64_t uptime; - per_cpu->hold_back = false; - last_head = per_cpu->head[ per_cpu->tail_head_index ]; + last_tail = per_cpu->tail[ per_cpu->tail_head_index ]; new_head = per_cpu->head[ per_cpu->tail_head_index ^ 1 ]; - overwritten = new_head - last_head; + last_capacity = ( last_tail - last_head - 1 ) & ( ctx->count - 1 ); + new_content = new_head - last_head; - if ( overwritten >= per_cpu->item_index ) { - return; + if ( new_content > last_capacity ) { + begin_index = new_content - last_capacity; + per_cpu->overflow += begin_index; + } else { + begin_index = 0; } - if ( overwritten > 0 ) { - call_handler( ctx, 0, RTEMS_RECORD_PER_CPU_OVERFLOW, overwritten ); + if ( begin_index >= per_cpu->item_index ) { + per_cpu->item_index = 0; + return; } - first = RTEMS_RECORD_GET_TIME( per_cpu->items[ overwritten ].event ); + per_cpu->hold_back = false; + + first = RTEMS_RECORD_GET_TIME( per_cpu->items[ begin_index ].event ); last = first; delta = 0; uptime = 0; - for ( index = overwritten; index < per_cpu->item_index; ++index ) { + for ( index = begin_index; index < per_cpu->item_index; ++index ) { const rtems_record_item_64 *item; rtems_record_event event; uint32_t time; @@ -170,7 +178,17 @@ static void resolve_hold_back( per_cpu->uptime.time_last = first; per_cpu->uptime.time_accumulated = 0; - for ( index = overwritten; index < per_cpu->item_index; ++index ) { + if ( per_cpu->overflow > 0 ) { + call_handler( + ctx, + per_cpu->uptime.bt, + RTEMS_RECORD_PER_CPU_OVERFLOW, + per_cpu->overflow + ); + per_cpu->overflow = 0; + } + + for ( index = begin_index; index < per_cpu->item_index; ++index ) { const rtems_record_item_64 *item; item = &per_cpu->items[ index ]; @@ -185,20 +203,31 @@ static void process_per_cpu_head( uint64_t data ) { - uint32_t new_head; uint32_t last_tail; uint32_t last_head; - uint32_t capacity; + uint32_t last_capacity; + uint32_t new_tail; + uint32_t new_head; uint32_t new_content; - uint32_t produced; + uint32_t content; + new_tail = per_cpu->tail[ per_cpu->tail_head_index ]; new_head = (uint32_t) data; - produced = new_head - per_cpu->tail[ per_cpu->tail_head_index ]; - per_cpu->head[ per_cpu->tail_head_index ]= new_head; + content = new_head - new_tail; + + per_cpu->head[ per_cpu->tail_head_index ] = new_head; per_cpu->tail_head_index ^= 1; - if ( produced >= ctx->count ) { - signal_overflow( ctx, per_cpu, produced - ctx->count + 1 ); + if ( content >= ctx->count ) { + /* + * This is a complete ring buffer overflow, the server will detect this + * also. It sets the tail to the head plus one and sends us all the + * content. This reduces the ring buffer capacity to zero. So, during + * transfer, new events will overwrite items in transfer. This is handled + * by resolve_hold_back(). + */ + per_cpu->tail[ per_cpu->tail_head_index ^ 1 ] = new_head + 1; + signal_overflow( per_cpu, content - ctx->count + 1 ); return; } @@ -207,6 +236,10 @@ static void process_per_cpu_head( if ( last_tail == last_head ) { if ( per_cpu->uptime.bt == 0 ) { + /* + * This is a special case during initial ramp up. We hold back the items + * to deduce the uptime of the first item via resolve_hold_back(). + */ per_cpu->hold_back = true; } else { resolve_hold_back( ctx, per_cpu ); @@ -215,15 +248,15 @@ static void process_per_cpu_head( return; } - capacity = ( last_tail - last_head - 1 ) & ( ctx->count - 1 ); + last_capacity = ( last_tail - last_head - 1 ) & ( ctx->count - 1 ); new_content = new_head - last_head; - if ( new_content <= capacity ) { + if ( new_content <= last_capacity || per_cpu->hold_back ) { resolve_hold_back( ctx, per_cpu ); return; } - signal_overflow( ctx, per_cpu, new_content - capacity ); + signal_overflow( per_cpu, new_content - last_capacity ); } static rtems_record_client_status process_per_cpu_count( -- cgit v1.2.3