commit 53a46578349972ae25c1648615f7c2bfc2f3d922 Author: JesusPerez Date: Mon Sep 20 13:06:08 2021 +0100 init repo diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..6515696 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "connectors" +version = "0.1.0" +authors = ["JesusPerez "] +edition = "2018" + +[dependencies] +anyhow = "1.0.40" +async-trait = "0.1.51" + +envmnt = "0.9.0" +serde = { version = "1.0", features = ["derive"] } +serde_derive = "1.0" +tokio = { version = "1.5.0", features = ["full"] } + +redis = { version = "0.21.2", features = [ "tokio-comp", "cluster"] } +slab = "0.4.4" +sqlx = {version = "0.5.7", default-features = false, features = ["macros","runtime-tokio-rustls","sqlite", "mysql", "postgres", "decimal", "chrono"]} +sthash = "0.2.11" +tempfile = "3.2.0" +thiserror = "1.0.29" +# tikv-client = { git = "https://github.com/tikv/client-rust.git" } + +datastores = { version = "0.1.0", path = "../defs" } \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7b4f123 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020-2021 Jesús Pérez Lorenzo + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..80b5bb2 --- /dev/null +++ b/README.md @@ -0,0 +1,41 @@ +# Data Stores Connections Library + +## Rust Library to connect and manage [Data Stores](https://en.wikipedia.org/wiki/Data_store) for **LibreClouds** [Klouds](https://rlung.librecloud.online/LibreCloud/Klouds#klouds) + +Part of the following developments: + +- [CloudMandala](https://rlung.librecloud.online/LibreCloud/CloudMandala#cloudmandala) +- [Zteron](https://rlung.librecloud.online/LibreCloud/CloudMandala#cloudmandala) + +It includes definitions and functions for the following [Data Stores](https://en.wikipedia.org/wiki/Data_store): + +- Redis +- MySQL +- Postgres +- Sqlite +- Slab + + +### How to use + +1 - Clone or download this lib in a path, better outside of target development + +2 - Get current version from Cargo.toml + +3 - Include a line like the one below in target development Cargo.toml (adjust version & path) and **use** whatever is need. + +```toml +connectors = { version = "0.1.0", path = "../lib/datastores/connectors" } +``` + +### Connectors + +[Data Store Connectors](LibreCloud/lib_datastores_connectors) + +## Author + +- [Jesús Pérez](https://info.jesusperez.pro). + +## License + +MIT diff --git a/src/defs.rs b/src/defs.rs new file mode 100644 index 0000000..c873dc7 --- /dev/null +++ b/src/defs.rs @@ -0,0 +1,300 @@ +use serde::{Serialize,Deserialize}; +use std::collections::{HashMap}; +use anyhow::{anyhow,Result}; +use datastores::defs::DataStore; + +use crate::redis::RedisPool; +use crate::mysql::MysqlPool; +use crate::postgres::PostgresPool; +use crate::sqlite::SqlitePool; +//use crate::tikv::TikvPool; + +use async_trait::async_trait; + +//use tkdr::crypt_lib::decrypt; // FIXME +fn decrypt(data: &str, _key: &str) -> String { + data.to_owned() +} + +#[async_trait] +pub trait PoolHandle { + async fn connect(&self) -> Self; +} + +#[derive(Debug)] +pub enum DataPool { + Redis(RedisPool), + Mysql(MysqlPool), + Postgres(PostgresPool), + Sqlite(SqlitePool), + //Tikv(TikvPool), + File(String), + Slab(String), + NoPool, +} + +impl PartialEq for DataPool { + fn eq(&self, other: &Self) -> bool { + match self { + DataPool::Redis(source) => + match other { + DataPool::Redis(target) => source.id == target.id, + _ => false, + }, + DataPool::Mysql(source) => + match other { + DataPool::Mysql(target) => source.id == target.id, + _ => false, + }, + DataPool::Postgres(source) => + match other { + DataPool::Postgres(target) => source.id == target.id, + _ => false, + }, + DataPool::Sqlite(source) => + match other { + DataPool::Sqlite(target) => source.id == target.id, + _ => false, + }, + DataPool::File(source) => + match other { + DataPool::File(target) => source == target, + _ => false, + }, + DataPool::Slab(source) => + match other { + DataPool::Slab(target) => source == target, + _ => false, + }, + DataPool::NoPool => + match other { + DataPool::NoPool => true, + _ => false, + }, + } + } +} +impl Default for DataPool { + fn default() -> Self { + DataPool::NoPool + } +} + +#[derive(Debug, Default)] +pub struct AppDataConn { + name: String, + datastores: HashMap, +} + +impl AppDataConn { + pub async fn new(name: String, stores_settings: Vec, key: &str) -> Self { + let mut datastores: HashMap = HashMap::new(); + for settings in stores_settings.iter() { + match settings.datastore { + DataStore::Redis => { + datastores.insert( + settings.id.to_owned(), + DataPool::Redis(RedisPool::new(settings.to_owned()).connect_pool().await) + ); + }, + DataStore::Mysql => { + datastores.insert( + settings.id.to_owned(), + DataPool::Mysql(MysqlPool::new(settings.to_owned(),key).await.connect_pool().await) + ); + }, + DataStore::Postgres => { + datastores.insert( + settings.id.to_owned(), + DataPool::Postgres(PostgresPool::new(settings.to_owned(),key).await.connect_pool().await) + ); + }, + DataStore::Sqlite => { + datastores.insert( + settings.id.to_owned(), + DataPool::Sqlite(SqlitePool::new(settings.to_owned()).connect_pool().await) + ); + }, + _ => continue, + }; + } + Self { + name, + datastores, + } + } + pub fn get_conn(&self,key: &str) -> &DataPool { + self.datastores.get(key).unwrap_or(&DataPool::NoPool) + } + pub async fn get_redis(&self,key: &str) -> Result<&redis::aio::Connection> { + match self.datastores.get(key).unwrap_or(&DataPool::NoPool) { + DataPool::Redis(redis_conn) => + if let Some(pool) = &redis_conn.conn { + Ok(pool) + } else { + Err(anyhow!("Redis pool not available")) + }, + _ => Err(anyhow!("{} is not a Redis connection",key)), + } + } + pub async fn get_mysql(&self,key: &str) -> Result<&sqlx::pool::PoolConnection> { + match self.datastores.get(key).unwrap_or(&DataPool::NoPool) { + DataPool::Mysql(mysql_conn) => + if let Some(pool) = &mysql_conn.conn { + Ok(pool) + } else { + Err(anyhow!("Mysql pool not available")) + }, + _ => Err(anyhow!("{} is not a Mysql connection",key)), + } + } + pub async fn get_postgres(&self,key: &str) -> Result<&sqlx::pool::PoolConnection> { + match self.datastores.get(key).unwrap_or(&DataPool::NoPool) { + DataPool::Postgres(postgres_conn) => + if let Some(pool) = &postgres_conn.conn { + Ok(pool) + } else { + Err(anyhow!("Postgres pool not available")) + }, + _ => Err(anyhow!("{} is not a Postgres connection",key)), + } + } + pub async fn get_sqlite(&self,key: &str) -> Result<&sqlx::Pool> { + match self.datastores.get(key).unwrap_or(&DataPool::NoPool) { + DataPool::Sqlite(sqlite_conn) => + if let Some(pool) = &sqlite_conn.conn { + Ok(pool) + } else { + Err(anyhow!("Sqlite pool not available")) + }, + _ => Err(anyhow!("{} is not a Sqlite connection",key)), + } + } + pub async fn check_connections(&self, datastores_settings: Vec) -> bool { + let debug = envmnt::get_isize("DEBUG",0); + let mut status = false; + for con in &datastores_settings { + match con.datastore { + DataStore::Redis => { + status = match self.get_redis(&con.id).await { + Ok(_pool) => { + if debug > 0 { + println!("app_data_conn found redis pool"); + } + true + }, + Err(e) => { + if StoreSettings::check_required_id(datastores_settings.to_owned(),&con.id) { + panic!("Error app_data_conn required: {}",e); + } else { + println!("Error app_data_conn: {}",e); + } + false + }, + } + } + _ => { + continue; + } + }; + } + status + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, Default)] +pub struct StoreSettings { + pub id: String, + pub host: String, + pub port: u32, + pub user: String, + pub pass: String, + pub datastore: DataStore, + pub database: String, + pub prefix: String, + pub max_conn: u32, + pub required: bool, +} +impl StoreSettings { + pub fn get_credentials(&self,key: &str) -> (String,String) { + let user: String; + let pass: String; + if key.is_empty() { + user = self.user.to_owned(); + pass = self.pass.to_owned(); + } else { + user = decrypt(&self.user, key); + pass = decrypt(&self.pass, key); + } + (user,pass) + } + pub fn url_db(&self, key: &str) -> String { + let store = self.get_store(); + let (user,pass) = self.get_credentials(key); + let user_pass: String; + if ! user.is_empty() || ! pass.is_empty() { + user_pass = format!("{}:{}", &user, &pass); + } else { + user_pass = String::from(""); + } + let host = &self.host; + let port = &self.port; + let database = &self.database; + format!( + "{}://{}@{}:{}/{}", + store, &user_pass, &host, &port, &database + ) + } + pub fn url_keyval(&self) -> String { + let store = self.get_store(); + let host = &self.host; + let port = &self.port; + format!("{}://{}:{}", store, &host, &port) + } + pub fn url_local(&self) -> String { + let store = self.get_store(); + format!("{}.{}", store, &self.database) + } + pub fn get_store(&self) -> String { + match self.datastore { + DataStore::File => String::from("file"), + DataStore::Mysql => String::from("mysql"), + DataStore::Postgres => String::from("postgres"), + DataStore::Sqlite => String::from("sqlite"), + DataStore::Redis => String::from("redis"), + // DataStore::Tikv => String::from("tikv"), + DataStore::Slab => String::from("slab"), + DataStore::Unknown => String::from("Unknown"), + } + } + pub fn find_storesetting_id(cfg_store_settings: Vec, id: &str) -> Result> { + let datastore_found: Vec = cfg_store_settings.iter().filter(|itm| itm.id == id).cloned().collect(); + if datastore_found.len() > 0 { + Ok(datastore_found) + } else { + Err(anyhow!("No DataStore Settings found for: {}",&id)) + } + } + pub fn check_required_id(cfg_store_settings: Vec, id: &str) -> bool { + match StoreSettings::find_storesetting_id(cfg_store_settings.to_owned(),id) { + Ok(ds_found) => + if let Some(item) = ds_found.get(0) { + item.required + } else { + false + }, + Err(e) => { + eprintln!("Error check required: {}",e); + false + }, + } + } +} + +#[derive(Default)] +pub struct DataStorePool + where T : PoolHandle + Default +{ + pub pool: T, + pub typ: DataStore, +} \ No newline at end of file diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..b44ff36 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,39 @@ +/// Error definition +// +use thiserror::Error; +use std::fmt; + +// #[derive(Debug, Fail)] + +#[derive(Error, Eq, PartialEq, Clone, Debug)] +pub enum DatabaseErrors { +// #[fail(display = "{}. Reason: {}", 0, 1)] + ConnectionFailed(String, String), +// #[fail(display = "{}. Reason: {}", 0, 1)] + PoolCreationFailed(String, String), +} +impl fmt::Display for DatabaseErrors { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + DatabaseErrors::ConnectionFailed(s,t) => write!(f,"connection {}: {} ",s,t), + DatabaseErrors::PoolCreationFailed(s,t) => write!(f,"pool {}: {} ",s,t), + } + //write!(f, "{:?}", self) + } +} + +/// Database Store Errors +#[derive(Error, Debug)] +pub enum DataStoreError { + #[error("data store disconnected")] + Disconnect(#[from] std::io::Error), + #[error("the data for key `{0}` is not available")] + Redaction(String), + #[error("invalid header (expected {expected:?}, found {found:?})")] + InvalidHeader { + expected: String, + found: String, + }, + #[error("unknown data store error")] + Unknown, +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..28534d8 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,7 @@ +pub mod defs; +pub mod errors; +pub mod mysql; +pub mod postgres; +pub mod redis; +pub mod sqlite; +// pub mod tikv; diff --git a/src/mysql.rs b/src/mysql.rs new file mode 100644 index 0000000..b728279 --- /dev/null +++ b/src/mysql.rs @@ -0,0 +1,82 @@ +/// `MySql` Connector +// +use sqlx::mysql::MySqlPoolOptions; +use anyhow::{Result,anyhow}; +use async_trait::async_trait; + +use crate::defs::{StoreSettings,PoolHandle}; + +#[derive(Default)] +pub struct MysqlPool { + pub id: String, + pub client: Option>, + pub conn: Option>, +} + +impl std::fmt::Debug for MysqlPool { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "MysqlPool client: {:?}", self.client) + } +} + +impl MysqlPool { + pub async fn new(pool_settings: StoreSettings,key: &str) -> Self { + let url = pool_settings.url_db(key); + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Open Mysql {}: {}",pool_settings.id, &url); + } + match MySqlPoolOptions::new().max_connections(pool_settings.max_conn) + // .connect_lazy(&url); + .connect(&url).await { + Ok(cli) => Self { + id: pool_settings.id, + client: Some(cli), + conn: None, + }, + Err(e) => { + eprintln!("Error Mysql new pool: {}",e); + Self::default() + }, + } + } + pub async fn connect_pool(self) -> Self { + self.connect().await + } + pub async fn get_conn(self) -> Result> { + if let Some(pool) = self.conn { + Ok(pool) + } else { + let cli=self.connect_pool().await; + if let Some(pool) = cli.conn { + Ok(pool) + } else { + Err(anyhow!("Mysql pool not available")) + } + } + } +} +#[async_trait] +impl PoolHandle for MysqlPool { + async fn connect(&self) -> Self { + let cli = self; + let mut conn = None; // DataPool::NoPool; + if let Some(client) = &cli.client { + if let Some(cli_pool) = client.try_acquire() { + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Got Mysql connection"); + } + conn = Some(cli_pool); + } else { + eprintln!("Error Mysql connection "); + } + } + Self { + id: cli.id.to_owned(), + client: cli.client.to_owned(), + conn, + } + } + +} \ No newline at end of file diff --git a/src/postgres.rs b/src/postgres.rs new file mode 100644 index 0000000..7de13f7 --- /dev/null +++ b/src/postgres.rs @@ -0,0 +1,82 @@ +/// Postgres Connector +// +use sqlx::postgres::PgPoolOptions; +use anyhow::{Result,anyhow}; +use async_trait::async_trait; + +use crate::defs::{StoreSettings,PoolHandle}; + +#[derive(Default)] +pub struct PostgresPool { + pub id: String, + pub client: Option>, + pub conn: Option>, +} + +impl std::fmt::Debug for PostgresPool { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "PostgresPool client: {:?}", self.client) + } +} + +impl PostgresPool { + pub async fn new(pool_settings: StoreSettings,key: &str) -> Self { + let url = pool_settings.url_db(key); + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Open Postgres {}: {}",pool_settings.id, &url); + } + match PgPoolOptions::new().max_connections(pool_settings.max_conn) + // .connect_lazy(&url); + .connect(&url).await { + Ok(cli) => Self { + id: pool_settings.id, + client: Some(cli), + conn: None, + }, + Err(e) => { + eprintln!("Error Postgres new pool: {}",e); + Self::default() + }, + } + } + pub async fn connect_pool(self) -> Self { + self.connect().await + } + pub async fn get_conn(self) -> Result> { + if let Some(pool) = self.conn { + Ok(pool) + } else { + let cli=self.connect_pool().await; + if let Some(pool) = cli.conn { + Ok(pool) + } else { + Err(anyhow!("Postgres pool not available")) + } + } + } +} +#[async_trait] +impl PoolHandle for PostgresPool { + async fn connect(&self) -> Self { + let cli = self; + let mut conn = None; // DataPool::NoPool; + if let Some(client) = &cli.client { + if let Some(cli_pool) = client.try_acquire() { + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Got Postgres connection"); + } + conn = Some(cli_pool); + } else { + eprintln!("Error Postgres connection "); + } + } + Self { + id: cli.id.to_owned(), + client: cli.client.to_owned(), + conn, + } + } + +} diff --git a/src/redis.rs b/src/redis.rs new file mode 100644 index 0000000..5e1b1ac --- /dev/null +++ b/src/redis.rs @@ -0,0 +1,82 @@ +/// Redis Connector +// +use async_trait::async_trait; +use anyhow::{anyhow,Result}; + +use crate::defs::{StoreSettings,PoolHandle}; + +#[derive(Default)] +pub struct RedisPool { + pub id: String, + pub client: Option, + pub conn: Option, +} + +impl std::fmt::Debug for RedisPool { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "RedisPool client: {:?}", self.client) + } +} + +impl RedisPool { + pub fn new(pool_settings: StoreSettings) -> Self { + let url = pool_settings.url_keyval(); + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Open Redis {}: {}",pool_settings.id, &url); + } + match redis::Client::open(url) { + Ok(cli) => Self { + id: pool_settings.id, + client: Some(cli.to_owned()), + conn: None, + }, + Err(e) => { + eprintln!("Error Redis new pool: {}",e); + Self::default() + }, + } + } + pub async fn connect_pool(self) -> Self { + self.connect().await + } + pub async fn get_conn(self) -> Result { + if let Some(pool) = self.conn { + Ok(pool) + } else { + let cli=self.connect_pool().await; + if let Some(pool) = cli.conn { + Ok(pool) + } else { + Err(anyhow!("Redis pool not available")) + } + } + } +} +#[async_trait] +impl PoolHandle for RedisPool { + async fn connect(&self) -> Self { + let cli = self; + let mut conn = None; + if let Some(client) = &cli.client { + match client.get_async_connection().await { + Ok(cli_pool) => { + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Got Redis connection"); + } + conn = Some(cli_pool); + }, + Err(e) => { + eprintln!("Error Redis connection: {}",e); + } + }; + } + Self { + id: cli.id.to_owned(), + client: cli.client.to_owned(), + conn, + } + } + +} diff --git a/src/sqlite.rs b/src/sqlite.rs new file mode 100644 index 0000000..6714e2c --- /dev/null +++ b/src/sqlite.rs @@ -0,0 +1,80 @@ +/// Sqlite Connection +// +use anyhow::{Result,anyhow}; +use async_trait::async_trait; + +use crate::defs::{StoreSettings,PoolHandle}; + +#[derive(Default)] +pub struct SqlitePool { + pub id: String, + // TODO Pool with Sqlite not working + // pub client: Option>, + // pub conn: Option>, + pub client: Option, + pub conn: Option>, +} + +impl std::fmt::Debug for SqlitePool { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "SqlitePool client: {:?}", self.client) + } +} + +impl SqlitePool { + pub fn new(pool_settings: StoreSettings) -> Self { + let url = pool_settings.url_local(); + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Open Sqlite {}: {}",pool_settings.id, &url); + } + Self { + id: pool_settings.id.to_owned(), + client: Some(url), + conn: None, + } + } + pub async fn connect_pool(self) -> Self { + self.connect().await + } + pub async fn get_conn(self) -> Result> { + if let Some(pool) = self.conn { + Ok(pool) + } else { + let cli=self.connect_pool().await; + if let Some(pool) = cli.conn { + Ok(pool) + } else { + Err(anyhow!("Sqlite pool not available")) + } + } + } +} +#[async_trait] +impl PoolHandle for SqlitePool { + async fn connect(&self) -> Self { + let cli = self; + let mut conn = None; // DataPool::NoPool; + if let Some(client) = &cli.client { + // TODO Pool with Sqlite not working + // match client.acquire().await { + match sqlx::sqlite::SqlitePool::connect(&client).await { + Ok(pool) => { + let debug = envmnt::get_isize("DEBUG",0); + if debug > 0 { + println!("Got Sqlite connection"); + } + conn = Some(pool); + }, + Err(e) => { + eprintln!("Error Sqlite new pool: {}",e); + }, + }; + } + Self { + id: cli.id.to_owned(), + client: cli.client.to_owned(), + conn, + } + } +} \ No newline at end of file diff --git a/src/tikv.rs b/src/tikv.rs new file mode 100644 index 0000000..546f2f8 --- /dev/null +++ b/src/tikv.rs @@ -0,0 +1,31 @@ +/// Tikv Connection +// +// Copyright 2020, Jesús Pérez Lorenzo +// +use crate::models::Tikv; +use anyhow::Context as AnyContext; +use std::path::PathBuf; +use tikv_client::{Config, Key, KvPair, RawClient, Result, TransactionClient, ToOwnedRange, Value}; +use crate::models::config::StoreKeyValue; + +/// Tikv pool connection +pub async fn get_tikv_pool(tikv: &Tikv) -> anyhow::Result { + let tikv_url = config.url("tikv"); + // Optionally encrypt the traffic. + let config = if let (Some(ca), Some(cert), Some(key)) = (tikv.ca, tikv.cert, tikv.key) { + Config::new(tikv.pd).with_security(ca, cert, key) + } else { + Config::new(tikv.pd) + }; + // When we first create a client we receive a `Connect` structure which must be resolved before + // the client is actually connected and usable. + // let client = if tikv.trans { + // TransactionClient::new(config).await? + // //let mut txn = txn_client.begin().await?; + // } else { + let client = RawClient::new(config) + .await + .context(format!("get tikv new tikv pool {}", redis_url))?; + //} + Ok(client) +}