mithril_aggregator/tools/
certificates_hash_migrator.rs

1use std::{collections::HashMap, sync::Arc};
2
3use anyhow::{anyhow, Context};
4use slog::{debug, info, trace, Logger};
5
6use mithril_common::logging::LoggerExtensions;
7use mithril_common::{entities::Certificate, StdResult};
8
9use crate::database::repository::{CertificateRepository, SignedEntityStorer};
10
11/// Tools to recompute all the certificates hashes in a aggregator database.
12pub struct CertificatesHashMigrator {
13    certificate_repository: CertificateRepository,
14    signed_entity_storer: Arc<dyn SignedEntityStorer>,
15    logger: Logger,
16}
17
18impl CertificatesHashMigrator {
19    /// [CertificatesHashMigrator] factory
20    pub fn new(
21        certificate_repository: CertificateRepository,
22        signed_entity_storer: Arc<dyn SignedEntityStorer>,
23        logger: Logger,
24    ) -> Self {
25        Self {
26            certificate_repository,
27            signed_entity_storer,
28            logger: logger.new_with_component_name::<Self>(),
29        }
30    }
31
32    /// Recompute all the certificates hashes the database.
33    pub async fn migrate(&self) -> StdResult<()> {
34        info!(self.logger, "Starting migration");
35        let (old_certificates, old_and_new_hashes) =
36            self.create_certificates_with_updated_hash().await?;
37
38        self.update_signed_entities_certificate_hash(old_and_new_hashes)
39            .await?;
40
41        self.cleanup(old_certificates).await?;
42
43        info!(
44            self.logger,
45            "All certificates have been migrated successfully"
46        );
47        Ok(())
48    }
49
50    /// Load all certificates from the database, compute their new hash, returns a list with
51    /// all old certificates joined with their new hash string.
52    async fn create_certificates_with_updated_hash(
53        &self,
54    ) -> StdResult<(Vec<Certificate>, HashMap<String, String>)> {
55        info!(self.logger, "Recomputing all certificates hash");
56        let old_certificates = self
57            .certificate_repository
58            // arbitrary high value to get all existing certificates
59            .get_latest_certificates::<Certificate>(usize::MAX)
60            .await?;
61        let mut certificates_to_remove = vec![];
62
63        let mut migrated_certificates = vec![];
64        let mut old_and_new_hashes: HashMap<String, String> = HashMap::new();
65
66        // 1 - Recompute all certificates hashes
67        // Note: get_latest_certificates retrieve certificates from the earliest to the older,
68        // in order to have a strong guarantee that when inserting a certificate in the db its
69        // previous_hash exist we have to work in the reverse order.
70        debug!(self.logger, "Computing new hash for all certificates");
71        for mut certificate in old_certificates.into_iter().rev() {
72            let old_previous_hash = if certificate.is_genesis() {
73                certificate.previous_hash.clone()
74            } else {
75                let old_previous_hash = certificate.previous_hash.clone();
76                old_and_new_hashes
77                    .get(&certificate.previous_hash)
78                    .ok_or(anyhow!(
79                        "Could not migrate certificate previous_hash: The hash '{}' doesn't exist in the certificate table",
80                        &certificate.previous_hash
81                    ))?.clone_into(&mut certificate.previous_hash);
82
83                old_previous_hash
84            };
85
86            if let Some(new_hash) = {
87                let computed_hash = certificate.compute_hash();
88                // return none if the hash did not change
89                (computed_hash != certificate.hash).then_some(computed_hash)
90            } {
91                old_and_new_hashes.insert(certificate.hash.clone(), new_hash.clone());
92
93                if certificate.is_genesis() {
94                    trace!(
95                        self.logger, "New hash computed for genesis certificate {:?}",
96                        certificate.signed_entity_type();
97                        "old_hash" => &certificate.hash,
98                        "new_hash" => &new_hash,
99                    );
100                } else {
101                    trace!(
102                        self.logger, "New hash computed for certificate {:?}",
103                        certificate.signed_entity_type();
104                        "old_hash" => &certificate.hash,
105                        "new_hash" => &new_hash,
106                        "old_previous_hash" => &old_previous_hash,
107                        "new_previous_hash" => &certificate.previous_hash
108                    );
109                }
110
111                certificates_to_remove.push(certificate.clone());
112                certificate.hash = new_hash;
113                migrated_certificates.push(certificate);
114            } else {
115                old_and_new_hashes.insert(certificate.hash.clone(), certificate.hash);
116            }
117        }
118
119        // 2 - Certificates migrated, we can insert them in the db
120        // (we do this by chunks in order to avoid reaching the limit of 32766 variables in a single query)
121        debug!(
122            self.logger,
123            "Inserting migrated certificates in the database"
124        );
125        let migrated_certificates_chunk_size = 250;
126        for migrated_certificates_chunk in
127            migrated_certificates.chunks(migrated_certificates_chunk_size)
128        {
129            self.certificate_repository
130            .create_many_certificates(migrated_certificates_chunk.to_owned())
131            .await
132            .with_context(|| {
133                "Certificates Hash Migrator can not insert migrated certificates in the database"
134            })?;
135        }
136
137        Ok((certificates_to_remove, old_and_new_hashes))
138    }
139
140    async fn update_signed_entities_certificate_hash(
141        &self,
142        old_and_new_certificate_hashes: HashMap<String, String>,
143    ) -> StdResult<()> {
144        info!(self.logger, "Updating signed entities certificate ids");
145        let old_hashes: Vec<&str> = old_and_new_certificate_hashes
146            .keys()
147            .map(|k| k.as_str())
148            .collect();
149
150        debug!(
151            self.logger,
152            "Updating signed entities certificate hash in the database"
153        );
154        let old_hashes_chunk_size = 250;
155        for old_hashes_chunk in old_hashes.chunks(old_hashes_chunk_size) {
156            let mut records_to_migrate = self
157                .signed_entity_storer
158                .get_signed_entities_by_certificates_ids(old_hashes_chunk)
159                .await
160                .with_context(||
161                    format!(
162                        "Certificates Hash Migrator can not get signed entities by certificates ids with hashes: '{:?}'", old_hashes
163                    )
164                )?;
165
166            for signed_entity_record in records_to_migrate.iter_mut() {
167                let new_certificate_hash =
168                    old_and_new_certificate_hashes
169                        .get(&signed_entity_record.certificate_id)
170                        .ok_or( anyhow!(
171                            "Migration Error: no migrated hash found for signed entity with certificate id: {}",
172                            signed_entity_record.certificate_id
173                        ))?
174                        .to_owned();
175
176                trace!(
177                    self.logger, "Migrating signed entity {} certificate hash computed for certificate",
178                    signed_entity_record.signed_entity_id;
179                    "old_certificate_hash" => &signed_entity_record.certificate_id,
180                    "new_certificate_hash" => &new_certificate_hash
181                );
182                signed_entity_record.certificate_id = new_certificate_hash;
183            }
184
185            self.signed_entity_storer
186                .update_signed_entities(records_to_migrate)
187                .await
188                .with_context(|| "Certificates Hash Migrator can not update signed entities")?;
189        }
190
191        Ok(())
192    }
193
194    async fn cleanup(&self, old_certificates: Vec<Certificate>) -> StdResult<()> {
195        info!(self.logger, "Deleting old certificates in the database");
196        let old_certificates_chunk_size = 250;
197        for old_certificates_chunk in old_certificates
198            .into_iter()
199            .rev()
200            .collect::<Vec<_>>()
201            .chunks(old_certificates_chunk_size)
202        {
203            self.certificate_repository
204                .delete_certificates(&old_certificates_chunk.iter().collect::<Vec<_>>())
205                .await
206                .with_context(|| {
207                    "Certificates Hash Migrator can not delete old certificates in the database"
208                })?;
209        }
210
211        Ok(())
212    }
213}
214
215#[cfg(test)]
216mod test {
217    use mithril_common::entities::{
218        Epoch, ImmutableFileNumber, SignedEntityConfig, SignedEntityType,
219        SignedEntityTypeDiscriminants as Type, TimePoint,
220    };
221    use mithril_persistence::sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection};
222
223    use crate::database::record::{CertificateRecord, SignedEntityRecord};
224    use crate::database::repository::SignedEntityStore;
225    use crate::test_tools::TestLogger;
226
227    use super::*;
228
229    fn connection_with_foreign_key_support() -> SqliteConnection {
230        ConnectionBuilder::open_memory()
231            .with_migrations(crate::database::migration::get_migrations())
232            .with_options(&[ConnectionOptions::EnableForeignKeys])
233            .build()
234            .unwrap()
235    }
236
237    fn connection_without_foreign_key_support() -> SqliteConnection {
238        ConnectionBuilder::open_memory()
239            .with_migrations(crate::database::migration::get_migrations())
240            .with_options(&[ConnectionOptions::ForceDisableForeignKeys])
241            .build()
242            .unwrap()
243    }
244
245    /// Note: If we want to create CardanoTransaction test certificate then another method
246    /// that take a ChainPoint as parameter should be created.
247    fn time_at(epoch: u64, immutable_file_number: ImmutableFileNumber) -> TimePoint {
248        TimePoint {
249            epoch: Epoch(epoch),
250            immutable_file_number,
251            ..TimePoint::dummy()
252        }
253    }
254
255    fn dummy_genesis(certificate_hash: &str, time_point: TimePoint) -> Certificate {
256        let certificate = CertificateRecord::dummy_genesis(certificate_hash, time_point.epoch);
257
258        certificate.into()
259    }
260
261    fn dummy_certificate(
262        certificate_hash: &str,
263        previous_hash: &str,
264        time_point: TimePoint,
265        signed_entity_type: Type,
266    ) -> Certificate {
267        let certificate = CertificateRecord::dummy(
268            certificate_hash,
269            previous_hash,
270            time_point.epoch,
271            SignedEntityConfig::dummy()
272                .time_point_to_signed_entity(signed_entity_type, &time_point)
273                .unwrap(),
274        );
275
276        certificate.into()
277    }
278
279    fn signed_entity_for_certificate(certificate: &Certificate) -> Option<SignedEntityRecord> {
280        match certificate.is_genesis() {
281            true => None,
282            false => {
283                let signed_entity_type = certificate.signed_entity_type();
284                // Note: we don't need to have real artifacts for those tests
285                let artifact = format!("{signed_entity_type:?}");
286                let id = match &signed_entity_type {
287                    SignedEntityType::MithrilStakeDistribution(epoch) => {
288                        format!("mithril-stake-distribution-{epoch}")
289                    }
290                    SignedEntityType::CardanoStakeDistribution(epoch) => {
291                        format!("cardano-stake-distribution-{epoch}")
292                    }
293                    SignedEntityType::CardanoImmutableFilesFull(beacon) => {
294                        format!("snapshot-{}-{}", beacon.epoch, beacon.immutable_file_number)
295                    }
296                    SignedEntityType::CardanoTransactions(epoch, block_number) => {
297                        format!("cardano-transactions-{epoch}-{block_number}",)
298                    }
299                    SignedEntityType::CardanoDatabase(beacon) => {
300                        format!(
301                            "cardano-database-{}-{}",
302                            beacon.epoch, beacon.immutable_file_number
303                        )
304                    }
305                };
306
307                let signed_entity_record = SignedEntityRecord {
308                    signed_entity_id: format!("signed-entity-{id}",),
309                    certificate_id: certificate.hash.clone(),
310                    signed_entity_type,
311                    artifact,
312                    created_at: Default::default(),
313                };
314
315                Some(signed_entity_record)
316            }
317        }
318    }
319
320    async fn fill_certificates_and_signed_entities_in_db(
321        connection: Arc<SqliteConnection>,
322        certificates: &[Certificate],
323    ) -> StdResult<Vec<(Certificate, Option<SignedEntityRecord>)>> {
324        let certificate_repository: CertificateRepository =
325            CertificateRepository::new(connection.clone());
326        let signed_entity_store = SignedEntityStore::new(connection.clone());
327        let mut result = vec![];
328
329        for certificate in certificates.iter().cloned() {
330            certificate_repository
331                .create_certificate(certificate.clone())
332                .await
333                .with_context(|| {
334                    format!(
335                        "Certificates Hash Migrator can not create certificate with hash: '{}'",
336                        certificate.hash
337                    )
338                })?;
339
340            let signed_entity_maybe = signed_entity_for_certificate(&certificate);
341            if let Some(record) = &signed_entity_maybe {
342                signed_entity_store
343                    .store_signed_entity(record)
344                    .await
345                    .with_context(|| "Certificates Hash Migrator can not store signed entity")?;
346            }
347
348            result.push((certificate.clone(), signed_entity_maybe));
349        }
350
351        Ok(result)
352    }
353
354    fn recompute_hashes(
355        certificates_and_signed_entity: Vec<(Certificate, Option<SignedEntityRecord>)>,
356    ) -> Vec<(Certificate, Option<SignedEntityRecord>)> {
357        let mut old_and_new_hashes: HashMap<String, String> = HashMap::new();
358        let mut result = vec![];
359
360        for (mut certificate, signed_entity_maybe) in certificates_and_signed_entity {
361            if let Some(hash) = old_and_new_hashes.get(&certificate.previous_hash) {
362                certificate.previous_hash.clone_from(hash);
363            }
364
365            let new_hash = certificate.compute_hash();
366            old_and_new_hashes.insert(certificate.hash.clone(), new_hash.clone());
367            certificate.hash = new_hash;
368
369            let signed_entity_maybe = match signed_entity_maybe {
370                None => None,
371                Some(mut signed_entity) => {
372                    signed_entity.certificate_id.clone_from(&certificate.hash);
373                    Some(signed_entity)
374                }
375            };
376
377            result.push((certificate, signed_entity_maybe));
378        }
379
380        result
381    }
382
383    #[test]
384    fn ensure_test_framework_recompute_correct_hashes() {
385        let old_certificates: Vec<(Certificate, Option<SignedEntityRecord>)> = vec![
386            dummy_genesis("genesis", time_at(1, 1)),
387            dummy_certificate(
388                "cert1",
389                "genesis",
390                time_at(1, 2),
391                Type::MithrilStakeDistribution,
392            ),
393            dummy_certificate(
394                "cert2",
395                "cert1",
396                time_at(2, 3),
397                Type::MithrilStakeDistribution,
398            ),
399        ]
400        .into_iter()
401        .map(|cert| (cert.clone(), signed_entity_for_certificate(&cert)))
402        .collect();
403
404        let expected: Vec<(Certificate, Option<SignedEntityRecord>)> = vec![
405            dummy_genesis(
406                "328b1ac75ef18fe09ff542ea1997ee512cd62c886a260463034e551255ad39e0",
407                time_at(1, 1),
408            ),
409            dummy_certificate(
410                "007286af724bb132dab1f13f9cda8a86d0cd82173f0b4a91124cc7bff63b1562",
411                "328b1ac75ef18fe09ff542ea1997ee512cd62c886a260463034e551255ad39e0",
412                time_at(1, 2),
413                Type::MithrilStakeDistribution,
414            ),
415            dummy_certificate(
416                "98fb51c4588293acec548c4d35e499fe77e6eb2eb75c67d64a1026a6f88bad7b",
417                "007286af724bb132dab1f13f9cda8a86d0cd82173f0b4a91124cc7bff63b1562",
418                time_at(2, 3),
419                Type::MithrilStakeDistribution,
420            ),
421        ]
422        .into_iter()
423        .map(|cert| (cert.clone(), signed_entity_for_certificate(&cert)))
424        .collect();
425        let recomputed = recompute_hashes(old_certificates);
426
427        assert_eq!(expected, recomputed);
428    }
429
430    async fn get_certificates_and_signed_entities(
431        connection: Arc<SqliteConnection>,
432    ) -> StdResult<Vec<(Certificate, Option<SignedEntityRecord>)>> {
433        let mut result = vec![];
434        let certificate_repository: CertificateRepository =
435            CertificateRepository::new(connection.clone());
436        let signed_entity_store = SignedEntityStore::new(connection.clone());
437
438        let certificates = certificate_repository
439            .get_latest_certificates::<Certificate>(usize::MAX)
440            .await?;
441
442        for certificate in certificates {
443            if certificate.is_genesis() {
444                result.push((certificate, None));
445            } else {
446                let record = signed_entity_store
447                    .get_signed_entity_by_certificate_id(&certificate.hash)
448                    .await
449                    .with_context(|| format!("Certificates Hash Migrator can not get signed entity type by certificate id with hash: '{}'", certificate.hash))?;
450                result.push((certificate, record));
451            }
452        }
453
454        Ok(result)
455    }
456
457    async fn run_migration_test(
458        sqlite_connection: Arc<SqliteConnection>,
459        certificates: Vec<Certificate>,
460    ) {
461        // Arrange
462        let old_certificates =
463            fill_certificates_and_signed_entities_in_db(sqlite_connection.clone(), &certificates)
464                .await
465                .unwrap();
466
467        // Note: data retrieved from the database will be in the earliest to the oldest order, the
468        // reverse of our insert order.
469        let expected_certificates_and_signed_entities = recompute_hashes(old_certificates)
470            .into_iter()
471            .rev()
472            .collect();
473
474        // Act
475        let migrator = CertificatesHashMigrator::new(
476            CertificateRepository::new(sqlite_connection.clone()),
477            Arc::new(SignedEntityStore::new(sqlite_connection.clone())),
478            TestLogger::stdout(),
479        );
480        migrator
481            .migrate()
482            .await
483            .expect("Certificates hash migration should not fail");
484
485        // Assert
486        let migrated_certificates_and_signed_entities =
487            get_certificates_and_signed_entities(sqlite_connection)
488                .await
489                .unwrap();
490
491        let extract_human_readable_data =
492            |entries: Vec<(Certificate, Option<SignedEntityRecord>)>| {
493                entries
494                    .into_iter()
495                    .map(|(cert, signed_entity)| {
496                        (
497                            cert.hash,
498                            cert.previous_hash,
499                            cert.epoch,
500                            signed_entity.map(|s| (s.signed_entity_type, s.certificate_id)),
501                        )
502                    })
503                    .collect::<Vec<_>>()
504            };
505        assert_eq!(
506            extract_human_readable_data(expected_certificates_and_signed_entities),
507            extract_human_readable_data(migrated_certificates_and_signed_entities)
508        );
509    }
510
511    #[tokio::test]
512    async fn migrate_genesis_certificate() {
513        let connection = Arc::new(connection_without_foreign_key_support());
514        run_migration_test(connection, vec![dummy_genesis("old_hash", time_at(1, 1))]).await;
515    }
516
517    #[tokio::test]
518    async fn migrate_a_chain_of_one_genesis_and_one_mithril_stake_distribution() {
519        let connection = Arc::new(connection_without_foreign_key_support());
520        run_migration_test(
521            connection,
522            vec![
523                dummy_genesis("old_genesis", time_at(1, 1)),
524                dummy_certificate(
525                    "old_hash_1",
526                    "old_genesis",
527                    time_at(1, 2),
528                    Type::MithrilStakeDistribution,
529                ),
530            ],
531        )
532        .await;
533    }
534
535    #[tokio::test]
536    async fn migrate_a_chain_with_one_genesis_spanning_multiple_epochs_and_multiple_signed_entities(
537    ) {
538        let connection = Arc::new(connection_with_foreign_key_support());
539        run_migration_test(
540            connection,
541            vec![
542                dummy_genesis("old_genesis", time_at(1, 1)),
543                dummy_certificate(
544                    "old_hash_1",
545                    "old_genesis",
546                    time_at(1, 2),
547                    Type::MithrilStakeDistribution,
548                ),
549                dummy_certificate(
550                    "old_hash_2",
551                    "old_genesis",
552                    time_at(1, 3),
553                    Type::CardanoImmutableFilesFull,
554                ),
555                dummy_certificate(
556                    "old_hash_3",
557                    "old_hash_2",
558                    time_at(2, 3),
559                    Type::MithrilStakeDistribution,
560                ),
561                dummy_certificate(
562                    "old_hash_4",
563                    "old_hash_3",
564                    time_at(2, 4),
565                    Type::CardanoImmutableFilesFull,
566                ),
567                dummy_certificate(
568                    "old_hash_5",
569                    "old_hash_3",
570                    time_at(3, 5),
571                    Type::CardanoImmutableFilesFull,
572                ),
573                dummy_certificate(
574                    "old_hash_6",
575                    "old_hash_5",
576                    time_at(4, 6),
577                    Type::CardanoImmutableFilesFull,
578                ),
579            ],
580        )
581        .await;
582    }
583
584    #[tokio::test]
585    async fn migrate_a_chain_with_multiple_genesis_spanning_multiple_epochs() {
586        let connection = Arc::new(connection_with_foreign_key_support());
587        run_migration_test(
588            connection,
589            vec![
590                dummy_genesis("old_genesis", time_at(1, 1)),
591                dummy_certificate(
592                    "old_hash_1",
593                    "old_genesis",
594                    time_at(1, 2),
595                    Type::MithrilStakeDistribution,
596                ),
597                dummy_certificate(
598                    "old_hash_2",
599                    "old_genesis",
600                    time_at(1, 3),
601                    Type::CardanoImmutableFilesFull,
602                ),
603                dummy_genesis("old_genesis_2", time_at(3, 5)),
604                dummy_certificate(
605                    "old_hash_3",
606                    "old_genesis_2",
607                    time_at(4, 6),
608                    Type::MithrilStakeDistribution,
609                ),
610                dummy_certificate(
611                    "old_hash_4",
612                    "old_hash_3",
613                    time_at(5, 7),
614                    Type::CardanoImmutableFilesFull,
615                ),
616                dummy_genesis("old_genesis_3", time_at(5, 7)),
617                dummy_certificate(
618                    "old_hash_5",
619                    "old_genesis_3",
620                    time_at(5, 8),
621                    Type::MithrilStakeDistribution,
622                ),
623            ],
624        )
625        .await;
626    }
627
628    #[tokio::test]
629    async fn should_not_fail_if_some_hash_dont_change() {
630        let connection = Arc::new(connection_without_foreign_key_support());
631        let certificate = {
632            let mut cert = dummy_genesis("whatever", time_at(1, 2));
633            cert.hash = cert.compute_hash();
634            cert
635        };
636        fill_certificates_and_signed_entities_in_db(connection.clone(), &[certificate])
637            .await
638            .unwrap();
639
640        let migrator = CertificatesHashMigrator::new(
641            CertificateRepository::new(connection.clone()),
642            Arc::new(SignedEntityStore::new(connection.clone())),
643            TestLogger::stdout(),
644        );
645        migrator
646            .migrate()
647            .await
648            .expect("Migration should not fail if a hash doesn't change");
649    }
650}