Skip to content

Instantly share code, notes, and snippets.

@sparrc
Last active November 7, 2025 22:17
Show Gist options
  • Select an option

  • Save sparrc/cbf7882c9b2479d062fdeb116719a3ab to your computer and use it in GitHub Desktop.

Select an option

Save sparrc/cbf7882c9b2479d062fdeb116719a3ab to your computer and use it in GitHub Desktop.
Fluentd socket log generator (requires `pip install msgpack` to run). Ex: `python ./main.py -s /var/run/fluent.sock --log-size 4096`
#!/usr/bin/env python3
"""
Fluent Bit Forward Protocol Log Generator
Generates high-volume logs in msgpack format and sends them to a Unix socket
using the Fluent Bit forward protocol.
Usage:
python fluent_log_generator.py [options]
"""
import argparse
import socket
import time
import msgpack
import random
import string
import sys
from datetime import datetime
class FluentLogGenerator:
def __init__(self, socket_path, tag="test.logs", target_throughput_mb=8,
batch_size=100, log_size=500):
"""
Initialize the log generator.
Args:
socket_path: Path to Unix socket
tag: Tag for log entries
target_throughput_mb: Target throughput in MB/s
batch_size: Number of log entries per batch
log_size: Approximate size of each log entry in bytes
"""
self.socket_path = socket_path
self.tag = tag
self.target_throughput = target_throughput_mb * 1024 * 1024 # Convert to bytes
self.batch_size = batch_size
self.log_size = log_size
self.sock = None
self.stats = {
'bytes_sent': 0,
'messages_sent': 0,
'batches_sent': 0,
'start_time': None,
'errors': 0
}
def connect(self):
"""Connect to the Unix socket."""
try:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.socket_path)
print(f"Connected to Unix socket: {self.socket_path}")
return True
except Exception as e:
print(f"Failed to connect to {self.socket_path}: {e}")
return False
def disconnect(self):
"""Close the socket connection."""
if self.sock:
self.sock.close()
self.sock = None
print("Disconnected from Unix socket")
def generate_log_entry(self):
"""
Generate a single log entry with realistic data.
Returns:
tuple: (timestamp, record_dict)
"""
timestamp = time.time()
# Generate varied log data to simulate real logs
record = {
'level': random.choice(['INFO', 'WARN', 'ERROR', 'DEBUG']),
'message': self._generate_message(),
'host': f'server-{random.randint(1, 100)}',
'service': random.choice(['api', 'web', 'worker', 'database']),
'request_id': ''.join(random.choices(string.ascii_letters + string.digits, k=32)),
'duration_ms': random.randint(1, 1000),
'status_code': random.choice([200, 201, 400, 404, 500]),
'user_id': random.randint(1000, 9999),
'ip_address': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}',
'bytes_sent': random.randint(100, 10000),
'success': random.choice([True, False]),
'metadata': {
'version': '1.0.0',
'environment': random.choice(['production', 'staging', 'development'])
}
}
# Pad with extra data to reach target log size
padding_needed = max(0, self.log_size - len(str(record)))
if padding_needed > 0:
record['padding'] = ''.join(random.choices(string.ascii_letters, k=padding_needed))
return timestamp, record
def _generate_message(self):
"""Generate a random log message."""
messages = [
'Request processed successfully',
'Database query executed',
'Cache hit for user data',
'Authentication successful',
'API endpoint called',
'File uploaded to storage',
'Email sent to user',
'Payment processed',
'User session started',
'Background job completed',
'Metrics collected and stored',
'Configuration reloaded',
'Health check passed',
'Service discovery updated',
'Rate limit enforced'
]
return random.choice(messages)
def create_forward_message(self, entries):
"""
Create a message in Fluent Bit forward protocol format.
Format: [tag, [[time, record], [time, record], ...]]
Args:
entries: List of (timestamp, record) tuples
Returns:
bytes: msgpack-encoded message
"""
# Build the array of [time, record] pairs
event_array = [[ts, rec] for ts, rec in entries]
# Create the forward protocol message: [tag, events]
message = [self.tag, event_array]
# Encode with msgpack
return msgpack.packb(message)
def send_batch(self):
"""Generate and send a batch of log entries."""
try:
# Generate batch of log entries
entries = [self.generate_log_entry() for _ in range(self.batch_size)]
# Create forward protocol message
message = self.create_forward_message(entries)
# Send the message
self.sock.sendall(message)
# Update statistics
self.stats['bytes_sent'] += len(message)
self.stats['messages_sent'] += len(entries)
self.stats['batches_sent'] += 1
return len(message)
except BrokenPipeError:
print("Connection broken - attempting to reconnect...")
self.stats['errors'] += 1
if self.connect():
return self.send_batch() # Retry
return 0
except Exception as e:
print(f"Error sending batch: {e}")
self.stats['errors'] += 1
return 0
def print_stats(self):
"""Print current statistics."""
if self.stats['start_time'] is None:
return
elapsed = time.time() - self.stats['start_time']
if elapsed == 0:
return
throughput_mb = (self.stats['bytes_sent'] / elapsed) / (1024 * 1024)
msg_per_sec = self.stats['messages_sent'] / elapsed
print(f"\rStats: {throughput_mb:.2f} MB/s | "
f"{msg_per_sec:.0f} msgs/s | "
f"{self.stats['batches_sent']} batches | "
f"{self.stats['bytes_sent'] / (1024*1024):.2f} MB total | "
f"Errors: {self.stats['errors']}", end='', flush=True)
def run(self, duration=None):
"""
Run the log generator.
Args:
duration: Optional duration in seconds (None for infinite)
"""
if not self.connect():
return
self.stats['start_time'] = time.time()
last_stat_time = time.time()
try:
while True:
batch_start = time.time()
# Send batch
bytes_sent = self.send_batch()
# Calculate sleep time to maintain target throughput
batch_time = time.time() - batch_start
target_batch_time = bytes_sent / self.target_throughput
sleep_time = max(0, target_batch_time - batch_time)
if sleep_time > 0:
time.sleep(sleep_time)
# Print stats every second
if time.time() - last_stat_time >= 1.0:
self.print_stats()
last_stat_time = time.time()
# Check duration
if duration and (time.time() - self.stats['start_time']) >= duration:
break
except KeyboardInterrupt:
print("\n\nShutting down gracefully...")
finally:
self.print_stats()
print("\n")
self.disconnect()
self.print_final_stats()
def print_final_stats(self):
"""Print final statistics summary."""
elapsed = time.time() - self.stats['start_time']
avg_throughput = (self.stats['bytes_sent'] / elapsed) / (1024 * 1024)
print("\n" + "="*60)
print("FINAL STATISTICS")
print("="*60)
print(f"Duration: {elapsed:.2f} seconds")
print(f"Total bytes sent: {self.stats['bytes_sent'] / (1024*1024):.2f} MB")
print(f"Total messages: {self.stats['messages_sent']:,}")
print(f"Total batches: {self.stats['batches_sent']:,}")
print(f"Average throughput: {avg_throughput:.2f} MB/s")
print(f"Messages per sec: {self.stats['messages_sent'] / elapsed:.0f}")
print(f"Errors: {self.stats['errors']}")
print("="*60)
def main():
parser = argparse.ArgumentParser(
description='Generate high-volume logs in Fluent Bit forward protocol format',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Send logs to /tmp/fluent.sock at 8 MB/s
python fluent_log_generator.py -s /tmp/fluent.sock
# Send logs at 10 MB/s with custom tag
python fluent_log_generator.py -s /tmp/fluent.sock -t app.logs -r 10
# Run for 60 seconds with larger batches
python fluent_log_generator.py -s /tmp/fluent.sock -d 60 -b 200
"""
)
parser.add_argument('-s', '--socket', required=True,
help='Path to Unix socket')
parser.add_argument('-t', '--tag', default='test.logs',
help='Tag for log entries (default: test.logs)')
parser.add_argument('-r', '--rate', type=float, default=8.0,
help='Target throughput in MB/s (default: 8)')
parser.add_argument('-b', '--batch-size', type=int, default=100,
help='Number of log entries per batch (default: 100)')
parser.add_argument('-l', '--log-size', type=int, default=500,
help='Approximate size of each log entry in bytes (default: 500)')
parser.add_argument('-d', '--duration', type=float, default=None,
help='Run duration in seconds (default: infinite)')
args = parser.parse_args()
print("="*60)
print("Fluent Bit Forward Protocol Log Generator")
print("="*60)
print(f"Socket: {args.socket}")
print(f"Tag: {args.tag}")
print(f"Target Rate: {args.rate} MB/s")
print(f"Batch Size: {args.batch_size}")
print(f"Log Size: {args.log_size} bytes")
print(f"Duration: {'Infinite' if args.duration is None else f'{args.duration} seconds'}")
print("="*60)
print("\nPress Ctrl+C to stop\n")
generator = FluentLogGenerator(
socket_path=args.socket,
tag=args.tag,
target_throughput_mb=args.rate,
batch_size=args.batch_size,
log_size=args.log_size
)
generator.run(duration=args.duration)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment