Skip to content

Instantly share code, notes, and snippets.

@overvenus
Last active July 7, 2016 03:34
Show Gist options
  • Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.
Save overvenus/31b3adba8d60fbc81325a958a0e9fc9d to your computer and use it in GitHub Desktop.
BlockUdpMetric VS NonBlockUdpMetric
[package]
name = "bench_metric"
version = "0.1.0"
authors = ["overvenus <[email protected]>"]
[dependencies]
cadence = "0.5.0"
log = "0.3"
quick-error = "0.2"
#![feature(test)]
#![feature(const_fn)]
extern crate test;
extern crate cadence;
#[macro_use]
extern crate log;
#[macro_use]
extern crate quick_error;
use std::io;
use std::mem;
use std::fmt;
use std::str::from_utf8;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use std::sync::atomic::{AtomicBool, Ordering};
use std::net::{ToSocketAddrs, SocketAddr, UdpSocket};
use cadence::{MetricSink, MetricResult, ErrorKind};
pub mod worker;
use worker::{Worker, Runnable};
static mut CLIENT: *const Metric = &NopMetric;
// IS_INITIALIZED indicates the state of CLIENT,
// `false` for uninitialized, `true` for initialized.
static IS_INITIALIZED: AtomicBool = AtomicBool::new(false);
pub trait Metric: MetricSink + Sync + Send{}
pub struct NopMetric;
impl MetricSink for NopMetric {
fn emit(&self, _: &str) -> io::Result<usize> {Ok(0)}
}
impl Metric for NopMetric {}
pub fn set_metric(metric: Box<Metric + Sync + Send>) -> Result<(), ()> {
unsafe {
if IS_INITIALIZED.compare_and_swap(false, true, Ordering::SeqCst) != false {
return Err(());
}
CLIENT = mem::transmute(metric);
Ok(())
}
}
#[doc(hidden)]
pub fn __client() -> Option<&'static Metric> {
if IS_INITIALIZED.load(Ordering::SeqCst) != true {
return None;
}
Some(unsafe { &*CLIENT })
}
/// =======================
/// NonblockUdpMetricSink
/// =======================
pub struct NonblockUdpMetricSink {
sink_addr: SocketAddr,
socket: UdpSocket,
}
impl NonblockUdpMetricSink {
pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<NonblockUdpMetricSink>
where A: ToSocketAddrs
{
let mut addr_iter = try!(sink_addr.to_socket_addrs());
let addr = try!(addr_iter.next()
.ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));
// Moves this UDP stream into nonblocking mode.
let _ = socket.set_nonblocking(true).unwrap();
Ok(NonblockUdpMetricSink {
sink_addr: addr,
socket: socket,
})
}
}
impl MetricSink for NonblockUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), &self.sink_addr)
}
}
impl Metric for NonblockUdpMetricSink {}
/// ===============================================
/// BlockUdpMetricSink AKA cadence::UdpMetricSink
/// ===============================================
pub struct BlockUdpMetricSink {
sink_addr: SocketAddr,
socket: UdpSocket,
}
impl BlockUdpMetricSink {
pub fn from<A>(sink_addr: A, socket: UdpSocket) -> MetricResult<BlockUdpMetricSink>
where A: ToSocketAddrs
{
let mut addr_iter = try!(sink_addr.to_socket_addrs());
let addr = try!(addr_iter.next()
.ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));
Ok(BlockUdpMetricSink {
sink_addr: addr,
socket: socket,
})
}
}
impl MetricSink for BlockUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.socket.send_to(metric.as_bytes(), &self.sink_addr)
}
}
impl Metric for BlockUdpMetricSink {}
/// ================================
/// Buffered NonblockUdpMetricSink
/// ================================
pub struct BufferedUdpMetricSink {
sink: NonblockUdpMetricSink,
buffer: Mutex<Vec<u8>>,
last_flush_time: Mutex<Instant>,
flush_period: Duration,
}
impl BufferedUdpMetricSink {
pub fn from<A>(sink_addr: A,
socket: UdpSocket,
flush_period: Duration)
-> MetricResult<BufferedUdpMetricSink>
where A: ToSocketAddrs
{
Ok(BufferedUdpMetricSink {
sink: NonblockUdpMetricSink::from(sink_addr, socket).unwrap(),
buffer: Mutex::new(Vec::with_capacity(1024)),
flush_period: flush_period,
last_flush_time: Mutex::new(Instant::now()),
})
}
fn append_to_buffer(&self, metric: &str) -> io::Result<usize> {
let mut buffer = self.buffer.lock().unwrap();
let mut last_flush_time = self.last_flush_time.lock().unwrap();
// +1 for '\n'
let elapse = last_flush_time.elapsed();
if (buffer.len() + metric.len() + 1) > buffer.capacity() ||
(elapse >= self.flush_period) {
self.flush(&mut buffer);
*last_flush_time = Instant::now();
}
buffer.extend_from_slice(metric.as_bytes());
buffer.push('\n' as u8);
Ok(metric.len())
}
fn flush(&self, buffer: &mut Vec<u8>) {
let _ = self.sink.emit(from_utf8(buffer.as_slice()).unwrap());
buffer.clear();
}
}
impl MetricSink for BufferedUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.append_to_buffer(metric)
}
}
impl Metric for BufferedUdpMetricSink {}
// ========================================
/// AsyncUdpMetricSink
// ========================================
const BUFFER_SIZE: usize = 1024;
struct BufferSender {
sink_addr: SocketAddr,
socket: UdpSocket,
buffer: Vec<u8>,
flush_period: Duration,
last_flush_time: Instant,
}
impl Runnable<String> for BufferSender {
fn run(&mut self, metric: String) {
// +1 for '\n'
if (self.buffer.len() + metric.len() + 1) > self.buffer.capacity() ||
(self.last_flush_time.elapsed() >= self.flush_period) {
self.last_flush_time = Instant::now();
if let Err(e) = self.socket.send_to(self.buffer.as_slice(), &self.sink_addr) {
warn!("send metric failed {:?}", e);
}
self.buffer.clear()
}
self.buffer.extend_from_slice(metric.as_bytes());
self.buffer.push(b'\n');
}
}
pub struct AsyncUdpMetricSink {
worker: Mutex<Worker<String>>,
}
impl AsyncUdpMetricSink {
pub fn from<A>(sink_addr: A,
socket: UdpSocket,
flush_period: Duration)
-> MetricResult<AsyncUdpMetricSink>
where A: ToSocketAddrs
{
let mut addr_iter = try!(sink_addr.to_socket_addrs());
let addr = try!(addr_iter.next()
.ok_or((ErrorKind::InvalidInput, "No socket addresses yielded")));
let sender = BufferSender {
sink_addr: addr,
socket: socket,
buffer:Vec::with_capacity(BUFFER_SIZE),
flush_period: flush_period,
last_flush_time: Instant::now(),
};
let mut worker = Worker::new("mteric-worker");
worker.start(sender).unwrap();
Ok(AsyncUdpMetricSink {
worker: Mutex::new(worker),
})
}
fn handle_to_sender(&self, metric: &str) -> io::Result<usize> {
let worker = self.worker.lock().unwrap();
if let Err(e) = worker.schedule(metric.to_owned()) {
warn!("{}", e);
}
Ok(metric.len())
}
}
impl MetricSink for AsyncUdpMetricSink {
fn emit(&self, metric: &str) -> io::Result<usize> {
self.handle_to_sender(metric)
}
}
impl Metric for AsyncUdpMetricSink {}
#[cfg(test)]
mod tests {
use super::*;
use test::Bencher;
use std::time::Duration;
use std::net::UdpSocket;
use std::thread::{self, JoinHandle};
use std::sync::mpsc::{channel, Sender, Receiver};
use cadence::MetricSink;
const MTABLE: &'static [ &'static str ] = &[ "foo:1|c", "bar:1|d", "foo:2|t" ];
fn new_nonblock_metric() -> NonblockUdpMetricSink {
let host = "127.0.0.1:8125";
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
NonblockUdpMetricSink::from(host, socket).unwrap()
}
fn new_block_metric() -> BlockUdpMetricSink {
let host = "127.0.0.1:8125";
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
BlockUdpMetricSink::from(host, socket).unwrap()
}
fn new_buffer_metric() -> BufferedUdpMetricSink {
let host = "127.0.0.1:8125";
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
BufferedUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap()
}
fn new_async_metric() -> AsyncUdpMetricSink {
let host = "127.0.0.1:8125";
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
AsyncUdpMetricSink::from(host, socket, Duration::from_millis(10)).unwrap()
}
#[bench]
fn bench_single_thread_block_udp(b: &mut Bencher) {
let sink = new_block_metric();
b.iter(|| sink.emit("foo:1|c"));
}
#[bench]
fn bench_single_thread_nonblock_udp(b: &mut Bencher) {
let sink = new_nonblock_metric();
b.iter(|| sink.emit("foo:1|c"));
}
#[bench]
fn bench_single_thread_buffer_udp(b: &mut Bencher) {
let sink = new_buffer_metric();
b.iter(|| sink.emit("foo:1|c"));
}
#[bench]
fn bench_single_thread_async_udp(b: &mut Bencher) {
let sink = new_async_metric();
b.iter(|| sink.emit("foo:1|c"));
}
fn thread_worker() -> (JoinHandle<()>, Sender<usize>, Receiver<i64>) {
let (tx, rx) = channel();
let (tx1, done) = channel();
let t = thread::spawn(move || {
let sink = __client().unwrap();
let length = MTABLE.len();
loop {
let n = rx.recv().unwrap();
if n == 0 { return }
for _ in 0 .. n {
let s = MTABLE[n%length];
let _ = sink.emit(s).unwrap();
}
tx1.send(1).unwrap()
}
});
(t, tx, done)
}
#[bench]
fn bench_multi_thread_block_udp(b: &mut Bencher) {
let sink = new_block_metric();
let _ = set_metric(Box::new(sink));
let (t0, tx0, done0) = thread_worker();
let (t1, tx1, done1) = thread_worker();
let (t2, tx2, done2) = thread_worker();
b.iter(|| {
let _ = tx0.send(5000).unwrap();
let _ = tx1.send(5000).unwrap();
let _ = tx2.send(5000).unwrap();
let _ = done0.recv().unwrap();
let _ = done1.recv().unwrap();
let _ = done2.recv().unwrap();
});
let _ = tx0.send(0).unwrap();
let _ = tx1.send(0).unwrap();
let _ = tx2.send(0).unwrap();
let _ = t0.join().unwrap();
let _ = t1.join().unwrap();
let _ = t2.join().unwrap();
}
#[bench]
fn bench_multi_thread_nonblock_udp(b: &mut Bencher) {
let sink = new_nonblock_metric();
let _ = set_metric(Box::new(sink));
let (t0, tx0, done0) = thread_worker();
let (t1, tx1, done1) = thread_worker();
let (t2, tx2, done2) = thread_worker();
b.iter(|| {
let _ = tx0.send(5000).unwrap();
let _ = tx1.send(5000).unwrap();
let _ = tx2.send(5000).unwrap();
let _ = done0.recv().unwrap();
let _ = done1.recv().unwrap();
let _ = done2.recv().unwrap();
});
let _ = tx0.send(0).unwrap();
let _ = tx1.send(0).unwrap();
let _ = tx2.send(0).unwrap();
let _ = t0.join().unwrap();
let _ = t1.join().unwrap();
let _ = t2.join().unwrap();
}
#[bench]
fn bench_multi_thread_buffer_udp(b: &mut Bencher) {
let sink = new_buffer_metric();
let _ = set_metric(Box::new(sink));
let (t0, tx0, done0) = thread_worker();
let (t1, tx1, done1) = thread_worker();
let (t2, tx2, done2) = thread_worker();
b.iter(|| {
let _ = tx0.send(5000).unwrap();
let _ = tx1.send(5000).unwrap();
let _ = tx2.send(5000).unwrap();
let _ = done0.recv().unwrap();
let _ = done1.recv().unwrap();
let _ = done2.recv().unwrap();
});
let _ = tx0.send(0).unwrap();
let _ = tx1.send(0).unwrap();
let _ = tx2.send(0).unwrap();
let _ = t0.join().unwrap();
let _ = t1.join().unwrap();
let _ = t2.join().unwrap();
}
#[bench]
fn bench_multi_thread_async_udp(b: &mut Bencher) {
let sink = new_async_metric();
let _ = set_metric(Box::new(sink));
let (t0, tx0, done0) = thread_worker();
let (t1, tx1, done1) = thread_worker();
let (t2, tx2, done2) = thread_worker();
b.iter(|| {
let _ = tx0.send(5000).unwrap();
let _ = tx1.send(5000).unwrap();
let _ = tx2.send(5000).unwrap();
let _ = done0.recv().unwrap();
let _ = done1.recv().unwrap();
let _ = done2.recv().unwrap();
});
let _ = tx0.send(0).unwrap();
let _ = tx1.send(0).unwrap();
let _ = tx2.send(0).unwrap();
let _ = t0.join().unwrap();
let _ = t1.join().unwrap();
let _ = t2.join().unwrap();
}
}
/// Worker contains all workers that do the expensive job in background.
use std::io;
use std::sync::Arc;
use std::fmt::Display;
use std::time::{Instant, Duration};
use std::thread::{self, JoinHandle, Builder};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{self, Sender, Receiver};
use std::result;
macro_rules! slow_log {
($t:expr, $($arg:tt)*) => {{
if $t.is_slow() {
warn!($($arg)*);
} else {
trace!($($arg)*);
}
}}
}
pub struct SlowTimer {
slow_time: Duration,
t: Instant,
}
impl SlowTimer {
pub fn new() -> SlowTimer {
SlowTimer::default()
}
pub fn from(slow_time: Duration) -> SlowTimer {
SlowTimer {
slow_time: slow_time,
t: Instant::now(),
}
}
pub fn from_secs(secs: u64) -> SlowTimer {
SlowTimer::from(Duration::from_secs(secs))
}
pub fn from_millis(millis: u64) -> SlowTimer {
SlowTimer::from(Duration::from_millis(millis))
}
pub fn elapsed(&self) -> Duration {
self.t.elapsed()
}
pub fn is_slow(&self) -> bool {
self.elapsed() >= self.slow_time
}
}
const DEFAULT_SLOW_SECS: u64 = 1;
impl Default for SlowTimer {
fn default() -> SlowTimer {
SlowTimer::from_secs(DEFAULT_SLOW_SECS)
}
}
quick_error! {
#[derive(Debug)]
pub enum Error {
Stopped
IoError(e: io::Error) {
from()
display("{}", e)
}
}
}
impl<T> From<mpsc::SendError<T>> for Error {
fn from(_: mpsc::SendError<T>) -> Error {
// do we need to return the failed data
Error::Stopped
}
}
pub type Result<T> = result::Result<T, Error>;
pub trait Runnable<T: Display> {
fn run(&mut self, t: T);
}
pub trait BatchRunnable<T: Display> {
/// run a batch of tasks.
///
/// Please note that ts will be clear after invoking this method.
fn run_batch(&mut self, ts: &mut Vec<T>);
}
impl<T: Display, R: Runnable<T>> BatchRunnable<T> for R {
fn run_batch(&mut self, ts: &mut Vec<T>) {
for t in ts.drain(..) {
let task_str = format!("{}", t);
let timer = SlowTimer::new();
self.run(t);
slow_log!(timer,
"task {} takes {:?} to finish.",
task_str,
timer.elapsed());
}
}
}
/// Scheduler provides interface to schedule task to underlying workers.
pub struct Scheduler<T> {
counter: Arc<AtomicUsize>,
sender: Sender<Option<T>>,
}
impl<T: Display> Scheduler<T> {
fn new(counter: AtomicUsize, sender: Sender<Option<T>>) -> Scheduler<T> {
Scheduler {
counter: Arc::new(counter),
sender: sender,
}
}
/// Schedule a task to run.
///
/// If the worker is stopped, an error will return.
pub fn schedule(&self, task: T) -> Result<()> {
debug!("scheduling task {}", task);
try!(self.sender.send(Some(task)));
self.counter.fetch_add(1, Ordering::SeqCst);
Ok(())
}
/// Check if underlying worker can't handle task immediately.
pub fn is_busy(&self) -> bool {
self.counter.load(Ordering::SeqCst) > 0
}
}
impl<T: Display> Clone for Scheduler<T> {
fn clone(&self) -> Scheduler<T> {
Scheduler {
counter: self.counter.clone(),
sender: self.sender.clone(),
}
}
}
/// Create a scheduler that can't be scheduled any task.
///
/// Useful for test purpose.
#[cfg(test)]
pub fn dummy_scheduler<T: Display>() -> Scheduler<T> {
let (tx, _) = mpsc::channel();
Scheduler::new(AtomicUsize::new(0), tx)
}
/// A worker that can schedule time consuming tasks.
pub struct Worker<T: Display> {
name: String,
scheduler: Scheduler<T>,
receiver: Option<Receiver<Option<T>>>,
handle: Option<JoinHandle<()>>,
}
fn poll<R, T>(mut runner: R, rx: Receiver<Option<T>>, counter: Arc<AtomicUsize>, batch_size: usize)
where R: BatchRunnable<T> + Send + 'static,
T: Display + Send + 'static
{
let mut keep_going = true;
let mut buffer = Vec::with_capacity(batch_size);
while keep_going {
let t = rx.recv();
match t {
Ok(Some(t)) => buffer.push(t),
_ => return,
}
while buffer.len() < batch_size {
match rx.try_recv() {
Ok(None) => {
keep_going = false;
break;
}
Ok(Some(t)) => buffer.push(t),
_ => break,
}
}
counter.fetch_sub(buffer.len(), Ordering::SeqCst);
runner.run_batch(&mut buffer);
buffer.clear();
}
}
impl<T: Display + Send + 'static> Worker<T> {
/// Create a worker.
pub fn new<S: Into<String>>(name: S) -> Worker<T> {
let (tx, rx) = mpsc::channel();
Worker {
name: name.into(),
scheduler: Scheduler::new(AtomicUsize::new(0), tx),
receiver: Some(rx),
handle: None,
}
}
/// Start the worker.
pub fn start<R: Runnable<T> + Send + 'static>(&mut self, runner: R) -> Result<()> {
self.start_batch(runner, 1)
}
pub fn start_batch<R>(&mut self, runner: R, batch_size: usize) -> Result<()>
where R: BatchRunnable<T> + Send + 'static
{
info!("starting working thread: {}", self.name);
if self.receiver.is_none() {
warn!("worker {} has been started.", self.name);
return Ok(());
}
let rx = self.receiver.take().unwrap();
let counter = self.scheduler.counter.clone();
let h = try!(Builder::new()
.name(self.name.clone())
.spawn(move || poll(runner, rx, counter, batch_size)));
self.handle = Some(h);
Ok(())
}
/// Get a scheduler to schedule task.
pub fn scheduler(&self) -> Scheduler<T> {
self.scheduler.clone()
}
/// Schedule a task to run.
///
/// If the worker is stopped, an error will return.
pub fn schedule(&self, task: T) -> Result<()> {
self.scheduler.schedule(task)
}
/// Check if underlying worker can't handle task immediately.
pub fn is_busy(&self) -> bool {
self.handle.is_none() || self.scheduler.is_busy()
}
/// Stop the worker thread.
pub fn stop(&mut self) -> thread::Result<()> {
let handler = match self.handle.take() {
Some(h) => h,
None => return Ok(()),
};
// close sender explicitly so the background thread will exit.
info!("stoping {}", self.name);
if let Err(e) = self.scheduler.sender.send(None) {
warn!("failed to stop worker thread: {:?}", e);
}
handler.join()
}
}
#[cfg(test)]
mod test {
use std::thread;
use std::sync::Arc;
use std::sync::atomic::*;
use std::cmp;
use std::time::Duration;
use super::*;
struct CountRunner {
count: Arc<AtomicUsize>,
}
impl Runnable<u64> for CountRunner {
fn run(&mut self, step: u64) {
self.count.fetch_add(step as usize, Ordering::SeqCst);
thread::sleep(Duration::from_millis(10));
}
}
struct BatchRunner {
count: Arc<AtomicUsize>,
}
impl BatchRunnable<u64> for BatchRunner {
fn run_batch(&mut self, ms: &mut Vec<u64>) {
let total = ms.iter().fold(0, |l, &r| l + r);
self.count.fetch_add(total as usize, Ordering::SeqCst);
let max_sleep = ms.iter().fold(0, |l, &r| cmp::max(l, r));
thread::sleep(Duration::from_millis(max_sleep));
}
}
#[test]
fn test_worker() {
let mut worker = Worker::new("test-worker");
let count = Arc::new(AtomicUsize::new(0));
worker.start(CountRunner { count: count.clone() }).unwrap();
assert!(!worker.is_busy());
worker.schedule(50).unwrap();
worker.schedule(50).unwrap();
worker.schedule(50).unwrap();
assert!(worker.is_busy());
for _ in 0..100 {
if !worker.is_busy() {
break;
}
thread::sleep(Duration::from_millis(10));
}
assert!(!worker.is_busy());
worker.stop().unwrap();
assert_eq!(count.load(Ordering::SeqCst), 150);
// now worker can't handle any task
assert!(worker.is_busy());
}
#[test]
fn test_threaded() {
let mut worker = Worker::new("test-worker-threaded");
let count = Arc::new(AtomicUsize::new(0));
worker.start(CountRunner { count: count.clone() }).unwrap();
let scheduler = worker.scheduler();
thread::spawn(move || {
scheduler.schedule(100).unwrap();
scheduler.schedule(100).unwrap();
});
for _ in 1..1000 {
if worker.is_busy() {
break;
}
thread::sleep(Duration::from_millis(1));
}
worker.stop().unwrap();
assert_eq!(count.load(Ordering::SeqCst), 200);
}
#[test]
fn test_batch() {
let mut worker = Worker::new("test-worker-batch");
let count = Arc::new(AtomicUsize::new(0));
worker.start_batch(BatchRunner { count: count.clone() }, 10).unwrap();
for _ in 0..20 {
worker.schedule(50).unwrap();
}
worker.stop().unwrap();
assert_eq!(count.load(Ordering::SeqCst), 20 * 50);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment