matrix_sdk_base/event_cache/store/
mod.rs1use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
23
24#[cfg(any(test, feature = "testing"))]
25#[macro_use]
26pub mod integration_tests;
27pub mod media;
28mod memory_store;
29mod traits;
30
31use matrix_sdk_common::store_locks::{
32 BackingStore, CrossProcessStoreLock, CrossProcessStoreLockGuard, LockStoreError,
33};
34pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
35use ruma::{
36 events::{relation::RelationType, AnySyncTimelineEvent},
37 serde::Raw,
38 OwnedEventId,
39};
40use tracing::trace;
41
42#[cfg(any(test, feature = "testing"))]
43pub use self::integration_tests::EventCacheStoreIntegrationTests;
44pub use self::{
45 memory_store::MemoryStore,
46 traits::{DynEventCacheStore, EventCacheStore, IntoEventCacheStore, DEFAULT_CHUNK_CAPACITY},
47};
48
49#[derive(Clone)]
51pub struct EventCacheStoreLock {
52 cross_process_lock: Arc<CrossProcessStoreLock<LockableEventCacheStore>>,
54
55 store: Arc<DynEventCacheStore>,
59}
60
61#[cfg(not(tarpaulin_include))]
62impl fmt::Debug for EventCacheStoreLock {
63 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
64 formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
65 }
66}
67
68impl EventCacheStoreLock {
69 pub fn new<S>(store: S, holder: String) -> Self
74 where
75 S: IntoEventCacheStore,
76 {
77 let store = store.into_event_cache_store();
78
79 Self {
80 cross_process_lock: Arc::new(CrossProcessStoreLock::new(
81 LockableEventCacheStore(store.clone()),
82 "default".to_owned(),
83 holder,
84 )),
85 store,
86 }
87 }
88
89 pub async fn lock(&self) -> Result<EventCacheStoreLockGuard<'_>, LockStoreError> {
91 let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?;
92
93 Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
94 }
95}
96
97pub struct EventCacheStoreLockGuard<'a> {
101 #[allow(unused)]
103 cross_process_lock_guard: CrossProcessStoreLockGuard,
104
105 store: &'a DynEventCacheStore,
107}
108
109#[cfg(not(tarpaulin_include))]
110impl fmt::Debug for EventCacheStoreLockGuard<'_> {
111 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
112 formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
113 }
114}
115
116impl Deref for EventCacheStoreLockGuard<'_> {
117 type Target = DynEventCacheStore;
118
119 fn deref(&self) -> &Self::Target {
120 self.store
121 }
122}
123
124#[derive(Debug, thiserror::Error)]
126pub enum EventCacheStoreError {
127 #[error(transparent)]
129 Backend(Box<dyn std::error::Error + Send + Sync>),
130
131 #[error("The event cache store failed to be unlocked")]
134 Locked,
135
136 #[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
138 Unencrypted,
139
140 #[error("Error encrypting or decrypting data from the event cache store: {0}")]
142 Encryption(#[from] StoreEncryptionError),
143
144 #[error("Error encoding or decoding data from the event cache store: {0}")]
146 Codec(#[from] Utf8Error),
147
148 #[error("Error serializing or deserializing data from the event cache store: {0}")]
150 Serialization(#[from] serde_json::Error),
151
152 #[error(
154 "The database format of the event cache store changed in an incompatible way, \
155 current version: {0}, latest version: {1}"
156 )]
157 UnsupportedDatabaseVersion(usize, usize),
158
159 #[error("The store contains invalid data: {details}")]
161 InvalidData {
162 details: String,
164 },
165}
166
167impl EventCacheStoreError {
168 #[inline]
172 pub fn backend<E>(error: E) -> Self
173 where
174 E: std::error::Error + Send + Sync + 'static,
175 {
176 Self::Backend(Box::new(error))
177 }
178}
179
180pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
182
183#[derive(Clone, Debug)]
186struct LockableEventCacheStore(Arc<DynEventCacheStore>);
187
188impl BackingStore for LockableEventCacheStore {
189 type LockError = EventCacheStoreError;
190
191 async fn try_lock(
192 &self,
193 lease_duration_ms: u32,
194 key: &str,
195 holder: &str,
196 ) -> std::result::Result<bool, Self::LockError> {
197 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
198 }
199}
200
201pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
207 #[derive(serde::Deserialize)]
208 struct RelatesTo {
209 event_id: OwnedEventId,
210 rel_type: String,
211 }
212
213 #[derive(serde::Deserialize)]
214 struct EventContent {
215 #[serde(rename = "m.relates_to")]
216 rel: Option<RelatesTo>,
217 }
218
219 match event.get_field::<EventContent>("content") {
220 Ok(event_content) => {
221 event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
222 }
223 Err(err) => {
224 trace!("when extracting relation data from an event: {err}");
225 None
226 }
227 }
228}
229
230pub fn compute_filters_string(filters: Option<&[RelationType]>) -> Option<Vec<String>> {
235 filters.map(|filter| {
236 filter
237 .iter()
238 .map(|f| {
239 if *f == RelationType::Replacement {
240 "m.replace".to_owned()
241 } else {
242 f.to_string()
243 }
244 })
245 .collect()
246 })
247}