Practical Implementation of Database Connection Pooling with Rust and PostgreSQL: deadpool and diesel

Practical Implementation of Database Connection Pooling with Rust and PostgreSQL: deadpool and diesel (Part 4)

The idea behind developing RTMate in Rust is to create a platform that provides WebSocket connection services, eliminating the need for users to set up their own WebSocket services. In this process, it is essential to manage different tenants and the clients under each tenant, which requires authentication, connection registration, and statistics. Therefore, I chose PostgreSQL as the underlying database. Operating the database requires a database connection, and creating database connections is resource-intensive, so a connection pool is necessary to manage these connections. Rust supports both synchronous and asynchronous programming; how can we build a PostgreSQL database connection pool in Rust? How can we use the database connections from the connection pool with an ORM framework in the project? This article will explore these questions.

GitHub Repository:RTMate

During the development of RTMate, I realized that establishing a new database connection for each query would lead to performance issues, so I began researching database connection pool solutions in Rust. After trying several libraries, I ultimately chose the combination of deadpool and diesel. Here, I will document the problems encountered during usage and their solutions.

Review of Basic diesel Usage

I chose diesel mainly for its type safety, which allows SQL field type errors to be detected at compile time, which is much better than runtime errors. Moreover, it is widely used in crates.io, making it a reliable choice.

I won’t go into detail about the basic usage, as the official documentation is quite clear:https://diesel.rs/guides/getting-started.html

Let’s briefly review the core concepts:

1. Schema Definition (Automatically Generated)

Generate the schema using the diesel CLI, which reads the database table structure:

diesel print-schema

For example, this simple posts table:

CREATE TABLE posts (
  id SERIAL PRIMARY KEY,
  title VARCHAR NOT NULL,
  body TEXT NOT NULL,
  published BOOLEAN NOT NULL DEFAULT FALSE
)

The generated schema looks like this:

// @generated automatically by Diesel CLI.

diesel::table! {
    posts (id) {
        id -> Int4,
        title -> Varchar,
        body -> Text,
        published -> Bool,
    }
}

2. Model Definition

use diesel::prelude::*;

#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::posts)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Post {
    pub id: i32,
    pub title: String,
    pub body: String,
    pub published: bool,
}

3. Basic Connection and Query

The most basic usage is to create a connection each time:

use diesel::prelude::*;
use dotenvy::dotenv;
use std::env;

pub fn establish_connection() -> PgConnection {
    dotenv().ok();

    let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
    PgConnection::establish(&database_url)
        .unwrap_or_else(|_| panic!("Error connecting to {}", database_url))
}

Then you can query directly:

use self::models::*;
use diesel::prelude::*;
use diesel_demo::*;

fn main() {
    use self::schema::posts::dsl::*;

    let connection = &mut establish_connection();
    let results = posts
        .filter(published.eq(true))
        .limit(5)
        .select(Post::as_select())
        .load(connection)
        .expect("Error loading posts");

    println!("Displaying {} posts", results.len());
    for post in results {
        println!("{}", post.title);
        println!("-----------\n");
        println!("{}", post.body);
    }
}

The Necessity of Connection Pools

The method of creating a connection each time is acceptable during development, but it is absolutely not feasible in a production environment. The overhead of creating connections in PostgreSQL is significant, and there is a limit to the number of connections.

Each time a connection is established, it requires TCP handshake, authentication, parameter negotiation, and other steps, which can become a performance bottleneck in high-concurrency scenarios. Moreover, the maximum number of connections on the database server is limited; if each request creates a new connection, it can easily exhaust connection resources.

Therefore, it is essential to use a connection pool to allow connections to be reused and avoid the overhead of repeatedly creating connections.

Practical Implementation of deadpool + diesel Connection Pool

After researching several connection pool solutions, I ultimately chose deadpool. The main reasons are:

  1. Native support for asynchronous operations

  2. There is a dedicated deadpool_diesel package

  3. Simple configuration, and the documentation is relatively clear

Let’s take a look at the actual code in my project, which defines the client connection record table:

Schema:

diesel::table! {
    rt_client_connection (id) {
        id -> Int8,
        app_id -> Int8,
        #[max_length = 100]
        rt_app -> Varchar,
        #[max_length = 100]
        client_id -> Varchar,
        #[max_length = 100]
        connect_token -> Varchar,
        used -> Bool,
        created_time -> Nullable<Timestamptz>,
        expire_time -> Nullable<Timestamptz>,
    }
}

Model:

#[derive(Queryable, Selectable, Deserialize, Serialize, Debug)]
#[diesel(table_name = rt_client_connection)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct RtClientConnection {
    pub id: i64,
    pub app_id: i64,
    pub rt_app: String,
    pub client_id: String,
    pub connect_token: String,
    pub used: bool,
    pub created_time: Option<DateTime<Utc>>,
    pub expire_time: Option<DateTime<Utc>>,
}

Connection Pool Encapsulation:

use deadpool_diesel::Runtime;
use deadpool_diesel::postgres::BuildError;
use deadpool_diesel::postgres::Manager;
use deadpool_diesel::postgres::Pool;
use deadpool_diesel::Timeouts;
use deadpool_diesel::postgres::Object;

#[derive(Clone)]
pub struct DataSource {
    pool: deadpool_diesel::Pool<deadpool_diesel::Manager<diesel::PgConnection>>,
}

