Restructuring; Moving from JSON to gRPC
Some checks failed
/ build (push) Failing after 1m42s
/ clippy (push) Failing after 1m18s

This commit is contained in:
xqtc 2024-09-05 16:12:58 +02:00
parent e7b3be5108
commit c6d745d25f
17 changed files with 1077 additions and 301 deletions

1065
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,4 +1,4 @@
[workspace]
members = ["./uwusched", "./lib-uwusched"]
members = ["./uwusched-head-node"]
resolver = "2"

View file

@ -26,6 +26,7 @@
nonRustDeps = [
pkgs.libiconv
pkgs.pkg-config
pkgs.protobuf
];
rust-toolchain = pkgs.symlinkJoin {
name = "rust-toolchain";

View file

@ -1,83 +0,0 @@
#![allow(non_camel_case_types, unused)]
use std::str::FromStr;
#[derive(Debug, serde::Serialize)]
pub enum DataStatus {
OK,
/// An Error while looking up a hash
HASH_ERR,
/// Timeout Error
TIMEOUT_ERR,
}
#[derive(Debug, serde::Serialize)]
pub enum NodeStatusLogin {
/// Accepted Login
ACCEPTED,
/// Rejected Login
REJECTED,
}
#[derive(Debug, serde::Serialize)]
pub enum NodeStatusLogout {
/// Head-Node ACKs the Logout request from compute node
ACK,
/// Head-Node restructured and rebalanced Task-Queue, allowing the compute node to log out
OK,
/// Head-Node failed to restructure and rebalance Task-Queue, preventing compute node to log out
ERR,
}
#[derive(Debug, serde::Serialize)]
pub enum ProtocolId {
/// Identifier indicating one node pushing data to another
DATA_PUSH,
/// Identifier indicating one node pulling to another
DATA_PULL,
/// Identifier indicating the success of a `DATA_PUSH` or `DATA_PULL`
DATA_RESP,
HASH_PUSH,
HASH_PULL,
HASH_RESP,
/// Identifier indicating a node wants to join the cluster
NODE_LOGIN,
/// Identifier indicating a node being online
NODE_ALIVE,
/// Identifier indicating a node wants to leave the cluster
NODE_LOGOUT,
}
pub struct Login {
protocol_id: ProtocolId,
node_id: String,
transaction_uuid: uuid::Uuid,
status: NodeStatusLogin,
}
pub struct AliveCheck {
protocol_id: ProtocolId,
node_id: String,
transaction_uuid: uuid::Uuid,
}
pub struct Logout {
protocol_id: ProtocolId,
node_id: String,
transaction_uuid: uuid::Uuid,
status: NodeStatusLogout,
}
fn str_to_uuid(uuid: String) -> uuid::Uuid {
uuid::Uuid::from_str(&uuid).unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn uuid() {
let res = str_to_uuid("a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8".to_string());
assert_eq!(uuid::uuid!("a1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8"), res);
}
}

View file

@ -1,60 +0,0 @@
#![allow(non_camel_case_types, unused)]
use crate::control::ProtocolId;
pub struct Pull {
protocol_id: ProtocolId,
node_id: String,
uuid: String,
hashmap_id: String,
}
#[derive(Debug)]
pub struct Push<T> {
protocol_id: ProtocolId,
node_id: String,
uuid: String,
hashmap_id: String,
length_decomp: i64,
length_comp: i64,
comp_format: String,
attributes: Option<T>,
data: Vec<u8>,
}
pub struct Response {
protocol_id: ProtocolId,
node_id: String,
uuid: String,
hashmap_id: String,
status: String,
}
impl<T> Push<T> {
fn new(
protocol_id: ProtocolId,
node_id: String,
uuid: String,
hashmap_id: String,
length_decomp: i64,
length_comp: i64,
comp_format: Option<String>,
attributes: Option<T>,
data: Vec<u8>,
) -> Self {
Push {
protocol_id,
node_id,
uuid,
hashmap_id,
length_decomp,
length_comp,
comp_format: match comp_format {
Some(c) => c,
None => String::from("lz4"),
},
attributes,
data,
}
}
}

View file

@ -1,3 +0,0 @@
pub mod data;
pub mod control;

32
protos/protocol.proto Normal file
View file

@ -0,0 +1,32 @@
syntax = "proto3";
package sched;
service Data {
rpc Data (DataRequest) returns (DataResponse);
}
message DataRequest {
string node_id = 1;
string uuid = 2;
string hashmap_id = 3;
}
message DataResponse {
string node_id = 1;
string uuid = 2;
string hashmap_id = 3;
string length = 4;
bytes data = 5;
}
service Auth {
rpc Auth (LoginRequest) returns (LoginResponse);
}
message LoginRequest {
string node_id = 1;
}
message LoginResponse {
}

View file

@ -1,14 +1,13 @@
[package]
authors = ["xqtc"]
description = "Distributed computing with abstract data"
name = "lib-uwusched"
name = "uwusched-head-node"
version = "0.1.0"
edition = "2021"
[dependencies]
pretty_env_logger = "0.5"
uuid = "1.10"
sha256 = "1.5"
tokio = { version = "1.40", features = ["macros", "rt-multi-thread"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
@ -18,3 +17,18 @@ lazy_static = "1.5.0"
log = "0.4.22"
bincode = "1.3.3"
clap = { version = "4.5.16", features = ["derive"] }
bson = "2.11.0"
tonic = "0.12.2"
prost = "0.13.2"
prost-types = "0.13.2"
[build-dependencies]
tonic-build = "0.12.2"
[[bin]]
name = "head-node"
path = "src/head.rs"
[[bin]]
name = "compute-node"
path = "src/compute.rs"

View file

@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("../protos/protocol.proto")?;
Ok(())
}

View file

View file

@ -0,0 +1,38 @@
mod config;
use crate::config::CONFIG;
use bson::{bson, Bson};
use clap::{Parser, Subcommand};
use log::debug;
use lz4_flex::block::{compress_prepend_size, decompress_size_prepended};
use tonic::*;
#[derive(Parser)]
#[command(author, version, about)]
#[command(propagate_version = true)]
struct Cli {
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Commands {
/// Starts uwusched
Start { role: Option<String> },
}
fn get_type_of<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}
fn main() {
std::env::set_var("RUST_LOG", CONFIG.log.level.clone());
pretty_env_logger::init();
// debug!("{:?}", CONFIG.node);
let cli = Cli::parse();
match &cli.command {
Some(Commands::Start { role }) => {
debug!("{:#?}", role)
}
None => {}
}
}

1
uwusched/.gitignore vendored
View file

@ -1 +0,0 @@
/target

View file

@ -1,18 +0,0 @@
[package]
authors = ["xqtc"]
description = "Distributed computing with abstract data"
name = "uwusched"
version = "0.1.0"
edition = "2021"
[dependencies]
pretty_env_logger = "0.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
lz4_flex = "0.11.3"
toml = "0.8.19"
lazy_static = "1.5.0"
log = "0.4.22"
bincode = "1.3.3"
clap = { version = "4.5.16", features = ["derive"] }

View file

@ -1,50 +0,0 @@
mod config;
use crate::config::CONFIG;
use clap::{Parser, Subcommand};
use log::debug;
#[derive(Parser)]
#[command(author, version, about)]
#[command(propagate_version = true)]
struct Cli {
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Commands {
/// Starts uwusched
Start { role: Option<String> },
}
fn get_type_of<T>(_: &T) -> &'static str {
std::any::type_name::<T>()
}
fn main() {
std::env::set_var("RUST_LOG", CONFIG.log.level.clone());
pretty_env_logger::init();
// debug!("{:?}", CONFIG.node);
let cli = Cli::parse();
let target = Some(b"uwuaaaaaaaaaaazzzaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaazzzodwiahjfowahodifwahjdiowahoidhawkjfsdkjfhshj3qu420749238749287fjksnfskaaa".to_vec());
let encoded = bincode::serialize(&target).unwrap();
let decoded: Option<Vec<u8>> = bincode::deserialize(&encoded[..]).unwrap();
dbg!(&encoded);
// dbg!(&decoded == &Some(String::from("uwu")));
//
// let compr = lz4_flex::compress(&encoded);
// dbg!(&decoded == lz4_flex::
use lz4_flex::block::{compress_prepend_size, decompress_size_prepended};
let compressed = compress_prepend_size(&encoded);
let uncompressed = decompress_size_prepended(&compressed).unwrap();
dbg!(compressed.len(), encoded.len());
dbg!(compressed.len() < encoded.len());
dbg!(uncompressed == encoded);
match &cli.command {
Some(Commands::Start { role }) => {
debug!("{:#?}", role)
}
None => {}
}
}