diff --git a/Cargo.lock b/Cargo.lock index 15a0c81..5252510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1339,6 +1339,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tonic-reflection" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b56b874eedb04f89907573b408eab1e87c1c1dce43aac6ad63742f57faa99ff" +dependencies = [ + "prost", + "prost-types", + "tokio", + "tokio-stream", + "tonic", +] + [[package]] name = "tower" version = "0.4.13" @@ -1460,6 +1473,7 @@ dependencies = [ "toml", "tonic", "tonic-build", + "tonic-reflection", ] [[package]] diff --git a/protos/protocol.proto b/protos/protocol.proto index e9ffdd6..274ae68 100644 --- a/protos/protocol.proto +++ b/protos/protocol.proto @@ -40,3 +40,15 @@ message LoginResponse { REJECTED = 1; } } + +service AliveCheck { + rpc AliveCheck (AliveCheckRequest) returns (AliveCheckResponse); +} + +message AliveCheckRequest { + string node_id = 1; +} + +message AliveCheckResponse { + string node_id = 1; +} diff --git a/uwusched-nodes/Cargo.toml b/uwusched-nodes/Cargo.toml index e8ef807..4cee6f6 100644 --- a/uwusched-nodes/Cargo.toml +++ b/uwusched-nodes/Cargo.toml @@ -19,6 +19,7 @@ bincode = "1.3.3" clap = { version = "4.5.16", features = ["derive"] } bson = "2.11.0" tonic = "0.12.2" +tonic-reflection = "0.12.2" prost = "0.13.2" prost-types = "0.13.2" diff --git a/uwusched-nodes/build.rs b/uwusched-nodes/build.rs index d03f1a8..e4d0498 100644 --- a/uwusched-nodes/build.rs +++ b/uwusched-nodes/build.rs @@ -1,5 +1,11 @@ +use std::{env, path::PathBuf}; + fn main() -> Result<(), Box> { -tonic_build::compile_protos("../protos/protocol.proto")?; + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + + tonic_build::configure() + .file_descriptor_set_path(out_dir.join("protocol_descriptor.bin")) + .compile(&["../protos/protocol.proto"], &["../protos"])?; + tonic_build::compile_protos("../protos/protocol.proto")?; Ok(()) } - diff --git a/uwusched-nodes/src/compute.rs b/uwusched-nodes/src/compute.rs index f328e4d..955a722 100644 --- a/uwusched-nodes/src/compute.rs +++ b/uwusched-nodes/src/compute.rs @@ -1 +1,47 @@ -fn main() {} +use std::{net::SocketAddr, thread}; +mod grpc; + +mod config; +use crate::config::CONFIG; + +use log::info; + +pub mod sched { + tonic::include_proto!("sched"); +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + std::env::set_var("RUST_LOG", CONFIG.log.level.clone()); + pretty_env_logger::init(); + let url: SocketAddr = "[::1]:50051".parse()?; + // let mut data_client = sched::data_client::DataClient::connect(url.clone()).await?; + // let mut auth_client = sched::auth_client::AuthClient::connect(url.clone()).await?; + alive_check_serve(url.clone()).await?; + Ok(()) +} + +async fn alive_check_serve(url: SocketAddr) -> Result<(), Box> { + use grpc::AliveCheckService; + use sched::alive_check_server::AliveCheckServer; + use tokio::sync::mpsc; + use tokio::task; + use tonic::transport::Server; + + let (tx, mut rx) = mpsc::channel::(100); + + task::spawn(async move { + info!("Starting Alive Check gRPC server"); + let alive_check_service = AliveCheckService::default(); + Server::builder() + .add_service(AliveCheckServer::new(alive_check_service)) + .serve(url) + .await + .unwrap(); + }); + + while let Some(msg) = rx.recv().await { + println!("REC: {:?}", msg); + } + Ok(()) +} diff --git a/uwusched-nodes/src/grpc/mod.rs b/uwusched-nodes/src/grpc/mod.rs index a0a99fa..d5753e7 100644 --- a/uwusched-nodes/src/grpc/mod.rs +++ b/uwusched-nodes/src/grpc/mod.rs @@ -1,4 +1,8 @@ -use crate::sched::{DataResponse, DataRequest, data_server}; +use crate::sched::{ + alive_check_client::{self, AliveCheckClient}, + alive_check_server, auth_server, data_server, AliveCheckRequest, AliveCheckResponse, + DataRequest, DataResponse, LoginRequest, LoginResponse, +}; use log::{debug, info}; #[derive(Debug, Default)] @@ -6,8 +10,11 @@ pub struct DataService {} #[tonic::async_trait] impl data_server::Data for DataService { - async fn data(&self, request: tonic::Request) -> Result, tonic::Status> { - info!("Got a request: {:?}", request); + async fn data( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + info!("Got a request: {:#?}", request); let input = request.get_ref(); let data = vec![]; @@ -15,10 +22,58 @@ impl data_server::Data for DataService { node_id: input.node_id.clone(), hashmap_id: input.hashmap_id.clone(), uuid: input.uuid.clone(), - length: data.len().to_string(), - data, + length: data.len().to_string(), + data, }; debug!("{:?}", &res); Ok(tonic::Response::new(res)) } } + +#[derive(Debug, Default)] +pub struct AuthService {} + +#[tonic::async_trait] +impl auth_server::Auth for AuthService { + async fn auth( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + info!("Got a login request: {:#?}", request); + let input = request.get_ref(); + let res = LoginResponse { + node_id: input.node_id.clone(), + }; + + Ok(tonic::Response::new(res)) + } +} + +#[derive(Debug, Default, Clone)] +pub struct AliveCheckService {} + +#[tonic::async_trait] +impl alive_check_server::AliveCheck for AliveCheckService { + async fn alive_check( + &self, + request: tonic::Request, + ) -> Result, tonic::Status> { + info!("Got an Alive Check request: {:#?}", request); + let input = request.get_ref(); + let res = AliveCheckResponse { + node_id: input.node_id.clone(), + }; + + Ok(tonic::Response::new(res)) + } +} +// In this case the head node operates as a grpc client +async fn alive_check(node_id: String) -> Result> { + let mut client = AliveCheckClient::connect("http://[::1]:50051").await?; + + let req = tonic::Request::new(AliveCheckRequest { node_id }); + + let response = client.alive_check(req).await?; + + Ok(response.get_ref().clone()) +} diff --git a/uwusched-nodes/src/head.rs b/uwusched-nodes/src/head.rs index 19b8cbb..0115547 100644 --- a/uwusched-nodes/src/head.rs +++ b/uwusched-nodes/src/head.rs @@ -7,7 +7,12 @@ use log::{debug, info}; use tonic::transport::Server; // Import generated protobuf mod sched { + use prost_types::FileDescriptorSet; + tonic::include_proto!("sched"); + + // pub(crate) const FILE_DESCRIPTOR_SET: FileDescriptorSet = + // tonic::include_file_descriptor_set!("protocol_descriptor").into(); } #[derive(Parser)] @@ -39,7 +44,15 @@ async fn main() -> Result<(), Box> { let addr = "[::1]:50051".parse()?; info!("Starting gRPC server on {}", addr); let data = grpc::DataService::default(); - Server::builder().add_service(sched::data_server::DataServer::new(data)).serve(addr).await?; + let auth = grpc::AuthService::default(); + // let reflection = tonic_reflection::server::Builder::configure() + // .register_file_descriptor_set(sched::FILE_DESCRIPTOR_SET) + // .build_v1()?; + Server::builder() + .add_service(sched::data_server::DataServer::new(data)) + .add_service(sched::auth_server::AuthServer::new(auth)) + .serve(addr) + .await?; debug!("{:?}", CONFIG.node); // let cli = Cli::parse(); // match &cli.command {