impl DataSource {
    pub async fn new() -> anyhow::Result<Self> {
        dotenvy::dotenv().map_err(PoolError::Env)?;

        let db_config = DbConfig::from_env()
            .map_err(|e| anyhow::anyhow!("Failed to load database configuration: {}", e))?;

        let manager = Manager::new(db_config.database_url, Runtime::Tokio1);
        let pool: deadpool_diesel::Pool<deadpool_diesel::Manager<diesel::PgConnection>> = Pool::builder(manager)
            .max_size(db_config.max_connections)
            .timeouts(db_config.timeouts)
            .build()
            .map_err(PoolError::PoolBuildError)?;

        Ok(DataSource { pool })
    }

    pub async fn get_connection(&self) -> anyhow::Result<Object> {
        let conn = self.pool.get().await?;
        Ok(conn)
    }
}

Insert Operation:

async fn save_connect_token(&self, new_connection: NewRtClientConnection) -> anyhow::Result<()> {
    let pg_connection = self.data_source.get_connection().await?;
    pg_connection
        .interact(move |conn: &mut diesel::PgConnection| {
            diesel::insert_into(conn_dsl::rt_client_connection)
                .values(&new_connection)
                .execute(conn)
        })
        .await
        .map_err(|e| anyhow::anyhow!("Insert rt app new_connection failed: {}", e))??;
    Ok(())
}

Key Point Analysis

Initially, when I used the <span>interact</span> method, I completely didn’t understand why it was so convoluted. Why not just use the connection directly?

Later, I understood the reason:

  1. <span>get_connection()</span> retrieves a connection from the pool asynchronously

  2. diesel operations are synchronous and cannot be used directly in an asynchronous context

  3. <span>interact()</span> offloads synchronous operations to a dedicated thread pool, returning a Future

  4. Using <span>move</span> to transfer data into the thread to avoid lifetime issues

The double <span>?</span> also caused confusion: the first <span>?</span> handles errors from <span>interact</span> itself (e.g., if the thread pool fails), and the second <span>?</span> handles errors from diesel operations (e.g., if SQL execution fails).

Query Operation

/// Query for RtClientConnection token that has not been successfully created based on connect_token
async fn get_rt_client_connection_by_token(&self, query_connect_token: &str) -> anyhow::Result<Option<RtClientConnection>> {
    let pg_connection = self.get_connection().await?;
    let connect_token_query = query_connect_token.to_owned();
    use rtmate_common::schema::rt_client_connection::dsl::*;
    let result = pg_connection.interact(move |conn: &mut diesel::PgConnection| {
        rt_client_connection
            .filter(connect_token.eq(connect_token_query))
            .filter(used.eq(false))
            .select(RtClientConnection::as_select())
            .first::<RtClientConnection>(conn)
            .optional()
    }).await.map_err(|e| anyhow::anyhow!("Query failed: {}", e))??;
    Ok(result)
}

Note that here I used <span>query_connect_token.to_owned()</span>, because I need to move the data to another thread and cannot use a reference.

Connection Pool Configuration

I spent a lot of time configuring this part, mainly loading configurations through environment variables:

#[derive(Debug, Deserialize)]
struct DbConfig {
    pub database_url: String,
    #[serde(default = "default_max_connections")]
    pub max_connections: usize,
    #[serde(default = "default_connect_timeout")]
    pub connect_timeout: u64,
    #[serde(rename = "test_query", default = "default_test_query")]
    pub test_query: String,
    #[serde(flatten)]
    pub timeouts: Timeouts,
}

fn default_max_connections() -> usize {
    5
}

fn default_connect_timeout() -> u64 {
    10
}
}

fn default_test_query() -> String {
    "SELECT 1".to_string()
}

impl DbConfig {
    pub fn from_env() -> anyhow::Result<Self, ConfigError> {
        config::Config::builder()
            .add_source(config::Environment::default())
            .build()
            .unwrap()
            .try_deserialize()
    }
}

// Create connection pool
let manager = Manager::new(db_config.database_url, Runtime::Tokio1);
let pool: deadpool_diesel::Pool<deadpool_diesel::Manager<diesel::PgConnection>> = Pool::builder(manager)
    .max_size(db_config.max_connections)
    .timeouts(db_config.timeouts)
    .build()
    .map_err(PoolError::PoolBuildError)?;

Several important parameters:

  • <span>max_size</span>: Maximum number of connections, adjusted based on database configuration and concurrency (default is 5 in the project)

  • <span>timeouts</span>: A structure containing various timeout configurations, set through environment variables

  • <span>database_url</span>: Database connection URL, read from environment variables

  • <span>Runtime::Tokio1</span>: Specifies the asynchronous runtime as Tokio

Note that the project uses configuration structures and environment variables instead of hardcoded parameter values. This makes it more flexible and allows for configuration adjustments based on different environments.

Conclusion

The combination of deadpool and diesel is indeed excellent:

  • Significant performance improvement, especially in high-concurrency scenarios

  • diesel’s type safety is very useful, allowing SQL issues to be detected at compile time

  • deadpool’s asynchronous support modernizes the entire architecture

  • Relatively simple configuration, and the documentation is decent

If you are also using Rust to write services that require a database, you might want to try this combination. Although the learning curve is slightly higher than using a direct connection, the performance benefits are definitely worth it.

Leave a Comment