1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use async_trait::async_trait;
use mithril_common::StdResult;
use tokio::sync::RwLock;

use super::adapter::StoreAdapter;

/// Implementing this trait will make store able to limit the number of the
/// stored records by pruning them if a limit is set.
#[async_trait]
pub trait StorePruner {
    /// The key type
    type Key: Sync + Send;

    /// The record type
    type Record: Sync + Send;

    /// This trait requires a way to get the internal adapter.
    fn get_adapter(&self)
        -> &RwLock<Box<dyn StoreAdapter<Key = Self::Key, Record = Self::Record>>>;

    /// Return the maximum number of elements that can exist in this store. If None, there is no limit.
    fn get_max_records(&self) -> Option<usize>;

    /// Prune elements exceeding the specified limit.
    async fn prune(&self) -> StdResult<()> {
        let retention_len = self.get_max_records().unwrap_or(usize::MAX);
        let lock = self.get_adapter();
        let mut adapter = lock.write().await;

        for (epoch, _record) in adapter
            .get_last_n_records(usize::MAX)
            .await?
            .into_iter()
            .skip(retention_len)
        {
            adapter.remove(&epoch).await?;
        }

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::{cmp::min, sync::Arc};

    use sqlite::Connection;

    use crate::store::adapter::SQLiteAdapter;

    use super::*;

    struct TestStore {
        adapter: RwLock<Box<dyn StoreAdapter<Key = u64, Record = String>>>,
        record_limit: Option<usize>,
    }

    impl StorePruner for TestStore {
        type Key = u64;
        type Record = String;

        fn get_adapter(
            &self,
        ) -> &RwLock<Box<dyn StoreAdapter<Key = Self::Key, Record = Self::Record>>> {
            &self.adapter
        }

        fn get_max_records(&self) -> Option<usize> {
            self.record_limit
        }
    }

    fn get_data(n: u64) -> Vec<(u64, String)> {
        let n = min(n, 6);
        let words = ["one", "two", "three", "four", "five", "six"];
        let mut values: Vec<(u64, String)> = Vec::new();

        for index in 0..n {
            values.push((index, words[index as usize].to_string()));
        }

        values
    }

    async fn get_adapter(data_len: u64) -> SQLiteAdapter<u64, String> {
        let connection = Connection::open_thread_safe(":memory:").unwrap();
        let mut adapter: SQLiteAdapter<u64, String> =
            SQLiteAdapter::new("whatever", Arc::new(connection)).unwrap();

        for (key, record) in get_data(data_len) {
            adapter.store_record(&key, &record).await.unwrap();
        }

        adapter
    }

    #[tokio::test]
    async fn test_no_prune() {
        for data_len in 1_u64..=6 {
            let store = TestStore {
                adapter: RwLock::new(Box::new(get_adapter(data_len).await)),
                record_limit: None,
            };

            store.prune().await.unwrap();
            assert_eq!(
                data_len as usize,
                store.adapter.read().await.get_iter().await.unwrap().count(),
                "test no pruning with dataset length = {data_len}"
            );
        }
    }
    #[tokio::test]
    async fn test_with_pruning() {
        for data_len in 1_u64..=6 {
            let store = TestStore {
                adapter: RwLock::new(Box::new(get_adapter(6).await)),
                record_limit: Some(data_len as usize),
            };

            store.prune().await.unwrap();
            assert_eq!(
                data_len as usize,
                store.adapter.read().await.get_iter().await.unwrap().count(),
                "test pruning with retention limit = {data_len}"
            );
        }
    }
}