The Fail Injection Tool fail-rs from PingCAP

Earlier this year, I shared some insights on using Go failpoint. If you’re interested, you can check out this article.

Failpoints is a tool that allows for the injection of errors or other behaviors at runtime, primarily for testing purposes, including unit tests, integration stress tests, and more. The types of tests include state machine errors, disk errors, and network IO delays.

Injectable behaviors include: panic, early returns, sleeping, etc. The injected behaviors can be controlled via environment variables or code. It is generally recommended to use HTTP or integrate with the company’s configuration platform, with triggering rules based on counts, probabilities, or a combination of both.

Getting Started Example

First, configure the dependencies in Cargo.toml

[dependencies]
fail = "0.4"

We depend on version 0.4

use fail::{fail_point, FailScenario};

fn do_fallible_work() {
    fail_point!("read-dir");
    println!("mock working now");
}

fn main() {
    let scenario = FailScenario::setup();
    do_fallible_work();
    scenario.teardown();
    println!("done");
}

The do_fallible_work function does two things: it executes the read-dir injection point and prints a message to simulate processing a request.

$ FAILPOINTS=read-dir="panic" cargo run
mock working now
done

By injecting a panic statement through the environment variable, the conditional compilation is not enabled by default, so the output is normal.

$ FAILPOINTS=read-dir="panic" cargo run --features fail/failpoints
mock working now
thread 'main' panicked at 'failpoint read-dir panic', /Users/zerun.dong/.cargo/registry/src/github.com-1ecc6299db9ec823/fail-0.4.0/src/lib.rs:488:25
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

By specifying --features fail/failpoints in cargo, the panic occurs as expected.

FAILPOINTS=read-dir="sleep(2000)" cargo run --features fail/failpoints

Of course, we can also specify other behaviors, such as sleep(2000) to sleep for 2 seconds.

use fail::{fail_point, FailScenario};
use std::io;

fn do_fallible_work() -> io::Result<()> {
    println!("mock working now");
    fail_point!("read-dir", |_| {
        Err(io::Error::new(io::ErrorKind::PermissionDenied, "error"))
    });
    Ok(())
}

fn main() -> io::Result<()> {
    let scenario = FailScenario::setup();
    do_fallible_work()?;
    do_fallible_work()?;
    scenario.teardown();
    println!("done");
    Ok(())
}

This is a test case for early return, which requires using a closure to encapsulate the error.

$ FAILPOINTS=read-dir=return cargo run --features fail/failpoints
mock working now
Error: Custom { kind: PermissionDenied, error: "error" }

The above is the normal usage, and we can also specify multiple actions.

$ FAILPOINTS=read-dir="1*sleep(2000)->return" cargo run --features fail/failpoints
mock working now
mock working now
Error: Custom { kind: PermissionDenied, error: "error" }

"1*sleep(2000)->return" indicates that the first time it sleeps for 2 seconds, and then returns early the second time. For more advanced usage, please refer to the official documentation at https://docs.rs/fail

Zero Performance Overhead

The most important requirement is: the code integrating Failpoint must have zero performance overhead when running in a production environment

func test() {
    failpoint.Inject("testValue", func(v failpoint.Value) {
        fmt.Println(v)
    })
}

This is Go test code, where failpoint.Inject is a marker function, with parameters being the name and closure.

// failpoint.Inject("fail-point-name", func(_ failpoint.Value) {...}
func Inject(fpname string, fpbody interface{}) {}

Since Inject has an empty function body, it will be optimized away at compile time, resulting in zero performance overhead at runtime. When testing offline, it is necessary to execute failpoint-ctl to convert all marker functions into injection functions.

func test() {
 if v, _err_ := failpoint.Eval(_curpkg_("testValue")); _err_ == nil {
  fmt.Println(v)
 }
}

The above is the converted code, and the principle is not difficult; it parses the AST and replaces the syntax tree. So how does Rust achieve this? The answer is macro + conditional compilation

