mithril_aggregator/database/repository/
buffered_single_signature_repository.rs1use anyhow::Context;
2use async_trait::async_trait;
3use std::sync::Arc;
4
5use mithril_common::entities::{SignedEntityTypeDiscriminants, SingleSignature};
6use mithril_common::{StdError, StdResult};
7use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection};
8
9use crate::database::query::{
10 DeleteBufferedSingleSignatureQuery, GetBufferedSingleSignatureQuery,
11 InsertOrReplaceBufferedSingleSignatureRecordQuery,
12};
13use crate::database::record::BufferedSingleSignatureRecord;
14use crate::services::BufferedSingleSignatureStore;
15
16pub struct BufferedSingleSignatureRepository {
18 connection: Arc<SqliteConnection>,
19}
20
21impl BufferedSingleSignatureRepository {
22 pub fn new(connection_pool: Arc<SqliteConnection>) -> Self {
24 Self {
25 connection: connection_pool,
26 }
27 }
28
29 #[cfg(test)]
30 fn get_all(&self) -> StdResult<Vec<BufferedSingleSignatureRecord>> {
31 self.connection.fetch_collect(GetBufferedSingleSignatureQuery::all())
32 }
33
34 fn get_by_discriminant<T>(
35 &self,
36 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
37 ) -> StdResult<Vec<T>>
38 where
39 T: TryFrom<BufferedSingleSignatureRecord>,
40 StdError: From<T::Error>,
41 {
42 let records: Vec<BufferedSingleSignatureRecord> =
43 self.connection
44 .fetch_collect(GetBufferedSingleSignatureQuery::by_discriminant(
45 signed_entity_type_discriminant,
46 ))?;
47
48 let mut entities: Vec<T> = Vec::with_capacity(records.len());
49 for record in records {
50 entities.push(record.try_into()?);
51 }
52
53 Ok(entities)
54 }
55}
56
57#[async_trait]
58impl BufferedSingleSignatureStore for BufferedSingleSignatureRepository {
59 async fn buffer_signature(
60 &self,
61 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
62 signature: &SingleSignature,
63 ) -> StdResult<()> {
64 let record = BufferedSingleSignatureRecord::try_from_single_signature(
65 signature,
66 signed_entity_type_discriminant,
67 )
68 .with_context(|| "Failed to convert SingleSignature to BufferedSingleSignatureRecord")?;
69
70 self.connection
71 .fetch_first(InsertOrReplaceBufferedSingleSignatureRecordQuery::one(
72 record,
73 ))?;
74
75 Ok(())
76 }
77
78 async fn get_buffered_signatures(
79 &self,
80 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
81 ) -> StdResult<Vec<SingleSignature>> {
82 self.get_by_discriminant(signed_entity_type_discriminant)
83 }
84
85 async fn remove_buffered_signatures(
86 &self,
87 signed_entity_type_discriminant: SignedEntityTypeDiscriminants,
88 single_signatures: Vec<SingleSignature>,
89 ) -> StdResult<()> {
90 let signatures_party_ids = single_signatures.into_iter().map(|s| s.party_id).collect();
91 self.connection.fetch_first(
92 DeleteBufferedSingleSignatureQuery::by_discriminant_and_party_ids(
93 signed_entity_type_discriminant,
94 signatures_party_ids,
95 ),
96 )?;
97
98 Ok(())
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use mithril_common::entities::SignedEntityTypeDiscriminants::{
105 CardanoTransactions, MithrilStakeDistribution,
106 };
107 use mithril_common::test_utils::fake_keys;
108
109 use crate::database::record::{BufferedSingleSignatureRecord, strip_buffered_sigs_date};
110 use crate::database::test_helper::{insert_buffered_single_signatures, main_db_connection};
111
112 use super::*;
113
114 #[test]
115 fn retrieve_all() {
116 let connection = main_db_connection().unwrap();
117 insert_buffered_single_signatures(
118 &connection,
119 BufferedSingleSignatureRecord::fakes(&[
120 ("party1", CardanoTransactions),
121 ("party2", CardanoTransactions),
122 ("party3", MithrilStakeDistribution),
123 ]),
124 )
125 .unwrap();
126
127 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
128
129 let buffered_signatures_ctx = store.get_all().unwrap();
130 assert_eq!(
131 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[
132 ("party3", MithrilStakeDistribution),
133 ("party2", CardanoTransactions),
134 ("party1", CardanoTransactions),
135 ])),
136 strip_buffered_sigs_date(&buffered_signatures_ctx)
137 );
138 }
139
140 #[tokio::test]
141 async fn retrieve_signatures_by_discriminant() {
142 let connection = main_db_connection().unwrap();
143 insert_buffered_single_signatures(
144 &connection,
145 BufferedSingleSignatureRecord::fakes(&[
146 ("party1", CardanoTransactions),
147 ("party2", CardanoTransactions),
148 ("party3", MithrilStakeDistribution),
149 ]),
150 )
151 .unwrap();
152
153 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
154
155 let buffered_signatures_ctx = store
156 .get_by_discriminant::<BufferedSingleSignatureRecord>(CardanoTransactions)
157 .unwrap();
158 assert_eq!(
159 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[
160 ("party2", CardanoTransactions),
161 ("party1", CardanoTransactions),
162 ])),
163 strip_buffered_sigs_date(&buffered_signatures_ctx)
164 );
165
166 let buffered_signatures_msd = store
167 .get_by_discriminant::<BufferedSingleSignatureRecord>(MithrilStakeDistribution)
168 .unwrap();
169 assert_eq!(
170 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[(
171 "party3",
172 MithrilStakeDistribution
173 ),])),
174 strip_buffered_sigs_date(&buffered_signatures_msd)
175 );
176 }
177
178 #[tokio::test]
179 async fn store_signatures() {
180 let connection = main_db_connection().unwrap();
181 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
182
183 {
185 store
186 .buffer_signature(
187 CardanoTransactions,
188 &SingleSignature::new(
189 "party1",
190 fake_keys::single_signature()[0].try_into().unwrap(),
191 vec![1],
192 ),
193 )
194 .await
195 .unwrap();
196 store
197 .buffer_signature(
198 CardanoTransactions,
199 &SingleSignature::new(
200 "party2",
201 fake_keys::single_signature()[1].try_into().unwrap(),
202 vec![2],
203 ),
204 )
205 .await
206 .unwrap();
207
208 let buffered_signatures =
209 store.get_buffered_signatures(CardanoTransactions).await.unwrap();
210 assert_eq!(
211 vec![
212 SingleSignature::new(
213 "party2",
214 fake_keys::single_signature()[1].try_into().unwrap(),
215 vec![2],
216 ),
217 SingleSignature::new(
218 "party1",
219 fake_keys::single_signature()[0].try_into().unwrap(),
220 vec![1],
221 ),
222 ],
223 buffered_signatures
224 );
225 }
226 {
228 store
229 .buffer_signature(
230 MithrilStakeDistribution,
231 &SingleSignature::new(
232 "party3",
233 fake_keys::single_signature()[2].try_into().unwrap(),
234 vec![3],
235 ),
236 )
237 .await
238 .unwrap();
239
240 let buffered_signatures =
241 store.get_buffered_signatures(MithrilStakeDistribution).await.unwrap();
242 assert_eq!(
243 vec![SingleSignature::new(
244 "party3",
245 fake_keys::single_signature()[2].try_into().unwrap(),
246 vec![3],
247 )],
248 buffered_signatures
249 );
250 }
251 }
252
253 #[tokio::test]
254 async fn remove_buffered_signatures() {
255 let connection = main_db_connection().unwrap();
256 insert_buffered_single_signatures(
257 &connection,
258 BufferedSingleSignatureRecord::fakes(&[
259 ("party1", MithrilStakeDistribution),
260 ("party2", MithrilStakeDistribution),
261 ("party3", MithrilStakeDistribution),
262 ("party4", CardanoTransactions),
263 ]),
264 )
265 .unwrap();
266
267 let store = BufferedSingleSignatureRepository::new(Arc::new(connection));
268
269 store
270 .remove_buffered_signatures(
271 MithrilStakeDistribution,
272 vec![
273 BufferedSingleSignatureRecord::fake("party1", MithrilStakeDistribution)
274 .try_into()
275 .unwrap(),
276 BufferedSingleSignatureRecord::fake("party3", MithrilStakeDistribution)
277 .try_into()
278 .unwrap(),
279 ],
280 )
281 .await
282 .unwrap();
283
284 let remaining_msd_sigs = store.get_all().unwrap();
285 assert_eq!(
286 strip_buffered_sigs_date(&BufferedSingleSignatureRecord::fakes(&[
287 ("party4", CardanoTransactions),
288 ("party2", MithrilStakeDistribution),
289 ])),
290 strip_buffered_sigs_date(&remaining_msd_sigs)
291 );
292 }
293}