From 6e3b797dd10dfb8824211843e956e1a6163e3e49 Mon Sep 17 00:00:00 2001 From: xqtc Date: Mon, 9 Sep 2024 18:19:19 +0200 Subject: [PATCH] Placeholder client logic; State Machine data structures --- Cargo.lock | 1 + uwusched-nodes/Cargo.toml | 1 + uwusched-nodes/src/compute.rs | 22 ++++++++++++++-------- uwusched-nodes/src/grpc/mod.rs | 13 ++++++++++--- uwusched-nodes/src/head.rs | 26 +++++++++++++++++++++++++- 5 files changed, 51 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5252510..6d16c6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1474,6 +1474,7 @@ dependencies = [ "tonic", "tonic-build", "tonic-reflection", + "uuid", ] [[package]] diff --git a/uwusched-nodes/Cargo.toml b/uwusched-nodes/Cargo.toml index 4cee6f6..8905c21 100644 --- a/uwusched-nodes/Cargo.toml +++ b/uwusched-nodes/Cargo.toml @@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" anyhow = "1.0" lz4_flex = "0.11.3" +uuid = "1.10" toml = "0.8.19" lazy_static = "1.5.0" log = "0.4.22" diff --git a/uwusched-nodes/src/compute.rs b/uwusched-nodes/src/compute.rs index 696110d..e43528b 100644 --- a/uwusched-nodes/src/compute.rs +++ b/uwusched-nodes/src/compute.rs @@ -4,7 +4,7 @@ mod grpc; mod config; use crate::config::CONFIG; -use log::info; +use log::{debug, info}; pub mod sched { tonic::include_proto!("sched"); @@ -14,14 +14,16 @@ pub mod sched { async fn main() -> Result<(), Box> { std::env::set_var("RUST_LOG", CONFIG.log.level.clone()); pretty_env_logger::init(); - let url: SocketAddr = "[::1]:50051".parse()?; - // let mut data_client = sched::data_client::DataClient::connect(url.clone()).await?; - // let mut auth_client = sched::auth_client::AuthClient::connect(url.clone()).await?; - alive_check_serve(url).await?; + let server_addr = "http://[::1]:50051"; + let addr: SocketAddr = "[::1]:50052".parse()?; + let mut data_client = sched::data_client::DataClient::connect(server_addr.clone()).await?; + debug!("{:#?}", &data_client); + // let mut auth_client = sched::auth_client::AuthClient::connect(server_addr.clone()).await?; + alive_check_serve(addr).await?; Ok(()) } -async fn alive_check_serve(url: SocketAddr) -> Result<(), Box> { +async fn alive_check_serve(addr: SocketAddr) -> Result<(), Box> { use grpc::AliveCheckService; use sched::alive_check_server::AliveCheckServer; use tokio::sync::mpsc; @@ -30,12 +32,16 @@ async fn alive_check_serve(url: SocketAddr) -> Result<(), Box(100); + // Spawn gRPC server in own async task task::spawn(async move { - info!("Starting Alive Check gRPC server"); + info!( + "Starting Alive Check gRPC server on: http://{}", + addr.to_string() + ); let alive_check_service = AliveCheckService::default(); Server::builder() .add_service(AliveCheckServer::new(alive_check_service)) - .serve(url) + .serve(addr) .await .unwrap(); }); diff --git a/uwusched-nodes/src/grpc/mod.rs b/uwusched-nodes/src/grpc/mod.rs index 3fb5132..e97942d 100644 --- a/uwusched-nodes/src/grpc/mod.rs +++ b/uwusched-nodes/src/grpc/mod.rs @@ -1,7 +1,6 @@ use crate::sched::{ - alive_check_client::{AliveCheckClient}, - alive_check_server, auth_server, data_server, AliveCheckRequest, AliveCheckResponse, - DataRequest, DataResponse, LoginRequest, LoginResponse, + alive_check_client::AliveCheckClient, alive_check_server, auth_server, data_server, + AliveCheckRequest, AliveCheckResponse, DataRequest, DataResponse, LoginRequest, LoginResponse, }; use log::{debug, info}; @@ -30,6 +29,10 @@ impl data_server::Data for DataService { } } +async fn data_client() { + todo!() +} + #[derive(Debug, Default)] pub struct AuthService {} @@ -49,6 +52,10 @@ impl auth_server::Auth for AuthService { } } +async fn auth_client() { + todo!() +} + #[derive(Debug, Default, Clone)] pub struct AliveCheckService {} diff --git a/uwusched-nodes/src/head.rs b/uwusched-nodes/src/head.rs index de91d8e..ed3781c 100644 --- a/uwusched-nodes/src/head.rs +++ b/uwusched-nodes/src/head.rs @@ -1,13 +1,14 @@ mod config; mod grpc; +use std::collections::HashMap; + use crate::config::CONFIG; use clap::{Parser, Subcommand}; use log::{debug, info}; use tonic::transport::Server; // Import generated protobuf mod sched { - tonic::include_proto!("sched"); @@ -15,6 +16,29 @@ mod sched { // tonic::include_file_descriptor_set!("protocol_descriptor").into(); } +struct Node { + id: String, + specs: NodeSpecs, + curr_job: Job, + job_queue: Vec, +} + +struct Job { + id: uuid::Uuid, + data: Vec, +} + +struct NodeSpecs { + threads: u64, + mem: u64, + net_speed: f64, +} +struct State { + nodes: Vec, + delegation_efficiency: f32, + tasks: HashMap, +} + #[derive(Parser)] #[command(author, version, about)] #[command(propagate_version = true)]