Skip to content

Instantly share code, notes, and snippets.

@Darksonn
Last active November 10, 2022 20:50
Show Gist options
  • Save Darksonn/39eaf3132eca5738848385cbb837484f to your computer and use it in GitHub Desktop.
Save Darksonn/39eaf3132eca5738848385cbb837484f to your computer and use it in GitHub Desktop.

Revisions

  1. Darksonn revised this gist Jan 16, 2019. 1 changed file with 5 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions killswitch.rs
    Original file line number Diff line number Diff line change
    @@ -61,6 +61,8 @@ impl Future for ConnWrapper {
    }
    if !self.kill_on_timeout {
    if self.timeouts > 1000 || self.kill.is_killed() {
    /// We should kill this connection. Start a 100 ms timeout and kill
    /// it if it doesn't finish before the timeout.
    self.timeout = Interval::new_interval(Duration::from_millis(100));
    self.kill_on_timeout = true;
    if let Some(conn) = &mut self.conn {
    @@ -76,14 +78,17 @@ impl Future for ConnWrapper {
    Ok(Async::Ready(())) => Ok(Async::Ready(())),
    Err(err) => {
    if err.is_closed() || err.is_canceled() || err.is_connect() {
    /// Just ignore it if the client closes the connection.
    Ok(Async::Ready(()))
    } else {
    if let Some(inner) = err.cause2() {
    if let Some(ioerr) = inner.downcast_ref::<std::io::Error>() {
    let kind = ioerr.kind();
    if kind == ErrorKind::ConnectionReset {
    /// Just ignore it if the client closes the connection.
    return Ok(Async::Ready(()));
    } else if kind == ErrorKind::ConnectionAborted {
    /// Just ignore it if the client closes the connection.
    return Ok(Async::Ready(()));
    } else {
    error!("socket error: {} (kind: {:?})", err, kind);
  2. Darksonn revised this gist Jan 16, 2019. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions killswitch.rs
    Original file line number Diff line number Diff line change
    @@ -8,6 +8,9 @@ impl KillSwitch {
    kill: Arc::new(AtomicBool::new(false)),
    }
    }
    /// Kill all clones of this KillSwitch. Note that this replaces self
    /// with a new KillSwitch, meaning that futher calls to `clone` won't
    /// contain a killed connection.
    pub fn kill(&mut self) {
    self.kill.store(true, Ordering::Relaxed);
    self.kill = Arc::new(AtomicBool::new(false));
  3. Darksonn created this gist Jan 16, 2019.
    101 changes: 101 additions & 0 deletions killswitch.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,101 @@
    #[derive(Clone)]
    pub struct KillSwitch {
    kill: Arc<AtomicBool>,
    }
    impl KillSwitch {
    pub fn new() -> Self {
    KillSwitch {
    kill: Arc::new(AtomicBool::new(false)),
    }
    }
    pub fn kill(&mut self) {
    self.kill.store(true, Ordering::Relaxed);
    self.kill = Arc::new(AtomicBool::new(false));
    }
    pub fn is_killed(&self) -> bool {
    self.kill.load(Ordering::Relaxed)
    }
    }

    pub struct ConnWrapper {
    conn: Option<Connection<TcpStream, Service>>,
    kill: KillSwitch,
    timeout: Interval,
    kill_on_timeout: bool,
    timeouts: usize,
    }
    impl ConnWrapper {
    pub fn wrap(conn: Connection<TcpStream, Service>, kill: &KillSwitch) -> Self {
    ConnWrapper {
    conn: Some(conn),
    kill: kill.clone(),
    kill_on_timeout: false,
    timeout: Interval::new_interval(Duration::from_millis(1000)),
    timeouts: 0,
    }
    }
    }
    impl Future for ConnWrapper {
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Result<Async<()>, ()> {
    match self.timeout.poll() {
    Ok(Async::NotReady) => {},
    Ok(Async::Ready(_)) => {
    if self.kill_on_timeout {
    self.conn = None;
    return Ok(Async::Ready(()));
    } else {
    task::current().notify();
    self.timeouts += 1;
    }
    },
    Err(err) => {
    error!("timer error: {}", err);
    self.conn = None;
    return Err(());
    },
    }
    if !self.kill_on_timeout {
    if self.timeouts > 1000 || self.kill.is_killed() {
    self.timeout = Interval::new_interval(Duration::from_millis(100));
    self.kill_on_timeout = true;
    if let Some(conn) = &mut self.conn {
    conn.graceful_shutdown();
    }
    task::current().notify();
    return Ok(Async::NotReady);
    }
    }
    match &mut self.conn {
    Some(conn) => match conn.poll() {
    Ok(Async::NotReady) => Ok(Async::NotReady),
    Ok(Async::Ready(())) => Ok(Async::Ready(())),
    Err(err) => {
    if err.is_closed() || err.is_canceled() || err.is_connect() {
    Ok(Async::Ready(()))
    } else {
    if let Some(inner) = err.cause2() {
    if let Some(ioerr) = inner.downcast_ref::<std::io::Error>() {
    let kind = ioerr.kind();
    if kind == ErrorKind::ConnectionReset {
    return Ok(Async::Ready(()));
    } else if kind == ErrorKind::ConnectionAborted {
    return Ok(Async::Ready(()));
    } else {
    error!("socket error: {} (kind: {:?})", err, kind);
    return Err(());
    }
    }
    }
    error!("socket error: {}", err);
    Err(())
    }
    },
    },
    None => {
    Ok(Async::Ready(()))
    },
    }
    }
    }