diff --git a/.gitignore b/.gitignore index ea8c4bf..d81f12e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +/.idea diff --git a/flake.nix b/flake.nix index cfae12c..028ecff 100644 --- a/flake.nix +++ b/flake.nix @@ -27,12 +27,13 @@ system, ... }: let - cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml); + cargoToml = builtins.fromTOML (builtins.readFile ./yotei-nodes/Cargo.toml); nonRustDeps = [ pkgs.libiconv pkgs.pkg-config pkgs.protobuf pkgs.grpcurl + pkgs.tmate ]; rust-toolchain = pkgs.symlinkJoin { name = "rust-toolchain"; @@ -45,7 +46,7 @@ nativeBuildInputs = nonRustDeps; buildInputs = nonRustDeps; src = ./.; - cargoLock.lockFile = ./Cargo.lock; + cargoLock.lockFile = ./yotei-nodes/Cargo.lock; }; # Rust dev environment diff --git a/yotei-nodes/Cargo.toml b/yotei-nodes/Cargo.toml index db76623..67e1c46 100644 --- a/yotei-nodes/Cargo.toml +++ b/yotei-nodes/Cargo.toml @@ -31,10 +31,16 @@ colorful = "0.3.2" [build-dependencies] tonic-build = "0.12.2" +[features] +head = [] +worker = [] + [[bin]] name = "head-node" path = "src/head.rs" +required-features = [ "head" ] [[bin]] name = "worker-node" path = "src/worker.rs" +required-features = [ "worker" ] diff --git a/yotei-nodes/src/head.rs b/yotei-nodes/src/head.rs index 38d6466..118801e 100644 --- a/yotei-nodes/src/head.rs +++ b/yotei-nodes/src/head.rs @@ -1,7 +1,6 @@ -mod config; -mod grpc; +mod util; -use std::collections::HashMap; +use std::{collections::HashMap, sync::RwLock}; use crate::config::CONFIG; use clap::{Parser, Subcommand}; @@ -17,33 +16,6 @@ mod sched { // tonic::include_file_descriptor_set!("protocol_descriptor").into(); } -struct Node { - id: String, - specs: NodeSpecs, - curr_job: Job, - job_queue: JobQueue, -} - -struct Job { - id: uuid::Uuid, - data: Vec, -} - -struct JobQueue { - jobs: Vec, - size: u128, -} - -struct NodeSpecs { - threads: u64, - mem: u64, - net_speed: f64, -} -struct State { - nodes: Vec, - delegation_efficiency: f32, - tasks: HashMap, -} #[derive(Parser)] #[command(author, version, about)] @@ -62,7 +34,6 @@ enum Commands { /// Starts uwusched Start { role: Option }, } - fn get_type_of(_: &T) -> &'static str { std::any::type_name::() } diff --git a/yotei-nodes/src/config.rs b/yotei-nodes/src/util/config.rs similarity index 100% rename from yotei-nodes/src/config.rs rename to yotei-nodes/src/util/config.rs diff --git a/yotei-nodes/src/grpc/mod.rs b/yotei-nodes/src/util/grpc.rs similarity index 91% rename from yotei-nodes/src/grpc/mod.rs rename to yotei-nodes/src/util/grpc.rs index 920033e..f734620 100644 --- a/yotei-nodes/src/grpc/mod.rs +++ b/yotei-nodes/src/util/grpc.rs @@ -5,6 +5,11 @@ use crate::sched::{ }; use log::{debug, info}; + + +#[cfg(feature = "head")] +use crate::state::STATE; + #[derive(Debug, Default)] pub struct DataService {} @@ -30,8 +35,16 @@ impl data_server::Data for DataService { } } -async fn data_client(node_id: String, hashmap_id: String, uuid: String) -> tonic::Request{ - let req = DataRequest {node_id, uuid, hashmap_id }; +async fn data_client( + node_id: String, + hashmap_id: String, + uuid: String, +) -> tonic::Request { + let req = DataRequest { + node_id, + uuid, + hashmap_id, + }; tonic::Request::new(req) } @@ -65,7 +78,6 @@ impl auth_server::Auth for AuthService { } } - // async fn logout(node_id: String) -> tonic::Request {} #[derive(Debug, Default, Clone)] diff --git a/yotei-nodes/src/util/mod.rs b/yotei-nodes/src/util/mod.rs new file mode 100644 index 0000000..13f0cb9 --- /dev/null +++ b/yotei-nodes/src/util/mod.rs @@ -0,0 +1,3 @@ +pub mod state; +pub mod config; +pub mod grpc; diff --git a/yotei-nodes/src/util/state.rs b/yotei-nodes/src/util/state.rs new file mode 100644 index 0000000..6191671 --- /dev/null +++ b/yotei-nodes/src/util/state.rs @@ -0,0 +1,35 @@ + +lazy_static::lazy_static! { + pub static ref STATE: RwLock = RwLock::new(State { + nodes: vec![], + tasks: HashMap::new(), + }); +} + +pub struct Node { + pub id: String, + pub specs: NodeSpecs, + pub curr_job: Task, + pub job_queue: JobQueue, +} + +pub struct Task { + pub id: uuid::Uuid, + pub data: Vec, +} + +pub struct JobQueue { + pub jobs: Vec, + pub size: u128, +} + +pub struct NodeSpecs { + pub threads: u64, + pub mem: u64, + pub net_speed: f64, +} + +pub struct State { + pub nodes: Vec, + pub tasks: HashMap, +} diff --git a/yotei-nodes/src/worker.rs b/yotei-nodes/src/worker.rs index 0c36631..9eb9172 100644 --- a/yotei-nodes/src/worker.rs +++ b/yotei-nodes/src/worker.rs @@ -9,6 +9,7 @@ use crate::config::CONFIG; use log::{debug, error, info}; use sched::{DataRequest, LoginRequest, LogoutRequest}; +use tonic::transport::Channel; pub mod sched { tonic::include_proto!("sched"); @@ -27,6 +28,30 @@ fn greet() { println!(" {}", " WORKER NODE\n".color(Color::LightBlue)); } +async fn alive_task( + mut alive: &mut sched::alive_check_client::AliveCheckClient, + shutdown: &Arc, + recv_shutdown: &mut tokio::sync::oneshot::Receiver<()>, +) -> Result<(), tonic::Status> { + let mut alive_interval = tokio::time::interval(std::time::Duration::from_secs(10)); + loop { + tokio::select! { + msg = &mut *recv_shutdown => { info!("Shutdown received {:#?}", msg); break; } + _ = alive_interval.tick() => { + debug!( + "{:#?}", + alive + .alive_check(tonic::Request::new(sched::AliveCheckRequest { + node_id: format!("compute-{}", 1), + })) + .await? + ); + }, + } + } + Ok::<(), tonic::Status>(()) +} + #[tokio::main] async fn main() -> Result<(), Box> { std::env::set_var("RUST_LOG", CONFIG.log.level.clone()); @@ -41,25 +66,7 @@ async fn main() -> Result<(), Box> { let mut alive = sched::alive_check_client::AliveCheckClient::connect(server_addr).await?; let mut shutdown = Arc::new(AtomicBool::new(false)); let (send_shutdown, mut recv_shutdown) = tokio::sync::oneshot::channel::<()>(); - let alive_thr = tokio::spawn(async move { - let mut alive_interval = tokio::time::interval(std::time::Duration::from_secs(10)); - loop { - tokio::select! { - msg = &mut recv_shutdown => { break; } - _ = alive_interval.tick() => { - debug!( - "{:#?}", - alive - .alive_check(tonic::Request::new(sched::AliveCheckRequest { - node_id: format!("compute-{}", 1), - })) - .await? - ); - }, - } - } - Ok::<(), tonic::Status>(()) - }); + let alive_hand = alive_task(&mut alive, &shutdown, &mut recv_shutdown); debug!( "{:#?}", auth.login(tonic::Request::new(LoginRequest { @@ -76,14 +83,14 @@ async fn main() -> Result<(), Box> { })) .await? ); - debug!( - "{:#?}", - auth.logout(tonic::Request::new(LogoutRequest { - node_id: "compute-1".to_string(), - })) - .await? - ); - let mut debug_iter = 0; - alive_thr.await?; + // debug!( + // "{:#?}", + // auth.logout(tonic::Request::new(LogoutRequest { + // node_id: "compute-1".to_string(), + // })) + // .await? + // ); + // let _ = send_shutdown.send(()); + alive_hand.await?; Ok(()) }