matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15        DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16        RoomLoadSettings, SentRequestKey,
17    },
18    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19    StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    canonical_json::{redact, RedactedBecause},
24    events::{
25        presence::PresenceEvent,
26        receipt::{Receipt, ReceiptThread, ReceiptType},
27        room::{
28            create::RoomCreateEventContent,
29            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30        },
31        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33    },
34    serde::Raw,
35    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt,
48    },
49    OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53    // Tables
54    pub const KV_BLOB: &str = "kv_blob";
55    pub const ROOM_INFO: &str = "room_info";
56    pub const STATE_EVENT: &str = "state_event";
57    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59    pub const MEMBER: &str = "member";
60    pub const PROFILE: &str = "profile";
61    pub const RECEIPT: &str = "receipt";
62    pub const DISPLAY_NAME: &str = "display_name";
63    pub const SEND_QUEUE: &str = "send_queue_events";
64    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67/// The filename used for the SQLITE database file used by the state store.
68pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
69
70/// Identifier of the latest database version.
71///
72/// This is used to figure whether the SQLite database requires a migration.
73/// Every new SQL migration should imply a bump of this number, and changes in
74/// the [`SqliteStateStore::run_migrations`] function.
75const DATABASE_VERSION: u8 = 12;
76
77/// An SQLite-based state store.
78#[derive(Clone)]
79pub struct SqliteStateStore {
80    store_cipher: Option<Arc<StoreCipher>>,
81    pool: SqlitePool,
82}
83
84#[cfg(not(tarpaulin_include))]
85impl fmt::Debug for SqliteStateStore {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
88    }
89}
90
91impl SqliteStateStore {
92    /// Open the SQLite-based state store at the given path using the given
93    /// passphrase to encrypt private data.
94    pub async fn open(
95        path: impl AsRef<Path>,
96        passphrase: Option<&str>,
97    ) -> Result<Self, OpenStoreError> {
98        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
99    }
100
101    /// Open the SQLite-based state store with the config open config.
102    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
103        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
104
105        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
106
107        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
108        config.pool = Some(pool_config);
109
110        let pool = config.create_pool(Runtime::Tokio1)?;
111
112        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
113        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
114
115        Ok(this)
116    }
117
118    /// Create an SQLite-based state store using the given SQLite database pool.
119    /// The given passphrase will be used to encrypt private data.
120    async fn open_with_pool(
121        pool: SqlitePool,
122        passphrase: Option<&str>,
123    ) -> Result<Self, OpenStoreError> {
124        let conn = pool.get().await?;
125
126        let mut version = conn.db_version().await?;
127
128        if version == 0 {
129            init(&conn).await?;
130            version = 1;
131        }
132
133        let store_cipher = match passphrase {
134            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
135            None => None,
136        };
137        let this = Self { store_cipher, pool };
138        this.run_migrations(&conn, version, None).await?;
139
140        Ok(this)
141    }
142
143    /// Run database migrations from the given `from` version to the given `to`
144    /// version
145    ///
146    /// If `to` is `None`, the current database version will be used.
147    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
148        let to = to.unwrap_or(DATABASE_VERSION);
149
150        if from < to {
151            debug!(version = from, new_version = to, "Upgrading database");
152        } else {
153            return Ok(());
154        }
155
156        if from < 2 && to >= 2 {
157            let this = self.clone();
158            conn.with_transaction(move |txn| {
159                // Create new table.
160                txn.execute_batch(include_str!(
161                    "../migrations/state_store/002_a_create_new_room_info.sql"
162                ))?;
163
164                // Migrate data to new table.
165                for data in txn
166                    .prepare("SELECT data FROM room_info")?
167                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
168                {
169                    let data = data?;
170                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
171
172                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
173                    let state = this
174                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
175                    txn.prepare_cached(
176                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
177                         VALUES (?, ?, ?)",
178                    )?
179                    .execute((room_id, state, data))?;
180                }
181
182                // Replace old table.
183                txn.execute_batch(include_str!(
184                    "../migrations/state_store/002_b_replace_room_info.sql"
185                ))?;
186
187                txn.set_db_version(2)?;
188                Result::<_, Error>::Ok(())
189            })
190            .await?;
191        }
192
193        // Migration to v3: RoomInfo format has changed.
194        if from < 3 && to >= 3 {
195            let this = self.clone();
196            conn.with_transaction(move |txn| {
197                // Migrate data .
198                for data in txn
199                    .prepare("SELECT data FROM room_info")?
200                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
201                {
202                    let data = data?;
203                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
204
205                    // Get the `m.room.create` event from the room state.
206                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
207                    let event_type =
208                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
209                    let create_res = txn
210                        .prepare(
211                            "SELECT stripped, data FROM state_event
212                             WHERE room_id = ? AND event_type = ?",
213                        )?
214                        .query_row([room_id, event_type], |row| {
215                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
216                        })
217                        .optional()?;
218
219                    let create = create_res.and_then(|(stripped, data)| {
220                        let create = if stripped {
221                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
222                                this.deserialize_json(&data).ok()?,
223                            )
224                        } else {
225                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
226                        };
227                        Some(create)
228                    });
229
230                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
231
232                    let data = this.serialize_json(&migrated_room_info)?;
233                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
234                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
235                        .execute((data, room_id))?;
236                }
237
238                txn.set_db_version(3)?;
239                Result::<_, Error>::Ok(())
240            })
241            .await?;
242        }
243
244        if from < 4 && to >= 4 {
245            conn.with_transaction(move |txn| {
246                // Create new table.
247                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
248                txn.set_db_version(4)
249            })
250            .await?;
251        }
252
253        if from < 5 && to >= 5 {
254            conn.with_transaction(move |txn| {
255                // Create new table.
256                txn.execute_batch(include_str!(
257                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
258                ))?;
259                txn.set_db_version(4)
260            })
261            .await?;
262        }
263
264        if from < 6 && to >= 6 {
265            conn.with_transaction(move |txn| {
266                // Create new table.
267                txn.execute_batch(include_str!(
268                    "../migrations/state_store/005_send_queue_dependent_events.sql"
269                ))?;
270                txn.set_db_version(6)
271            })
272            .await?;
273        }
274
275        if from < 7 && to >= 7 {
276            conn.with_transaction(move |txn| {
277                // Drop media table.
278                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
279                txn.set_db_version(7)
280            })
281            .await?;
282        }
283
284        if from < 8 && to >= 8 {
285            // Replace all existing wedged events with a generic error.
286            let error = QueueWedgeError::GenericApiError {
287                msg: "local echo failed to send in a previous session".into(),
288            };
289            let default_err = self.serialize_value(&error)?;
290
291            conn.with_transaction(move |txn| {
292                // Update send queue table to persist the wedge reason if any.
293                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
294
295                // Migrate the data, add a generic error for currently wedged events
296
297                for wedged_entries in txn
298                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
299                    .query_map((), |row| {
300                        Ok(
301                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
302                        )
303                    })? {
304
305                    let (room_id, transaction_id) = wedged_entries?;
306
307                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
308                        .execute((default_err.clone(), room_id, transaction_id))?;
309                }
310
311
312                // Clean up the table now that data is migrated
313                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
314
315                txn.set_db_version(8)
316            })
317                .await?;
318        }
319
320        if from < 9 && to >= 9 {
321            conn.with_transaction(move |txn| {
322                // Run the migration.
323                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
324                txn.set_db_version(9)
325            })
326            .await?;
327        }
328
329        if from < 10 && to >= 10 {
330            conn.with_transaction(move |txn| {
331                // Run the migration.
332                txn.execute_batch(include_str!(
333                    "../migrations/state_store/009_send_queue_priority.sql"
334                ))?;
335                txn.set_db_version(10)
336            })
337            .await?;
338        }
339
340        if from < 11 && to >= 11 {
341            conn.with_transaction(move |txn| {
342                // Run the migration.
343                txn.execute_batch(include_str!(
344                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
345                ))?;
346                txn.set_db_version(11)
347            })
348            .await?;
349        }
350
351        if from < 12 && to >= 12 {
352            // Defragment the DB and optimize its size on the filesystem.
353            // This should have been run in the migration for version 7, to reduce the size
354            // of the DB as we removed the media cache.
355            conn.vacuum().await?;
356            conn.set_kv("version", vec![12]).await?;
357        }
358
359        Ok(())
360    }
361
362    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
363        let key_s = match key {
364            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
365            StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
366            StateStoreDataKey::Filter(f) => {
367                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
368            }
369            StateStoreDataKey::UserAvatarUrl(u) => {
370                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
371            }
372            StateStoreDataKey::RecentlyVisitedRooms(b) => {
373                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
374            }
375            StateStoreDataKey::UtdHookManagerData => {
376                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
377            }
378            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
379                if let Some(thread_root) = thread_root {
380                    Cow::Owned(format!(
381                        "{}:{room_id}:{thread_root}",
382                        StateStoreDataKey::COMPOSER_DRAFT
383                    ))
384                } else {
385                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
386                }
387            }
388            StateStoreDataKey::SeenKnockRequests(room_id) => {
389                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
390            }
391        };
392
393        self.encode_key(keys::KV_BLOB, &*key_s)
394    }
395
396    fn encode_presence_key(&self, user_id: &UserId) -> Key {
397        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
398    }
399
400    fn encode_custom_key(&self, key: &[u8]) -> Key {
401        let mut full_key = b"custom:".to_vec();
402        full_key.extend(key);
403        self.encode_key(keys::KV_BLOB, full_key)
404    }
405
406    async fn acquire(&self) -> Result<SqliteAsyncConn> {
407        Ok(self.pool.get().await?)
408    }
409
410    fn remove_maybe_stripped_room_data(
411        &self,
412        txn: &Transaction<'_>,
413        room_id: &RoomId,
414        stripped: bool,
415    ) -> rusqlite::Result<()> {
416        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
417        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
418
419        let member_room_id = self.encode_key(keys::MEMBER, room_id);
420        txn.remove_room_members(&member_room_id, Some(stripped))
421    }
422}
423
424impl EncryptableStore for SqliteStateStore {
425    fn get_cypher(&self) -> Option<&StoreCipher> {
426        self.store_cipher.as_deref()
427    }
428}
429
430/// Initialize the database.
431async fn init(conn: &SqliteAsyncConn) -> Result<()> {
432    // First turn on WAL mode, this can't be done in the transaction, it fails with
433    // the error message: "cannot change into wal mode from within a transaction".
434    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
435    conn.with_transaction(|txn| {
436        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
437        txn.set_db_version(1)?;
438
439        Ok(())
440    })
441    .await
442}
443
444trait SqliteConnectionStateStoreExt {
445    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
446
447    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
448
449    fn set_room_account_data(
450        &self,
451        room_id: &[u8],
452        event_type: &[u8],
453        data: &[u8],
454    ) -> rusqlite::Result<()>;
455    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
456
457    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
458    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
459    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
460
461    fn set_state_event(
462        &self,
463        room_id: &[u8],
464        event_type: &[u8],
465        state_key: &[u8],
466        stripped: bool,
467        event_id: Option<&[u8]>,
468        data: &[u8],
469    ) -> rusqlite::Result<()>;
470    fn get_state_event_by_id(
471        &self,
472        room_id: &[u8],
473        event_id: &[u8],
474    ) -> rusqlite::Result<Option<Vec<u8>>>;
475    fn remove_room_state_events(
476        &self,
477        room_id: &[u8],
478        stripped: Option<bool>,
479    ) -> rusqlite::Result<()>;
480
481    fn set_member(
482        &self,
483        room_id: &[u8],
484        user_id: &[u8],
485        membership: &[u8],
486        stripped: bool,
487        data: &[u8],
488    ) -> rusqlite::Result<()>;
489    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
490
491    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
492    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
493    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
494
495    fn set_receipt(
496        &self,
497        room_id: &[u8],
498        user_id: &[u8],
499        receipt_type: &[u8],
500        thread_id: &[u8],
501        event_id: &[u8],
502        data: &[u8],
503    ) -> rusqlite::Result<()>;
504    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
505
506    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
507    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
508    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
509    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
510    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
511}
512
513impl SqliteConnectionStateStoreExt for rusqlite::Connection {
514    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
515        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
516        Ok(())
517    }
518
519    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
520        self.prepare_cached(
521            "INSERT OR REPLACE INTO global_account_data (event_type, data)
522             VALUES (?, ?)",
523        )?
524        .execute((event_type, data))?;
525        Ok(())
526    }
527
528    fn set_room_account_data(
529        &self,
530        room_id: &[u8],
531        event_type: &[u8],
532        data: &[u8],
533    ) -> rusqlite::Result<()> {
534        self.prepare_cached(
535            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
536             VALUES (?, ?, ?)",
537        )?
538        .execute((room_id, event_type, data))?;
539        Ok(())
540    }
541
542    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
543        self.prepare(
544            "DELETE FROM room_account_data
545             WHERE room_id = ?",
546        )?
547        .execute((room_id,))?;
548        Ok(())
549    }
550
551    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
552        self.prepare_cached(
553            "INSERT OR REPLACE INTO room_info (room_id, state, data)
554             VALUES (?, ?, ?)",
555        )?
556        .execute((room_id, state, data))?;
557        Ok(())
558    }
559
560    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
561        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
562            .optional()
563    }
564
565    /// Remove the room info for the given room.
566    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
567        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
568        Ok(())
569    }
570
571    fn set_state_event(
572        &self,
573        room_id: &[u8],
574        event_type: &[u8],
575        state_key: &[u8],
576        stripped: bool,
577        event_id: Option<&[u8]>,
578        data: &[u8],
579    ) -> rusqlite::Result<()> {
580        self.prepare_cached(
581            "INSERT OR REPLACE
582             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
583             VALUES (?, ?, ?, ?, ?, ?)",
584        )?
585        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
586        Ok(())
587    }
588
589    fn get_state_event_by_id(
590        &self,
591        room_id: &[u8],
592        event_id: &[u8],
593    ) -> rusqlite::Result<Option<Vec<u8>>> {
594        self.query_row(
595            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
596            (room_id, event_id),
597            |row| row.get(0),
598        )
599        .optional()
600    }
601
602    /// Remove state events for the given room.
603    ///
604    /// If `stripped` is `Some()`, only removes state events for the given
605    /// stripped state. Otherwise, state events are removed regardless of the
606    /// stripped state.
607    fn remove_room_state_events(
608        &self,
609        room_id: &[u8],
610        stripped: Option<bool>,
611    ) -> rusqlite::Result<()> {
612        if let Some(stripped) = stripped {
613            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
614                .execute((room_id, stripped))?;
615        } else {
616            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
617                .execute((room_id,))?;
618        }
619        Ok(())
620    }
621
622    fn set_member(
623        &self,
624        room_id: &[u8],
625        user_id: &[u8],
626        membership: &[u8],
627        stripped: bool,
628        data: &[u8],
629    ) -> rusqlite::Result<()> {
630        self.prepare_cached(
631            "INSERT OR REPLACE
632             INTO member (room_id, user_id, membership, stripped, data)
633             VALUES (?, ?, ?, ?, ?)",
634        )?
635        .execute((room_id, user_id, membership, stripped, data))?;
636        Ok(())
637    }
638
639    /// Remove members for the given room.
640    ///
641    /// If `stripped` is `Some()`, only removes members for the given stripped
642    /// state. Otherwise, members are removed regardless of the stripped state.
643    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
644        if let Some(stripped) = stripped {
645            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
646                .execute((room_id, stripped))?;
647        } else {
648            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
649        }
650        Ok(())
651    }
652
653    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
654        self.prepare_cached(
655            "INSERT OR REPLACE
656             INTO profile (room_id, user_id, data)
657             VALUES (?, ?, ?)",
658        )?
659        .execute((room_id, user_id, data))?;
660        Ok(())
661    }
662
663    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
664        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
665        Ok(())
666    }
667
668    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
669        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
670            .execute((room_id, user_id))?;
671        Ok(())
672    }
673
674    fn set_receipt(
675        &self,
676        room_id: &[u8],
677        user_id: &[u8],
678        receipt_type: &[u8],
679        thread: &[u8],
680        event_id: &[u8],
681        data: &[u8],
682    ) -> rusqlite::Result<()> {
683        self.prepare_cached(
684            "INSERT OR REPLACE
685             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
686             VALUES (?, ?, ?, ?, ?, ?)",
687        )?
688        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
689        Ok(())
690    }
691
692    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
693        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
694        Ok(())
695    }
696
697    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
698        self.prepare_cached(
699            "INSERT OR REPLACE
700             INTO display_name (room_id, name, data)
701             VALUES (?, ?, ?)",
702        )?
703        .execute((room_id, name, data))?;
704        Ok(())
705    }
706
707    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
708        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
709            .execute((room_id, name))?;
710        Ok(())
711    }
712
713    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
714        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
715        Ok(())
716    }
717
718    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
719        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
720        Ok(())
721    }
722
723    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
724        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
725            .execute((room_id,))?;
726        Ok(())
727    }
728}
729
730#[async_trait]
731trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
732    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
733        Ok(self
734            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
735            .await
736            .optional()?)
737    }
738
739    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
740        let keys_length = keys.len();
741
742        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
743            let sql_params = repeat_vars(keys.len());
744            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
745
746            let params = rusqlite::params_from_iter(keys);
747
748            Ok(txn
749                .prepare(&sql)?
750                .query(params)?
751                .mapped(|row| row.get(0))
752                .collect::<Result<_, _>>()?)
753        })
754        .await
755    }
756
757    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
758
759    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
760        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
761        Ok(())
762    }
763
764    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
765        Ok(match room_id {
766            None => {
767                self.prepare("SELECT data FROM room_info", move |mut stmt| {
768                    stmt.query_map((), |row| row.get(0))?.collect()
769                })
770                .await?
771            }
772
773            Some(room_id) => {
774                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
775                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
776                })
777                .await?
778            }
779        })
780    }
781
782    async fn get_maybe_stripped_state_events_for_keys(
783        &self,
784        room_id: Key,
785        event_type: Key,
786        state_keys: Vec<Key>,
787    ) -> Result<Vec<(bool, Vec<u8>)>> {
788        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
789            let sql_params = repeat_vars(state_keys.len());
790            let sql = format!(
791                "SELECT stripped, data FROM state_event
792                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
793            );
794
795            let params = rusqlite::params_from_iter(
796                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
797            );
798
799            Ok(txn
800                .prepare(&sql)?
801                .query(params)?
802                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
803                .collect::<Result<_, _>>()?)
804        })
805        .await
806    }
807
808    async fn get_maybe_stripped_state_events(
809        &self,
810        room_id: Key,
811        event_type: Key,
812    ) -> Result<Vec<(bool, Vec<u8>)>> {
813        Ok(self
814            .prepare(
815                "SELECT stripped, data FROM state_event
816                 WHERE room_id = ? AND event_type = ?",
817                |mut stmt| {
818                    stmt.query((room_id, event_type))?
819                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
820                        .collect()
821                },
822            )
823            .await?)
824    }
825
826    async fn get_profiles(
827        &self,
828        room_id: Key,
829        user_ids: Vec<Key>,
830    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
831        let user_ids_length = user_ids.len();
832
833        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
834            let sql_params = repeat_vars(user_ids.len());
835            let sql = format!(
836                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
837            );
838
839            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
840
841            Ok(txn
842                .prepare(&sql)?
843                .query(params)?
844                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
845                .collect::<Result<_, _>>()?)
846        })
847        .await
848    }
849
850    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
851        let res = if memberships.is_empty() {
852            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
853                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
854            })
855            .await?
856        } else {
857            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
858                let sql_params = repeat_vars(memberships.len());
859                let sql = format!(
860                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
861                );
862
863                let params =
864                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
865
866                Ok(txn
867                    .prepare(&sql)?
868                    .query(params)?
869                    .mapped(|row| row.get(0))
870                    .collect::<Result<_, _>>()?)
871            })
872            .await?
873        };
874
875        Ok(res)
876    }
877
878    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
879        Ok(self
880            .query_row(
881                "SELECT data FROM global_account_data WHERE event_type = ?",
882                (event_type,),
883                |row| row.get(0),
884            )
885            .await
886            .optional()?)
887    }
888
889    async fn get_room_account_data(
890        &self,
891        room_id: Key,
892        event_type: Key,
893    ) -> Result<Option<Vec<u8>>> {
894        Ok(self
895            .query_row(
896                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
897                (room_id, event_type),
898                |row| row.get(0),
899            )
900            .await
901            .optional()?)
902    }
903
904    async fn get_display_names(
905        &self,
906        room_id: Key,
907        names: Vec<Key>,
908    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
909        let names_length = names.len();
910
911        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
912            let sql_params = repeat_vars(names.len());
913            let sql = format!(
914                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
915            );
916
917            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
918
919            Ok(txn
920                .prepare(&sql)?
921                .query(params)?
922                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
923                .collect::<Result<_, _>>()?)
924        })
925        .await
926    }
927
928    async fn get_user_receipt(
929        &self,
930        room_id: Key,
931        receipt_type: Key,
932        thread: Key,
933        user_id: Key,
934    ) -> Result<Option<Vec<u8>>> {
935        Ok(self
936            .query_row(
937                "SELECT data FROM receipt
938                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
939                (room_id, receipt_type, thread, user_id),
940                |row| row.get(0),
941            )
942            .await
943            .optional()?)
944    }
945
946    async fn get_event_receipts(
947        &self,
948        room_id: Key,
949        receipt_type: Key,
950        thread: Key,
951        event_id: Key,
952    ) -> Result<Vec<Vec<u8>>> {
953        Ok(self
954            .prepare(
955                "SELECT data FROM receipt
956                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
957                |mut stmt| {
958                    stmt.query((room_id, receipt_type, thread, event_id))?
959                        .mapped(|row| row.get(0))
960                        .collect()
961                },
962            )
963            .await?)
964    }
965}
966
967#[async_trait]
968impl SqliteObjectStateStoreExt for SqliteAsyncConn {
969    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
970        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
971    }
972}
973
974#[async_trait]
975impl StateStore for SqliteStateStore {
976    type Error = Error;
977
978    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
979        self.acquire()
980            .await?
981            .get_kv_blob(self.encode_state_store_data_key(key))
982            .await?
983            .map(|data| {
984                Ok(match key {
985                    StateStoreDataKey::SyncToken => {
986                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
987                    }
988                    StateStoreDataKey::ServerInfo => {
989                        StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
990                    }
991                    StateStoreDataKey::Filter(_) => {
992                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
993                    }
994                    StateStoreDataKey::UserAvatarUrl(_) => {
995                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
996                    }
997                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
998                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
999                    }
1000                    StateStoreDataKey::UtdHookManagerData => {
1001                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1002                    }
1003                    StateStoreDataKey::ComposerDraft(_, _) => {
1004                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1005                    }
1006                    StateStoreDataKey::SeenKnockRequests(_) => {
1007                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1008                    }
1009                })
1010            })
1011            .transpose()
1012    }
1013
1014    async fn set_kv_data(
1015        &self,
1016        key: StateStoreDataKey<'_>,
1017        value: StateStoreDataValue,
1018    ) -> Result<()> {
1019        let serialized_value = match key {
1020            StateStoreDataKey::SyncToken => self.serialize_value(
1021                &value.into_sync_token().expect("Session data not a sync token"),
1022            )?,
1023            StateStoreDataKey::ServerInfo => self.serialize_value(
1024                &value.into_server_info().expect("Session data not containing server info"),
1025            )?,
1026            StateStoreDataKey::Filter(_) => {
1027                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1028            }
1029            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1030                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1031            )?,
1032            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1033                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1034            )?,
1035            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1036                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1037            )?,
1038            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1039                &value.into_composer_draft().expect("Session data not a composer draft"),
1040            )?,
1041            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1042                &value
1043                    .into_seen_knock_requests()
1044                    .expect("Session data is not a set of seen knock request ids"),
1045            )?,
1046        };
1047
1048        self.acquire()
1049            .await?
1050            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1051            .await
1052    }
1053
1054    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1055        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1056    }
1057
1058    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1059        let changes = changes.to_owned();
1060        let this = self.clone();
1061        self.acquire()
1062            .await?
1063            .with_transaction(move |txn| {
1064                let StateChanges {
1065                    sync_token,
1066                    account_data,
1067                    presence,
1068                    profiles,
1069                    profiles_to_delete,
1070                    state,
1071                    room_account_data,
1072                    room_infos,
1073                    receipts,
1074                    redactions,
1075                    stripped_state,
1076                    ambiguity_maps,
1077                } = changes;
1078
1079                if let Some(sync_token) = sync_token {
1080                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1081                    let value = this.serialize_value(&sync_token)?;
1082                    txn.set_kv_blob(&key, &value)?;
1083                }
1084
1085                for (event_type, event) in account_data {
1086                    let event_type =
1087                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1088                    let data = this.serialize_json(&event)?;
1089                    txn.set_global_account_data(&event_type, &data)?;
1090                }
1091
1092                for (room_id, events) in room_account_data {
1093                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1094                    for (event_type, event) in events {
1095                        let event_type =
1096                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1097                        let data = this.serialize_json(&event)?;
1098                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1099                    }
1100                }
1101
1102                for (user_id, event) in presence {
1103                    let key = this.encode_presence_key(&user_id);
1104                    let value = this.serialize_json(&event)?;
1105                    txn.set_kv_blob(&key, &value)?;
1106                }
1107
1108                for (room_id, room_info) in room_infos {
1109                    let stripped = room_info.state() == RoomState::Invited;
1110                    // Remove non-stripped data for stripped rooms and vice-versa.
1111                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1112
1113                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1114                    let state = this
1115                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1116                    let data = this.serialize_json(&room_info)?;
1117                    txn.set_room_info(&room_id, &state, &data)?;
1118                }
1119
1120                for (room_id, user_ids) in profiles_to_delete {
1121                    let room_id = this.encode_key(keys::PROFILE, room_id);
1122                    for user_id in user_ids {
1123                        let user_id = this.encode_key(keys::PROFILE, user_id);
1124                        txn.remove_room_profile(&room_id, &user_id)?;
1125                    }
1126                }
1127
1128                for (room_id, state_event_types) in state {
1129                    let profiles = profiles.get(&room_id);
1130                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1131
1132                    for (event_type, state_events) in state_event_types {
1133                        let encoded_event_type =
1134                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1135
1136                        for (state_key, raw_state_event) in state_events {
1137                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1138                            let data = this.serialize_json(&raw_state_event)?;
1139
1140                            let event_id: Option<String> =
1141                                raw_state_event.get_field("event_id").ok().flatten();
1142                            let encoded_event_id =
1143                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1144
1145                            txn.set_state_event(
1146                                &encoded_room_id,
1147                                &encoded_event_type,
1148                                &encoded_state_key,
1149                                false,
1150                                encoded_event_id.as_deref(),
1151                                &data,
1152                            )?;
1153
1154                            if event_type == StateEventType::RoomMember {
1155                                let member_event = match raw_state_event
1156                                    .deserialize_as::<SyncRoomMemberEvent>()
1157                                {
1158                                    Ok(ev) => ev,
1159                                    Err(e) => {
1160                                        debug!(event_id, "Failed to deserialize member event: {e}");
1161                                        continue;
1162                                    }
1163                                };
1164
1165                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1166                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1167                                let membership = this
1168                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1169                                let data = this.serialize_value(&state_key)?;
1170
1171                                txn.set_member(
1172                                    &encoded_room_id,
1173                                    &user_id,
1174                                    &membership,
1175                                    false,
1176                                    &data,
1177                                )?;
1178
1179                                if let Some(profile) =
1180                                    profiles.and_then(|p| p.get(member_event.state_key()))
1181                                {
1182                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1183                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1184                                    let data = this.serialize_json(&profile)?;
1185                                    txn.set_profile(&room_id, &user_id, &data)?;
1186                                }
1187                            }
1188                        }
1189                    }
1190                }
1191
1192                for (room_id, stripped_state_event_types) in stripped_state {
1193                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1194
1195                    for (event_type, stripped_state_events) in stripped_state_event_types {
1196                        let encoded_event_type =
1197                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1198
1199                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1200                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1201                            let data = this.serialize_json(&raw_stripped_state_event)?;
1202                            txn.set_state_event(
1203                                &encoded_room_id,
1204                                &encoded_event_type,
1205                                &encoded_state_key,
1206                                true,
1207                                None,
1208                                &data,
1209                            )?;
1210
1211                            if event_type == StateEventType::RoomMember {
1212                                let member_event = match raw_stripped_state_event
1213                                    .deserialize_as::<StrippedRoomMemberEvent>(
1214                                ) {
1215                                    Ok(ev) => ev,
1216                                    Err(e) => {
1217                                        debug!("Failed to deserialize stripped member event: {e}");
1218                                        continue;
1219                                    }
1220                                };
1221
1222                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1223                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1224                                let membership = this.encode_key(
1225                                    keys::MEMBER,
1226                                    member_event.content.membership.as_str(),
1227                                );
1228                                let data = this.serialize_value(&state_key)?;
1229
1230                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1231                            }
1232                        }
1233                    }
1234                }
1235
1236                for (room_id, receipt_event) in receipts {
1237                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1238
1239                    for (event_id, receipt_types) in receipt_event {
1240                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1241
1242                        for (receipt_type, receipt_users) in receipt_types {
1243                            let receipt_type =
1244                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1245
1246                            for (user_id, receipt) in receipt_users {
1247                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1248                                // We cannot have a NULL primary key so we rely on serialization
1249                                // instead of the string representation.
1250                                let thread = this.encode_key(
1251                                    keys::RECEIPT,
1252                                    rmp_serde::to_vec_named(&receipt.thread)?,
1253                                );
1254                                let data = this.serialize_json(&ReceiptData {
1255                                    receipt,
1256                                    event_id: event_id.clone(),
1257                                    user_id,
1258                                })?;
1259
1260                                txn.set_receipt(
1261                                    &room_id,
1262                                    &encoded_user_id,
1263                                    &receipt_type,
1264                                    &thread,
1265                                    &encoded_event_id,
1266                                    &data,
1267                                )?;
1268                            }
1269                        }
1270                    }
1271                }
1272
1273                for (room_id, redactions) in redactions {
1274                    let make_room_version = || {
1275                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1276                        txn.get_room_info(&encoded_room_id)
1277                            .ok()
1278                            .flatten()
1279                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1280                            .map(|info| info.room_version_or_default())
1281                            .unwrap_or_else(|| {
1282                                warn!(
1283                                    ?room_id,
1284                                    "Unable to find the room version, assuming {ROOM_VERSION_FALLBACK}"
1285                                );
1286                                ROOM_VERSION_FALLBACK
1287                            })
1288                    };
1289
1290                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1291                    let mut room_version = None;
1292
1293                    for (event_id, redaction) in redactions {
1294                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1295
1296                        if let Some(Ok(raw_event)) = txn
1297                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1298                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1299                        {
1300                            let event = raw_event.deserialize()?;
1301                            let redacted = redact(
1302                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1303                                room_version.get_or_insert_with(make_room_version),
1304                                Some(RedactedBecause::from_raw_event(&redaction)?),
1305                            )
1306                            .map_err(Error::Redaction)?;
1307                            let data = this.serialize_json(&redacted)?;
1308
1309                            let event_type =
1310                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1311                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1312
1313                            txn.set_state_event(
1314                                &encoded_room_id,
1315                                &event_type,
1316                                &state_key,
1317                                false,
1318                                Some(&event_id),
1319                                &data,
1320                            )?;
1321                        }
1322                    }
1323                }
1324
1325                for (room_id, display_names) in ambiguity_maps {
1326                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1327
1328                    for (name, user_ids) in display_names {
1329                        let encoded_name = this.encode_key(
1330                            keys::DISPLAY_NAME,
1331                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1332                        );
1333                        let data = this.serialize_json(&user_ids)?;
1334
1335                        if user_ids.is_empty() {
1336                            txn.remove_display_name(&room_id, &encoded_name)?;
1337
1338                            // We can't do a migration to merge the previously distinct buckets of
1339                            // user IDs since the display names themselves are hashed before they
1340                            // are persisted in the store. So the store will always retain two
1341                            // buckets: one for raw display names and one for normalised ones.
1342                            //
1343                            // We therefore do the next best thing, which is a sort of a soft
1344                            // migration: we fetch both the raw and normalised buckets, then merge
1345                            // the user IDs contained in them into a separate, temporary merged
1346                            // bucket. The SDK then operates on the merged buckets exclusively. See
1347                            // the comment in `get_users_with_display_names` for details.
1348                            //
1349                            // If the merged bucket is empty, that must mean that both the raw and
1350                            // normalised buckets were also empty, so we can remove both from the
1351                            // store.
1352                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1353                            txn.remove_display_name(&room_id, &raw_name)?;
1354                        } else {
1355                            // We only create new buckets with the normalized display name.
1356                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1357                        }
1358                    }
1359                }
1360
1361                Ok::<_, Error>(())
1362            })
1363            .await?;
1364
1365        Ok(())
1366    }
1367
1368    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1369        self.acquire()
1370            .await?
1371            .get_kv_blob(self.encode_presence_key(user_id))
1372            .await?
1373            .map(|data| self.deserialize_json(&data))
1374            .transpose()
1375    }
1376
1377    async fn get_presence_events(
1378        &self,
1379        user_ids: &[OwnedUserId],
1380    ) -> Result<Vec<Raw<PresenceEvent>>> {
1381        if user_ids.is_empty() {
1382            return Ok(Vec::new());
1383        }
1384
1385        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1386        self.acquire()
1387            .await?
1388            .get_kv_blobs(user_ids)
1389            .await?
1390            .into_iter()
1391            .map(|data| self.deserialize_json(&data))
1392            .collect()
1393    }
1394
1395    async fn get_state_event(
1396        &self,
1397        room_id: &RoomId,
1398        event_type: StateEventType,
1399        state_key: &str,
1400    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1401        Ok(self
1402            .get_state_events_for_keys(room_id, event_type, &[state_key])
1403            .await?
1404            .into_iter()
1405            .next())
1406    }
1407
1408    async fn get_state_events(
1409        &self,
1410        room_id: &RoomId,
1411        event_type: StateEventType,
1412    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1413        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1414        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1415        self.acquire()
1416            .await?
1417            .get_maybe_stripped_state_events(room_id, event_type)
1418            .await?
1419            .into_iter()
1420            .map(|(stripped, data)| {
1421                let ev = if stripped {
1422                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1423                } else {
1424                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1425                };
1426
1427                Ok(ev)
1428            })
1429            .collect()
1430    }
1431
1432    async fn get_state_events_for_keys(
1433        &self,
1434        room_id: &RoomId,
1435        event_type: StateEventType,
1436        state_keys: &[&str],
1437    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1438        if state_keys.is_empty() {
1439            return Ok(Vec::new());
1440        }
1441
1442        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1443        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1444        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1445        self.acquire()
1446            .await?
1447            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1448            .await?
1449            .into_iter()
1450            .map(|(stripped, data)| {
1451                let ev = if stripped {
1452                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1453                } else {
1454                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1455                };
1456
1457                Ok(ev)
1458            })
1459            .collect()
1460    }
1461
1462    async fn get_profile(
1463        &self,
1464        room_id: &RoomId,
1465        user_id: &UserId,
1466    ) -> Result<Option<MinimalRoomMemberEvent>> {
1467        let room_id = self.encode_key(keys::PROFILE, room_id);
1468        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1469
1470        self.acquire()
1471            .await?
1472            .get_profiles(room_id, user_ids)
1473            .await?
1474            .into_iter()
1475            .next()
1476            .map(|(_, data)| self.deserialize_json(&data))
1477            .transpose()
1478    }
1479
1480    async fn get_profiles<'a>(
1481        &self,
1482        room_id: &RoomId,
1483        user_ids: &'a [OwnedUserId],
1484    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1485        if user_ids.is_empty() {
1486            return Ok(BTreeMap::new());
1487        }
1488
1489        let room_id = self.encode_key(keys::PROFILE, room_id);
1490        let mut user_ids_map = user_ids
1491            .iter()
1492            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1493            .collect::<BTreeMap<_, _>>();
1494        let user_ids = user_ids_map.keys().cloned().collect();
1495
1496        self.acquire()
1497            .await?
1498            .get_profiles(room_id, user_ids)
1499            .await?
1500            .into_iter()
1501            .map(|(user_id, data)| {
1502                Ok((
1503                    user_ids_map
1504                        .remove(user_id.as_slice())
1505                        .expect("returned user IDs were requested"),
1506                    self.deserialize_json(&data)?,
1507                ))
1508            })
1509            .collect()
1510    }
1511
1512    async fn get_user_ids(
1513        &self,
1514        room_id: &RoomId,
1515        membership: RoomMemberships,
1516    ) -> Result<Vec<OwnedUserId>> {
1517        let room_id = self.encode_key(keys::MEMBER, room_id);
1518        let memberships = membership
1519            .as_vec()
1520            .into_iter()
1521            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1522            .collect();
1523        self.acquire()
1524            .await?
1525            .get_user_ids(room_id, memberships)
1526            .await?
1527            .iter()
1528            .map(|data| self.deserialize_value(data))
1529            .collect()
1530    }
1531
1532    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1533        self.acquire()
1534            .await?
1535            .get_room_infos(match room_load_settings {
1536                RoomLoadSettings::All => None,
1537                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1538            })
1539            .await?
1540            .into_iter()
1541            .map(|data| self.deserialize_json(&data))
1542            .collect()
1543    }
1544
1545    async fn get_users_with_display_name(
1546        &self,
1547        room_id: &RoomId,
1548        display_name: &DisplayName,
1549    ) -> Result<BTreeSet<OwnedUserId>> {
1550        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1551        let names = vec![self.encode_key(
1552            keys::DISPLAY_NAME,
1553            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1554        )];
1555
1556        Ok(self
1557            .acquire()
1558            .await?
1559            .get_display_names(room_id, names)
1560            .await?
1561            .into_iter()
1562            .next()
1563            .map(|(_, data)| self.deserialize_json(&data))
1564            .transpose()?
1565            .unwrap_or_default())
1566    }
1567
1568    async fn get_users_with_display_names<'a>(
1569        &self,
1570        room_id: &RoomId,
1571        display_names: &'a [DisplayName],
1572    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1573        let mut result = HashMap::new();
1574
1575        if display_names.is_empty() {
1576            return Ok(result);
1577        }
1578
1579        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1580        let mut names_map = display_names
1581            .iter()
1582            .flat_map(|display_name| {
1583                // We encode the display name as the `raw_str()` and the normalized string.
1584                //
1585                // This is for compatibility reasons since:
1586                //  1. Previously "Alice" and "alice" were considered to be distinct display
1587                //     names, while we now consider them to be the same so we need to merge the
1588                //     previously distinct buckets of user IDs.
1589                //  2. We can't do a migration to merge the previously distinct buckets of user
1590                //     IDs since the display names itself are hashed before they are persisted
1591                //     in the store.
1592                let raw =
1593                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1594                let normalized = display_name.as_normalized_str().map(|normalized| {
1595                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1596                });
1597
1598                iter::once(raw).chain(normalized.into_iter())
1599            })
1600            .collect::<BTreeMap<_, _>>();
1601        let names = names_map.keys().cloned().collect();
1602
1603        for (name, data) in
1604            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1605        {
1606            let display_name =
1607                names_map.remove(name.as_slice()).expect("returned display names were requested");
1608            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1609
1610            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1611        }
1612
1613        Ok(result)
1614    }
1615
1616    async fn get_account_data_event(
1617        &self,
1618        event_type: GlobalAccountDataEventType,
1619    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1620        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1621        self.acquire()
1622            .await?
1623            .get_global_account_data(event_type)
1624            .await?
1625            .map(|value| self.deserialize_json(&value))
1626            .transpose()
1627    }
1628
1629    async fn get_room_account_data_event(
1630        &self,
1631        room_id: &RoomId,
1632        event_type: RoomAccountDataEventType,
1633    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1634        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1635        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1636        self.acquire()
1637            .await?
1638            .get_room_account_data(room_id, event_type)
1639            .await?
1640            .map(|value| self.deserialize_json(&value))
1641            .transpose()
1642    }
1643
1644    async fn get_user_room_receipt_event(
1645        &self,
1646        room_id: &RoomId,
1647        receipt_type: ReceiptType,
1648        thread: ReceiptThread,
1649        user_id: &UserId,
1650    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1651        let room_id = self.encode_key(keys::RECEIPT, room_id);
1652        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1653        // We cannot have a NULL primary key so we rely on serialization instead of the
1654        // string representation.
1655        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1656        let user_id = self.encode_key(keys::RECEIPT, user_id);
1657
1658        self.acquire()
1659            .await?
1660            .get_user_receipt(room_id, receipt_type, thread, user_id)
1661            .await?
1662            .map(|value| {
1663                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1664            })
1665            .transpose()
1666    }
1667
1668    async fn get_event_room_receipt_events(
1669        &self,
1670        room_id: &RoomId,
1671        receipt_type: ReceiptType,
1672        thread: ReceiptThread,
1673        event_id: &EventId,
1674    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1675        let room_id = self.encode_key(keys::RECEIPT, room_id);
1676        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1677        // We cannot have a NULL primary key so we rely on serialization instead of the
1678        // string representation.
1679        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1680        let event_id = self.encode_key(keys::RECEIPT, event_id);
1681
1682        self.acquire()
1683            .await?
1684            .get_event_receipts(room_id, receipt_type, thread, event_id)
1685            .await?
1686            .iter()
1687            .map(|value| {
1688                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1689            })
1690            .collect()
1691    }
1692
1693    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1694        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1695    }
1696
1697    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1698        let conn = self.acquire().await?;
1699        let key = self.encode_custom_key(key);
1700        conn.set_kv_blob(key, value).await?;
1701        Ok(())
1702    }
1703
1704    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1705        let conn = self.acquire().await?;
1706        let key = self.encode_custom_key(key);
1707        let previous = conn.get_kv_blob(key.clone()).await?;
1708        conn.set_kv_blob(key, value).await?;
1709        Ok(previous)
1710    }
1711
1712    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1713        let conn = self.acquire().await?;
1714        let key = self.encode_custom_key(key);
1715        let previous = conn.get_kv_blob(key.clone()).await?;
1716        if previous.is_some() {
1717            conn.delete_kv_blob(key).await?;
1718        }
1719        Ok(previous)
1720    }
1721
1722    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1723        let this = self.clone();
1724        let room_id = room_id.to_owned();
1725
1726        let conn = self.acquire().await?;
1727
1728        conn.with_transaction(move |txn| -> Result<()> {
1729            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1730            txn.remove_room_info(&room_info_room_id)?;
1731
1732            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1733            txn.remove_room_state_events(&state_event_room_id, None)?;
1734
1735            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1736            txn.remove_room_members(&member_room_id, None)?;
1737
1738            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1739            txn.remove_room_profiles(&profile_room_id)?;
1740
1741            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1742            txn.remove_room_account_data(&room_account_data_room_id)?;
1743
1744            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1745            txn.remove_room_receipts(&receipt_room_id)?;
1746
1747            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1748            txn.remove_room_display_names(&display_name_room_id)?;
1749
1750            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1751            txn.remove_room_send_queue(&send_queue_room_id)?;
1752
1753            let dependent_send_queue_room_id =
1754                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1755            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1756
1757            Ok(())
1758        })
1759        .await?;
1760
1761        conn.vacuum().await
1762    }
1763
1764    async fn save_send_queue_request(
1765        &self,
1766        room_id: &RoomId,
1767        transaction_id: OwnedTransactionId,
1768        created_at: MilliSecondsSinceUnixEpoch,
1769        content: QueuedRequestKind,
1770        priority: usize,
1771    ) -> Result<(), Self::Error> {
1772        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1773        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1774
1775        let content = self.serialize_json(&content)?;
1776        // The transaction id is used both as a key (in remove/update) and a value (as
1777        // it's useful for the callers), so we keep it as is, and neither hash
1778        // it (with encode_key) or encrypt it (through serialize_value). After
1779        // all, it carries no personal information, so this is considered fine.
1780
1781        let created_at_ts: u64 = created_at.0.into();
1782        self.acquire()
1783            .await?
1784            .with_transaction(move |txn| {
1785                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1786                Ok(())
1787            })
1788            .await
1789    }
1790
1791    async fn update_send_queue_request(
1792        &self,
1793        room_id: &RoomId,
1794        transaction_id: &TransactionId,
1795        content: QueuedRequestKind,
1796    ) -> Result<bool, Self::Error> {
1797        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1798
1799        let content = self.serialize_json(&content)?;
1800        // See comment in [`Self::save_send_queue_event`] to understand why the
1801        // transaction id is neither encrypted or hashed.
1802        let transaction_id = transaction_id.to_string();
1803
1804        let num_updated = self.acquire()
1805            .await?
1806            .with_transaction(move |txn| {
1807                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1808            })
1809            .await?;
1810
1811        Ok(num_updated > 0)
1812    }
1813
1814    async fn remove_send_queue_request(
1815        &self,
1816        room_id: &RoomId,
1817        transaction_id: &TransactionId,
1818    ) -> Result<bool, Self::Error> {
1819        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1820
1821        // See comment in `save_send_queue_event`.
1822        let transaction_id = transaction_id.to_string();
1823
1824        let num_deleted = self
1825            .acquire()
1826            .await?
1827            .with_transaction(move |txn| {
1828                txn.prepare_cached(
1829                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1830                )?
1831                .execute((room_id, &transaction_id))
1832            })
1833            .await?;
1834
1835        Ok(num_deleted > 0)
1836    }
1837
1838    async fn load_send_queue_requests(
1839        &self,
1840        room_id: &RoomId,
1841    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1842        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1843
1844        // Note: ROWID is always present and is an auto-incremented integer counter. We
1845        // want to maintain the insertion order, so we can sort using it.
1846        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1847        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1848            .acquire()
1849            .await?
1850            .prepare(
1851                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1852                |mut stmt| {
1853                    stmt.query((room_id,))?
1854                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1855                        .collect()
1856                },
1857            )
1858            .await?;
1859
1860        let mut requests = Vec::with_capacity(res.len());
1861        for entry in res {
1862            let created_at = entry
1863                .4
1864                .and_then(UInt::new)
1865                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1866            requests.push(QueuedRequest {
1867                transaction_id: entry.0.into(),
1868                kind: self.deserialize_json(&entry.1)?,
1869                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1870                priority: entry.3,
1871                created_at,
1872            });
1873        }
1874
1875        Ok(requests)
1876    }
1877
1878    async fn update_send_queue_request_status(
1879        &self,
1880        room_id: &RoomId,
1881        transaction_id: &TransactionId,
1882        error: Option<QueueWedgeError>,
1883    ) -> Result<(), Self::Error> {
1884        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1885
1886        // See comment in `save_send_queue_event`.
1887        let transaction_id = transaction_id.to_string();
1888
1889        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1890        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1891
1892        self.acquire()
1893            .await?
1894            .with_transaction(move |txn| {
1895                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1896                Ok(())
1897            })
1898            .await
1899    }
1900
1901    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1902        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1903        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1904        // != encrypted(X), since we use a nonce in the encryption process.
1905
1906        let res: Vec<Vec<u8>> = self
1907            .acquire()
1908            .await?
1909            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1910                stmt.query(())?.mapped(|row| row.get(0)).collect()
1911            })
1912            .await?;
1913
1914        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1915        // then rejigger that into a vector.
1916        Ok(res
1917            .into_iter()
1918            .map(|entry| self.deserialize_value(&entry))
1919            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1920            .into_iter()
1921            .collect())
1922    }
1923
1924    async fn save_dependent_queued_request(
1925        &self,
1926        room_id: &RoomId,
1927        parent_txn_id: &TransactionId,
1928        own_txn_id: ChildTransactionId,
1929        created_at: MilliSecondsSinceUnixEpoch,
1930        content: DependentQueuedRequestKind,
1931    ) -> Result<()> {
1932        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1933        let content = self.serialize_json(&content)?;
1934
1935        // See comment in `save_send_queue_event`.
1936        let parent_txn_id = parent_txn_id.to_string();
1937        let own_txn_id = own_txn_id.to_string();
1938
1939        let created_at_ts: u64 = created_at.0.into();
1940        self.acquire()
1941            .await?
1942            .with_transaction(move |txn| {
1943                txn.prepare_cached(
1944                    r#"INSERT INTO dependent_send_queue_events
1945                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1946                       VALUES (?, ?, ?, ?, ?)"#,
1947                )?
1948                .execute((
1949                    room_id,
1950                    parent_txn_id,
1951                    own_txn_id,
1952                    content,
1953                    created_at_ts,
1954                ))?;
1955                Ok(())
1956            })
1957            .await
1958    }
1959
1960    async fn update_dependent_queued_request(
1961        &self,
1962        room_id: &RoomId,
1963        own_transaction_id: &ChildTransactionId,
1964        new_content: DependentQueuedRequestKind,
1965    ) -> Result<bool> {
1966        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1967        let content = self.serialize_json(&new_content)?;
1968
1969        // See comment in `save_send_queue_event`.
1970        let own_txn_id = own_transaction_id.to_string();
1971
1972        let num_updated = self
1973            .acquire()
1974            .await?
1975            .with_transaction(move |txn| {
1976                txn.prepare_cached(
1977                    r#"UPDATE dependent_send_queue_events
1978                       SET content = ?
1979                       WHERE own_transaction_id = ?
1980                       AND room_id = ?"#,
1981                )?
1982                .execute((content, own_txn_id, room_id))
1983            })
1984            .await?;
1985
1986        if num_updated > 1 {
1987            return Err(Error::InconsistentUpdate);
1988        }
1989
1990        Ok(num_updated == 1)
1991    }
1992
1993    async fn mark_dependent_queued_requests_as_ready(
1994        &self,
1995        room_id: &RoomId,
1996        parent_txn_id: &TransactionId,
1997        parent_key: SentRequestKey,
1998    ) -> Result<usize> {
1999        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2000        let parent_key = self.serialize_value(&parent_key)?;
2001
2002        // See comment in `save_send_queue_event`.
2003        let parent_txn_id = parent_txn_id.to_string();
2004
2005        self.acquire()
2006            .await?
2007            .with_transaction(move |txn| {
2008                Ok(txn.prepare_cached(
2009                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2010                )?
2011                .execute((parent_key, parent_txn_id, room_id))?)
2012            })
2013            .await
2014    }
2015
2016    async fn remove_dependent_queued_request(
2017        &self,
2018        room_id: &RoomId,
2019        txn_id: &ChildTransactionId,
2020    ) -> Result<bool> {
2021        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2022
2023        // See comment in `save_send_queue_event`.
2024        let txn_id = txn_id.to_string();
2025
2026        let num_deleted = self
2027            .acquire()
2028            .await?
2029            .with_transaction(move |txn| {
2030                txn.prepare_cached(
2031                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2032                )?
2033                .execute((txn_id, room_id))
2034            })
2035            .await?;
2036
2037        Ok(num_deleted > 0)
2038    }
2039
2040    async fn load_dependent_queued_requests(
2041        &self,
2042        room_id: &RoomId,
2043    ) -> Result<Vec<DependentQueuedRequest>> {
2044        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2045
2046        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2047        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2048            .acquire()
2049            .await?
2050            .prepare(
2051                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2052                |mut stmt| {
2053                    stmt.query((room_id,))?
2054                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2055                        .collect()
2056                },
2057            )
2058            .await?;
2059
2060        let mut dependent_events = Vec::with_capacity(res.len());
2061        for entry in res {
2062            let created_at = entry
2063                .4
2064                .and_then(UInt::new)
2065                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2066            dependent_events.push(DependentQueuedRequest {
2067                own_transaction_id: entry.0.into(),
2068                parent_transaction_id: entry.1.into(),
2069                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2070                kind: self.deserialize_json(&entry.3)?,
2071                created_at,
2072            });
2073        }
2074
2075        Ok(dependent_events)
2076    }
2077}
2078
2079#[derive(Debug, Clone, Serialize, Deserialize)]
2080struct ReceiptData {
2081    receipt: Receipt,
2082    event_id: OwnedEventId,
2083    user_id: OwnedUserId,
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2089
2090    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2091    use once_cell::sync::Lazy;
2092    use tempfile::{tempdir, TempDir};
2093
2094    use super::SqliteStateStore;
2095
2096    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2097    static NUM: AtomicU32 = AtomicU32::new(0);
2098
2099    async fn get_store() -> Result<impl StateStore, StoreError> {
2100        let name = NUM.fetch_add(1, SeqCst).to_string();
2101        let tmpdir_path = TMP_DIR.path().join(name);
2102
2103        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2104
2105        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2106    }
2107
2108    statestore_integration_tests!();
2109}
2110
2111#[cfg(test)]
2112mod encrypted_tests {
2113    use std::{
2114        path::PathBuf,
2115        sync::atomic::{AtomicU32, Ordering::SeqCst},
2116    };
2117
2118    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2119    use matrix_sdk_test::async_test;
2120    use once_cell::sync::Lazy;
2121    use tempfile::{tempdir, TempDir};
2122
2123    use super::SqliteStateStore;
2124    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2125
2126    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2127    static NUM: AtomicU32 = AtomicU32::new(0);
2128
2129    fn new_state_store_workspace() -> PathBuf {
2130        let name = NUM.fetch_add(1, SeqCst).to_string();
2131        TMP_DIR.path().join(name)
2132    }
2133
2134    async fn get_store() -> Result<impl StateStore, StoreError> {
2135        let tmpdir_path = new_state_store_workspace();
2136
2137        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2138
2139        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2140            .await
2141            .unwrap())
2142    }
2143
2144    #[async_test]
2145    async fn test_pool_size() {
2146        let tmpdir_path = new_state_store_workspace();
2147        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2148
2149        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2150
2151        assert_eq!(store.pool.status().max_size, 42);
2152    }
2153
2154    #[async_test]
2155    async fn test_cache_size() {
2156        let tmpdir_path = new_state_store_workspace();
2157        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2158
2159        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2160
2161        let conn = store.pool.get().await.unwrap();
2162        let cache_size =
2163            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2164
2165        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2166        // converted to kibibytes. Also, it must be a negative value because it
2167        // _is_ the size in kibibytes, not in page size.
2168        assert_eq!(cache_size, -(1500 / 1024));
2169    }
2170
2171    #[async_test]
2172    async fn test_journal_size_limit() {
2173        let tmpdir_path = new_state_store_workspace();
2174        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2175
2176        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2177
2178        let conn = store.pool.get().await.unwrap();
2179        let journal_size_limit = conn
2180            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2181            .await
2182            .unwrap();
2183
2184        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2185        // bytes in SQLite.
2186        assert_eq!(journal_size_limit, 1500);
2187    }
2188
2189    statestore_integration_tests!();
2190}
2191
2192#[cfg(test)]
2193mod migration_tests {
2194    use std::{
2195        path::{Path, PathBuf},
2196        sync::{
2197            atomic::{AtomicU32, Ordering::SeqCst},
2198            Arc,
2199        },
2200    };
2201
2202    use as_variant::as_variant;
2203    use deadpool_sqlite::Runtime;
2204    use matrix_sdk_base::{
2205        media::{MediaFormat, MediaRequestParameters},
2206        store::{
2207            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2208            SerializableEventContent,
2209        },
2210        sync::UnreadNotificationsCount,
2211        RoomState, StateStore,
2212    };
2213    use matrix_sdk_test::async_test;
2214    use once_cell::sync::Lazy;
2215    use ruma::{
2216        events::{
2217            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2218            StateEventType,
2219        },
2220        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2221        RoomId, TransactionId, UserId,
2222    };
2223    use rusqlite::Transaction;
2224    use serde::{Deserialize, Serialize};
2225    use serde_json::json;
2226    use tempfile::{tempdir, TempDir};
2227    use tokio::fs;
2228
2229    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2230    use crate::{
2231        error::{Error, Result},
2232        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2233        OpenStoreError,
2234    };
2235
2236    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2237    static NUM: AtomicU32 = AtomicU32::new(0);
2238    const SECRET: &str = "secret";
2239
2240    fn new_path() -> PathBuf {
2241        let name = NUM.fetch_add(1, SeqCst).to_string();
2242        TMP_DIR.path().join(name)
2243    }
2244
2245    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2246        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2247
2248        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2249        // use default pool config
2250
2251        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2252        let conn = pool.get().await?;
2253
2254        init(&conn).await?;
2255
2256        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2257        let this = SqliteStateStore { store_cipher, pool };
2258        this.run_migrations(&conn, 1, Some(version)).await?;
2259
2260        Ok(this)
2261    }
2262
2263    fn room_info_v1_json(
2264        room_id: &RoomId,
2265        state: RoomState,
2266        name: Option<&str>,
2267        creator: Option<&UserId>,
2268    ) -> serde_json::Value {
2269        // Test with name set or not.
2270        let name_content = match name {
2271            Some(name) => json!({ "name": name }),
2272            None => json!({ "name": null }),
2273        };
2274        // Test with creator set or not.
2275        let create_content = match creator {
2276            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2277            None => RoomCreateEventContent::new_v11(),
2278        };
2279
2280        json!({
2281            "room_id": room_id,
2282            "room_type": state,
2283            "notification_counts": UnreadNotificationsCount::default(),
2284            "summary": {
2285                "heroes": [],
2286                "joined_member_count": 0,
2287                "invited_member_count": 0,
2288            },
2289            "members_synced": false,
2290            "base_info": {
2291                "dm_targets": [],
2292                "max_power_level": 100,
2293                "name": {
2294                    "Original": {
2295                        "content": name_content,
2296                    },
2297                },
2298                "create": {
2299                    "Original": {
2300                        "content": create_content,
2301                    }
2302                }
2303            },
2304        })
2305    }
2306
2307    #[async_test]
2308    pub async fn test_migrating_v1_to_v2() {
2309        let path = new_path();
2310        // Create and populate db.
2311        {
2312            let db = create_fake_db(&path, 1).await.unwrap();
2313            let conn = db.pool.get().await.unwrap();
2314
2315            let this = db.clone();
2316            conn.with_transaction(move |txn| {
2317                for i in 0..5 {
2318                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2319                    let (state, stripped) =
2320                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2321                    let info = room_info_v1_json(&room_id, state, None, None);
2322
2323                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2324                    let data = this.serialize_json(&info)?;
2325
2326                    txn.prepare_cached(
2327                        "INSERT INTO room_info (room_id, stripped, data)
2328                         VALUES (?, ?, ?)",
2329                    )?
2330                    .execute((room_id, stripped, data))?;
2331                }
2332
2333                Result::<_, Error>::Ok(())
2334            })
2335            .await
2336            .unwrap();
2337        }
2338
2339        // This transparently migrates to the latest version.
2340        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2341
2342        // Check all room infos are there.
2343        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2344    }
2345
2346    // Add a room in version 2 format of the state store.
2347    fn add_room_v2(
2348        this: &SqliteStateStore,
2349        txn: &Transaction<'_>,
2350        room_id: &RoomId,
2351        name: Option<&str>,
2352        create_creator: Option<&UserId>,
2353        create_sender: Option<&UserId>,
2354    ) -> Result<(), Error> {
2355        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2356
2357        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2358        let encoded_state =
2359            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2360        let data = this.serialize_json(&room_info_json)?;
2361
2362        txn.prepare_cached(
2363            "INSERT INTO room_info (room_id, state, data)
2364             VALUES (?, ?, ?)",
2365        )?
2366        .execute((encoded_room_id, encoded_state, data))?;
2367
2368        // Test with or without `m.room.create` event in the room state.
2369        let Some(create_sender) = create_sender else {
2370            return Ok(());
2371        };
2372
2373        let create_content = match create_creator {
2374            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2375            None => RoomCreateEventContent::new_v11(),
2376        };
2377
2378        let event_id = EventId::new(server_name!("dummy.local"));
2379        let create_event = json!({
2380            "content": create_content,
2381            "event_id": event_id,
2382            "sender": create_sender.to_owned(),
2383            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2384            "state_key": "",
2385            "type": "m.room.create",
2386            "unsigned": {},
2387        });
2388
2389        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2390        let encoded_event_type =
2391            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2392        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2393        let stripped = false;
2394        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2395        let data = this.serialize_json(&create_event)?;
2396
2397        txn.prepare_cached(
2398            "INSERT
2399             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2400             VALUES (?, ?, ?, ?, ?, ?)",
2401        )?
2402        .execute((
2403            encoded_room_id,
2404            encoded_event_type,
2405            encoded_state_key,
2406            stripped,
2407            encoded_event_id,
2408            data,
2409        ))?;
2410
2411        Ok(())
2412    }
2413
2414    #[async_test]
2415    pub async fn test_migrating_v2_to_v3() {
2416        let path = new_path();
2417
2418        // Room A: with name, creator and sender.
2419        let room_a_id = room_id!("!room_a:dummy.local");
2420        let room_a_name = "Room A";
2421        let room_a_creator = user_id!("@creator:dummy.local");
2422        // Use a different sender to check that sender is used over creator in
2423        // migration.
2424        let room_a_create_sender = user_id!("@sender:dummy.local");
2425
2426        // Room B: without name, creator and sender.
2427        let room_b_id = room_id!("!room_b:dummy.local");
2428
2429        // Room C: only with sender.
2430        let room_c_id = room_id!("!room_c:dummy.local");
2431        let room_c_create_sender = user_id!("@creator:dummy.local");
2432
2433        // Create and populate db.
2434        {
2435            let db = create_fake_db(&path, 2).await.unwrap();
2436            let conn = db.pool.get().await.unwrap();
2437
2438            let this = db.clone();
2439            conn.with_transaction(move |txn| {
2440                add_room_v2(
2441                    &this,
2442                    txn,
2443                    room_a_id,
2444                    Some(room_a_name),
2445                    Some(room_a_creator),
2446                    Some(room_a_create_sender),
2447                )?;
2448                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2449                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2450
2451                Result::<_, Error>::Ok(())
2452            })
2453            .await
2454            .unwrap();
2455        }
2456
2457        // This transparently migrates to the latest version.
2458        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2459
2460        // Check all room infos are there.
2461        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2462        assert_eq!(room_infos.len(), 3);
2463
2464        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2465        assert_eq!(room_a.name(), Some(room_a_name));
2466        assert_eq!(room_a.creator(), Some(room_a_create_sender));
2467
2468        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2469        assert_eq!(room_b.name(), None);
2470        assert_eq!(room_b.creator(), None);
2471
2472        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2473        assert_eq!(room_c.name(), None);
2474        assert_eq!(room_c.creator(), Some(room_c_create_sender));
2475    }
2476
2477    #[async_test]
2478    pub async fn test_migrating_v7_to_v9() {
2479        let path = new_path();
2480
2481        let room_id = room_id!("!room_a:dummy.local");
2482        let wedged_event_transaction_id = TransactionId::new();
2483        let local_event_transaction_id = TransactionId::new();
2484
2485        // Create and populate db.
2486        {
2487            let db = create_fake_db(&path, 7).await.unwrap();
2488            let conn = db.pool.get().await.unwrap();
2489
2490            let wedge_tx = wedged_event_transaction_id.clone();
2491            let local_tx = local_event_transaction_id.clone();
2492
2493            conn.with_transaction(move |txn| {
2494                add_dependent_send_queue_event_v7(
2495                    &db,
2496                    txn,
2497                    room_id,
2498                    &local_tx,
2499                    ChildTransactionId::new(),
2500                    DependentQueuedRequestKind::RedactEvent,
2501                )?;
2502                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2503                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2504                Result::<_, Error>::Ok(())
2505            })
2506            .await
2507            .unwrap();
2508        }
2509
2510        // This transparently migrates to the latest version, which clears up all
2511        // requests and dependent requests.
2512        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2513
2514        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2515        assert!(requests.is_empty());
2516
2517        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2518        assert!(dependent_requests.is_empty());
2519    }
2520
2521    fn add_send_queue_event_v7(
2522        this: &SqliteStateStore,
2523        txn: &Transaction<'_>,
2524        transaction_id: &TransactionId,
2525        room_id: &RoomId,
2526        is_wedged: bool,
2527    ) -> Result<(), Error> {
2528        let content =
2529            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2530
2531        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2532        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2533
2534        let content = this.serialize_json(&content)?;
2535
2536        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2537            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2538
2539        Ok(())
2540    }
2541
2542    fn add_dependent_send_queue_event_v7(
2543        this: &SqliteStateStore,
2544        txn: &Transaction<'_>,
2545        room_id: &RoomId,
2546        parent_txn_id: &TransactionId,
2547        own_txn_id: ChildTransactionId,
2548        content: DependentQueuedRequestKind,
2549    ) -> Result<(), Error> {
2550        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2551
2552        let parent_txn_id = parent_txn_id.to_string();
2553        let own_txn_id = own_txn_id.to_string();
2554        let content = this.serialize_json(&content)?;
2555
2556        txn.prepare_cached(
2557            "INSERT INTO dependent_send_queue_events
2558                         (room_id, parent_transaction_id, own_transaction_id, content)
2559                       VALUES (?, ?, ?, ?)",
2560        )?
2561        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2562
2563        Ok(())
2564    }
2565
2566    #[derive(Clone, Debug, Serialize, Deserialize)]
2567    pub enum LegacyDependentQueuedRequestKind {
2568        UploadFileWithThumbnail {
2569            content_type: String,
2570            cache_key: MediaRequestParameters,
2571            related_to: OwnedTransactionId,
2572        },
2573    }
2574
2575    #[async_test]
2576    pub async fn test_dependent_queued_request_variant_renaming() {
2577        let path = new_path();
2578        let db = create_fake_db(&path, 7).await.unwrap();
2579
2580        let cache_key = MediaRequestParameters {
2581            format: MediaFormat::File,
2582            source: MediaSource::Plain("https://server.local/foobar".into()),
2583        };
2584        let related_to = TransactionId::new();
2585        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2586            content_type: "image/png".to_owned(),
2587            cache_key,
2588            related_to: related_to.clone(),
2589        };
2590
2591        let data = db
2592            .serialize_json(&request)
2593            .expect("should be able to serialize legacy dependent request");
2594        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2595            "should be able to deserialize dependent request from legacy dependent request",
2596        );
2597
2598        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2599            assert_eq!(de_related_to, related_to);
2600        });
2601    }
2602}