1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
/* Redis Glue is provides abstractions over single and cluster mode Redis interactions
 * Copyright 2021 Aravinth Manivannan <realaravinth@batsense.net>
 *
 * Licensed under the Apache License, Version 2.0 (the "License") or MIT
 */

//! Redis Client/Connection manager that can handle both single and clustered Redis Instances
use std::cell::RefCell;
use std::rc::Rc;

use redis::cluster::ClusterClient;
use redis::Client;
use redis::FromRedisValue;
use redis::RedisResult;
use redis::{aio::Connection, cluster::ClusterConnection};

pub use redis;

/// Client configuration
#[derive(Clone)]
pub enum RedisConfig {
    /// Redis server URL
    Single(String),
    /// List of URL of Redis nodes in cluster mode
    Cluster(Vec<String>),
}

impl RedisConfig {
    /// Create Redis connection
    pub fn connect(&self) -> RedisClient {
        match self {
            Self::Single(url) => {
                let client = Client::open(url.as_str()).unwrap();
                RedisClient::Single(client)
            }
            Self::Cluster(nodes) => {
                let cluster_client = ClusterClient::open(nodes.to_owned()).unwrap();
                RedisClient::Cluster(cluster_client)
            }
        }
    }
}

/// Redis connection - manages both single and clustered deployments
#[derive(Clone)]
pub enum RedisConnection {
    Single(Rc<RefCell<Connection>>),
    Cluster(Rc<RefCell<ClusterConnection>>),
}

impl RedisConnection {
    #[inline]
    /// Get client. Uses interior mutability, so lookout for panics
    pub fn get_client(&self) -> Self {
        match self {
            Self::Single(con) => Self::Single(Rc::clone(&con)),
            Self::Cluster(con) => Self::Cluster(Rc::clone(&con)),
        }
    }
    #[inline]
    /// execute a redis command against a [Self]
    pub async fn exec<T: FromRedisValue>(&self, cmd: &mut redis::Cmd) -> redis::RedisResult<T> {
        match self {
            RedisConnection::Single(con) => cmd.query_async(&mut *con.borrow_mut()).await,
            RedisConnection::Cluster(con) => cmd.query(&mut *con.borrow_mut()),
        }
    }

    pub async fn ping(&self) -> bool {
        if let Ok(redis::Value::Status(v)) = self.exec(&mut redis::cmd("PING")).await {
            v == "PONG"
        } else {
            false
        }
    }
}

#[derive(Clone)]
/// Client Configuration that can be used to get new connection shuld [RedisConnection] fail
pub enum RedisClient {
    Single(Client),
    Cluster(ClusterClient),
}

/// A Redis Client Object that encapsulates [RedisClient] and [RedisConnection].
/// Use this when you need a Redis Client
#[derive(Clone)]
pub struct Redis {
    _client: RedisClient,
    connection: RedisConnection,
}

impl Redis {
    /// create new [Redis]. Will try to connect to Redis instance specified in [RedisConfig]
    pub async fn new(redis: RedisConfig) -> RedisResult<Self> {
        let (_client, connection) = Self::connect(redis).await?;
        let master = Self {
            _client,
            connection,
        };
        Ok(master)
    }

    /// Get client to do interact with Redis server.
    ///
    /// Uses Interior mutability so look out for panics
    pub fn get_client(&self) -> RedisConnection {
        self.connection.get_client()
    }

    async fn connect(redis: RedisConfig) -> RedisResult<(RedisClient, RedisConnection)> {
        let redis = redis.connect();
        let client = match &redis {
            RedisClient::Single(c) => {
                let con = c.get_async_connection().await?;
                RedisConnection::Single(Rc::new(RefCell::new(con)))
            }
            RedisClient::Cluster(c) => {
                let con = c.get_connection()?;
                RedisConnection::Cluster(Rc::new(RefCell::new(con)))
            }
        };
        Ok((redis, client))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[actix_rt::test]
    async fn ping_works() {
        let r = Redis::new(RedisConfig::Single("redis://127.0.0.1".into()))
            .await
            .unwrap();
        assert!(r.get_client().ping().await);
    }

    #[actix_rt::test]
    async fn exec_works() {
        const VAR: (&str, &str) = ("testval", "4");
        let r = Redis::new(RedisConfig::Single("redis://127.0.0.1".into()))
            .await
            .unwrap();
        let _set: () = r
            .get_client()
            .exec(redis::cmd("SET").arg(&[VAR.0, VAR.1]))
            .await
            .unwrap();

        let get: String = r
            .get_client()
            .exec(redis::cmd("GET").arg(&[VAR.0]))
            .await
            .unwrap();

        assert_eq!(&get, VAR.1);
    }
}