mithril_aggregator/database/repository/
buffered_single_signature_repository.rs1use anyhow::Context;
2use async_trait::async_trait;
3use std::sync::Arc;
4
5use mithril_common::entities::{SignedEntityTypeDiscriminants, SingleSignatures};
6use mithril_common::{StdError, StdResult};
7use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
8
9use crate::database::query::{
10 DeleteBufferedSingleSignatureQuery, GetBufferedSingleSignatureQuery,
11 InsertOrReplaceBufferedSingleSignatureRecordQuery,
12};
13use crate::database::record::BufferedSingleSignatureRecord;
14use crate::services::BufferedSingleSignatureStore;
15
16pub struct BufferedSingleSignatureRepository {
18 connection: Arc<SqliteConnection>,
19}
20
21impl BufferedSingleSignatureRepository {
22 pub fn new(connection_pool: Arc<SqliteConnection>) -> Self {
24 Self {
25 connection: connection_pool,
26 }
27 }
28
29 #[cfg(test)]
30 fn get_all(&self) -> StdResult<Vec<BufferedSingleSignatureRecord>> {
31 self.connection
32 .fetch_collect(GetBufferedSingleSignatureQuery::all())
33 }
34
35 fn get_by_discriminant<T>(
36 &self,
37 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
38 ) -> StdResult<Vec<T>>
39 where
40 T: TryFrom<BufferedSingleSignatureRecord>,
41 StdError: From<T::Error>,
42 {
43 let records: Vec<BufferedSingleSignatureRecord> =
44 self.connection
45 .fetch_collect(GetBufferedSingleSignatureQuery::by_discriminant(
46 signed_entity_type_discriminant,
47 ))?;
48
49 let mut entities: Vec<T> = Vec::with_capacity(records.len());
50 for record in records {
51 entities.push(record.try_into()?);
52 }
53
54 Ok(entities)
55 }
56}
57
58#[async_trait]
59impl BufferedSingleSignatureStore for BufferedSingleSignatureRepository {
60 async fn buffer_signature(
61 &self,
62 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
63 signature: &SingleSignatures,
64 ) -> StdResult<()> {
65 let record = BufferedSingleSignatureRecord::try_from_single_signatures(
66 signature,
67 signed_entity_type_discriminant,
68 )
69 .with_context(|| "Failed to convert SingleSignatures to BufferedSingleSignatureRecord")?;
70
71 self.connection
72 .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one(
73 record,
74 ))?;
75
76 Ok(())
77 }
78
79 async fn get_buffered_signatures(
80 &self,
81 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
82 ) -> StdResult<Vec<SingleSignatures>> {
83 self.get_by_discriminant(signed_entity_type_discriminant)
84 }
85
86 async fn remove_buffered_signatures(
87 &self,
88 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
89 single_signatures: Vec<SingleSignatures>,
90 ) -> StdResult<()> {
91 let signatures_party_ids = single_signatures.into_iter().map(|s| s.party_id).collect();
92 self.connection.fetch_first(
93 DeleteBufferedSingleSignatureQuery::by_discriminant_and_party_ids(
94 signed_entity_type_discriminant,
95 signatures_party_ids,
96 ),
97 )?;
98
99 Ok(())
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use mithril_common::entities::SignedEntityTypeDiscriminants::{
106 CardanoTransactions, MithrilStakeDistribution,
107 };
108 use mithril_common::test_utils::fake_keys;
109
110 use crate::database::record::{strip_buffered_sigs_date, BufferedSingleSignatureRecord};
111 use crate::database::test_helper::{insert_buffered_single_signatures, main_db_connection};
112
113 use super::*;
114
115 #[test]
116 fn retrieve_all() {
117 let connection = main_db_connection().unwrap();
118 insert_buffered_single_signatures(
119 &connection,
120 BufferedSingleSignatureRecord::fakes(&[
121 ("party1", CardanoTransactions),
122 ("party2", CardanoTransactions),
123 ("party3", MithrilStakeDistribution),
124 ]),
125 )
126 .unwrap();
127
128 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
129
130 let buffered_signatures_ctx = store.get_all().unwrap();
131 assert_eq!(
132 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[
133 ("party3", MithrilStakeDistribution),
134 ("party2", CardanoTransactions),
135 ("party1", CardanoTransactions),
136 ])),
137 strip_buffered_sigs_date(&buffered_signatures_ctx)
138 );
139 }
140
141 #[tokio::test]
142 async fn retrieve_signatures_by_discriminant() {
143 let connection = main_db_connection().unwrap();
144 insert_buffered_single_signatures(
145 &connection,
146 BufferedSingleSignatureRecord::fakes(&[
147 ("party1", CardanoTransactions),
148 ("party2", CardanoTransactions),
149 ("party3", MithrilStakeDistribution),
150 ]),
151 )
152 .unwrap();
153
154 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
155
156 let buffered_signatures_ctx = store
157 .get_by_discriminant::<BufferedSingleSignatureRecord>(CardanoTransactions)
158 .unwrap();
159 assert_eq!(
160 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[
161 ("party2", CardanoTransactions),
162 ("party1", CardanoTransactions),
163 ])),
164 strip_buffered_sigs_date(&buffered_signatures_ctx)
165 );
166
167 let buffered_signatures_msd = store
168 .get_by_discriminant::<BufferedSingleSignatureRecord>(MithrilStakeDistribution)
169 .unwrap();
170 assert_eq!(
171 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[(
172 "party3",
173 MithrilStakeDistribution
174 ),])),
175 strip_buffered_sigs_date(&buffered_signatures_msd)
176 );
177 }
178
179 #[tokio::test]
180 async fn store_signatures() {
181 let connection = main_db_connection().unwrap();
182 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
183
184 {
186 store
187 .buffer_signature(
188 CardanoTransactions,
189 &SingleSignatures::new(
190 "party1",
191 fake_keys::single_signature()[0].try_into().unwrap(),
192 vec![1],
193 ),
194 )
195 .await
196 .unwrap();
197 store
198 .buffer_signature(
199 CardanoTransactions,
200 &SingleSignatures::new(
201 "party2",
202 fake_keys::single_signature()[1].try_into().unwrap(),
203 vec![2],
204 ),
205 )
206 .await
207 .unwrap();
208
209 let buffered_signatures = store
210 .get_buffered_signatures(CardanoTransactions)
211 .await
212 .unwrap();
213 assert_eq!(
214 vec![
215 SingleSignatures::new(
216 "party2",
217 fake_keys::single_signature()[1].try_into().unwrap(),
218 vec![2],
219 ),
220 SingleSignatures::new(
221 "party1",
222 fake_keys::single_signature()[0].try_into().unwrap(),
223 vec![1],
224 ),
225 ],
226 buffered_signatures
227 );
228 }
229 {
231 store
232 .buffer_signature(
233 MithrilStakeDistribution,
234 &SingleSignatures::new(
235 "party3",
236 fake_keys::single_signature()[2].try_into().unwrap(),
237 vec![3],
238 ),
239 )
240 .await
241 .unwrap();
242
243 let buffered_signatures = store
244 .get_buffered_signatures(MithrilStakeDistribution)
245 .await
246 .unwrap();
247 assert_eq!(
248 vec![SingleSignatures::new(
249 "party3",
250 fake_keys::single_signature()[2].try_into().unwrap(),
251 vec![3],
252 )],
253 buffered_signatures
254 );
255 }
256 }
257
258 #[tokio::test]
259 async fn remove_buffered_signatures() {
260 let connection = main_db_connection().unwrap();
261 insert_buffered_single_signatures(
262 &connection,
263 BufferedSingleSignatureRecord::fakes(&[
264 ("party1", MithrilStakeDistribution),
265 ("party2", MithrilStakeDistribution),
266 ("party3", MithrilStakeDistribution),
267 ("party4", CardanoTransactions),
268 ]),
269 )
270 .unwrap();
271
272 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
273
274 store
275 .remove_buffered_signatures(
276 MithrilStakeDistribution,
277 vec![
278 BufferedSingleSignatureRecord::fake("party1", MithrilStakeDistribution)
279 .try_into()
280 .unwrap(),
281 BufferedSingleSignatureRecord::fake("party3", MithrilStakeDistribution)
282 .try_into()
283 .unwrap(),
284 ],
285 )
286 .await
287 .unwrap();
288
289 let remaining_msd_sigs = store.get_all().unwrap();
290 assert_eq!(
291 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[
292 ("party4", CardanoTransactions),
293 ("party2", MithrilStakeDistribution),
294 ])),
295 strip_buffered_sigs_date(&remaining_msd_sigs)
296 );
297 }
298}