Let's write fault-tolerant and type-safe workflows in Rust using Temporal

Let's write fault-tolerant and type-safe workflows in Rust using Temporal


New: More related content at https://www.denote.dev/

I have been developing in Rust a lot and am a long-time user of Temporal.

They are powerful together.

Temporal is a platform for executing code in a durable, resilient, and scalable way. We can define stateful workflows in a high-level programming language like Rust or Go. Then we create a worker program deployed independently of the platform which executes the code while Temporal handling the state persistence and queue semantics. The main advantage is our workflow code is almost like regular code.

Temporal has SDK to develop workflow code in the supported languages. I have been a prolific user of Go and Java SDK.

As I develop in Rust, I want to minimize context switches between different language ecosystems for workflow code in Go/Java. But the Temporal Rust Core SDK which powers the SDK of all the other languages(Python, Typescript, etc) is still under development. It's just a prototype for now.

So I have been working on adding an ergonomic API to the Rust SDK similar to the other production-ready ones.

NOTE: It's highly experimental and just for use in my projects.

This post is inspired by the .NET SDK post. I am quoting here from that post with a small change :)

Different language runtimes have different trade-offs for writing workflows. Go is very fast and resource-efficient due to runtime-supported coroutines, but that comes at the expense of type safety (even generics as implemented in Go are limited for this use). Java is also very fast and type-safe, but a bit less resource-efficient due to the lack of runtime-supported coroutines (but virtual threads are coming). It might sound weird to say, but our dynamic languages of JS/TypeScript and Python are probably the most type-safe SDKs when used properly; however, as can be expected, they are not the most resource efficient. Rust provides the best of all worlds: high performance like Go/Java, good resource utilization like Go, and high-quality type-safe APIs.


Creating the API for registering and executing arbitrary user-defined functions (workflow code) is not easy in Rust. Rust does not have support for runtime 'Reflection' like Go or Java. It also does not have variadic arguments/generics. We need to use compile time support and macros for achieving the features. There are many attempts in the wild for such an API.

An in-depth discussion on how I am trying to solve this will follow in a design and overview post. But for now, we shall see how the workflow code can be written in type-safe Rust (Async). It feels just like a normal async code. You will just 'await' the workflow and activities. Thanks to the core SDK design by the Temporal SDK team.

Implementing an Activity

use temporal_sdk::prelude::activity::*;

#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
    #[error(transparent)]
    Io(#[from] std::io::Error),
    #[error(transparent)]
    Any(anyhow::Error),
}

impl FromFailureExt for Error {
    fn from_failure(failure: Failure) -> Error {
        Error::Any(anyhow::anyhow!("{:?}", failure.message))
    }
}

#[derive(Default, Deserialize, Serialize, Debug, Clone)]
pub struct ActivityInput {
    pub language: String,
    pub kind: String,
}

#[derive(Default, Deserialize, Serialize, Debug, Clone)]
pub struct ActivityOutput {
    pub kind: String,
    pub platform: String,
    pub features: Vec<String>,
}

pub async fn sdk_example_activity(
    _ctx: ActContext,
    input: ActivityInput,
) -> Result<(String, ActivityOutput), Error> {
    Ok((
        format!("Workflow written in {} {}", input.kind, input.language),
        ActivityOutput {
            kind: "worker".to_string(),
            platform: "temporal".to_string(),
            features: vec![
                "performance".to_string(),
                "async".to_string(),
                "type-safe".to_string(),
                "resource-efficient".to_string(),
            ],
        },
    ))
}

Implementing a Workflow

use super::activity::*;
use temporal_sdk::prelude::workflow::*;

#[derive(Default, Deserialize, Serialize, Debug, Clone)]
pub struct WorkflowInput {
    pub code: String,
    pub kind: String,
}

pub async fn sdk_example_workflow(
    ctx: WfContext,
    input: WorkflowInput,
) -> Result<WfExitValue<ActivityOutput>, anyhow::Error> {
    let output = execute_activity_1_args_with_errors(
        &ctx,
        ActivityOptions {
            activity_id: Some("sdk_example_activity".to_string()),
            activity_type: "sdk_example_activity".to_string(),
            schedule_to_close_timeout: Some(Duration::from_secs(5)),
            ..Default::default()
        },
        sdk_example_activity,
        ActivityInput {
            language: input.code,
            kind: input.kind,
        },
    )
    .await;
    match output {
        Ok(output) => Ok(WfExitValue::Normal(output.1)),
        Err(e) => Err(anyhow::Error::from(e)),
    }
}

Running a Worker

use std::{str::FromStr, sync::Arc};
use temporal_sdk::prelude::worker::*;
use temporal_sdk::prelude::registry::*;

pub(crate) async fn worker() -> Result<Worker, Box<dyn std::error::Error>> {
    let server_options = sdk_client_options(Url::from_str("http://localhost:7233")?).build()?;
    let client = server_options.connect("default", None, None).await?;
    let telemetry_options = TelemetryOptionsBuilder::default().build()?;
    let runtime = CoreRuntime::new_assume_tokio(telemetry_options)?;
    let task_queue = "example-task-queue";
    let worker_config = WorkerConfigBuilder::default()
        .namespace("default")
        .task_queue(task_queue)
        .worker_build_id("example-rust-worker")
        .build()?;
    let core_worker = init_worker(&runtime, worker_config, client)?;
    Ok(Worker::new_from_core(Arc::new(core_worker), task_queue))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut worker = worker::worker().await.unwrap();

    worker.register_activity(
        "sdk_example_activity",
        into_activity_1_args_with_errors(activity::sdk_example_activity),
    );

    worker.register_wf(
        "sdk_example_workflow",
        into_workflow_1_args(workflow::sdk_example_workflow),
    );

    worker.run().await?;

    Ok(())
}

Executing a Workflow

Build

cargo build

Run

temporal server start-dev
cargo run

In a separate terminal:

temporal workflow start --task-queue="example-task-queue" --type="sdk_example_workflow" --workflow-id="example_id" --input='{"code":"rust","kind":"typesafe"}'

Check the execution in the Temporal Web UI: http://localhost:8233/namespaces/default/workflows

The full code is here https://github.com/h7kanna/temporal-rust-worker

Check out https://www.ekalavya.dev/ for more upcoming blogs on Temporal Rust SDK.