init repo

This commit is contained in:
Jesús Pérez Lorenzo 2021-09-20 13:06:08 +01:00
commit 53a4657834
11 changed files with 789 additions and 0 deletions

24
Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "connectors"
version = "0.1.0"
authors = ["JesusPerez <jpl@jesusperez.pro>"]
edition = "2018"
[dependencies]
anyhow = "1.0.40"
async-trait = "0.1.51"
envmnt = "0.9.0"
serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
tokio = { version = "1.5.0", features = ["full"] }
redis = { version = "0.21.2", features = [ "tokio-comp", "cluster"] }
slab = "0.4.4"
sqlx = {version = "0.5.7", default-features = false, features = ["macros","runtime-tokio-rustls","sqlite", "mysql", "postgres", "decimal", "chrono"]}
sthash = "0.2.11"
tempfile = "3.2.0"
thiserror = "1.0.29"
# tikv-client = { git = "https://github.com/tikv/client-rust.git" }
datastores = { version = "0.1.0", path = "../defs" }

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020-2021 Jesús Pérez Lorenzo
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

41
README.md Normal file
View File

@ -0,0 +1,41 @@
# Data Stores Connections Library
## Rust Library to connect and manage [Data Stores](https://en.wikipedia.org/wiki/Data_store) for **LibreClouds** [Klouds](https://rlung.librecloud.online/LibreCloud/Klouds#klouds)
Part of the following developments:
- [CloudMandala](https://rlung.librecloud.online/LibreCloud/CloudMandala#cloudmandala)
- [Zteron](https://rlung.librecloud.online/LibreCloud/CloudMandala#cloudmandala)
It includes definitions and functions for the following [Data Stores](https://en.wikipedia.org/wiki/Data_store):
- Redis
- MySQL
- Postgres
- Sqlite
- Slab
### How to use
1 - Clone or download this lib in a path, better outside of target development
2 - Get current version from <u>Cargo.toml</u>
3 - Include a line like the one below in <u>target development Cargo.toml</u> (adjust version & path) and **use** whatever is need.
```toml
connectors = { version = "0.1.0", path = "../lib/datastores/connectors" }
```
### Connectors
[Data Store Connectors](LibreCloud/lib_datastores_connectors)
## Author
- [Jesús Pérez](https://info.jesusperez.pro).
## License
MIT

300
src/defs.rs Normal file
View File

@ -0,0 +1,300 @@
use serde::{Serialize,Deserialize};
use std::collections::{HashMap};
use anyhow::{anyhow,Result};
use datastores::defs::DataStore;
use crate::redis::RedisPool;
use crate::mysql::MysqlPool;
use crate::postgres::PostgresPool;
use crate::sqlite::SqlitePool;
//use crate::tikv::TikvPool;
use async_trait::async_trait;
//use tkdr::crypt_lib::decrypt; // FIXME
fn decrypt(data: &str, _key: &str) -> String {
data.to_owned()
}
#[async_trait]
pub trait PoolHandle {
async fn connect(&self) -> Self;
}
#[derive(Debug)]
pub enum DataPool {
Redis(RedisPool),
Mysql(MysqlPool),
Postgres(PostgresPool),
Sqlite(SqlitePool),
//Tikv(TikvPool),
File(String),
Slab(String),
NoPool,
}
impl PartialEq for DataPool {
fn eq(&self, other: &Self) -> bool {
match self {
DataPool::Redis(source) =>
match other {
DataPool::Redis(target) => source.id == target.id,
_ => false,
},
DataPool::Mysql(source) =>
match other {
DataPool::Mysql(target) => source.id == target.id,
_ => false,
},
DataPool::Postgres(source) =>
match other {
DataPool::Postgres(target) => source.id == target.id,
_ => false,
},
DataPool::Sqlite(source) =>
match other {
DataPool::Sqlite(target) => source.id == target.id,
_ => false,
},
DataPool::File(source) =>
match other {
DataPool::File(target) => source == target,
_ => false,
},
DataPool::Slab(source) =>
match other {
DataPool::Slab(target) => source == target,
_ => false,
},
DataPool::NoPool =>
match other {
DataPool::NoPool => true,
_ => false,
},
}
}
}
impl Default for DataPool {
fn default() -> Self {
DataPool::NoPool
}
}
#[derive(Debug, Default)]
pub struct AppDataConn {
name: String,
datastores: HashMap<String,DataPool>,
}
impl AppDataConn {
pub async fn new(name: String, stores_settings: Vec<StoreSettings>, key: &str) -> Self {
let mut datastores: HashMap<String,DataPool> = HashMap::new();
for settings in stores_settings.iter() {
match settings.datastore {
DataStore::Redis => {
datastores.insert(
settings.id.to_owned(),
DataPool::Redis(RedisPool::new(settings.to_owned()).connect_pool().await)
);
},
DataStore::Mysql => {
datastores.insert(
settings.id.to_owned(),
DataPool::Mysql(MysqlPool::new(settings.to_owned(),key).await.connect_pool().await)
);
},
DataStore::Postgres => {
datastores.insert(
settings.id.to_owned(),
DataPool::Postgres(PostgresPool::new(settings.to_owned(),key).await.connect_pool().await)
);
},
DataStore::Sqlite => {
datastores.insert(
settings.id.to_owned(),
DataPool::Sqlite(SqlitePool::new(settings.to_owned()).connect_pool().await)
);
},
_ => continue,
};
}
Self {
name,
datastores,
}
}
pub fn get_conn(&self,key: &str) -> &DataPool {
self.datastores.get(key).unwrap_or(&DataPool::NoPool)
}
pub async fn get_redis(&self,key: &str) -> Result<&redis::aio::Connection> {
match self.datastores.get(key).unwrap_or(&DataPool::NoPool) {
DataPool::Redis(redis_conn) =>
if let Some(pool) = &redis_conn.conn {
Ok(pool)
} else {
Err(anyhow!("Redis pool not available"))
},
_ => Err(anyhow!("{} is not a Redis connection",key)),
}
}
pub async fn get_mysql(&self,key: &str) -> Result<&sqlx::pool::PoolConnection<sqlx::MySql>> {
match self.datastores.get(key).unwrap_or(&DataPool::NoPool) {
DataPool::Mysql(mysql_conn) =>
if let Some(pool) = &mysql_conn.conn {
Ok(pool)
} else {
Err(anyhow!("Mysql pool not available"))
},
_ => Err(anyhow!("{} is not a Mysql connection",key)),
}
}
pub async fn get_postgres(&self,key: &str) -> Result<&sqlx::pool::PoolConnection<sqlx::Postgres>> {
match self.datastores.get(key).unwrap_or(&DataPool::NoPool) {
DataPool::Postgres(postgres_conn) =>
if let Some(pool) = &postgres_conn.conn {
Ok(pool)
} else {
Err(anyhow!("Postgres pool not available"))
},
_ => Err(anyhow!("{} is not a Postgres connection",key)),
}
}
pub async fn get_sqlite(&self,key: &str) -> Result<&sqlx::Pool<sqlx::Sqlite>> {
match self.datastores.get(key).unwrap_or(&DataPool::NoPool) {
DataPool::Sqlite(sqlite_conn) =>
if let Some(pool) = &sqlite_conn.conn {
Ok(pool)
} else {
Err(anyhow!("Sqlite pool not available"))
},
_ => Err(anyhow!("{} is not a Sqlite connection",key)),
}
}
pub async fn check_connections(&self, datastores_settings: Vec<StoreSettings>) -> bool {
let debug = envmnt::get_isize("DEBUG",0);
let mut status = false;
for con in &datastores_settings {
match con.datastore {
DataStore::Redis => {
status = match self.get_redis(&con.id).await {
Ok(_pool) => {
if debug > 0 {
println!("app_data_conn found redis pool");
}
true
},
Err(e) => {
if StoreSettings::check_required_id(datastores_settings.to_owned(),&con.id) {
panic!("Error app_data_conn required: {}",e);
} else {
println!("Error app_data_conn: {}",e);
}
false
},
}
}
_ => {
continue;
}
};
}
status
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct StoreSettings {
pub id: String,
pub host: String,
pub port: u32,
pub user: String,
pub pass: String,
pub datastore: DataStore,
pub database: String,
pub prefix: String,
pub max_conn: u32,
pub required: bool,
}
impl StoreSettings {
pub fn get_credentials(&self,key: &str) -> (String,String) {
let user: String;
let pass: String;
if key.is_empty() {
user = self.user.to_owned();
pass = self.pass.to_owned();
} else {
user = decrypt(&self.user, key);
pass = decrypt(&self.pass, key);
}
(user,pass)
}
pub fn url_db(&self, key: &str) -> String {
let store = self.get_store();
let (user,pass) = self.get_credentials(key);
let user_pass: String;
if ! user.is_empty() || ! pass.is_empty() {
user_pass = format!("{}:{}", &user, &pass);
} else {
user_pass = String::from("");
}
let host = &self.host;
let port = &self.port;
let database = &self.database;
format!(
"{}://{}@{}:{}/{}",
store, &user_pass, &host, &port, &database
)
}
pub fn url_keyval(&self) -> String {
let store = self.get_store();
let host = &self.host;
let port = &self.port;
format!("{}://{}:{}", store, &host, &port)
}
pub fn url_local(&self) -> String {
let store = self.get_store();
format!("{}.{}", store, &self.database)
}
pub fn get_store(&self) -> String {
match self.datastore {
DataStore::File => String::from("file"),
DataStore::Mysql => String::from("mysql"),
DataStore::Postgres => String::from("postgres"),
DataStore::Sqlite => String::from("sqlite"),
DataStore::Redis => String::from("redis"),
// DataStore::Tikv => String::from("tikv"),
DataStore::Slab => String::from("slab"),
DataStore::Unknown => String::from("Unknown"),
}
}
pub fn find_storesetting_id(cfg_store_settings: Vec<StoreSettings>, id: &str) -> Result<Vec<StoreSettings>> {
let datastore_found: Vec<StoreSettings> = cfg_store_settings.iter().filter(|itm| itm.id == id).cloned().collect();
if datastore_found.len() > 0 {
Ok(datastore_found)
} else {
Err(anyhow!("No DataStore Settings found for: {}",&id))
}
}
pub fn check_required_id(cfg_store_settings: Vec<StoreSettings>, id: &str) -> bool {
match StoreSettings::find_storesetting_id(cfg_store_settings.to_owned(),id) {
Ok(ds_found) =>
if let Some(item) = ds_found.get(0) {
item.required
} else {
false
},
Err(e) => {
eprintln!("Error check required: {}",e);
false
},
}
}
}
#[derive(Default)]
pub struct DataStorePool<T>
where T : PoolHandle + Default
{
pub pool: T,
pub typ: DataStore,
}

39
src/errors.rs Normal file
View File

@ -0,0 +1,39 @@
/// Error definition
//
use thiserror::Error;
use std::fmt;
// #[derive(Debug, Fail)]
#[derive(Error, Eq, PartialEq, Clone, Debug)]
pub enum DatabaseErrors {
// #[fail(display = "{}. Reason: {}", 0, 1)]
ConnectionFailed(String, String),
// #[fail(display = "{}. Reason: {}", 0, 1)]
PoolCreationFailed(String, String),
}
impl fmt::Display for DatabaseErrors {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DatabaseErrors::ConnectionFailed(s,t) => write!(f,"connection {}: {} ",s,t),
DatabaseErrors::PoolCreationFailed(s,t) => write!(f,"pool {}: {} ",s,t),
}
//write!(f, "{:?}", self)
}
}
/// Database Store Errors
#[derive(Error, Debug)]
pub enum DataStoreError {
#[error("data store disconnected")]
Disconnect(#[from] std::io::Error),
#[error("the data for key `{0}` is not available")]
Redaction(String),
#[error("invalid header (expected {expected:?}, found {found:?})")]
InvalidHeader {
expected: String,
found: String,
},
#[error("unknown data store error")]
Unknown,
}

7
src/lib.rs Normal file
View File

@ -0,0 +1,7 @@
pub mod defs;
pub mod errors;
pub mod mysql;
pub mod postgres;
pub mod redis;
pub mod sqlite;
// pub mod tikv;

82
src/mysql.rs Normal file
View File

@ -0,0 +1,82 @@
/// `MySql` Connector
//
use sqlx::mysql::MySqlPoolOptions;
use anyhow::{Result,anyhow};
use async_trait::async_trait;
use crate::defs::{StoreSettings,PoolHandle};
#[derive(Default)]
pub struct MysqlPool {
pub id: String,
pub client: Option<sqlx::Pool<sqlx::MySql>>,
pub conn: Option<sqlx::pool::PoolConnection<sqlx::MySql>>,
}
impl std::fmt::Debug for MysqlPool {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "MysqlPool client: {:?}", self.client)
}
}
impl MysqlPool {
pub async fn new(pool_settings: StoreSettings,key: &str) -> Self {
let url = pool_settings.url_db(key);
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Open Mysql {}: {}",pool_settings.id, &url);
}
match MySqlPoolOptions::new().max_connections(pool_settings.max_conn)
// .connect_lazy(&url);
.connect(&url).await {
Ok(cli) => Self {
id: pool_settings.id,
client: Some(cli),
conn: None,
},
Err(e) => {
eprintln!("Error Mysql new pool: {}",e);
Self::default()
},
}
}
pub async fn connect_pool(self) -> Self {
self.connect().await
}
pub async fn get_conn(self) -> Result<sqlx::pool::PoolConnection<sqlx::MySql>> {
if let Some(pool) = self.conn {
Ok(pool)
} else {
let cli=self.connect_pool().await;
if let Some(pool) = cli.conn {
Ok(pool)
} else {
Err(anyhow!("Mysql pool not available"))
}
}
}
}
#[async_trait]
impl PoolHandle for MysqlPool {
async fn connect(&self) -> Self {
let cli = self;
let mut conn = None; // DataPool::NoPool;
if let Some(client) = &cli.client {
if let Some(cli_pool) = client.try_acquire() {
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Got Mysql connection");
}
conn = Some(cli_pool);
} else {
eprintln!("Error Mysql connection ");
}
}
Self {
id: cli.id.to_owned(),
client: cli.client.to_owned(),
conn,
}
}
}

82
src/postgres.rs Normal file
View File

@ -0,0 +1,82 @@
/// Postgres Connector
//
use sqlx::postgres::PgPoolOptions;
use anyhow::{Result,anyhow};
use async_trait::async_trait;
use crate::defs::{StoreSettings,PoolHandle};
#[derive(Default)]
pub struct PostgresPool {
pub id: String,
pub client: Option<sqlx::Pool<sqlx::Postgres>>,
pub conn: Option<sqlx::pool::PoolConnection<sqlx::Postgres>>,
}
impl std::fmt::Debug for PostgresPool {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "PostgresPool client: {:?}", self.client)
}
}
impl PostgresPool {
pub async fn new(pool_settings: StoreSettings,key: &str) -> Self {
let url = pool_settings.url_db(key);
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Open Postgres {}: {}",pool_settings.id, &url);
}
match PgPoolOptions::new().max_connections(pool_settings.max_conn)
// .connect_lazy(&url);
.connect(&url).await {
Ok(cli) => Self {
id: pool_settings.id,
client: Some(cli),
conn: None,
},
Err(e) => {
eprintln!("Error Postgres new pool: {}",e);
Self::default()
},
}
}
pub async fn connect_pool(self) -> Self {
self.connect().await
}
pub async fn get_conn(self) -> Result<sqlx::pool::PoolConnection<sqlx::Postgres>> {
if let Some(pool) = self.conn {
Ok(pool)
} else {
let cli=self.connect_pool().await;
if let Some(pool) = cli.conn {
Ok(pool)
} else {
Err(anyhow!("Postgres pool not available"))
}
}
}
}
#[async_trait]
impl PoolHandle for PostgresPool {
async fn connect(&self) -> Self {
let cli = self;
let mut conn = None; // DataPool::NoPool;
if let Some(client) = &cli.client {
if let Some(cli_pool) = client.try_acquire() {
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Got Postgres connection");
}
conn = Some(cli_pool);
} else {
eprintln!("Error Postgres connection ");
}
}
Self {
id: cli.id.to_owned(),
client: cli.client.to_owned(),
conn,
}
}
}

82
src/redis.rs Normal file
View File

@ -0,0 +1,82 @@
/// Redis Connector
//
use async_trait::async_trait;
use anyhow::{anyhow,Result};
use crate::defs::{StoreSettings,PoolHandle};
#[derive(Default)]
pub struct RedisPool {
pub id: String,
pub client: Option<redis::Client>,
pub conn: Option<redis::aio::Connection>,
}
impl std::fmt::Debug for RedisPool {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "RedisPool client: {:?}", self.client)
}
}
impl RedisPool {
pub fn new(pool_settings: StoreSettings) -> Self {
let url = pool_settings.url_keyval();
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Open Redis {}: {}",pool_settings.id, &url);
}
match redis::Client::open(url) {
Ok(cli) => Self {
id: pool_settings.id,
client: Some(cli.to_owned()),
conn: None,
},
Err(e) => {
eprintln!("Error Redis new pool: {}",e);
Self::default()
},
}
}
pub async fn connect_pool(self) -> Self {
self.connect().await
}
pub async fn get_conn(self) -> Result<redis::aio::Connection> {
if let Some(pool) = self.conn {
Ok(pool)
} else {
let cli=self.connect_pool().await;
if let Some(pool) = cli.conn {
Ok(pool)
} else {
Err(anyhow!("Redis pool not available"))
}
}
}
}
#[async_trait]
impl PoolHandle for RedisPool {
async fn connect(&self) -> Self {
let cli = self;
let mut conn = None;
if let Some(client) = &cli.client {
match client.get_async_connection().await {
Ok(cli_pool) => {
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Got Redis connection");
}
conn = Some(cli_pool);
},
Err(e) => {
eprintln!("Error Redis connection: {}",e);
}
};
}
Self {
id: cli.id.to_owned(),
client: cli.client.to_owned(),
conn,
}
}
}

80
src/sqlite.rs Normal file
View File

@ -0,0 +1,80 @@
/// Sqlite Connection
//
use anyhow::{Result,anyhow};
use async_trait::async_trait;
use crate::defs::{StoreSettings,PoolHandle};
#[derive(Default)]
pub struct SqlitePool {
pub id: String,
// TODO Pool with Sqlite not working
// pub client: Option<sqlx::Pool<sqlx::Sqlite>>,
// pub conn: Option<sqlx::pool::PoolConnection<sqlx::Sqlite>>,
pub client: Option<String>,
pub conn: Option<sqlx::Pool<sqlx::Sqlite>>,
}
impl std::fmt::Debug for SqlitePool {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "SqlitePool client: {:?}", self.client)
}
}
impl SqlitePool {
pub fn new(pool_settings: StoreSettings) -> Self {
let url = pool_settings.url_local();
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Open Sqlite {}: {}",pool_settings.id, &url);
}
Self {
id: pool_settings.id.to_owned(),
client: Some(url),
conn: None,
}
}
pub async fn connect_pool(self) -> Self {
self.connect().await
}
pub async fn get_conn(self) -> Result<sqlx::Pool<sqlx::Sqlite>> {
if let Some(pool) = self.conn {
Ok(pool)
} else {
let cli=self.connect_pool().await;
if let Some(pool) = cli.conn {
Ok(pool)
} else {
Err(anyhow!("Sqlite pool not available"))
}
}
}
}
#[async_trait]
impl PoolHandle for SqlitePool {
async fn connect(&self) -> Self {
let cli = self;
let mut conn = None; // DataPool::NoPool;
if let Some(client) = &cli.client {
// TODO Pool with Sqlite not working
// match client.acquire().await {
match sqlx::sqlite::SqlitePool::connect(&client).await {
Ok(pool) => {
let debug = envmnt::get_isize("DEBUG",0);
if debug > 0 {
println!("Got Sqlite connection");
}
conn = Some(pool);
},
Err(e) => {
eprintln!("Error Sqlite new pool: {}",e);
},
};
}
Self {
id: cli.id.to_owned(),
client: cli.client.to_owned(),
conn,
}
}
}

31
src/tikv.rs Normal file
View File

@ -0,0 +1,31 @@
/// Tikv Connection
//
// Copyright 2020, Jesús Pérez Lorenzo
//
use crate::models::Tikv;
use anyhow::Context as AnyContext;
use std::path::PathBuf;
use tikv_client::{Config, Key, KvPair, RawClient, Result, TransactionClient, ToOwnedRange, Value};
use crate::models::config::StoreKeyValue;
/// Tikv pool connection
pub async fn get_tikv_pool(tikv: &Tikv) -> anyhow::Result<RawClient> {
let tikv_url = config.url("tikv");
// Optionally encrypt the traffic.
let config = if let (Some(ca), Some(cert), Some(key)) = (tikv.ca, tikv.cert, tikv.key) {
Config::new(tikv.pd).with_security(ca, cert, key)
} else {
Config::new(tikv.pd)
};
// When we first create a client we receive a `Connect` structure which must be resolved before
// the client is actually connected and usable.
// let client = if tikv.trans {
// TransactionClient::new(config).await?
// //let mut txn = txn_client.begin().await?;
// } else {
let client = RawClient::new(config)
.await
.context(format!("get tikv new tikv pool {}", redis_url))?;
//}
Ok(client)
}