/// Define a fail point (disabled, see `failpoints` feature).
#[macro_export]
#[cfg(not(feature = "failpoints"))]
macro_rules! fail_point {
    ($name:expr, $e:expr) => {{}};
    ($name:expr) => {{}};
    ($name:expr, $cond:expr, $e:expr) => {{}};
}

When compiling with cargo build without specifying the failpoints feature, the fail_point macro corresponds to an empty implementation.

#[cfg(feature = "failpoints")]
macro_rules! fail_point {
    ($name:expr) => {{
        $crate::eval($name, |_| {
            panic!("Return is not supported for the fail point \"{}\"", $name);
        });
    }};
    ($name:expr, $e:expr) => {{
        if let Some(res) = $crate::eval($name, $e) {
            return res;
        }
    }};
    ($name:expr, $cond:expr, $e:expr) => {{
        if $cond {
            fail_point!($name, $e);
        }
    }};
}

When the feature is specified, the corresponding macro implementation expands into the appropriate logical code at compile time. The fail_point macro has three forms, matching different parameter expressions (designators) to corresponding code blocks

  • Single parameter name string, which can execute panic, print, sleep, pause four behaviors
  • Two parameters name, e where e is a closure, allowing for early return
  • Three parameters name, cond, e, where cond is a conditional expression that should return a boolean value, and e is a closure. The corresponding injection is executed based on the condition.

Implementation Principles

1. Registration Center

/// Registry with failpoints configuration.
type Registry = HashMap<String, Arc<FailPoint>>;

#[derive(Debug, Default)]
struct FailPointRegistry {
    // TODO: remove rwlock or store *mut FailPoint
    registry: RwLock<Registry>,
}

lazy_static::lazy_static! {
    static ref REGISTRY: FailPointRegistry = FailPointRegistry::default();
    static ref SCENARIO: Mutex<&'static FailPointRegistry> = Mutex::new(&REGISTRY);
}

The registration center Registry is of type HashMap, where the key is the name from the above test example, and the value is of type Arc<Failpoint>, with Arc used for shared ownership in concurrent environments.

struct FailPoint {
    pause: Mutex<bool>,
    pause_notifier: Condvar,
    actions: RwLock<Vec<Action>>,
    actions_str: RwLock<String>,
}

pause indicates whether to pause, pause_notifier is used for pause notifications, actions is an array because a fail_point injection can have multiple actions, and actions_str is a string representing the task, converted into the action structure via from_str.

2. Generating Tasks

FailScenario::setup() initializes the injection actions by obtaining the FAILPOINTS environment variable, and currently does not support HTTP methods.

After parsing, it registers multiple injection actions into the aforementioned Registry using the set function.

fn set(
    registry: &mut HashMap<String, Arc<FailPoint>>,
    name: String,
    actions: &str,
) -> Result<(), String> {
    let actions_str = actions;
    // `actions` are in the format of `failpoint[->failpoint...].
    let actions = actions
        .split("->")
        .map(Action::from_str)
        .collect::<result<_, _>="">()?;
    // Please note that we can't figure out whether there is a failpoint named `name`,
    // so we may insert a failpoint that doesn't exist at all.
    let p = registry
        .entry(name)
        .or_insert_with(|| Arc::new(FailPoint::new()));
    p.set_actions(actions_str, actions);
    Ok(())
}
</result<_,>

Here, Action::from_str is used to parse the string into an Action.

#[derive(Clone, Debug, PartialEq)]
enum Task {
    /// Do nothing.
    Off,
    /// Return the value.
    Return(Option<String>),
    /// Sleep for some milliseconds.
    Sleep(u64),
    /// Panic with the message.
    Panic(Option<String>),
    /// Print the message.
    Print(Option<String>),
    /// Sleep until other action is set.
    Pause,
    /// Yield the CPU.
    Yield,
    /// Busy waiting for some milliseconds.
    Delay(u64),
    /// Call callback function.
    Callback(SyncCallback),
}

#[derive(Debug)]
struct Action {
    task: Task,
    freq: f32,
    count: Option<AtomicUsize>,
}

The Action types are different, with freq controlling the frequency and count controlling the trigger count.

3. Triggering Tasks

The prerequisite is that the conditional compilation has enabled the failpoint; let’s look directly at the macro implementation.

pub fn eval<R, F: FnOnce(Option<String>) -> R>(name: &str, f: F) -> Option<R> {
    let p = {
        let registry = REGISTRY.registry.read().unwrap();
        match registry.get(name) {
            None => return None,
            Some(p) => p.clone(),
        }
    };
    p.eval(name).map(f)
}

The logic is quite simple; it finds the corresponding failpoint from the Registry registration center map, then calls the failpoint.eval function, and executes the closure f (if there is a value) for all return values.

#[cfg_attr(feature = "cargo-clippy", allow(clippy::option_option))]
fn eval(&self, name: &str) -> Option<Option<String>> {
    let task = {
        let actions = self.actions.read().unwrap();
        match actions.iter().filter_map(Action::get_task).next() {
            Some(Task::Pause) => {
                let mut guard = self.pause.lock().unwrap();
                *guard = true;
                loop {
                    guard = self.pause_notifier.wait(guard).unwrap();
                    if !*guard {
                        break;
                    }
                }
                return None;
            }
            Some(t) => t,
            None => return None,
        }
    };

    match task {
        Task::Off => {}
        Task::Return(s) => return Some(s),
        Task::Sleep(t) => thread::sleep(Duration::from_millis(t)),
        Task::Panic(msg) => match msg {
            Some(ref msg) => panic!("{}", msg),
            None => panic!("failpoint {} panic", name),
        },
        Task::Print(msg) => match msg {
            Some(ref msg) => log::info!("{}", msg),
            None => log::info!("failpoint {} executed.", name),
        },
        Task::Pause => unreachable!(),
        Task::Yield => thread::yield_now(),
        Task::Delay(t) => {
            let timer = Instant::now();
            let timeout = Duration::from_millis(t);
            while timer.elapsed() &lt; timeout {}
        }
        Task::Callback(f) => {
            f.run();
        }
    }
    None
}

The eval function is not difficult; it first calls get_task to get the Action to execute. Here, the Pause action is handled separately, while the others are matched using pattern matching. It can also be seen that if Return does not specify a closure f, the return value is Some(“”), triggering the default panic closure of the macro.

fn get_task(&self) -> Option<Task> {
  use rand::Rng;

  if let Some(ref cnt) = self.count {
      let c = cnt.load(Ordering::Acquire);
      if c == 0 {
          return None;
      }
  }
  if self.freq &lt; 1f32 &amp;&amp; !rand::thread_rng().gen_bool(f64::from(self.freq)) {
      return None;
  }
  if let Some(ref ref_cnt) = self.count {
      let mut cnt = ref_cnt.load(Ordering::Acquire);
      loop {
          if cnt == 0 {
              return None;
          }
          let new_cnt = cnt - 1;
          match ref_cnt.compare_exchange_weak(
              cnt,
              new_cnt,
              Ordering::AcqRel,
              Ordering::Acquire,
          ) {
              Ok(_) => break,
              Err(c) => cnt = c,
          }
      }
  }
  Some(self.task.clone())
}

The get_task first checks the execution count; if it is 0, it returns None. Then it checks the frequency; if it does not trigger, it returns None. Finally, it checks the count again and updates it using CAS. Here, the count field type is Option<AtomicUsize>, and if no count is specified, it defaults to unlimited.

Conclusion

Writing articles is not easy. If this has been helpful or inspiring to you, please help by clicking like, share, and follow.

If you have any thoughts on Failpoint, feel free to leave a comment for discussion. Experts are welcome to share their insights ^_^

Leave a Comment