diff --git a/Cargo.lock b/Cargo.lock index 6d16c6c..c80aa58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +[[package]] +name = "colorful" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb474a9c3219a8254ead020421ecf1b90427f29b55f6aae9a2471fa62c126ef" + [[package]] name = "deranged" version = "0.3.11" @@ -1454,13 +1460,14 @@ dependencies = [ ] [[package]] -name = "uwusched-head-node" +name = "uwusched" version = "0.1.0" dependencies = [ "anyhow", "bincode", "bson", "clap", + "colorful", "lazy_static", "log", "lz4_flex", diff --git a/protos/protocol.proto b/protos/protocol.proto index 274ae68..bbc2bae 100644 --- a/protos/protocol.proto +++ b/protos/protocol.proto @@ -20,7 +20,8 @@ message DataResponse { } service Auth { - rpc Auth (LoginRequest) returns (LoginResponse); + rpc Login (LoginRequest) returns (LoginResponse); + rpc Logout (LogoutRequest) returns (LogoutResponse); } message LoginRequest { @@ -41,6 +42,14 @@ message LoginResponse { } } +message LogoutRequest { + string node_id = 1; +} + +message LogoutResponse { + string node_id = 1; +} + service AliveCheck { rpc AliveCheck (AliveCheckRequest) returns (AliveCheckResponse); } diff --git a/uwusched-nodes/Cargo.toml b/uwusched-nodes/Cargo.toml index 8905c21..598564b 100644 --- a/uwusched-nodes/Cargo.toml +++ b/uwusched-nodes/Cargo.toml @@ -1,7 +1,7 @@ [package] authors = ["xqtc"] description = "Distributed computing with abstract data" -name = "uwusched-head-node" +name = "uwusched" version = "0.1.0" edition = "2021" @@ -23,6 +23,7 @@ tonic = "0.12.2" tonic-reflection = "0.12.2" prost = "0.13.2" prost-types = "0.13.2" +colorful = "0.3.2" [build-dependencies] tonic-build = "0.12.2" diff --git a/uwusched-nodes/src/compute.rs b/uwusched-nodes/src/compute.rs index 2e120d5..43dfa89 100644 --- a/uwusched-nodes/src/compute.rs +++ b/uwusched-nodes/src/compute.rs @@ -5,6 +5,7 @@ mod config; use crate::config::CONFIG; use log::{debug, info}; +use sched::LoginRequest; pub mod sched { tonic::include_proto!("sched"); @@ -14,12 +15,34 @@ pub mod sched { async fn main() -> Result<(), Box> { std::env::set_var("RUST_LOG", CONFIG.log.level.clone()); pretty_env_logger::init(); + use colorful::Colorful; + use colorful::HSL; + println!( + "{}", + " +,--. ,--. ,--. ,--. ,--. ,--. +| | | | ,--. ,--. | | | | ,---. ,---. | ,---. ,---. ,-| | +| | | | | |.'.| | | | | | ( .-' | .--' | .-. | | .-. : ' .-. | +' '-' ' | .'. | ' '-' ' .-' `) \\ `--. | | | | \\ --. \\ `-' | + `-----' '--' '--' `-----' `----' `---' `--' `--' `----' `---' + Compute Node + " + .gradient_with_color(HSL::new(0.0, 1.0, 0.5), HSL::new(0.833, 1.0, 0.5)) + ); let server_addr = "http://[::1]:50051"; let addr: SocketAddr = "[::1]:50052".parse()?; - let data_client = sched::data_client::DataClient::connect(server_addr).await?; - debug!("{:#?}", &data_client); - // let mut auth_client = sched::auth_client::AuthClient::connect(server_addr.clone()).await?; - alive_check_serve(addr).await?; + let mut data = sched::data_client::DataClient::connect(server_addr).await?; + let mut auth = sched::auth_client::AuthClient::connect(server_addr).await?; + debug!("{:#?}", &data); + debug!("{:#?}", &auth); + // alive_check_serve(addr).await?; + debug!( + "{:#?}", + auth.login(tonic::Request::new(LoginRequest { + node_id: "compute-1".to_string(), + })) + .await? + ); Ok(()) } @@ -32,7 +55,6 @@ async fn alive_check_serve(addr: SocketAddr) -> Result<(), Box(100); - // Spawn gRPC server in own async task task::spawn(async move { info!( "Starting Alive Check gRPC server on: http://{}", diff --git a/uwusched-nodes/src/grpc/mod.rs b/uwusched-nodes/src/grpc/mod.rs index e97942d..311b2ea 100644 --- a/uwusched-nodes/src/grpc/mod.rs +++ b/uwusched-nodes/src/grpc/mod.rs @@ -1,6 +1,7 @@ use crate::sched::{ alive_check_client::AliveCheckClient, alive_check_server, auth_server, data_server, AliveCheckRequest, AliveCheckResponse, DataRequest, DataResponse, LoginRequest, LoginResponse, + LogoutRequest, LogoutResponse, }; use log::{debug, info}; @@ -38,7 +39,7 @@ pub struct AuthService {} #[tonic::async_trait] impl auth_server::Auth for AuthService { - async fn auth( + async fn login( &self, request: tonic::Request, ) -> Result, tonic::Status> { @@ -50,12 +51,26 @@ impl auth_server::Auth for AuthService { Ok(tonic::Response::new(res)) } + async fn logout( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + info!("Got a logout request: {:#?}", request); + let input = request.get_ref(); + let res = LogoutResponse { + node_id: input.node_id.clone(), + }; + Ok(tonic::Response::new(res)) + } } -async fn auth_client() { - todo!() +async fn login(node_id: String) -> tonic::Request { + let req = LoginRequest { node_id }; + tonic::Request::new(req) } +// async fn logout(node_id: String) -> tonic::Request {} + #[derive(Debug, Default, Clone)] pub struct AliveCheckService {} diff --git a/uwusched-nodes/src/head.rs b/uwusched-nodes/src/head.rs index ed3781c..bd63a10 100644 --- a/uwusched-nodes/src/head.rs +++ b/uwusched-nodes/src/head.rs @@ -20,7 +20,7 @@ struct Node { id: String, specs: NodeSpecs, curr_job: Job, - job_queue: Vec, + job_queue: JobQueue, } struct Job { @@ -28,6 +28,11 @@ struct Job { data: Vec, } +struct JobQueue { + jobs: Vec, + size: u128, +} + struct NodeSpecs { threads: u64, mem: u64, @@ -65,25 +70,22 @@ fn get_type_of(_: &T) -> &'static str { async fn main() -> Result<(), Box> { std::env::set_var("RUST_LOG", CONFIG.log.level.clone()); pretty_env_logger::init(); + let cli = Cli::parse(); + match &cli.command { + Some(Commands::Start { role }) => { + debug!("{:#?}", role) + } + None => {} + } let addr = "[::1]:50051".parse()?; info!("Starting gRPC server on {}", addr); let data = grpc::DataService::default(); let auth = grpc::AuthService::default(); - // let reflection = tonic_reflection::server::Builder::configure() - // .register_file_descriptor_set(sched::FILE_DESCRIPTOR_SET) - // .build_v1()?; Server::builder() .add_service(sched::data_server::DataServer::new(data)) .add_service(sched::auth_server::AuthServer::new(auth)) .serve(addr) .await?; debug!("{:?}", CONFIG.node); - // let cli = Cli::parse(); - // match &cli.command { - // Some(Commands::Start { role }) => { - // debug!("{:#?}", role) - // } - // None => {} - // } Ok(()) }