- Published on
Very basic socket server and data parsing in Rust
Just notes so I can comeback and use it next time without shitting my pants with 100% cpu usage
Many errors are not handled since we dont need it in this 'tool' that I use for internal business needs And all of the errors are just printed in stdout (println! macro)
I used ChatGPT to make comments on code.
Data example:
56;index:syscalls;data:[brk(NULL), close(3)];zalupa:3;empt;
Where 56 is length of data, index is a "table", used later in Export proccessing, rest of the data just random shit trying to brake parser
I highly recommend to replace this shit Berr<T>
to tokio::io::Result
pub type Berr<T> = Result<T, Box<dyn std::error::Error>>;
use crate::Berr;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::{timeout, Duration};
// Struct representing a TCP listener that communicates through a channel
pub struct Listener {
pub listener: TcpListener, // TCP listener to accept incoming connections
channel: UnboundedSender<String>, // Channel to send received data
}
impl Listener {
// Constructor for Listener
pub async fn new(ip: &String, port: u16, channel: UnboundedSender<String>) -> Berr<Self> {
let addr = format!("{}:{}", ip, port); // Format the address string
let listener = TcpListener::bind(&addr).await?; // Bind the listener to the address
println!("Listening at: {}", &addr); // Log the address
Ok(Self { listener, channel }) // Return a new Listener instance
}
// Helper function to send data through the channel
fn send_to_channel(data: String, channel: &UnboundedSender<String>) {
match channel.send(data) {
Ok(_) => {}
Err(e) => {
println!("Something wrong with UnboundedSender : {}", e.to_string()); // Log error if sending fails
}
}
}
// Method to start accepting connections and processing them
pub async fn start(&self) -> Berr<()> {
loop {
let res = self.listener.accept().await; // Accept an incoming connection
match res {
Ok((mut socket, _)) => {
let channel = self.channel.clone(); // Clone the channel for the new task
tokio::spawn(async move {
// Spawn a new task to handle the connection
match Self::process_connection(&channel, &mut socket).await {
Ok(_) => {}
Err(e) => {
println!(
"Error occurred while inserting/parsing data : {}",
e.to_string()
); // Log error if processing fails
}
}
});
}
Err(e) => {
println!(
"Error occurred while accepting connection : {}",
e.to_string()
); // Log error if accepting the connection fails
}
}
}
}
// Method to process an accepted connection
pub async fn process_connection(
channel: &UnboundedSender<String>,
socket: &mut TcpStream,
) -> Berr<()> {
const TIMEOUT: u64 = 1; // Timeout duration for reading operations
let mut reader = BufReader::new(socket); // Wrap the socket in a buffered reader
loop {
&reader.get_ref().readable().await; // Wait until the socket is ready to read
let mut length_buffer = Vec::new();
// Read the length prefix (up to the ';' character)
match timeout(
Duration::from_secs(TIMEOUT),
reader.read_until(b';', &mut length_buffer),
)
.await
{
Ok(Ok(_)) => {}
Ok(Err(e)) => {
println!("Error reading length prefix: {:?}", e); // Log error if reading the length fails
return Err("Error reading length prefix".into());
}
Err(_) => {
println!("Timeout while reading length prefix"); // Log timeout if reading the length takes too long
return Err("Read time out".into());
}
}
// Convert the length prefix to a string and parse it as an integer
let length_str = String::from_utf8_lossy(&length_buffer);
let content_length: usize = match length_str.trim_end_matches(';').parse() {
Ok(length) => length,
Err(_) => {
return Ok(()); // If parsing fails, continue with the next iteration
}
};
let mut content_buffer = vec![0; content_length]; // Allocate buffer for the incoming data
// Read the actual data from the socket
match timeout(
Duration::from_secs(TIMEOUT),
reader.read_exact(&mut content_buffer),
)
.await
{
Ok(Ok(0)) => {
println!("Client disconnected"); // Log message if the client disconnects
break;
}
Ok(Ok(_)) => {
// If the data is valid UTF-8, send it through the channel
if let Ok(str) = String::from_utf8(content_buffer) {
Self::send_to_channel(str, channel);
} else {
dbg!("Invalid UTF-8 data"); // Log message if the data is not valid UTF-8
}
}
Ok(Err(e)) => {
println!("I/O error during read: {:?}", e); // Log I/O error if reading fails
continue;
}
Err(_) => {
println!("Timeout while reading data"); // Log timeout if reading the data takes too long
continue;
}
};
}
return Ok(()); // Return OK after the loop ends
}
}
In terms of perfomance this code bounded to UnboundedChannel
capability, so if you need proccess decent amount of data (lol use better tool) you need to have multiple UnboundedSender
instances for sure
Since Im receiving dynamic data and exporters (Files, MongoDB etc you name it) are different so I will just parse it into JSON.
Its very unoptimized but it wont be bottleneck since we already working with I/O.
use super::json_extractor::BinaryToJson;
pub struct ExportData {
pub index: String,
pub data: serde_json::Value,
}
impl ExportData {
pub fn from_raw_string(input: &str) -> Option<Self> {
let mut parts = input.split(';');
let index_value = parts
.find(|part| part.starts_with("index:"))
.map(|part| part["index:".len()..].to_string())?;
let data_value = parts.collect::<Vec<&str>>().join(";");
let data = BinaryToJson::convert(&data_value).ok()?;
Some(ExportData {
index: index_value,
data,
})
}
}
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;
use crate::models::berr::Berr;
pub struct BinaryToJson;
impl BinaryToJson {
pub fn convert(input: &str) -> Berr<Value> {
let parts: Vec<&str> = input.split(';').collect();
let mut map: HashMap<&str, Value> = HashMap::new();
for item in parts.into_iter() {
let entity_parts = item.splitn(2, ':').collect::<Vec<&str>>();
match entity_parts.len() {
0 => {}
1 => {
if entity_parts[0].len() > 0 {
map.insert(entity_parts[0], Value::Null);
}
}
2 => {
map.insert(entity_parts[0], Self::parse_entity(entity_parts[1]));
}
3.. => {
dbg!("What the fuck?");
}
}
}
let json = json!(map);
Ok(json)
}
fn is_array(input: &str) -> Option<Vec<Value>> {
let output: Option<Vec<Value>>;
if input.starts_with('[') && input.ends_with(']') {
output = Some(
input
.trim_matches(&['[', ']'][..])
.split(", ")
.map(|x| Value::String(x.to_string()))
.collect::<Vec<Value>>(),
);
} else {
output = None;
}
return output;
}
fn parse_entity<'a>(entity: &'a str) -> Value {
if let Some(x) = Self::is_array(&entity) {
return Value::Array(x);
} else {
return Value::String(entity.to_string());
}
}
}
Exporter trait:
use crate::Berr;
use crate::InitArgs;
pub trait Exporter {
async fn new(cfg: &InitArgs) -> Berr<Self>
where
Self: Sized;
async fn insert_bulk(&self, index: &str, data: Vec<String>) -> Berr<()>;
async fn insert(&self, index: &str, data: &serde_json::Value) -> Berr<()>;
}
MongoDB exporter:
use mongodb::{bson::doc, options::ClientOptions, Client};
use crate::exporter::Exporter;
pub struct MongoExporter {
client: Client,
}
impl Exporter for MongoExporter {
async fn new(cfg: &crate::models::init_config::InitArgs) -> crate::models::berr::Berr<Self>
where
Self: Sized,
{
let uri = format!("mongodb://{}:{}", &cfg.export_host, &cfg.export_port);
let mut client_options = ClientOptions::parse(uri).await?;
client_options.app_name = Some("DEBUG LOGGER".to_string());
let client = Client::with_options(client_options)?;
client
.database("admin")
.run_command(doc! { "ping" : 1})
.await?;
Ok(Self { client })
}
async fn insert(&self, index: &str, data: &serde_json::Value) -> crate::models::berr::Berr<()> {
let mut doc = bson::to_document(&data)?;
doc.insert("date", bson::DateTime::now());
self.client
.database("debugger_data")
.collection(index)
.insert_one(doc)
.await?;
Ok(())
}
async fn insert_bulk(&self, index: &str, data: Vec<String>) -> crate::models::berr::Berr<()> {
todo!()
}
}
I had many questions to mongodb crate TBH.
And just UnboundedReceiver
very basic, didnt implemented backlog since never was in goal to break perfomance and stabilty.
use crate::exporter::Exporter;
use crate::parsers::data_extractor::ExportData;
use tokio::sync::mpsc::UnboundedReceiver;
pub struct Queue {
backlog: Vec<ExportData>,
}
impl Default for Queue {
fn default() -> Self {
Self { backlog: vec![] }
}
}
impl Queue {
pub async fn start_consumer<T: Exporter>(
&mut self,
mut rx: UnboundedReceiver<String>,
exporter: T,
) {
loop {
match rx.recv().await {
Some(msg) => match ExportData::from_raw_string(&msg) {
Some(ed) => {
if let Err(e) = exporter.insert(&ed.index, &ed.data).await {
println!(
"Error from Exporter, sending to backlog : {}",
e.to_string()
);
if self.backlog.len() < 100_000 {
self.backlog.push(ed);
} else {
println!("Backlog is pretty full now, data is lost");
}
}
}
None => {
println!("Invalid raw data : {}", msg);
}
},
None => {}
}
}
}
}