[WORKER] Make Alive Check run in parallel
This commit is contained in:
parent
bc84e9164f
commit
150df72e00
|
@ -6,6 +6,7 @@ use std::collections::HashMap;
|
||||||
use crate::config::CONFIG;
|
use crate::config::CONFIG;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use log::{debug, info};
|
use log::{debug, info};
|
||||||
|
use sched::AliveCheckRequest;
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
// Import generated protobuf
|
// Import generated protobuf
|
||||||
mod sched {
|
mod sched {
|
||||||
|
@ -92,11 +93,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
info!("Starting gRPC server on {}", addr);
|
info!("Starting gRPC server on {}", addr);
|
||||||
let data = grpc::DataService::default();
|
let data = grpc::DataService::default();
|
||||||
let auth = grpc::AuthService::default();
|
let auth = grpc::AuthService::default();
|
||||||
Server::builder()
|
let alive = grpc::AliveCheckService::default();
|
||||||
|
let grpc_server = Server::builder()
|
||||||
.add_service(sched::data_server::DataServer::new(data))
|
.add_service(sched::data_server::DataServer::new(data))
|
||||||
.add_service(sched::auth_server::AuthServer::new(auth))
|
.add_service(sched::auth_server::AuthServer::new(auth))
|
||||||
.serve(addr)
|
.add_service(sched::alive_check_server::AliveCheckServer::new(alive))
|
||||||
.await?;
|
.serve(addr);
|
||||||
debug!("{:?}", CONFIG.node);
|
grpc_server.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
use std::net::SocketAddr;
|
use std::{
|
||||||
|
net::SocketAddr,
|
||||||
|
sync::{atomic::AtomicBool, Arc},
|
||||||
|
};
|
||||||
mod grpc;
|
mod grpc;
|
||||||
|
|
||||||
mod config;
|
mod config;
|
||||||
use crate::config::CONFIG;
|
use crate::config::CONFIG;
|
||||||
|
|
||||||
use log::{debug, info};
|
use log::{debug, error, info};
|
||||||
use sched::{LoginRequest, LogoutRequest};
|
use sched::{DataRequest, LoginRequest, LogoutRequest};
|
||||||
|
|
||||||
pub mod sched {
|
pub mod sched {
|
||||||
tonic::include_proto!("sched");
|
tonic::include_proto!("sched");
|
||||||
|
@ -29,20 +32,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
std::env::set_var("RUST_LOG", CONFIG.log.level.clone());
|
std::env::set_var("RUST_LOG", CONFIG.log.level.clone());
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
greet();
|
greet();
|
||||||
// println!(" {}", "+-------------+".color(Color::LightBlue));
|
|
||||||
// println!(" {}", "| 予 定 |".color(Color::LightPink1));
|
|
||||||
// println!(
|
|
||||||
// " {}",
|
|
||||||
// "+-------------+".color(colorful::RGB::new(252, 252, 252))
|
|
||||||
// );
|
|
||||||
// println!(" {}", "YOTEI SCHEDULER".color(Color::LightPink1));
|
|
||||||
// println!(" {}", " WORKER NODE\n".color(Color::LightBlue));
|
|
||||||
let server_addr = "http://[::1]:50051";
|
let server_addr = "http://[::1]:50051";
|
||||||
let addr: SocketAddr = "[::1]:50053".parse()?;
|
let addr: SocketAddr = "[::1]:50053".parse()?;
|
||||||
let data = sched::data_client::DataClient::connect(server_addr).await?;
|
|
||||||
let mut auth = sched::auth_client::AuthClient::connect(server_addr).await?;
|
|
||||||
// Start server here
|
// Start server here
|
||||||
let alive_check_server = alive_check_serve(addr);
|
std::thread::sleep(std::time::Duration::from_secs(4));
|
||||||
|
let mut data = sched::data_client::DataClient::connect(server_addr).await?;
|
||||||
|
let mut auth = sched::auth_client::AuthClient::connect(server_addr).await?;
|
||||||
|
let mut alive = sched::alive_check_client::AliveCheckClient::connect(server_addr).await?;
|
||||||
|
let mut shutdown = Arc::new(AtomicBool::new(false));
|
||||||
|
let (send_shutdown, mut recv_shutdown) = tokio::sync::oneshot::channel::<()>();
|
||||||
|
let alive_thr = tokio::spawn(async move {
|
||||||
|
let mut alive_interval = tokio::time::interval(std::time::Duration::from_secs(10));
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
msg = &mut recv_shutdown => { break; }
|
||||||
|
_ = alive_interval.tick() => {
|
||||||
|
debug!(
|
||||||
|
"{:#?}",
|
||||||
|
alive
|
||||||
|
.alive_check(tonic::Request::new(sched::AliveCheckRequest {
|
||||||
|
node_id: format!("compute-{}", 1),
|
||||||
|
}))
|
||||||
|
.await?
|
||||||
|
);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok::<(), tonic::Status>(())
|
||||||
|
});
|
||||||
debug!(
|
debug!(
|
||||||
"{:#?}",
|
"{:#?}",
|
||||||
auth.login(tonic::Request::new(LoginRequest {
|
auth.login(tonic::Request::new(LoginRequest {
|
||||||
|
@ -50,33 +67,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
}))
|
}))
|
||||||
.await?
|
.await?
|
||||||
);
|
);
|
||||||
debug!("{:#?}", auth.logout(tonic::Request::new(LogoutRequest {
|
debug!(
|
||||||
|
"{:#?}",
|
||||||
|
data.data(tonic::Request::new(DataRequest {
|
||||||
node_id: "compute-1".to_string(),
|
node_id: "compute-1".to_string(),
|
||||||
})).await?);
|
uuid: "foo".to_string(),
|
||||||
// Await server at end
|
hashmap_id: "bar".to_string(),
|
||||||
alive_check_server.await?;
|
}))
|
||||||
Ok(())
|
.await?
|
||||||
}
|
|
||||||
|
|
||||||
async fn alive_check_serve(addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
use grpc::AliveCheckService;
|
|
||||||
use sched::alive_check_server::AliveCheckServer;
|
|
||||||
use tokio::task;
|
|
||||||
use tonic::transport::Server;
|
|
||||||
|
|
||||||
|
|
||||||
task::spawn(async move {
|
|
||||||
info!(
|
|
||||||
"Starting Alive Check gRPC server on: http://{}",
|
|
||||||
addr.to_string()
|
|
||||||
);
|
);
|
||||||
let alive_check_service = AliveCheckService::default();
|
debug!(
|
||||||
Server::builder()
|
"{:#?}",
|
||||||
.add_service(AliveCheckServer::new(alive_check_service))
|
auth.logout(tonic::Request::new(LogoutRequest {
|
||||||
.serve(addr)
|
node_id: "compute-1".to_string(),
|
||||||
.await
|
}))
|
||||||
.unwrap();
|
.await?
|
||||||
});
|
);
|
||||||
|
let mut debug_iter = 0;
|
||||||
|
alive_thr.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue