mithril_aggregator/database/query/open_message/
insert_open_message.rs1use chrono::Utc;
2
3use mithril_common::StdResult;
4use mithril_common::entities::{Epoch, ProtocolMessage, SignedEntityType};
5use mithril_persistence::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};
6
7use crate::database::query::open_message::conditions;
8use crate::database::record::OpenMessageRecord;
9
10pub struct InsertOpenMessageQuery {
12 condition: WhereCondition,
13}
14
15impl InsertOpenMessageQuery {
16 pub fn one(
17 epoch: Epoch,
18 signed_entity_type: &SignedEntityType,
19 protocol_message: &ProtocolMessage,
20 ) -> StdResult<Self> {
21 let now = Utc::now();
22 let record = OpenMessageRecord {
23 open_message_id: OpenMessageRecord::new_id(),
24 epoch,
25 signed_entity_type: signed_entity_type.clone(),
26 protocol_message: protocol_message.clone(),
27 is_certified: false,
28 is_expired: false,
29 created_at: now,
30 expires_at: signed_entity_type.get_open_message_timeout().map(|t| now + t),
31 };
32
33 Ok(Self {
34 condition: conditions::insert_one(record)?,
35 })
36 }
37}
38
39impl Query for InsertOpenMessageQuery {
40 type Entity = OpenMessageRecord;
41
42 fn filters(&self) -> WhereCondition {
43 self.condition.clone()
44 }
45
46 fn get_definition(&self, condition: &str) -> String {
47 let aliases = SourceAlias::new(&[("{:open_message:}", "open_message")]);
48 let projection = Self::Entity::get_projection().expand(aliases);
49
50 format!("insert into open_message {condition} returning {projection}")
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use mithril_common::entities::ProtocolMessagePartKey;
57 use mithril_persistence::sqlite::ConnectionExtensions;
58
59 use crate::database::query::GetOpenMessageQuery;
60 use crate::database::test_helper::main_db_connection;
61
62 use super::*;
63
64 #[test]
65 fn test_insert_one() {
66 let connection = main_db_connection().unwrap();
67 let epoch = Epoch(5);
68 let signed_entity_type = SignedEntityType::CardanoStakeDistribution(Epoch(10));
69 let mut protocol_message = ProtocolMessage::new();
70 protocol_message.set_message_part(
71 ProtocolMessagePartKey::CardanoStakeDistributionEpoch,
72 "value".to_string(),
73 );
74
75 connection
76 .fetch_first(
77 InsertOpenMessageQuery::one(epoch, &signed_entity_type, &protocol_message).unwrap(),
78 )
79 .unwrap();
80 let records: Vec<OpenMessageRecord> =
81 connection.fetch_collect(GetOpenMessageQuery::all()).unwrap();
82
83 assert_eq!(1, records.len());
84 assert_eq!(
85 OpenMessageRecord {
86 open_message_id: records[0].open_message_id,
87 epoch,
88 signed_entity_type,
89 protocol_message,
90 is_certified: false,
91 is_expired: false,
92 created_at: records[0].created_at,
93 expires_at: records[0].expires_at,
94 },
95 records[0]
96 );
97 }
98
99 #[should_panic]
100 #[test]
101 fn test_insert_two_entries_with_same_signed_entity_violate_unique_constraint() {
102 let connection = main_db_connection().unwrap();
103 let epoch = Epoch(5);
104 let signed_entity_type = SignedEntityType::MithrilStakeDistribution(Epoch(10));
105
106 connection
107 .fetch_first(
108 InsertOpenMessageQuery::one(epoch, &signed_entity_type, &ProtocolMessage::new())
109 .unwrap(),
110 )
111 .unwrap();
112
113 let _ = connection.fetch_first(
114 InsertOpenMessageQuery::one(epoch + 10, &signed_entity_type, &ProtocolMessage::new())
115 .unwrap(),
116 );
117 }
118}