#include "clog.h" #include #include #include #define LOGMETA_BUFFER 4096 #define LOGDATA_BUFFER (64 * 1024) // [..........................] // ^ // | // ptr % LOGDATA_BUFFER // N producer: // add ptr, size // copy : input -> buffer + (ptr - size) % LOGDATA_BUFFER // consumer: // copy : buffer + offset % LOGDATA_BUFFER -> output // check valid ( offset + LOGDATA_BUFFER >= ptr ) struct log_data_ringbuffer { atomic_size_t ptr; char buffer[LOGDATA_BUFFER]; }; // N producer: // push data into log_data_ringbuffer // index <- atomic_fetch_add tail, 1 // offset[index] = data offset // size[index] = size (last) // 1 consumer: // if head == tail then return empty // if size[head] < 0 then return empty // index = head ++ // offset[index] -> output // size[index] = -1 (invalid size) struct log_meta_ringbuffer { atomic_size_t tail; size_t head; size_t offset[LOGMETA_BUFFER]; atomic_int size[LOGMETA_BUFFER]; }; struct log_buffer { struct log_meta_ringbuffer meta; struct log_data_ringbuffer data; }; struct log_buffer * log_new() { struct log_buffer * obj = (struct log_buffer *)malloc(sizeof(*obj)); obj->meta.head = 0; atomic_init(&obj->meta.tail, 0); atomic_init(&obj->data.ptr, 0); return obj; } void log_delete(struct log_buffer *obj) { free(obj); } static size_t push_data(struct log_data_ringbuffer *buffer, int n, const char *data) { size_t ptr = atomic_fetch_add(&buffer->ptr, n); int offset = ptr % LOGDATA_BUFFER; if (offset + n <= LOGDATA_BUFFER) { memcpy(buffer->buffer + offset, data, n); } else { int second_part = offset + n - LOGDATA_BUFFER; int first_part = n - second_part; memcpy(buffer->buffer + offset, data, first_part); memcpy(buffer->buffer, data+first_part, second_part); } return ptr; } void log_push(struct log_buffer *self, int n, const char *data) { size_t offset = push_data(&self->data, n, data); struct log_meta_ringbuffer *meta_buffer = &self->meta; size_t index = atomic_fetch_add(&meta_buffer->tail, 1); index %= LOGMETA_BUFFER; meta_buffer->offset[index] = offset; atomic_thread_fence(memory_order_release); atomic_store_explicit(&meta_buffer->size[index] , n, memory_order_relaxed); } // >0 : size , 0 empty ; -1 : drop static int get_index(struct log_buffer *self, size_t *offset) { struct log_meta_ringbuffer *meta = &self->meta; size_t t = atomic_load_explicit(&meta->tail, memory_order_relaxed); if (t == meta->head) return 0; if (meta->head + LOGMETA_BUFFER < t) { ++meta->head; return -1; } int index = meta->head % LOGMETA_BUFFER; int size = atomic_load_explicit(&meta->size[index], memory_order_relaxed); if (size < 0) return 0; atomic_thread_fence(memory_order_acquire); meta->head ++; *offset = meta->offset[index]; atomic_thread_fence(memory_order_release); atomic_store_explicit(&meta->size[index], -1, memory_order_relaxed); return size; } // >0 : length ; 0 empty ; -1 : drop int log_pop(struct log_buffer *self, int n, char *output) { size_t queue_offset; int size = get_index(self, &queue_offset); if (size <= 0) return size; struct log_data_ringbuffer *buffer = &self->data; if (size > n) size = n; int offset = queue_offset % LOGDATA_BUFFER; if (offset + size <= LOGDATA_BUFFER) { memcpy(output, buffer->buffer + offset, size); } else { int second_part = offset + size - LOGDATA_BUFFER; int first_part = size - second_part; memcpy(output, buffer->buffer + offset, first_part); memcpy(output + first_part, buffer->buffer, second_part); } size_t ptr = atomic_load(&buffer->ptr); if (queue_offset + LOGDATA_BUFFER >= ptr) return size; return -1; }