restructuring
All checks were successful
/ build (push) Successful in 1m24s
/ clippy (push) Successful in 1m21s

This commit is contained in:
xqtc 2024-09-13 13:17:41 +02:00
parent 150df72e00
commit a6524106b8
9 changed files with 100 additions and 64 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target /target
/.idea

View file

@ -27,12 +27,13 @@
system, system,
... ...
}: let }: let
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml); cargoToml = builtins.fromTOML (builtins.readFile ./yotei-nodes/Cargo.toml);
nonRustDeps = [ nonRustDeps = [
pkgs.libiconv pkgs.libiconv
pkgs.pkg-config pkgs.pkg-config
pkgs.protobuf pkgs.protobuf
pkgs.grpcurl pkgs.grpcurl
pkgs.tmate
]; ];
rust-toolchain = pkgs.symlinkJoin { rust-toolchain = pkgs.symlinkJoin {
name = "rust-toolchain"; name = "rust-toolchain";
@ -45,7 +46,7 @@
nativeBuildInputs = nonRustDeps; nativeBuildInputs = nonRustDeps;
buildInputs = nonRustDeps; buildInputs = nonRustDeps;
src = ./.; src = ./.;
cargoLock.lockFile = ./Cargo.lock; cargoLock.lockFile = ./yotei-nodes/Cargo.lock;
}; };
# Rust dev environment # Rust dev environment

View file

@ -31,10 +31,16 @@ colorful = "0.3.2"
[build-dependencies] [build-dependencies]
tonic-build = "0.12.2" tonic-build = "0.12.2"
[features]
head = []
worker = []
[[bin]] [[bin]]
name = "head-node" name = "head-node"
path = "src/head.rs" path = "src/head.rs"
required-features = [ "head" ]
[[bin]] [[bin]]
name = "worker-node" name = "worker-node"
path = "src/worker.rs" path = "src/worker.rs"
required-features = [ "worker" ]

View file

@ -1,7 +1,6 @@
mod config; mod util;
mod grpc;
use std::collections::HashMap; use std::{collections::HashMap, sync::RwLock};
use crate::config::CONFIG; use crate::config::CONFIG;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
@ -17,33 +16,6 @@ mod sched {
// tonic::include_file_descriptor_set!("protocol_descriptor").into(); // tonic::include_file_descriptor_set!("protocol_descriptor").into();
} }
struct Node {
id: String,
specs: NodeSpecs,
curr_job: Job,
job_queue: JobQueue,
}
struct Job {
id: uuid::Uuid,
data: Vec<u8>,
}
struct JobQueue {
jobs: Vec<Job>,
size: u128,
}
struct NodeSpecs {
threads: u64,
mem: u64,
net_speed: f64,
}
struct State {
nodes: Vec<Node>,
delegation_efficiency: f32,
tasks: HashMap<uuid::Uuid, Job>,
}
#[derive(Parser)] #[derive(Parser)]
#[command(author, version, about)] #[command(author, version, about)]
@ -62,7 +34,6 @@ enum Commands {
/// Starts uwusched /// Starts uwusched
Start { role: Option<String> }, Start { role: Option<String> },
} }
fn get_type_of<T>(_: &T) -> &'static str { fn get_type_of<T>(_: &T) -> &'static str {
std::any::type_name::<T>() std::any::type_name::<T>()
} }

View file

@ -5,6 +5,11 @@ use crate::sched::{
}; };
use log::{debug, info}; use log::{debug, info};
#[cfg(feature = "head")]
use crate::state::STATE;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct DataService {} pub struct DataService {}
@ -30,8 +35,16 @@ impl data_server::Data for DataService {
} }
} }
async fn data_client(node_id: String, hashmap_id: String, uuid: String) -> tonic::Request<DataRequest>{ async fn data_client(
let req = DataRequest {node_id, uuid, hashmap_id }; node_id: String,
hashmap_id: String,
uuid: String,
) -> tonic::Request<DataRequest> {
let req = DataRequest {
node_id,
uuid,
hashmap_id,
};
tonic::Request::new(req) tonic::Request::new(req)
} }
@ -65,7 +78,6 @@ impl auth_server::Auth for AuthService {
} }
} }
// async fn logout(node_id: String) -> tonic::Request<LoginRequest> {} // async fn logout(node_id: String) -> tonic::Request<LoginRequest> {}
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]

