1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use std::sync::Arc;

use async_trait::async_trait;
use slog::{debug, Logger};

use mithril_common::entities::BlockNumber;
use mithril_common::logging::LoggerExtensions;
use mithril_common::signable_builder::TransactionsImporter;
use mithril_common::StdResult;
use mithril_persistence::sqlite::{SqliteCleaner, SqliteCleaningTask, SqliteConnectionPool};

/// A decorator of [TransactionsImporter] that vacuums the database after running the import.
pub struct TransactionsImporterWithVacuum {
    connection_pool: Arc<SqliteConnectionPool>,
    wrapped_importer: Arc<dyn TransactionsImporter>,
    logger: Logger,
}

impl TransactionsImporterWithVacuum {
    /// Create a new instance of [TransactionsImporterWithVacuum].
    pub fn new(
        connection_pool: Arc<SqliteConnectionPool>,
        wrapped_importer: Arc<dyn TransactionsImporter>,
        logger: Logger,
    ) -> Self {
        Self {
            connection_pool,
            wrapped_importer,
            logger: logger.new_with_component_name::<Self>(),
        }
    }
}

#[async_trait]
impl TransactionsImporter for TransactionsImporterWithVacuum {
    async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()> {
        self.wrapped_importer.import(up_to_beacon).await?;

        debug!(
            self.logger,
            "Transaction Import finished - Vacuuming database to reclaim disk space"
        );
        let connection = self.connection_pool.connection()?;

        SqliteCleaner::new(&connection)
            .with_tasks(&[SqliteCleaningTask::Vacuum])
            .run()?;

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use mockall::mock;
    use sqlite::Connection;

    use mithril_common::test_utils::TempDir;
    use mithril_persistence::sqlite::SqliteConnection;

    use crate::test_tools::TestLogger;

    use super::*;

    mock! {
        pub TransactionImporterImpl {}

        #[async_trait]
        impl TransactionsImporter for TransactionImporterImpl {
            async fn import(&self, up_to_beacon: BlockNumber) -> StdResult<()>;
        }
    }

    impl TransactionsImporterWithVacuum {
        pub fn new_with_mock<I>(
            connection_pool: Arc<SqliteConnectionPool>,
            importer_mock_config: I,
        ) -> Self
        where
            I: FnOnce(&mut MockTransactionImporterImpl),
        {
            let mut transaction_importer = MockTransactionImporterImpl::new();
            importer_mock_config(&mut transaction_importer);

            Self::new(
                connection_pool,
                Arc::new(transaction_importer),
                TestLogger::stdout(),
            )
        }
    }

    /// Create a table, insert a lot of data and drop the table to make the database size grow.
    fn mangle_db(connection: &SqliteConnection) {
        connection
            .execute("CREATE TABLE test (id INTEGER PRIMARY KEY, text TEXT);")
            .unwrap();
        connection
            .execute(format!(
                "INSERT INTO test (id, text) VALUES {}",
                (0..10_000)
                    .map(|i| format!("({}, 'some text to fill the db')", i))
                    .collect::<Vec<String>>()
                    .join(", ")
            ))
            .unwrap();
        connection.execute("DROP TABLE test").unwrap();
    }

    #[tokio::test]
    async fn test_database_size_shrink_after_import() {
        let db_path = TempDir::create("mithril-persistence", "test_vacuum").join("test.db");
        let connection = Connection::open_thread_safe(&db_path).unwrap();
        // make the database size grow
        mangle_db(&connection);

        let importer = TransactionsImporterWithVacuum::new_with_mock(
            Arc::new(SqliteConnectionPool::build_from_connection(connection)),
            |mock| {
                mock.expect_import().once().returning(|_| Ok(()));
            },
        );

        let initial_size = db_path.metadata().unwrap().len();

        importer
            .import(BlockNumber(100))
            .await
            .expect("Import should not fail");

        let after_import_size = db_path.metadata().unwrap().len();

        assert!(
            initial_size > after_import_size,
            "Database size did not shrink after import: \
            initial_size: {initial_size} -> after_import_size: {after_import_size}"
        );
    }
}