Published on

Buffered UnboundedChannel consumer

Some small changes to reduce mongodb usage and increase insert speed through exporter

Prev version used insert() on EVERY entity

not cool

So we are going to stash incoming data for a while (1-10 secs whatever you need, just be careful since date is generated at exporter)

I again sent my code to ChatGPT to make comments, so it should be enough with explanation

rust
use crate::exporter::Exporter;
use crate::parsers::data_extractor::ExportData;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedReceiver;

/// A struct representing a queue. The implementation is left empty for now.
pub struct Queue;

impl Default for Queue {
    fn default() -> Self {
        Self {}
    }
}

/// A buffer to store and manage `ExportData` items.
pub struct ExportDataBuffer {
    map: HashMap<String, Vec<ExportData>>,
}

impl ExportDataBuffer {
    /// Creates a new `ExportDataBuffer` instance with an empty map.
    pub fn new() -> Self {
        Self {
            map: HashMap::new(),
        }
    }

    /// Adds an `ExportData` item to the buffer. Items are grouped by their `index`.
    pub fn push(&mut self, item: ExportData) {
        let key = &item.index;
        // Get or create the vector for the given key and push the item into it.
        let vec = self.map.entry(key.to_string()).or_insert_with(|| vec![]);
        vec.push(item);
    }

    /// Extracts and removes data from the buffer to be inserted.
    /// Returns `None` if there's no data to insert, otherwise returns a map of data to insert.
    pub fn data_to_insert(&mut self) -> Option<HashMap<String, Vec<ExportData>>> {
        let mut keys_to_remove = vec![];

        // Collect keys with non-empty values to be removed later.
        for (k, v) in self.map.iter() {
            if !v.is_empty() {
                keys_to_remove.push(k.clone());
            }
        }

        let mut result = HashMap::new();

        // Remove collected keys and add their values to the result map.
        for k in keys_to_remove {
            if let Some(v) = self.map.remove(&k) {
                result.insert(k, v);
            }
        }

        if result.is_empty() {
            None
        } else {
            Some(result)
        }
    }
}

impl Queue {
    /// Starts a consumer that processes messages from `UnboundedReceiver` and periodically inserts data.
    /// 
    /// # Arguments
    ///
    /// * `rx` - The `UnboundedReceiver` from which messages are received.
    /// * `exporter` - The `Exporter` instance used to insert data.
    pub async fn start_consumer<T: Exporter + std::marker::Sync + Send>(
        &mut self,
        mut rx: UnboundedReceiver<String>,
        exporter: Arc<T>,
    ) {
        let mut buffer: ExportDataBuffer = ExportDataBuffer::new();
        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
        loop {
            tokio::select! {
                // Handle incoming data from the receiver.
                Some(data) = rx.recv() => {
                    // Convert raw string to `ExportData` and add to buffer if valid.
                    match ExportData::from_raw_string(&data) {
                        None => {},
                        Some(ed) => {
                            buffer.push(ed);
                        }
                    }
                },
                // Periodically check the buffer and insert data.
                _ = interval.tick() => {
                    if let Some(d) = buffer.data_to_insert() {
                        for (key, val) in d.iter() {
                            // Insert bulk data and handle any errors.
                            match exporter.insert_bulk(key, val).await {
                                Ok(_) => {
                                    // Handle success if needed (currently does nothing).
                                }
                                Err(e) => {
                                    dbg!(e.to_string()); // Log error message.
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

Now we dont calling mongodb to insert for every item, just buffer them and insert in batches