1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::{anyhow, Context};
4use slog::{debug, info, trace, Logger};
5
6use mithril_common::logging::LoggerExtensions;
7use mithril_common::{entities::Certificate, StdResult};
8
9use crate::database::repository::{CertificateRepository, SignedEntityStorer};
10
11pub struct CertificatesHashMigrator {
13 certificate_repository: CertificateRepository,
14 signed_entity_storer: Arc<dyn SignedEntityStorer>,
15 logger: Logger,
16}
17
18impl CertificatesHashMigrator {
19 pub fn new(
21 certificate_repository: CertificateRepository,
22 signed_entity_storer: Arc<dyn SignedEntityStorer>,
23 logger: Logger,
24 ) -> Self {
25 Self {
26 certificate_repository,
27 signed_entity_storer,
28 logger: logger.new_with_component_name::<Self>(),
29 }
30 }
31
32 pub async fn migrate(&self) -> StdResult<()> {
34 info!(self.logger, "Starting migration");
35 let (old_certificates, old_and_new_hashes) =
36 self.create_certificates_with_updated_hash().await?;
37
38 self.update_signed_entities_certificate_hash(old_and_new_hashes)
39 .await?;
40
41 self.cleanup(old_certificates).await?;
42
43 info!(
44 self.logger,
45 "All certificates have been migrated successfully"
46 );
47 Ok(())
48 }
49
50 async fn create_certificates_with_updated_hash(
53 &self,
54 ) -> StdResult<(Vec<Certificate>, HashMap<String, String>)> {
55 info!(self.logger, "Recomputing all certificates hash");
56 let old_certificates = self
57 .certificate_repository
58 .get_latest_certificates::<Certificate>(usize::MAX)
60 .await?;
61 let mut certificates_to_remove = vec![];
62
63 let mut migrated_certificates = vec![];
64 let mut old_and_new_hashes: HashMap<String, String> = HashMap::new();
65
66 debug!(self.logger, "Computing new hash for all certificates");
71 for mut certificate in old_certificates.into_iter().rev() {
72 let old_previous_hash = if certificate.is_genesis() {
73 certificate.previous_hash.clone()
74 } else {
75 let old_previous_hash = certificate.previous_hash.clone();
76 old_and_new_hashes
77 .get(&certificate.previous_hash)
78 .ok_or(anyhow!(
79 "Could not migrate certificate previous_hash: The hash '{}' doesn't exist in the certificate table",
80 &certificate.previous_hash
81 ))?.clone_into(&mut certificate.previous_hash);
82
83 old_previous_hash
84 };
85
86 if let Some(new_hash) = {
87 let computed_hash = certificate.compute_hash();
88 (computed_hash != certificate.hash).then_some(computed_hash)
90 } {
91 old_and_new_hashes.insert(certificate.hash.clone(), new_hash.clone());
92
93 if certificate.is_genesis() {
94 trace!(
95 self.logger, "New hash computed for genesis certificate {:?}",
96 certificate.signed_entity_type();
97 "old_hash" => &certificate.hash,
98 "new_hash" => &new_hash,
99 );
100 } else {
101 trace!(
102 self.logger, "New hash computed for certificate {:?}",
103 certificate.signed_entity_type();
104 "old_hash" => &certificate.hash,
105 "new_hash" => &new_hash,
106 "old_previous_hash" => &old_previous_hash,
107 "new_previous_hash" => &certificate.previous_hash
108 );
109 }
110
111 certificates_to_remove.push(certificate.clone());
112 certificate.hash = new_hash;
113 migrated_certificates.push(certificate);
114 } else {
115 old_and_new_hashes.insert(certificate.hash.clone(), certificate.hash);
116 }
117 }
118
119 debug!(
122 self.logger,
123 "Inserting migrated certificates in the database"
124 );
125 let migrated_certificates_chunk_size = 250;
126 for migrated_certificates_chunk in
127 migrated_certificates.chunks(migrated_certificates_chunk_size)
128 {
129 self.certificate_repository
130 .create_many_certificates(migrated_certificates_chunk.to_owned())
131 .await
132 .with_context(|| {
133 "Certificates Hash Migrator can not insert migrated certificates in the database"
134 })?;
135 }
136
137 Ok((certificates_to_remove, old_and_new_hashes))
138 }
139
140 async fn update_signed_entities_certificate_hash(
141 &self,
142 old_and_new_certificate_hashes: HashMap<String, String>,
143 ) -> StdResult<()> {
144 info!(self.logger, "Updating signed entities certificate ids");
145 let old_hashes: Vec<&str> = old_and_new_certificate_hashes
146 .keys()
147 .map(|k| k.as_str())
148 .collect();
149
150 debug!(
151 self.logger,
152 "Updating signed entities certificate hash in the database"
153 );
154 let old_hashes_chunk_size = 250;
155 for old_hashes_chunk in old_hashes.chunks(old_hashes_chunk_size) {
156 let mut records_to_migrate = self
157 .signed_entity_storer
158 .get_signed_entities_by_certificates_ids(old_hashes_chunk)
159 .await
160 .with_context(||
161 format!(
162 "Certificates Hash Migrator can not get signed entities by certificates ids with hashes: '{:?}'", old_hashes
163 )
164 )?;
165
166 for signed_entity_record in records_to_migrate.iter_mut() {
167 let new_certificate_hash =
168 old_and_new_certificate_hashes
169 .get(&signed_entity_record.certificate_id)
170 .ok_or( anyhow!(
171 "Migration Error: no migrated hash found for signed entity with certificate id: {}",
172 signed_entity_record.certificate_id
173 ))?
174 .to_owned();
175
176 trace!(
177 self.logger, "Migrating signed entity {} certificate hash computed for certificate",
178 signed_entity_record.signed_entity_id;
179 "old_certificate_hash" => &signed_entity_record.certificate_id,
180 "new_certificate_hash" => &new_certificate_hash
181 );
182 signed_entity_record.certificate_id = new_certificate_hash;
183 }
184
185 self.signed_entity_storer
186 .update_signed_entities(records_to_migrate)
187 .await
188 .with_context(|| "Certificates Hash Migrator can not update signed entities")?;
189 }
190
191 Ok(())
192 }
193
194 async fn cleanup(&self, old_certificates: Vec<Certificate>) -> StdResult<()> {
195 info!(self.logger, "Deleting old certificates in the database");
196 let old_certificates_chunk_size = 250;
197 for old_certificates_chunk in old_certificates
198 .into_iter()
199 .rev()
200 .collect::<Vec<_>>()
201 .chunks(old_certificates_chunk_size)
202 {
203 self.certificate_repository
204 .delete_certificates(&old_certificates_chunk.iter().collect::<Vec<_>>())
205 .await
206 .with_context(|| {
207 "Certificates Hash Migrator can not delete old certificates in the database"
208 })?;
209 }
210
211 Ok(())
212 }
213}
214
215#[cfg(test)]
216mod test {
217 use mithril_common::entities::{
218 Epoch, ImmutableFileNumber, SignedEntityConfig, SignedEntityType,
219 SignedEntityTypeDiscriminants as Type, TimePoint,
220 };
221 use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection};
222
223 use crate::database::record::{CertificateRecord, SignedEntityRecord};
224 use crate::database::repository::SignedEntityStore;
225 use crate::test_tools::TestLogger;
226
227 use super::*;
228
229 fn connection_with_foreign_key_support() -> SqliteConnection {
230 ConnectionBuilder::open_memory()
231 .with_migrations(crate::database::migration::get_migrations())
232 .with_options(&[ConnectionOptions::EnableForeignKeys])
233 .build()
234 .unwrap()
235 }
236
237 fn connection_without_foreign_key_support() -> SqliteConnection {
238 ConnectionBuilder::open_memory()
239 .with_migrations(crate::database::migration::get_migrations())
240 .with_options(&[ConnectionOptions::ForceDisableForeignKeys])
241 .build()
242 .unwrap()
243 }
244
245 fn time_at(epoch: u64, immutable_file_number: ImmutableFileNumber) -> TimePoint {
248 TimePoint {
249 epoch: Epoch(epoch),
250 immutable_file_number,
251 ..TimePoint::dummy()
252 }
253 }
254
255 fn dummy_genesis(certificate_hash: &str, time_point: TimePoint) -> Certificate {
256 let certificate = CertificateRecord::dummy_genesis(certificate_hash, time_point.epoch);
257
258 certificate.into()
259 }
260
261 fn dummy_certificate(
262 certificate_hash: &str,
263 previous_hash: &str,
264 time_point: TimePoint,
265 signed_entity_type: Type,
266 ) -> Certificate {
267 let certificate = CertificateRecord::dummy(
268 certificate_hash,
269 previous_hash,
270 time_point.epoch,
271 SignedEntityConfig::dummy()
272 .time_point_to_signed_entity(signed_entity_type, &time_point)
273 .unwrap(),
274 );
275
276 certificate.into()
277 }
278
279 fn signed_entity_for_certificate(certificate: &Certificate) -> Option<SignedEntityRecord> {
280 match certificate.is_genesis() {
281 true => None,
282 false => {
283 let signed_entity_type = certificate.signed_entity_type();
284 let artifact = format!("{signed_entity_type:?}");
286 let id = match &signed_entity_type {
287 SignedEntityType::MithrilStakeDistribution(epoch) => {
288 format!("mithril-stake-distribution-{epoch}")
289 }
290 SignedEntityType::CardanoStakeDistribution(epoch) => {
291 format!("cardano-stake-distribution-{epoch}")
292 }
293 SignedEntityType::CardanoImmutableFilesFull(beacon) => {
294 format!("snapshot-{}-{}", beacon.epoch, beacon.immutable_file_number)
295 }
296 SignedEntityType::CardanoTransactions(epoch, block_number) => {
297 format!("cardano-transactions-{epoch}-{block_number}",)
298 }
299 SignedEntityType::CardanoDatabase(beacon) => {
300 format!(
301 "cardano-database-{}-{}",
302 beacon.epoch, beacon.immutable_file_number
303 )
304 }
305 };
306
307 let signed_entity_record = SignedEntityRecord {
308 signed_entity_id: format!("signed-entity-{id}",),
309 certificate_id: certificate.hash.clone(),
310 signed_entity_type,
311 artifact,
312 created_at: Default::default(),
313 };
314
315 Some(signed_entity_record)
316 }
317 }
318 }
319
320 async fn fill_certificates_and_signed_entities_in_db(
321 connection: Arc<SqliteConnection>,
322 certificates: &[Certificate],
323 ) -> StdResult<Vec<(Certificate, Option<SignedEntityRecord>)>> {
324 let certificate_repository: CertificateRepository =
325 CertificateRepository::new(connection.clone());
326 let signed_entity_store = SignedEntityStore::new(connection.clone());
327 let mut result = vec![];
328
329 for certificate in certificates.iter().cloned() {
330 certificate_repository
331 .create_certificate(certificate.clone())
332 .await
333 .with_context(|| {
334 format!(
335 "Certificates Hash Migrator can not create certificate with hash: '{}'",
336 certificate.hash
337 )
338 })?;
339
340 let signed_entity_maybe = signed_entity_for_certificate(&certificate);
341 if let Some(record) = &signed_entity_maybe {
342 signed_entity_store
343 .store_signed_entity(record)
344 .await
345 .with_context(|| "Certificates Hash Migrator can not store signed entity")?;
346 }
347
348 result.push((certificate.clone(), signed_entity_maybe));
349 }
350
351 Ok(result)
352 }
353
354 fn recompute_hashes(
355 certificates_and_signed_entity: Vec<(Certificate, Option<SignedEntityRecord>)>,
356 ) -> Vec<(Certificate, Option<SignedEntityRecord>)> {
357 let mut old_and_new_hashes: HashMap<String, String> = HashMap::new();
358 let mut result = vec![];
359
360 for (mut certificate, signed_entity_maybe) in certificates_and_signed_entity {
361 if let Some(hash) = old_and_new_hashes.get(&certificate.previous_hash) {
362 certificate.previous_hash.clone_from(hash);
363 }
364
365 let new_hash = certificate.compute_hash();
366 old_and_new_hashes.insert(certificate.hash.clone(), new_hash.clone());
367 certificate.hash = new_hash;
368
369 let signed_entity_maybe = match signed_entity_maybe {
370 None => None,
371 Some(mut signed_entity) => {
372 signed_entity.certificate_id.clone_from(&certificate.hash);
373 Some(signed_entity)
374 }
375 };
376
377 result.push((certificate, signed_entity_maybe));
378 }
379
380 result
381 }
382
383 #[test]
384 fn ensure_test_framework_recompute_correct_hashes() {
385 let old_certificates: Vec<(Certificate, Option<SignedEntityRecord>)> = vec![
386 dummy_genesis("genesis", time_at(1, 1)),
387 dummy_certificate(
388 "cert1",
389 "genesis",
390 time_at(1, 2),
391 Type::MithrilStakeDistribution,
392 ),
393 dummy_certificate(
394 "cert2",
395 "cert1",
396 time_at(2, 3),
397 Type::MithrilStakeDistribution,
398 ),
399 ]
400 .into_iter()
401 .map(|cert| (cert.clone(), signed_entity_for_certificate(&cert)))
402 .collect();
403
404 let expected: Vec<(Certificate, Option<SignedEntityRecord>)> = vec![
405 dummy_genesis(
406 "328b1ac75ef18fe09ff542ea1997ee512cd62c886a260463034e551255ad39e0",
407 time_at(1, 1),
408 ),
409 dummy_certificate(
410 "007286af724bb132dab1f13f9cda8a86d0cd82173f0b4a91124cc7bff63b1562",
411 "328b1ac75ef18fe09ff542ea1997ee512cd62c886a260463034e551255ad39e0",
412 time_at(1, 2),
413 Type::MithrilStakeDistribution,
414 ),
415 dummy_certificate(
416 "98fb51c4588293acec548c4d35e499fe77e6eb2eb75c67d64a1026a6f88bad7b",
417 "007286af724bb132dab1f13f9cda8a86d0cd82173f0b4a91124cc7bff63b1562",
418 time_at(2, 3),
419 Type::MithrilStakeDistribution,
420 ),
421 ]
422 .into_iter()
423 .map(|cert| (cert.clone(), signed_entity_for_certificate(&cert)))
424 .collect();
425 let recomputed = recompute_hashes(old_certificates);
426
427 assert_eq!(expected, recomputed);
428 }
429
430 async fn get_certificates_and_signed_entities(
431 connection: Arc<SqliteConnection>,
432 ) -> StdResult<Vec<(Certificate, Option<SignedEntityRecord>)>> {
433 let mut result = vec![];
434 let certificate_repository: CertificateRepository =
435 CertificateRepository::new(connection.clone());
436 let signed_entity_store = SignedEntityStore::new(connection.clone());
437
438 let certificates = certificate_repository
439 .get_latest_certificates::<Certificate>(usize::MAX)
440 .await?;
441
442 for certificate in certificates {
443 if certificate.is_genesis() {
444 result.push((certificate, None));
445 } else {
446 let record = signed_entity_store
447 .get_signed_entity_by_certificate_id(&certificate.hash)
448 .await
449 .with_context(|| format!("Certificates Hash Migrator can not get signed entity type by certificate id with hash: '{}'", certificate.hash))?;
450 result.push((certificate, record));
451 }
452 }
453
454 Ok(result)
455 }
456
457 async fn run_migration_test(
458 sqlite_connection: Arc<SqliteConnection>,
459 certificates: Vec<Certificate>,
460 ) {
461 let old_certificates =
463 fill_certificates_and_signed_entities_in_db(sqlite_connection.clone(), &certificates)
464 .await
465 .unwrap();
466
467 let expected_certificates_and_signed_entities = recompute_hashes(old_certificates)
470 .into_iter()
471 .rev()
472 .collect();
473
474 let migrator = CertificatesHashMigrator::new(
476 CertificateRepository::new(sqlite_connection.clone()),
477 Arc::new(SignedEntityStore::new(sqlite_connection.clone())),
478 TestLogger::stdout(),
479 );
480 migrator
481 .migrate()
482 .await
483 .expect("Certificates hash migration should not fail");
484
485 let migrated_certificates_and_signed_entities =
487 get_certificates_and_signed_entities(sqlite_connection)
488 .await
489 .unwrap();
490
491 let extract_human_readable_data =
492 |entries: Vec<(Certificate, Option<SignedEntityRecord>)>| {
493 entries
494 .into_iter()
495 .map(|(cert, signed_entity)| {
496 (
497 cert.hash,
498 cert.previous_hash,
499 cert.epoch,
500 signed_entity.map(|s| (s.signed_entity_type, s.certificate_id)),
501 )
502 })
503 .collect::<Vec<_>>()
504 };
505 assert_eq!(
506 extract_human_readable_data(expected_certificates_and_signed_entities),
507 extract_human_readable_data(migrated_certificates_and_signed_entities)
508 );
509 }
510
511 #[tokio::test]
512 async fn migrate_genesis_certificate() {
513 let connection = Arc::new(connection_without_foreign_key_support());
514 run_migration_test(connection, vec![dummy_genesis("old_hash", time_at(1, 1))]).await;
515 }
516
517 #[tokio::test]
518 async fn migrate_a_chain_of_one_genesis_and_one_mithril_stake_distribution() {
519 let connection = Arc::new(connection_without_foreign_key_support());
520 run_migration_test(
521 connection,
522 vec![
523 dummy_genesis("old_genesis", time_at(1, 1)),
524 dummy_certificate(
525 "old_hash_1",
526 "old_genesis",
527 time_at(1, 2),
528 Type::MithrilStakeDistribution,
529 ),
530 ],
531 )
532 .await;
533 }
534
535 #[tokio::test]
536 async fn migrate_a_chain_with_one_genesis_spanning_multiple_epochs_and_multiple_signed_entities(
537 ) {
538 let connection = Arc::new(connection_with_foreign_key_support());
539 run_migration_test(
540 connection,
541 vec![
542 dummy_genesis("old_genesis", time_at(1, 1)),
543 dummy_certificate(
544 "old_hash_1",
545 "old_genesis",
546 time_at(1, 2),
547 Type::MithrilStakeDistribution,
548 ),
549 dummy_certificate(
550 "old_hash_2",
551 "old_genesis",
552 time_at(1, 3),
553 Type::CardanoImmutableFilesFull,
554 ),
555 dummy_certificate(
556 "old_hash_3",
557 "old_hash_2",
558 time_at(2, 3),
559 Type::MithrilStakeDistribution,
560 ),
561 dummy_certificate(
562 "old_hash_4",
563 "old_hash_3",
564 time_at(2, 4),
565 Type::CardanoImmutableFilesFull,
566 ),
567 dummy_certificate(
568 "old_hash_5",
569 "old_hash_3",
570 time_at(3, 5),
571 Type::CardanoImmutableFilesFull,
572 ),
573 dummy_certificate(
574 "old_hash_6",
575 "old_hash_5",
576 time_at(4, 6),
577 Type::CardanoImmutableFilesFull,
578 ),
579 ],
580 )
581 .await;
582 }
583
584 #[tokio::test]
585 async fn migrate_a_chain_with_multiple_genesis_spanning_multiple_epochs() {
586 let connection = Arc::new(connection_with_foreign_key_support());
587 run_migration_test(
588 connection,
589 vec![
590 dummy_genesis("old_genesis", time_at(1, 1)),
591 dummy_certificate(
592 "old_hash_1",
593 "old_genesis",
594 time_at(1, 2),
595 Type::MithrilStakeDistribution,
596 ),
597 dummy_certificate(
598 "old_hash_2",
599 "old_genesis",
600 time_at(1, 3),
601 Type::CardanoImmutableFilesFull,
602 ),
603 dummy_genesis("old_genesis_2", time_at(3, 5)),
604 dummy_certificate(
605 "old_hash_3",
606 "old_genesis_2",
607 time_at(4, 6),
608 Type::MithrilStakeDistribution,
609 ),
610 dummy_certificate(
611 "old_hash_4",
612 "old_hash_3",
613 time_at(5, 7),
614 Type::CardanoImmutableFilesFull,
615 ),
616 dummy_genesis("old_genesis_3", time_at(5, 7)),
617 dummy_certificate(
618 "old_hash_5",
619 "old_genesis_3",
620 time_at(5, 8),
621 Type::MithrilStakeDistribution,
622 ),
623 ],
624 )
625 .await;
626 }
627
628 #[tokio::test]
629 async fn should_not_fail_if_some_hash_dont_change() {
630 let connection = Arc::new(connection_without_foreign_key_support());
631 let certificate = {
632 let mut cert = dummy_genesis("whatever", time_at(1, 2));
633 cert.hash = cert.compute_hash();
634 cert
635 };
636 fill_certificates_and_signed_entities_in_db(connection.clone(), &[certificate])
637 .await
638 .unwrap();
639
640 let migrator = CertificatesHashMigrator::new(
641 CertificateRepository::new(connection.clone()),
642 Arc::new(SignedEntityStore::new(connection.clone())),
643 TestLogger::stdout(),
644 );
645 migrator
646 .migrate()
647 .await
648 .expect("Migration should not fail if a hash doesn't change");
649 }
650}