Published on

Very basic socket server and data parsing in Rust

Just notes so I can comeback and use it next time without shitting my pants with 100% cpu usage

Many errors are not handled since we dont need it in this 'tool' that I use for internal business needs And all of the errors are just printed in stdout (println! macro)

I used ChatGPT to make comments on code.

Data example:

56;index:syscalls;data:[brk(NULL), close(3)];zalupa:3;empt;

Where 56 is length of data, index is a "table", used later in Export proccessing, rest of the data just random shit trying to brake parser

I highly recommend to replace this shit Berr<T> to tokio::io::Result

rust
pub type Berr<T> = Result<T, Box<dyn std::error::Error>>;
rust
use crate::Berr;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::{timeout, Duration};

// Struct representing a TCP listener that communicates through a channel
pub struct Listener {
    pub listener: TcpListener,          // TCP listener to accept incoming connections
    channel: UnboundedSender<String>,   // Channel to send received data
}

impl Listener {
    // Constructor for Listener
    pub async fn new(ip: &String, port: u16, channel: UnboundedSender<String>) -> Berr<Self> {
        let addr = format!("{}:{}", ip, port);  // Format the address string
        let listener = TcpListener::bind(&addr).await?;  // Bind the listener to the address
        println!("Listening at: {}", &addr);  // Log the address
        Ok(Self { listener, channel })  // Return a new Listener instance
    }

    // Helper function to send data through the channel
    fn send_to_channel(data: String, channel: &UnboundedSender<String>) {
        match channel.send(data) {
            Ok(_) => {}
            Err(e) => {
                println!("Something wrong with UnboundedSender : {}", e.to_string());  // Log error if sending fails
            }
        }
    }

    // Method to start accepting connections and processing them
    pub async fn start(&self) -> Berr<()> {
        loop {
            let res = self.listener.accept().await;  // Accept an incoming connection
            match res {
                Ok((mut socket, _)) => {
                    let channel = self.channel.clone();  // Clone the channel for the new task
                    tokio::spawn(async move {
                        // Spawn a new task to handle the connection
                        match Self::process_connection(&channel, &mut socket).await {
                            Ok(_) => {}
                            Err(e) => {
                                println!(
                                    "Error occurred while inserting/parsing data : {}",
                                    e.to_string()
                                );  // Log error if processing fails
                            }
                        }
                    });
                }
                Err(e) => {
                    println!(
                        "Error occurred while accepting connection : {}",
                        e.to_string()
                    );  // Log error if accepting the connection fails
                }
            }
        }
    }

    // Method to process an accepted connection
    pub async fn process_connection(
        channel: &UnboundedSender<String>,
        socket: &mut TcpStream,
    ) -> Berr<()> {
        const TIMEOUT: u64 = 1;  // Timeout duration for reading operations
        let mut reader = BufReader::new(socket);  // Wrap the socket in a buffered reader
        loop {
            &reader.get_ref().readable().await;  // Wait until the socket is ready to read

            let mut length_buffer = Vec::new();

            // Read the length prefix (up to the ';' character)
            match timeout(
                Duration::from_secs(TIMEOUT),
                reader.read_until(b';', &mut length_buffer),
            )
            .await
            {
                Ok(Ok(_)) => {}
                Ok(Err(e)) => {
                    println!("Error reading length prefix: {:?}", e);  // Log error if reading the length fails
                    return Err("Error reading length prefix".into());
                }
                Err(_) => {
                    println!("Timeout while reading length prefix");  // Log timeout if reading the length takes too long
                    return Err("Read time out".into());
                }
            }

            // Convert the length prefix to a string and parse it as an integer
            let length_str = String::from_utf8_lossy(&length_buffer);
            let content_length: usize = match length_str.trim_end_matches(';').parse() {
                Ok(length) => length,
                Err(_) => {
                    return Ok(());  // If parsing fails, continue with the next iteration
                }
            };

            let mut content_buffer = vec![0; content_length];  // Allocate buffer for the incoming data

            // Read the actual data from the socket
            match timeout(
                Duration::from_secs(TIMEOUT),
                reader.read_exact(&mut content_buffer),
            )
            .await
            {
                Ok(Ok(0)) => {
                    println!("Client disconnected");  // Log message if the client disconnects
                    break;
                }
                Ok(Ok(_)) => {
                    // If the data is valid UTF-8, send it through the channel
                    if let Ok(str) = String::from_utf8(content_buffer) {
                        Self::send_to_channel(str, channel);
                    } else {
                        dbg!("Invalid UTF-8 data");  // Log message if the data is not valid UTF-8
                    }
                }
                Ok(Err(e)) => {
                    println!("I/O error during read: {:?}", e);  // Log I/O error if reading fails
                    continue;
                }
                Err(_) => {
                    println!("Timeout while reading data");  // Log timeout if reading the data takes too long
                    continue;
                }
            };
        }
        return Ok(());  // Return OK after the loop ends
    }
}

In terms of perfomance this code bounded to UnboundedChannel capability, so if you need proccess decent amount of data (lol use better tool) you need to have multiple UnboundedSender instances for sure

Since Im receiving dynamic data and exporters (Files, MongoDB etc you name it) are different so I will just parse it into JSON.

