Impl AliveCheck server on compute node
This commit is contained in:
parent
a63cbae570
commit
7fc00dcbb0
14
Cargo.lock
generated
14
Cargo.lock
generated
|
@ -1339,6 +1339,19 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tonic-reflection"
|
||||||
|
version = "0.12.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7b56b874eedb04f89907573b408eab1e87c1c1dce43aac6ad63742f57faa99ff"
|
||||||
|
dependencies = [
|
||||||
|
"prost",
|
||||||
|
"prost-types",
|
||||||
|
"tokio",
|
||||||
|
"tokio-stream",
|
||||||
|
"tonic",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower"
|
name = "tower"
|
||||||
version = "0.4.13"
|
version = "0.4.13"
|
||||||
|
@ -1460,6 +1473,7 @@ dependencies = [
|
||||||
"toml",
|
"toml",
|
||||||
"tonic",
|
"tonic",
|
||||||
"tonic-build",
|
"tonic-build",
|
||||||
|
"tonic-reflection",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -40,3 +40,15 @@ message LoginResponse {
|
||||||
REJECTED = 1;
|
REJECTED = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
service AliveCheck {
|
||||||
|
rpc AliveCheck (AliveCheckRequest) returns (AliveCheckResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
message AliveCheckRequest {
|
||||||
|
string node_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message AliveCheckResponse {
|
||||||
|
string node_id = 1;
|
||||||
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ bincode = "1.3.3"
|
||||||
clap = { version = "4.5.16", features = ["derive"] }
|
clap = { version = "4.5.16", features = ["derive"] }
|
||||||
bson = "2.11.0"
|
bson = "2.11.0"
|
||||||
tonic = "0.12.2"
|
tonic = "0.12.2"
|
||||||
|
tonic-reflection = "0.12.2"
|
||||||
prost = "0.13.2"
|
prost = "0.13.2"
|
||||||
prost-types = "0.13.2"
|
prost-types = "0.13.2"
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
|
use std::{env, path::PathBuf};
|
||||||
|
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
|
||||||
|
|
||||||
|
tonic_build::configure()
|
||||||
|
.file_descriptor_set_path(out_dir.join("protocol_descriptor.bin"))
|
||||||
|
.compile(&["../protos/protocol.proto"], &["../protos"])?;
|
||||||
tonic_build::compile_protos("../protos/protocol.proto")?;
|
tonic_build::compile_protos("../protos/protocol.proto")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1 +1,47 @@
|
||||||
fn main() {}
|
use std::{net::SocketAddr, thread};
|
||||||
|
mod grpc;
|
||||||
|
|
||||||
|
mod config;
|
||||||
|
use crate::config::CONFIG;
|
||||||
|
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
pub mod sched {
|
||||||
|
tonic::include_proto!("sched");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
std::env::set_var("RUST_LOG", CONFIG.log.level.clone());
|
||||||
|
pretty_env_logger::init();
|
||||||
|
let url: SocketAddr = "[::1]:50051".parse()?;
|
||||||
|
// let mut data_client = sched::data_client::DataClient::connect(url.clone()).await?;
|
||||||
|
// let mut auth_client = sched::auth_client::AuthClient::connect(url.clone()).await?;
|
||||||
|
alive_check_serve(url.clone()).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn alive_check_serve(url: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
use grpc::AliveCheckService;
|
||||||
|
use sched::alive_check_server::AliveCheckServer;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::task;
|
||||||
|
use tonic::transport::Server;
|
||||||
|
|
||||||
|
let (tx, mut rx) = mpsc::channel::<Server>(100);
|
||||||
|
|
||||||
|
task::spawn(async move {
|
||||||
|
info!("Starting Alive Check gRPC server");
|
||||||
|
let alive_check_service = AliveCheckService::default();
|
||||||
|
Server::builder()
|
||||||
|
.add_service(AliveCheckServer::new(alive_check_service))
|
||||||
|
.serve(url)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
while let Some(msg) = rx.recv().await {
|
||||||
|
println!("REC: {:?}", msg);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
|
@ -1,4 +1,8 @@
|
||||||
use crate::sched::{DataResponse, DataRequest, data_server};
|
use crate::sched::{
|
||||||
|
alive_check_client::{self, AliveCheckClient},
|
||||||
|
alive_check_server, auth_server, data_server, AliveCheckRequest, AliveCheckResponse,
|
||||||
|
DataRequest, DataResponse, LoginRequest, LoginResponse,
|
||||||
|
};
|
||||||
use log::{debug, info};
|
use log::{debug, info};
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
|
@ -6,8 +10,11 @@ pub struct DataService {}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl data_server::Data for DataService {
|
impl data_server::Data for DataService {
|
||||||
async fn data(&self, request: tonic::Request<DataRequest>) -> Result<tonic::Response<DataResponse>, tonic::Status> {
|
async fn data(
|
||||||
info!("Got a request: {:?}", request);
|
&self,
|
||||||
|
request: tonic::Request<DataRequest>,
|
||||||
|
) -> Result<tonic::Response<DataResponse>, tonic::Status> {
|
||||||
|
info!("Got a request: {:#?}", request);
|
||||||
let input = request.get_ref();
|
let input = request.get_ref();
|
||||||
let data = vec![];
|
let data = vec![];
|
||||||
|
|
||||||
|
@ -22,3 +29,51 @@ impl data_server::Data for DataService {
|
||||||
Ok(tonic::Response::new(res))
|
Ok(tonic::Response::new(res))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct AuthService {}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl auth_server::Auth for AuthService {
|
||||||
|
async fn auth(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<LoginRequest>,
|
||||||
|
) -> Result<tonic::Response<LoginResponse>, tonic::Status> {
|
||||||
|
info!("Got a login request: {:#?}", request);
|
||||||
|
let input = request.get_ref();
|
||||||
|
let res = LoginResponse {
|
||||||
|
node_id: input.node_id.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(tonic::Response::new(res))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Clone)]
|
||||||
|
pub struct AliveCheckService {}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl alive_check_server::AliveCheck for AliveCheckService {
|
||||||
|
async fn alive_check(
|
||||||
|
&self,
|
||||||
|
request: tonic::Request<AliveCheckRequest>,
|
||||||
|
) -> Result<tonic::Response<AliveCheckResponse>, tonic::Status> {
|
||||||
|
info!("Got an Alive Check request: {:#?}", request);
|
||||||
|
let input = request.get_ref();
|
||||||
|
let res = AliveCheckResponse {
|
||||||
|
node_id: input.node_id.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(tonic::Response::new(res))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// In this case the head node operates as a grpc client
|
||||||
|
async fn alive_check(node_id: String) -> Result<AliveCheckResponse, Box<dyn std::error::Error>> {
|
||||||
|
let mut client = AliveCheckClient::connect("http://[::1]:50051").await?;
|
||||||
|
|
||||||
|
let req = tonic::Request::new(AliveCheckRequest { node_id });
|
||||||
|
|
||||||
|
let response = client.alive_check(req).await?;
|
||||||
|
|
||||||
|
Ok(response.get_ref().clone())
|
||||||
|
}
|
||||||
|
|
|
@ -7,7 +7,12 @@ use log::{debug, info};
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
// Import generated protobuf
|
// Import generated protobuf
|
||||||
mod sched {
|
mod sched {
|
||||||
|
use prost_types::FileDescriptorSet;
|
||||||
|
|
||||||
tonic::include_proto!("sched");
|
tonic::include_proto!("sched");
|
||||||
|
|
||||||
|
// pub(crate) const FILE_DESCRIPTOR_SET: FileDescriptorSet =
|
||||||
|
// tonic::include_file_descriptor_set!("protocol_descriptor").into();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
|
@ -39,7 +44,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let addr = "[::1]:50051".parse()?;
|
let addr = "[::1]:50051".parse()?;
|
||||||
info!("Starting gRPC server on {}", addr);
|
info!("Starting gRPC server on {}", addr);
|
||||||
let data = grpc::DataService::default();
|
let data = grpc::DataService::default();
|
||||||
Server::builder().add_service(sched::data_server::DataServer::new(data)).serve(addr).await?;
|
let auth = grpc::AuthService::default();
|
||||||
|
// let reflection = tonic_reflection::server::Builder::configure()
|
||||||
|
// .register_file_descriptor_set(sched::FILE_DESCRIPTOR_SET)
|
||||||
|
// .build_v1()?;
|
||||||
|
Server::builder()
|
||||||
|
.add_service(sched::data_server::DataServer::new(data))
|
||||||
|
.add_service(sched::auth_server::AuthServer::new(auth))
|
||||||
|
.serve(addr)
|
||||||
|
.await?;
|
||||||
debug!("{:?}", CONFIG.node);
|
debug!("{:?}", CONFIG.node);
|
||||||
// let cli = Cli::parse();
|
// let cli = Cli::parse();
|
||||||
// match &cli.command {
|
// match &cli.command {
|
||||||
|
|
Loading…
Reference in a new issue