From 150df72e00cc6a0ad49b5d4171941f3235ab9a09 Mon Sep 17 00:00:00 2001 From: xqtc Date: Thu, 12 Sep 2024 02:34:52 +0200 Subject: [PATCH] [WORKER] Make Alive Check run in parallel --- .forgejo/workflows/{uwu.yaml => .uwu.yaml} | 0 yotei-nodes/src/head.rs | 10 ++- yotei-nodes/src/worker.rs | 91 ++++++++++++---------- 3 files changed, 55 insertions(+), 46 deletions(-) rename .forgejo/workflows/{uwu.yaml => .uwu.yaml} (100%) diff --git a/.forgejo/workflows/uwu.yaml b/.forgejo/workflows/.uwu.yaml similarity index 100% rename from .forgejo/workflows/uwu.yaml rename to .forgejo/workflows/.uwu.yaml diff --git a/yotei-nodes/src/head.rs b/yotei-nodes/src/head.rs index 24cab73..38d6466 100644 --- a/yotei-nodes/src/head.rs +++ b/yotei-nodes/src/head.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use crate::config::CONFIG; use clap::{Parser, Subcommand}; use log::{debug, info}; +use sched::AliveCheckRequest; use tonic::transport::Server; // Import generated protobuf mod sched { @@ -92,11 +93,12 @@ async fn main() -> Result<(), Box> { info!("Starting gRPC server on {}", addr); let data = grpc::DataService::default(); let auth = grpc::AuthService::default(); - Server::builder() + let alive = grpc::AliveCheckService::default(); + let grpc_server = Server::builder() .add_service(sched::data_server::DataServer::new(data)) .add_service(sched::auth_server::AuthServer::new(auth)) - .serve(addr) - .await?; - debug!("{:?}", CONFIG.node); + .add_service(sched::alive_check_server::AliveCheckServer::new(alive)) + .serve(addr); + grpc_server.await?; Ok(()) } diff --git a/yotei-nodes/src/worker.rs b/yotei-nodes/src/worker.rs index 728a94f..0c36631 100644 --- a/yotei-nodes/src/worker.rs +++ b/yotei-nodes/src/worker.rs @@ -1,11 +1,14 @@ -use std::net::SocketAddr; +use std::{ + net::SocketAddr, + sync::{atomic::AtomicBool, Arc}, +}; mod grpc; mod config; use crate::config::CONFIG; -use log::{debug, info}; -use sched::{LoginRequest, LogoutRequest}; +use log::{debug, error, info}; +use sched::{DataRequest, LoginRequest, LogoutRequest}; pub mod sched { tonic::include_proto!("sched"); @@ -29,20 +32,34 @@ async fn main() -> Result<(), Box> { std::env::set_var("RUST_LOG", CONFIG.log.level.clone()); pretty_env_logger::init(); greet(); - // println!(" {}", "+-------------+".color(Color::LightBlue)); - // println!(" {}", "| 予 定 |".color(Color::LightPink1)); - // println!( - // " {}", - // "+-------------+".color(colorful::RGB::new(252, 252, 252)) - // ); - // println!(" {}", "YOTEI SCHEDULER".color(Color::LightPink1)); - // println!(" {}", " WORKER NODE\n".color(Color::LightBlue)); let server_addr = "http://[::1]:50051"; let addr: SocketAddr = "[::1]:50053".parse()?; - let data = sched::data_client::DataClient::connect(server_addr).await?; - let mut auth = sched::auth_client::AuthClient::connect(server_addr).await?; // Start server here - let alive_check_server = alive_check_serve(addr); + std::thread::sleep(std::time::Duration::from_secs(4)); + let mut data = sched::data_client::DataClient::connect(server_addr).await?; + let mut auth = sched::auth_client::AuthClient::connect(server_addr).await?; + let mut alive = sched::alive_check_client::AliveCheckClient::connect(server_addr).await?; + let mut shutdown = Arc::new(AtomicBool::new(false)); + let (send_shutdown, mut recv_shutdown) = tokio::sync::oneshot::channel::<()>(); + let alive_thr = tokio::spawn(async move { + let mut alive_interval = tokio::time::interval(std::time::Duration::from_secs(10)); + loop { + tokio::select! { + msg = &mut recv_shutdown => { break; } + _ = alive_interval.tick() => { + debug!( + "{:#?}", + alive + .alive_check(tonic::Request::new(sched::AliveCheckRequest { + node_id: format!("compute-{}", 1), + })) + .await? + ); + }, + } + } + Ok::<(), tonic::Status>(()) + }); debug!( "{:#?}", auth.login(tonic::Request::new(LoginRequest { @@ -50,33 +67,23 @@ async fn main() -> Result<(), Box> { })) .await? ); - debug!("{:#?}", auth.logout(tonic::Request::new(LogoutRequest { - node_id: "compute-1".to_string(), - })).await?); - // Await server at end - alive_check_server.await?; - Ok(()) -} - -async fn alive_check_serve(addr: SocketAddr) -> Result<(), Box> { - use grpc::AliveCheckService; - use sched::alive_check_server::AliveCheckServer; - use tokio::task; - use tonic::transport::Server; - - - task::spawn(async move { - info!( - "Starting Alive Check gRPC server on: http://{}", - addr.to_string() - ); - let alive_check_service = AliveCheckService::default(); - Server::builder() - .add_service(AliveCheckServer::new(alive_check_service)) - .serve(addr) - .await - .unwrap(); - }); - + debug!( + "{:#?}", + data.data(tonic::Request::new(DataRequest { + node_id: "compute-1".to_string(), + uuid: "foo".to_string(), + hashmap_id: "bar".to_string(), + })) + .await? + ); + debug!( + "{:#?}", + auth.logout(tonic::Request::new(LogoutRequest { + node_id: "compute-1".to_string(), + })) + .await? + ); + let mut debug_iter = 0; + alive_thr.await?; Ok(()) }