Its very unoptimized but it wont be bottleneck since we already working with I/O.

rust
use super::json_extractor::BinaryToJson;

pub struct ExportData {
    pub index: String,
    pub data: serde_json::Value,
}

impl ExportData {
    pub fn from_raw_string(input: &str) -> Option<Self> {
        let mut parts = input.split(';');
        let index_value = parts
            .find(|part| part.starts_with("index:"))
            .map(|part| part["index:".len()..].to_string())?;

        let data_value = parts.collect::<Vec<&str>>().join(";");
        let data = BinaryToJson::convert(&data_value).ok()?;

        Some(ExportData {
            index: index_value,
            data,
        })
    }
}
rust
use serde_json::json;
use serde_json::Value;
use std::collections::HashMap;

use crate::models::berr::Berr;

pub struct BinaryToJson;
impl BinaryToJson {
    pub fn convert(input: &str) -> Berr<Value> {
        let parts: Vec<&str> = input.split(';').collect();
        let mut map: HashMap<&str, Value> = HashMap::new();

        for item in parts.into_iter() {
            let entity_parts = item.splitn(2, ':').collect::<Vec<&str>>();
            match entity_parts.len() {
                0 => {}
                1 => {
                    if entity_parts[0].len() > 0 {
                        map.insert(entity_parts[0], Value::Null);
                    }
                }
                2 => {
                    map.insert(entity_parts[0], Self::parse_entity(entity_parts[1]));
                }
                3.. => {
                    dbg!("What the fuck?");
                }
            }
        }
        let json = json!(map);
        Ok(json)
    }
    fn is_array(input: &str) -> Option<Vec<Value>> {
        let output: Option<Vec<Value>>;
        if input.starts_with('[') && input.ends_with(']') {
            output = Some(
                input
                    .trim_matches(&['[', ']'][..])
                    .split(", ")
                    .map(|x| Value::String(x.to_string()))
                    .collect::<Vec<Value>>(),
            );
        } else {
            output = None;
        }
        return output;
    }
    fn parse_entity<'a>(entity: &'a str) -> Value {
        if let Some(x) = Self::is_array(&entity) {
            return Value::Array(x);
        } else {
            return Value::String(entity.to_string());
        }
    }
}

Exporter trait:

rust
use crate::Berr;
use crate::InitArgs;

pub trait Exporter {
    async fn new(cfg: &InitArgs) -> Berr<Self>
    where
        Self: Sized;
    async fn insert_bulk(&self, index: &str, data: Vec<String>) -> Berr<()>;
    async fn insert(&self, index: &str, data: &serde_json::Value) -> Berr<()>;
}

MongoDB exporter:

rust
use mongodb::{bson::doc, options::ClientOptions, Client};

use crate::exporter::Exporter;

pub struct MongoExporter {
    client: Client,
}

impl Exporter for MongoExporter {
    async fn new(cfg: &crate::models::init_config::InitArgs) -> crate::models::berr::Berr<Self>
    where
        Self: Sized,
    {
        let uri = format!("mongodb://{}:{}", &cfg.export_host, &cfg.export_port);
        let mut client_options = ClientOptions::parse(uri).await?;

        client_options.app_name = Some("DEBUG LOGGER".to_string());
        let client = Client::with_options(client_options)?;
        client
            .database("admin")
            .run_command(doc! { "ping" : 1})
            .await?;
        Ok(Self { client })
    }
    async fn insert(&self, index: &str, data: &serde_json::Value) -> crate::models::berr::Berr<()> {
        let mut doc = bson::to_document(&data)?;
        doc.insert("date", bson::DateTime::now());
        self.client
            .database("debugger_data")
            .collection(index)
            .insert_one(doc)
            .await?;
        Ok(())
    }
    async fn insert_bulk(&self, index: &str, data: Vec<String>) -> crate::models::berr::Berr<()> {
        todo!()
    }
}

I had many questions to mongodb crate TBH.

And just UnboundedReceiver very basic, didnt implemented backlog since never was in goal to break perfomance and stabilty.

rust
use crate::exporter::Exporter;
use crate::parsers::data_extractor::ExportData;
use tokio::sync::mpsc::UnboundedReceiver;
pub struct Queue {
    backlog: Vec<ExportData>,
}
impl Default for Queue {
    fn default() -> Self {
        Self { backlog: vec![] }
    }
}

impl Queue {
    pub async fn start_consumer<T: Exporter>(
        &mut self,
        mut rx: UnboundedReceiver<String>,
        exporter: T,
    ) {
        loop {
            match rx.recv().await {
                Some(msg) => match ExportData::from_raw_string(&msg) {
                    Some(ed) => {
                        if let Err(e) = exporter.insert(&ed.index, &ed.data).await {
                            println!(
                                "Error from Exporter, sending to backlog : {}",
                                e.to_string()
                            );
                            if self.backlog.len() < 100_000 {
                                self.backlog.push(ed);
                            } else {
                                println!("Backlog is pretty full now, data is lost");
                            }
                        }
                    }
                    None => {
                        println!("Invalid raw data : {}", msg);
                    }
                },
                None => {}
            }
        }
    }
}