View file

@ -0,0 +1,3 @@
pub mod state;
pub mod config;
pub mod grpc;

View file

@ -0,0 +1,35 @@
lazy_static::lazy_static! {
pub static ref STATE: RwLock<State> = RwLock::new(State {
nodes: vec![],
tasks: HashMap::new(),
});
}
pub struct Node {
pub id: String,
pub specs: NodeSpecs,
pub curr_job: Task,
pub job_queue: JobQueue,
}
pub struct Task {
pub id: uuid::Uuid,
pub data: Vec<u8>,
}
pub struct JobQueue {
pub jobs: Vec<Task>,
pub size: u128,
}
pub struct NodeSpecs {
pub threads: u64,
pub mem: u64,
pub net_speed: f64,
}
pub struct State {
pub nodes: Vec<Node>,
pub tasks: HashMap<uuid::Uuid, Task>,
}

View file

@ -9,6 +9,7 @@ use crate::config::CONFIG;
use log::{debug, error, info}; use log::{debug, error, info};
use sched::{DataRequest, LoginRequest, LogoutRequest}; use sched::{DataRequest, LoginRequest, LogoutRequest};
use tonic::transport::Channel;
pub mod sched { pub mod sched {
tonic::include_proto!("sched"); tonic::include_proto!("sched");
@ -27,6 +28,30 @@ fn greet() {
println!(" {}", " WORKER NODE\n".color(Color::LightBlue)); println!(" {}", " WORKER NODE\n".color(Color::LightBlue));
} }
async fn alive_task(
mut alive: &mut sched::alive_check_client::AliveCheckClient<Channel>,
shutdown: &Arc<AtomicBool>,
recv_shutdown: &mut tokio::sync::oneshot::Receiver<()>,
) -> Result<(), tonic::Status> {
let mut alive_interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
tokio::select! {
msg = &mut *recv_shutdown => { info!("Shutdown received {:#?}", msg); break; }
_ = alive_interval.tick() => {
debug!(
"{:#?}",
alive
.alive_check(tonic::Request::new(sched::AliveCheckRequest {
node_id: format!("compute-{}", 1),
}))
.await?
);
},
}
}
Ok::<(), tonic::Status>(())
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
std::env::set_var("RUST_LOG", CONFIG.log.level.clone()); std::env::set_var("RUST_LOG", CONFIG.log.level.clone());
@ -41,25 +66,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut alive = sched::alive_check_client::AliveCheckClient::connect(server_addr).await?; let mut alive = sched::alive_check_client::AliveCheckClient::connect(server_addr).await?;
let mut shutdown = Arc::new(AtomicBool::new(false)); let mut shutdown = Arc::new(AtomicBool::new(false));
let (send_shutdown, mut recv_shutdown) = tokio::sync::oneshot::channel::<()>(); let (send_shutdown, mut recv_shutdown) = tokio::sync::oneshot::channel::<()>();
let alive_thr = tokio::spawn(async move { let alive_hand = alive_task(&mut alive, &shutdown, &mut recv_shutdown);
let mut alive_interval = tokio::time::interval(std::time::Duration::from_secs(10));
loop {
tokio::select! {
msg = &mut recv_shutdown => { break; }
_ = alive_interval.tick() => {
debug!(
"{:#?}",
alive
.alive_check(tonic::Request::new(sched::AliveCheckRequest {
node_id: format!("compute-{}", 1),
}))
.await?
);
},
}
}
Ok::<(), tonic::Status>(())
});
debug!( debug!(
"{:#?}", "{:#?}",
auth.login(tonic::Request::new(LoginRequest { auth.login(tonic::Request::new(LoginRequest {
@ -76,14 +83,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
})) }))
.await? .await?
); );
debug!( // debug!(
"{:#?}", // "{:#?}",
auth.logout(tonic::Request::new(LogoutRequest { // auth.logout(tonic::Request::new(LogoutRequest {
node_id: "compute-1".to_string(), // node_id: "compute-1".to_string(),
})) // }))
.await? // .await?
); // );
let mut debug_iter = 0; // let _ = send_shutdown.send(());
alive_thr.await?; alive_hand.await?;
Ok(()) Ok(())
} }