Last active
November 7, 2025 22:17
-
-
Save sparrc/cbf7882c9b2479d062fdeb116719a3ab to your computer and use it in GitHub Desktop.
Fluentd socket log generator (requires `pip install msgpack` to run). Ex: `python ./main.py -s /var/run/fluent.sock --log-size 4096`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Fluent Bit Forward Protocol Log Generator | |
| Generates high-volume logs in msgpack format and sends them to a Unix socket | |
| using the Fluent Bit forward protocol. | |
| Usage: | |
| python fluent_log_generator.py [options] | |
| """ | |
| import argparse | |
| import socket | |
| import time | |
| import msgpack | |
| import random | |
| import string | |
| import sys | |
| from datetime import datetime | |
| class FluentLogGenerator: | |
| def __init__(self, socket_path, tag="test.logs", target_throughput_mb=8, | |
| batch_size=100, log_size=500): | |
| """ | |
| Initialize the log generator. | |
| Args: | |
| socket_path: Path to Unix socket | |
| tag: Tag for log entries | |
| target_throughput_mb: Target throughput in MB/s | |
| batch_size: Number of log entries per batch | |
| log_size: Approximate size of each log entry in bytes | |
| """ | |
| self.socket_path = socket_path | |
| self.tag = tag | |
| self.target_throughput = target_throughput_mb * 1024 * 1024 # Convert to bytes | |
| self.batch_size = batch_size | |
| self.log_size = log_size | |
| self.sock = None | |
| self.stats = { | |
| 'bytes_sent': 0, | |
| 'messages_sent': 0, | |
| 'batches_sent': 0, | |
| 'start_time': None, | |
| 'errors': 0 | |
| } | |
| def connect(self): | |
| """Connect to the Unix socket.""" | |
| try: | |
| self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| self.sock.connect(self.socket_path) | |
| print(f"Connected to Unix socket: {self.socket_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Failed to connect to {self.socket_path}: {e}") | |
| return False | |
| def disconnect(self): | |
| """Close the socket connection.""" | |
| if self.sock: | |
| self.sock.close() | |
| self.sock = None | |
| print("Disconnected from Unix socket") | |
| def generate_log_entry(self): | |
| """ | |
| Generate a single log entry with realistic data. | |
| Returns: | |
| tuple: (timestamp, record_dict) | |
| """ | |
| timestamp = time.time() | |
| # Generate varied log data to simulate real logs | |
| record = { | |
| 'level': random.choice(['INFO', 'WARN', 'ERROR', 'DEBUG']), | |
| 'message': self._generate_message(), | |
| 'host': f'server-{random.randint(1, 100)}', | |
| 'service': random.choice(['api', 'web', 'worker', 'database']), | |
| 'request_id': ''.join(random.choices(string.ascii_letters + string.digits, k=32)), | |
| 'duration_ms': random.randint(1, 1000), | |
| 'status_code': random.choice([200, 201, 400, 404, 500]), | |
| 'user_id': random.randint(1000, 9999), | |
| 'ip_address': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}', | |
| 'bytes_sent': random.randint(100, 10000), | |
| 'success': random.choice([True, False]), | |
| 'metadata': { | |
| 'version': '1.0.0', | |
| 'environment': random.choice(['production', 'staging', 'development']) | |
| } | |
| } | |
| # Pad with extra data to reach target log size | |
| padding_needed = max(0, self.log_size - len(str(record))) | |
| if padding_needed > 0: | |
| record['padding'] = ''.join(random.choices(string.ascii_letters, k=padding_needed)) | |
| return timestamp, record | |
| def _generate_message(self): | |
| """Generate a random log message.""" | |
| messages = [ | |
| 'Request processed successfully', | |
| 'Database query executed', | |
| 'Cache hit for user data', | |
| 'Authentication successful', | |
| 'API endpoint called', | |
| 'File uploaded to storage', | |
| 'Email sent to user', | |
| 'Payment processed', | |
| 'User session started', | |
| 'Background job completed', | |
| 'Metrics collected and stored', | |
| 'Configuration reloaded', | |
| 'Health check passed', | |
| 'Service discovery updated', | |
| 'Rate limit enforced' | |
| ] | |
| return random.choice(messages) | |
| def create_forward_message(self, entries): | |
| """ | |
| Create a message in Fluent Bit forward protocol format. | |
| Format: [tag, [[time, record], [time, record], ...]] | |
| Args: | |
| entries: List of (timestamp, record) tuples | |
| Returns: | |
| bytes: msgpack-encoded message | |
| """ | |
| # Build the array of [time, record] pairs | |
| event_array = [[ts, rec] for ts, rec in entries] | |
| # Create the forward protocol message: [tag, events] | |
| message = [self.tag, event_array] | |
| # Encode with msgpack | |
| return msgpack.packb(message) | |
| def send_batch(self): | |
| """Generate and send a batch of log entries.""" | |
| try: | |
| # Generate batch of log entries | |
| entries = [self.generate_log_entry() for _ in range(self.batch_size)] | |
| # Create forward protocol message | |
| message = self.create_forward_message(entries) | |
| # Send the message | |
| self.sock.sendall(message) | |
| # Update statistics | |
| self.stats['bytes_sent'] += len(message) | |
| self.stats['messages_sent'] += len(entries) | |
| self.stats['batches_sent'] += 1 | |
| return len(message) | |
| except BrokenPipeError: | |
| print("Connection broken - attempting to reconnect...") | |
| self.stats['errors'] += 1 | |
| if self.connect(): | |
| return self.send_batch() # Retry | |
| return 0 | |
| except Exception as e: | |
| print(f"Error sending batch: {e}") | |
| self.stats['errors'] += 1 | |
| return 0 | |
| def print_stats(self): | |
| """Print current statistics.""" | |
| if self.stats['start_time'] is None: | |
| return | |
| elapsed = time.time() - self.stats['start_time'] | |
| if elapsed == 0: | |
| return | |
| throughput_mb = (self.stats['bytes_sent'] / elapsed) / (1024 * 1024) | |
| msg_per_sec = self.stats['messages_sent'] / elapsed | |
| print(f"\rStats: {throughput_mb:.2f} MB/s | " | |
| f"{msg_per_sec:.0f} msgs/s | " | |
| f"{self.stats['batches_sent']} batches | " | |
| f"{self.stats['bytes_sent'] / (1024*1024):.2f} MB total | " | |
| f"Errors: {self.stats['errors']}", end='', flush=True) | |
| def run(self, duration=None): | |
| """ | |
| Run the log generator. | |
| Args: | |
| duration: Optional duration in seconds (None for infinite) | |
| """ | |
| if not self.connect(): | |
| return | |
| self.stats['start_time'] = time.time() | |
| last_stat_time = time.time() | |
| try: | |
| while True: | |
| batch_start = time.time() | |
| # Send batch | |
| bytes_sent = self.send_batch() | |
| # Calculate sleep time to maintain target throughput | |
| batch_time = time.time() - batch_start | |
| target_batch_time = bytes_sent / self.target_throughput | |
| sleep_time = max(0, target_batch_time - batch_time) | |
| if sleep_time > 0: | |
| time.sleep(sleep_time) | |
| # Print stats every second | |
| if time.time() - last_stat_time >= 1.0: | |
| self.print_stats() | |
| last_stat_time = time.time() | |
| # Check duration | |
| if duration and (time.time() - self.stats['start_time']) >= duration: | |
| break | |
| except KeyboardInterrupt: | |
| print("\n\nShutting down gracefully...") | |
| finally: | |
| self.print_stats() | |
| print("\n") | |
| self.disconnect() | |
| self.print_final_stats() | |
| def print_final_stats(self): | |
| """Print final statistics summary.""" | |
| elapsed = time.time() - self.stats['start_time'] | |
| avg_throughput = (self.stats['bytes_sent'] / elapsed) / (1024 * 1024) | |
| print("\n" + "="*60) | |
| print("FINAL STATISTICS") | |
| print("="*60) | |
| print(f"Duration: {elapsed:.2f} seconds") | |
| print(f"Total bytes sent: {self.stats['bytes_sent'] / (1024*1024):.2f} MB") | |
| print(f"Total messages: {self.stats['messages_sent']:,}") | |
| print(f"Total batches: {self.stats['batches_sent']:,}") | |
| print(f"Average throughput: {avg_throughput:.2f} MB/s") | |
| print(f"Messages per sec: {self.stats['messages_sent'] / elapsed:.0f}") | |
| print(f"Errors: {self.stats['errors']}") | |
| print("="*60) | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='Generate high-volume logs in Fluent Bit forward protocol format', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Send logs to /tmp/fluent.sock at 8 MB/s | |
| python fluent_log_generator.py -s /tmp/fluent.sock | |
| # Send logs at 10 MB/s with custom tag | |
| python fluent_log_generator.py -s /tmp/fluent.sock -t app.logs -r 10 | |
| # Run for 60 seconds with larger batches | |
| python fluent_log_generator.py -s /tmp/fluent.sock -d 60 -b 200 | |
| """ | |
| ) | |
| parser.add_argument('-s', '--socket', required=True, | |
| help='Path to Unix socket') | |
| parser.add_argument('-t', '--tag', default='test.logs', | |
| help='Tag for log entries (default: test.logs)') | |
| parser.add_argument('-r', '--rate', type=float, default=8.0, | |
| help='Target throughput in MB/s (default: 8)') | |
| parser.add_argument('-b', '--batch-size', type=int, default=100, | |
| help='Number of log entries per batch (default: 100)') | |
| parser.add_argument('-l', '--log-size', type=int, default=500, | |
| help='Approximate size of each log entry in bytes (default: 500)') | |
| parser.add_argument('-d', '--duration', type=float, default=None, | |
| help='Run duration in seconds (default: infinite)') | |
| args = parser.parse_args() | |
| print("="*60) | |
| print("Fluent Bit Forward Protocol Log Generator") | |
| print("="*60) | |
| print(f"Socket: {args.socket}") | |
| print(f"Tag: {args.tag}") | |
| print(f"Target Rate: {args.rate} MB/s") | |
| print(f"Batch Size: {args.batch_size}") | |
| print(f"Log Size: {args.log_size} bytes") | |
| print(f"Duration: {'Infinite' if args.duration is None else f'{args.duration} seconds'}") | |
| print("="*60) | |
| print("\nPress Ctrl+C to stop\n") | |
| generator = FluentLogGenerator( | |
| socket_path=args.socket, | |
| tag=args.tag, | |
| target_throughput_mb=args.rate, | |
| batch_size=args.batch_size, | |
| log_size=args.log_size | |
| ) | |
| generator.run(duration=args.duration) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment