// use std::sync::atomic::{AtomicUsize, Ordering}; use app_env::{ AppStore, appenv::AppEnv, appinfo::AppInfo, appdata::AppData, config::{Config,WebServer} }; use clds::clouds::{ monitor_rules::{MonitorRules}, }; use clds::monitor::run_monitor; use app_auth::AuthStore; use reject_filters::{handle_rejection}; // use zterton::models::{Terton}; //use std::fs; //, io}; // use std::fs::OpenOptions; // use std::io::Write; //use std::path::Path; // use serde_yaml::Value; use anyhow::{Result}; // use std::env; //use warp::{http::Response as HttpResponse, Filter, filters::BoxedFilter}; use warp::{ // http::{StatusCode}, http::{method::Method, HeaderMap}, Filter, }; // use warp::filters::header::headers_cloned; // use warp::path::FullPath; // use warp::http::{Uri, HeaderMap, HeaderValue}; //use crate::utils::set_ta_data; use crate::defs::{DataDBs,CollsData,load_cloud_env}; use clds::clouds::defs::{ Cloud, }; // use clds::app_env::config::Config; use clds::clouds::on_clouds::{make_cloud_cache,run_clouds_check}; use reqenv::ReqEnv; // #[macro_use] // extern crate kloud_entries_macro_derive; // static WEBSERVER: AtomicUsize = AtomicUsize::new(0); const PKG_VERSION: &'static str = env!("CARGO_PKG_VERSION"); // const PKG_VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); const PKG_NAME: &'static str = env!("CARGO_PKG_NAME"); const PKG_AUTHORS: &'static str = env!("CARGO_PKG_AUTHORS"); const PKG_DESCRIPTION: &'static str = env!("CARGO_PKG_DESCRIPTION"); pub mod defs; pub mod graphql; pub mod filters; pub mod handlers; // pub const MODEL_PATH: &String = String::from("./auth/auth_model.conf"); // pub const POLICY_PATH: &String = String::from("./auth/policy.csv"); async fn create_auth_store(app_env: &AppEnv,verbose: &str) -> AuthStore { let config = app_env.get_curr_websrvr_config(); let model_path = config.st_auth_model_path(); let policy_path = config.st_auth_policy_path(); AuthStore::new(&config,AuthStore::create_enforcer(model_path,policy_path).await,&verbose) } async fn up_web_server(webpos: usize) -> Result<()> { let mut app_env = AppEnv::default(); app_env.curr_web=webpos; println!("Web services: init {} ___________ ", chrono::Utc::now().timestamp()); app_env.info = AppInfo::new( "ZTerton", format!("web: {}",&webpos), format!("version: {}",PKG_VERSION), format!("authors: {}",PKG_AUTHORS), format!("{}",PKG_DESCRIPTION), ); zterton::init_app(&mut app_env,"").await.unwrap_or_else(|e| panic!("Error loadding app environment {}",e) ); let config = app_env.get_curr_websrvr_config(); // let webserver_status = WEBSERVER.load(Ordering::Relaxed); let zterton_env = envmnt::get_or(format!("ZTERTON_{}",&config.name).as_str(), "UNKNOWN"); let verbose = envmnt::get_or("WEB_SERVER_VERBOSE", ""); // if webserver_status != 0 { if zterton_env != "UNKNOWN" { if verbose != "quiet" { println!("ZTerton web services at {}",&zterton_env); } // envmnt::set("ZTERTON", "WEBSERVER"); return Ok(()); /* let app = Zterton::new( app_env.config.srv_protocol.to_owned(), app_env.config.srv_host.to_owned(), app_env.config.srv_port, ); let serverstring = format!("{}:{}",&srv_host,&srv_port); match std::net::TcpStream::connect(&serverstring) { Ok(_serverstream) => { Ok(()) Err(anyhow!("Source {}: Connection to '{}' for tsksrvc '{}' failed: {}",&source,&serverstring,&tsk_name,&e)) }, Err(e) => { // handle_input(serverstream); } } */ } // WEBSERVER.store(1,Ordering::Relaxed); // TODO pass root file-name frmt from AppEnv Config // if init at load // set_ta_data(&app_env).await?; println!("Loading webserver: {} ({})",&config.name,&app_env.curr_web); let (app, socket) = zterton::start_web(&mut app_env).await; println!("Load app store ..."); let app_store = AppStore::new(AppData::new(app_env.to_owned())); // As static casbin println!("Load auth store ..."); let auth_store = create_auth_store(&app_env,"").await; // dbg!(&auth_store.users.read().await); // dbg!(&auth_store.shadows.read().await); println!("Load data store ..."); let data_dbs = DataDBs { colls: CollsData::new(app_env.to_owned()), app: app_store.to_owned(), auth: auth_store.to_owned(), }; println!("Load web filters ..."); // let store = warp::any().map(move || ta_store.clone()); //let routes = warp::any().map(|| "Hello, World!"); // let us get some static boxes from config values: let log_name = app_env.config.st_log_name(); // Path for static files let html_path = config.st_html_path(); // If not graphQL comment/remove next line let gql_path = config.st_gql_req_path(); // If not graphiQL comment/remove next line Interface GiQL let giql_path = config.st_giql_req_path(); let origins: Vec<&str> = config.allow_origin.iter().map(AsRef::as_ref).collect(); let cors = warp::cors() //.allow_any_origin() .allow_origins(origins) //.allow_origins(vec![app_env.config.allow_origin.as_str(), "https://localhost:8000"]) .allow_credentials(true) .allow_header("content-type") .allow_header("Authorization") .allow_methods(&[Method::GET, Method::POST, Method::DELETE]); let auth_api = // Auth routes for login & logout REQUIRED app_auth_filters::auth(app_store.clone(),auth_store.clone(),cors.clone()); // .with(cors.clone()); let gqli_api = // // If not graphiQL comment/remove next line Interface GiQL MUST BEFORE graphql post with schema // app_api.to_owned() graphql::graphiql(gql_path, giql_path, data_dbs.clone()).await; if giql_path.len() > 0 { println!( "GraphiQL url: {}://{}:{}/{}", &app.protocol, &app.host, &app.port, &giql_path ); } let mut cloud = Cloud::default(); load_cloud_env(&mut cloud).await; // app_api.to_owned() // If not graphQL comment/remove next line let gql_api=graphql::graphql(gql_path, data_dbs.clone(),cors.clone()).await; //.with(cors.clone()); // // Add ALL ENTITIES to work with here let kloud_api = filters::CollFilters::new("kloud") .filters_config(data_dbs.clone(),cloud.clone(),cors.clone()); // let ta_api = // filters::CollFilters::new("ta").filters(&config, data_dbs.clone(),cors.clone()); // let tp_api = filters::CollFilters::new("tp").filters(&config, data_dbs.clone(),cors.clone()); // .or(tracking_point::load_tp_filters().filters(&config, data_dbs.clone())) // .or(topographic_anatomy::filters::ta(&config, data_dbs.clone())) // .or(tracking_point::filters::tp(&config, data_dbs.clone())) let file_api = app_file_filters::files(app_store.clone(),auth_store.clone()).with(cors.clone()); // Path for static files, better to be LAST let fs_api = warp::fs::dir(html_path).with(warp::compression::gzip()); // Recover and handle errors let app_api = auth_api .or(gqli_api).or(gql_api) .or(kloud_api) // .or(ta_api) // .or(tp_api) .or(file_api) .or(fs_api) .recover(move | error: warp::Rejection| handle_rejection(error, app_store.clone())) .boxed(); // Wrap routes with log to get info let routes = app_api.with(warp::log(log_name)); // let routes = app_api.with(cors).with(warp::log(log_name)); println!( "Starting http server: {}://{}:{}", &app.protocol, &app.host, &app.port ); envmnt::set("ZTERTON", format!("{}:{}",&app.host,&app.port)); println!("Web services: done {} __________ ",chrono::Utc::now().timestamp()); if app.protocol.clone().as_str() == "http" { warp::serve(routes.to_owned()) .run(socket) .await; } else { let cert_pem = format!("{}/ssl/{}", &config.resources_path, "cert.pem"); let key_pem = format!("{}/ssl/{}", &config.resources_path, "key.pem"); warp::serve(routes) .tls() .cert_path(cert_pem) .key_path(key_pem) .run(socket) .await; } Ok(()) } pub async fn run_cache_clouds() { let args: Vec = std::env::args().collect(); let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); args.iter().enumerate().for_each(|(idx,arg)| { if arg == "-c" { arg_cfg_path=args[idx+1].to_owned(); } else if arg == "-e" { arg_env_path=args[idx+1].to_owned(); } }); println!("Cache service on Clouds: run {} __________ {} / {} ",chrono::Utc::now().timestamp(),&arg_cfg_path,&arg_env_path); let mut cloud = Cloud::default(); load_cloud_env(&mut cloud).await; let mut app_env = AppEnv::default(); let config_content = Config::load_file_content("quiet", &arg_cfg_path); if ! config_content.contains("run_mode") { return; } app_env.config = Config::new(config_content,"quiet"); let app_store = AppStore::new(AppData::new(app_env.to_owned())); let auth_store = create_auth_store(&app_env,"quiet").await; let mut headers = HeaderMap::new(); headers.insert(http::header::HOST, "localhost".parse().unwrap()); let reqenv = ReqEnv::new( app_store, auth_store, headers, Method::GET, "/config", "config", "kloud" ); let _ = make_cloud_cache(&reqenv,&cloud).await; println!("Cache service on Clouds: done {} __________ ",chrono::Utc::now().timestamp()); } pub async fn run_check_clouds() { let args: Vec = std::env::args().collect(); let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); args.iter().enumerate().for_each(|(idx,arg)| { if arg == "-c" { arg_cfg_path=args[idx+1].to_owned(); } else if arg == "-e" { arg_env_path=args[idx+1].to_owned(); } }); println!("Check Cloud services: run {} __________ {} / {} ",chrono::Utc::now().timestamp(),&arg_cfg_path,&arg_env_path); let mut cloud = Cloud::default(); load_cloud_env(&mut cloud).await; let mut app_env = AppEnv::default(); let config_content = Config::load_file_content("quiet",&arg_cfg_path); if ! config_content.contains("run_mode") { return; } app_env.config = Config::new(config_content,"quiet"); let app_store = AppStore::new(AppData::new(app_env.to_owned())); let auth_store = create_auth_store(&app_env,"quiet").await; let mut headers = HeaderMap::new(); headers.insert(http::header::HOST, "localhost".parse().unwrap()); let reqenv = ReqEnv::new( app_store, auth_store, headers, Method::GET, "/config", "config", "kloud" ); let _ = run_clouds_check(&reqenv,&cloud).await; println!("Check Cloud service: done {} __________ ",chrono::Utc::now().timestamp()); } pub async fn run_clouds_monitor() { let args: Vec = std::env::args().collect(); let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); args.iter().enumerate().for_each(|(idx,arg)| { if arg == "-c" { arg_cfg_path=args[idx+1].to_owned(); } else if arg == "-e" { arg_env_path=args[idx+1].to_owned(); } }); println!("Monitor Cloud: run {} __________ {} / {} ",chrono::Utc::now().timestamp(),&arg_cfg_path,&arg_env_path); let mut cloud = Cloud::default(); load_cloud_env(&mut cloud).await; let mut app_env = AppEnv::default(); let config_content = Config::load_file_content("quiet",&arg_cfg_path); if ! config_content.contains("run_mode") { return; } app_env.config = Config::new(config_content,"quiet"); let monitor_rules = MonitorRules::load(&app_env.config.monitor_rules_path,&app_env.config.monitor_rules_file,&app_env.config.monitor_rules_format); // monitor_rules.rules[0].context = RuleContext::Service(String::from("kubernetes")); if monitor_rules.rules.len() > 0 { let _ = run_monitor(monitor_rules,cloud,app_env).await; } println!("Monitor Cloud: done {} __________ ",chrono::Utc::now().timestamp()); } // for standalone server & async use // #[tokio::main] // pub async fn main() -> Result<()> { pub fn main() -> Result<()> { let args: Vec = std::env::args().collect(); // println!("I got {:?} arguments: {:?}.", args.len() - 1, &args[1..]); if args.len() > 1 { match args[1].as_str() { "-h" | "--help" => println!("{} USAGE: -c config-toml -e env.file",PKG_NAME), "-v" | "--version" => { println!("{} version: {}",PKG_NAME,PKG_VERSION); return Ok(()); }, _ => println!("{}",PKG_NAME), } } let mut arg_cfg_path = String::from(""); let mut arg_env_path = String::from(""); args.iter().enumerate().for_each(|(idx,arg)| { if arg == "-c" { arg_cfg_path=args[idx+1].to_owned(); } else if arg == "-e" { arg_env_path=args[idx+1].to_owned(); } }); if !arg_env_path.is_empty() { let env_path = std::path::Path::new(&arg_env_path); dotenv::from_path(env_path)?; } pretty_env_logger::init(); // assert!(output.is_ok()); let loop_duration: u64; let run_websrvrs: bool; let run_cache: bool; let run_check: bool; let run_monitor: bool; let websrvrs: Vec; { let config_content = Config::load_file_content("quiet", &arg_cfg_path); if config_content.contains("run_mode") { let config = Config::new(config_content,"quiet"); loop_duration = config.loop_duration; // loop_duration = 10; run_websrvrs = config.run_websrvrs; run_cache = config.run_cache; run_check = config.run_check; run_monitor = config.run_monitor; websrvrs = config.websrvrs; if run_cache { println!("Running 'cloud_cache' every {} seconds in LOOP",&loop_duration); } if run_check { println!("Running 'cloud_check' every {} seconds in LOOP",&loop_duration); } } else { loop_duration = 0; run_websrvrs = false; run_cache = false; run_check = false; run_monitor = false; websrvrs = Vec::new(); } } // println!("content: {}",&config_content); let rt = tokio::runtime::Runtime::new().unwrap_or_else(|e| panic!("Error create tokio runtime {}",e) ); if run_websrvrs { websrvrs.iter().enumerate().for_each(|(pos,it)| { rt.block_on(async move { println!("{} -> {}",it.name,pos); tokio::spawn(async move {up_web_server(pos).await}); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; }); }); } loop { rt.block_on(async move { if run_monitor { tokio::spawn(async {run_clouds_monitor().await }); // For async task } if run_check { tokio::spawn(async {run_check_clouds().await }); // For async task tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } // { // // For blocking task: // let join_handle = tokio::task::spawn_blocking(|| do_my_task()); // tokio::spawn(async { do_my_task().await; }); // join_handle.await; // TODO: should handle error here // } if run_cache { tokio::spawn(async {run_cache_clouds().await }); // For async task } println!("LOOP: {} __________",chrono::Utc::now().timestamp()); tokio::time::sleep(tokio::time::Duration::from_secs(loop_duration)).await; }); } }