#!/usr/bin/env python3 """ Fluent Bit Forward Protocol Log Generator Generates high-volume logs in msgpack format and sends them to a Unix socket using the Fluent Bit forward protocol. Usage: python fluent_log_generator.py [options] """ import argparse import socket import time import msgpack import random import string import sys from datetime import datetime class FluentLogGenerator: def __init__(self, socket_path, tag="test.logs", target_throughput_mb=8, batch_size=100, log_size=500): """ Initialize the log generator. Args: socket_path: Path to Unix socket tag: Tag for log entries target_throughput_mb: Target throughput in MB/s batch_size: Number of log entries per batch log_size: Approximate size of each log entry in bytes """ self.socket_path = socket_path self.tag = tag self.target_throughput = target_throughput_mb * 1024 * 1024 # Convert to bytes self.batch_size = batch_size self.log_size = log_size self.sock = None self.stats = { 'bytes_sent': 0, 'messages_sent': 0, 'batches_sent': 0, 'start_time': None, 'errors': 0 } def connect(self): """Connect to the Unix socket.""" try: self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(self.socket_path) print(f"Connected to Unix socket: {self.socket_path}") return True except Exception as e: print(f"Failed to connect to {self.socket_path}: {e}") return False def disconnect(self): """Close the socket connection.""" if self.sock: self.sock.close() self.sock = None print("Disconnected from Unix socket") def generate_log_entry(self): """ Generate a single log entry with realistic data. Returns: tuple: (timestamp, record_dict) """ timestamp = time.time() # Generate varied log data to simulate real logs record = { 'level': random.choice(['INFO', 'WARN', 'ERROR', 'DEBUG']), 'message': self._generate_message(), 'host': f'server-{random.randint(1, 100)}', 'service': random.choice(['api', 'web', 'worker', 'database']), 'request_id': ''.join(random.choices(string.ascii_letters + string.digits, k=32)), 'duration_ms': random.randint(1, 1000), 'status_code': random.choice([200, 201, 400, 404, 500]), 'user_id': random.randint(1000, 9999), 'ip_address': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}', 'bytes_sent': random.randint(100, 10000), 'success': random.choice([True, False]), 'metadata': { 'version': '1.0.0', 'environment': random.choice(['production', 'staging', 'development']) } } # Pad with extra data to reach target log size padding_needed = max(0, self.log_size - len(str(record))) if padding_needed > 0: record['padding'] = ''.join(random.choices(string.ascii_letters, k=padding_needed)) return timestamp, record def _generate_message(self): """Generate a random log message.""" messages = [ 'Request processed successfully', 'Database query executed', 'Cache hit for user data', 'Authentication successful', 'API endpoint called', 'File uploaded to storage', 'Email sent to user', 'Payment processed', 'User session started', 'Background job completed', 'Metrics collected and stored', 'Configuration reloaded', 'Health check passed', 'Service discovery updated', 'Rate limit enforced' ] return random.choice(messages) def create_forward_message(self, entries): """ Create a message in Fluent Bit forward protocol format. Format: [tag, [[time, record], [time, record], ...]] Args: entries: List of (timestamp, record) tuples Returns: bytes: msgpack-encoded message """ # Build the array of [time, record] pairs event_array = [[ts, rec] for ts, rec in entries] # Create the forward protocol message: [tag, events] message = [self.tag, event_array] # Encode with msgpack return msgpack.packb(message) def send_batch(self): """Generate and send a batch of log entries.""" try: # Generate batch of log entries entries = [self.generate_log_entry() for _ in range(self.batch_size)] # Create forward protocol message message = self.create_forward_message(entries) # Send the message self.sock.sendall(message) # Update statistics self.stats['bytes_sent'] += len(message) self.stats['messages_sent'] += len(entries) self.stats['batches_sent'] += 1 return len(message) except BrokenPipeError: print("Connection broken - attempting to reconnect...") self.stats['errors'] += 1 if self.connect(): return self.send_batch() # Retry return 0 except Exception as e: print(f"Error sending batch: {e}") self.stats['errors'] += 1 return 0 def print_stats(self): """Print current statistics.""" if self.stats['start_time'] is None: return elapsed = time.time() - self.stats['start_time'] if elapsed == 0: return throughput_mb = (self.stats['bytes_sent'] / elapsed) / (1024 * 1024) msg_per_sec = self.stats['messages_sent'] / elapsed print(f"\rStats: {throughput_mb:.2f} MB/s | " f"{msg_per_sec:.0f} msgs/s | " f"{self.stats['batches_sent']} batches | " f"{self.stats['bytes_sent'] / (1024*1024):.2f} MB total | " f"Errors: {self.stats['errors']}", end='', flush=True) def run(self, duration=None): """ Run the log generator. Args: duration: Optional duration in seconds (None for infinite) """ if not self.connect(): return self.stats['start_time'] = time.time() last_stat_time = time.time() try: while True: batch_start = time.time() # Send batch bytes_sent = self.send_batch() # Calculate sleep time to maintain target throughput batch_time = time.time() - batch_start target_batch_time = bytes_sent / self.target_throughput sleep_time = max(0, target_batch_time - batch_time) if sleep_time > 0: time.sleep(sleep_time) # Print stats every second if time.time() - last_stat_time >= 1.0: self.print_stats() last_stat_time = time.time() # Check duration if duration and (time.time() - self.stats['start_time']) >= duration: break except KeyboardInterrupt: print("\n\nShutting down gracefully...") finally: self.print_stats() print("\n") self.disconnect() self.print_final_stats() def print_final_stats(self): """Print final statistics summary.""" elapsed = time.time() - self.stats['start_time'] avg_throughput = (self.stats['bytes_sent'] / elapsed) / (1024 * 1024) print("\n" + "="*60) print("FINAL STATISTICS") print("="*60) print(f"Duration: {elapsed:.2f} seconds") print(f"Total bytes sent: {self.stats['bytes_sent'] / (1024*1024):.2f} MB") print(f"Total messages: {self.stats['messages_sent']:,}") print(f"Total batches: {self.stats['batches_sent']:,}") print(f"Average throughput: {avg_throughput:.2f} MB/s") print(f"Messages per sec: {self.stats['messages_sent'] / elapsed:.0f}") print(f"Errors: {self.stats['errors']}") print("="*60) def main(): parser = argparse.ArgumentParser( description='Generate high-volume logs in Fluent Bit forward protocol format', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Send logs to /tmp/fluent.sock at 8 MB/s python fluent_log_generator.py -s /tmp/fluent.sock # Send logs at 10 MB/s with custom tag python fluent_log_generator.py -s /tmp/fluent.sock -t app.logs -r 10 # Run for 60 seconds with larger batches python fluent_log_generator.py -s /tmp/fluent.sock -d 60 -b 200 """ ) parser.add_argument('-s', '--socket', required=True, help='Path to Unix socket') parser.add_argument('-t', '--tag', default='test.logs', help='Tag for log entries (default: test.logs)') parser.add_argument('-r', '--rate', type=float, default=8.0, help='Target throughput in MB/s (default: 8)') parser.add_argument('-b', '--batch-size', type=int, default=100, help='Number of log entries per batch (default: 100)') parser.add_argument('-l', '--log-size', type=int, default=500, help='Approximate size of each log entry in bytes (default: 500)') parser.add_argument('-d', '--duration', type=float, default=None, help='Run duration in seconds (default: infinite)') args = parser.parse_args() print("="*60) print("Fluent Bit Forward Protocol Log Generator") print("="*60) print(f"Socket: {args.socket}") print(f"Tag: {args.tag}") print(f"Target Rate: {args.rate} MB/s") print(f"Batch Size: {args.batch_size}") print(f"Log Size: {args.log_size} bytes") print(f"Duration: {'Infinite' if args.duration is None else f'{args.duration} seconds'}") print("="*60) print("\nPress Ctrl+C to stop\n") generator = FluentLogGenerator( socket_path=args.socket, tag=args.tag, target_throughput_mb=args.rate, batch_size=args.batch_size, log_size=args.log_size ) generator.run(duration=args.duration) if __name__ == '__main__': main()