1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15 DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16 RoomLoadSettings, SentRequestKey,
17 },
18 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19 StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 canonical_json::{redact, RedactedBecause},
24 events::{
25 presence::PresenceEvent,
26 receipt::{Receipt, ReceiptThread, ReceiptType},
27 room::{
28 create::RoomCreateEventContent,
29 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30 },
31 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33 },
34 serde::Raw,
35 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt,
48 },
49 OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53 pub const KV_BLOB: &str = "kv_blob";
55 pub const ROOM_INFO: &str = "room_info";
56 pub const STATE_EVENT: &str = "state_event";
57 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59 pub const MEMBER: &str = "member";
60 pub const PROFILE: &str = "profile";
61 pub const RECEIPT: &str = "receipt";
62 pub const DISPLAY_NAME: &str = "display_name";
63 pub const SEND_QUEUE: &str = "send_queue_events";
64 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
69
70const DATABASE_VERSION: u8 = 12;
76
77#[derive(Clone)]
79pub struct SqliteStateStore {
80 store_cipher: Option<Arc<StoreCipher>>,
81 pool: SqlitePool,
82}
83
84#[cfg(not(tarpaulin_include))]
85impl fmt::Debug for SqliteStateStore {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
88 }
89}
90
91impl SqliteStateStore {
92 pub async fn open(
95 path: impl AsRef<Path>,
96 passphrase: Option<&str>,
97 ) -> Result<Self, OpenStoreError> {
98 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
99 }
100
101 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
103 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
104
105 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
106
107 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
108 config.pool = Some(pool_config);
109
110 let pool = config.create_pool(Runtime::Tokio1)?;
111
112 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
113 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
114
115 Ok(this)
116 }
117
118 async fn open_with_pool(
121 pool: SqlitePool,
122 passphrase: Option<&str>,
123 ) -> Result<Self, OpenStoreError> {
124 let conn = pool.get().await?;
125
126 let mut version = conn.db_version().await?;
127
128 if version == 0 {
129 init(&conn).await?;
130 version = 1;
131 }
132
133 let store_cipher = match passphrase {
134 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
135 None => None,
136 };
137 let this = Self { store_cipher, pool };
138 this.run_migrations(&conn, version, None).await?;
139
140 Ok(this)
141 }
142
143 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
148 let to = to.unwrap_or(DATABASE_VERSION);
149
150 if from < to {
151 debug!(version = from, new_version = to, "Upgrading database");
152 } else {
153 return Ok(());
154 }
155
156 if from < 2 && to >= 2 {
157 let this = self.clone();
158 conn.with_transaction(move |txn| {
159 txn.execute_batch(include_str!(
161 "../migrations/state_store/002_a_create_new_room_info.sql"
162 ))?;
163
164 for data in txn
166 .prepare("SELECT data FROM room_info")?
167 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
168 {
169 let data = data?;
170 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
171
172 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
173 let state = this
174 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
175 txn.prepare_cached(
176 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
177 VALUES (?, ?, ?)",
178 )?
179 .execute((room_id, state, data))?;
180 }
181
182 txn.execute_batch(include_str!(
184 "../migrations/state_store/002_b_replace_room_info.sql"
185 ))?;
186
187 txn.set_db_version(2)?;
188 Result::<_, Error>::Ok(())
189 })
190 .await?;
191 }
192
193 if from < 3 && to >= 3 {
195 let this = self.clone();
196 conn.with_transaction(move |txn| {
197 for data in txn
199 .prepare("SELECT data FROM room_info")?
200 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
201 {
202 let data = data?;
203 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
204
205 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
207 let event_type =
208 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
209 let create_res = txn
210 .prepare(
211 "SELECT stripped, data FROM state_event
212 WHERE room_id = ? AND event_type = ?",
213 )?
214 .query_row([room_id, event_type], |row| {
215 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
216 })
217 .optional()?;
218
219 let create = create_res.and_then(|(stripped, data)| {
220 let create = if stripped {
221 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
222 this.deserialize_json(&data).ok()?,
223 )
224 } else {
225 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
226 };
227 Some(create)
228 });
229
230 let migrated_room_info = room_info_v1.migrate(create.as_ref());
231
232 let data = this.serialize_json(&migrated_room_info)?;
233 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
234 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
235 .execute((data, room_id))?;
236 }
237
238 txn.set_db_version(3)?;
239 Result::<_, Error>::Ok(())
240 })
241 .await?;
242 }
243
244 if from < 4 && to >= 4 {
245 conn.with_transaction(move |txn| {
246 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
248 txn.set_db_version(4)
249 })
250 .await?;
251 }
252
253 if from < 5 && to >= 5 {
254 conn.with_transaction(move |txn| {
255 txn.execute_batch(include_str!(
257 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
258 ))?;
259 txn.set_db_version(4)
260 })
261 .await?;
262 }
263
264 if from < 6 && to >= 6 {
265 conn.with_transaction(move |txn| {
266 txn.execute_batch(include_str!(
268 "../migrations/state_store/005_send_queue_dependent_events.sql"
269 ))?;
270 txn.set_db_version(6)
271 })
272 .await?;
273 }
274
275 if from < 7 && to >= 7 {
276 conn.with_transaction(move |txn| {
277 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
279 txn.set_db_version(7)
280 })
281 .await?;
282 }
283
284 if from < 8 && to >= 8 {
285 let error = QueueWedgeError::GenericApiError {
287 msg: "local echo failed to send in a previous session".into(),
288 };
289 let default_err = self.serialize_value(&error)?;
290
291 conn.with_transaction(move |txn| {
292 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
294
295 for wedged_entries in txn
298 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
299 .query_map((), |row| {
300 Ok(
301 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
302 )
303 })? {
304
305 let (room_id, transaction_id) = wedged_entries?;
306
307 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
308 .execute((default_err.clone(), room_id, transaction_id))?;
309 }
310
311
312 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
314
315 txn.set_db_version(8)
316 })
317 .await?;
318 }
319
320 if from < 9 && to >= 9 {
321 conn.with_transaction(move |txn| {
322 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
324 txn.set_db_version(9)
325 })
326 .await?;
327 }
328
329 if from < 10 && to >= 10 {
330 conn.with_transaction(move |txn| {
331 txn.execute_batch(include_str!(
333 "../migrations/state_store/009_send_queue_priority.sql"
334 ))?;
335 txn.set_db_version(10)
336 })
337 .await?;
338 }
339
340 if from < 11 && to >= 11 {
341 conn.with_transaction(move |txn| {
342 txn.execute_batch(include_str!(
344 "../migrations/state_store/010_send_queue_enqueue_time.sql"
345 ))?;
346 txn.set_db_version(11)
347 })
348 .await?;
349 }
350
351 if from < 12 && to >= 12 {
352 conn.vacuum().await?;
356 conn.set_kv("version", vec![12]).await?;
357 }
358
359 Ok(())
360 }
361
362 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
363 let key_s = match key {
364 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
365 StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
366 StateStoreDataKey::Filter(f) => {
367 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
368 }
369 StateStoreDataKey::UserAvatarUrl(u) => {
370 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
371 }
372 StateStoreDataKey::RecentlyVisitedRooms(b) => {
373 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
374 }
375 StateStoreDataKey::UtdHookManagerData => {
376 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
377 }
378 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
379 if let Some(thread_root) = thread_root {
380 Cow::Owned(format!(
381 "{}:{room_id}:{thread_root}",
382 StateStoreDataKey::COMPOSER_DRAFT
383 ))
384 } else {
385 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
386 }
387 }
388 StateStoreDataKey::SeenKnockRequests(room_id) => {
389 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
390 }
391 };
392
393 self.encode_key(keys::KV_BLOB, &*key_s)
394 }
395
396 fn encode_presence_key(&self, user_id: &UserId) -> Key {
397 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
398 }
399
400 fn encode_custom_key(&self, key: &[u8]) -> Key {
401 let mut full_key = b"custom:".to_vec();
402 full_key.extend(key);
403 self.encode_key(keys::KV_BLOB, full_key)
404 }
405
406 async fn acquire(&self) -> Result<SqliteAsyncConn> {
407 Ok(self.pool.get().await?)
408 }
409
410 fn remove_maybe_stripped_room_data(
411 &self,
412 txn: &Transaction<'_>,
413 room_id: &RoomId,
414 stripped: bool,
415 ) -> rusqlite::Result<()> {
416 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
417 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
418
419 let member_room_id = self.encode_key(keys::MEMBER, room_id);
420 txn.remove_room_members(&member_room_id, Some(stripped))
421 }
422}
423
424impl EncryptableStore for SqliteStateStore {
425 fn get_cypher(&self) -> Option<&StoreCipher> {
426 self.store_cipher.as_deref()
427 }
428}
429
430async fn init(conn: &SqliteAsyncConn) -> Result<()> {
432 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
435 conn.with_transaction(|txn| {
436 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
437 txn.set_db_version(1)?;
438
439 Ok(())
440 })
441 .await
442}
443
444trait SqliteConnectionStateStoreExt {
445 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
446
447 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
448
449 fn set_room_account_data(
450 &self,
451 room_id: &[u8],
452 event_type: &[u8],
453 data: &[u8],
454 ) -> rusqlite::Result<()>;
455 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
456
457 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
458 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
459 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
460
461 fn set_state_event(
462 &self,
463 room_id: &[u8],
464 event_type: &[u8],
465 state_key: &[u8],
466 stripped: bool,
467 event_id: Option<&[u8]>,
468 data: &[u8],
469 ) -> rusqlite::Result<()>;
470 fn get_state_event_by_id(
471 &self,
472 room_id: &[u8],
473 event_id: &[u8],
474 ) -> rusqlite::Result<Option<Vec<u8>>>;
475 fn remove_room_state_events(
476 &self,
477 room_id: &[u8],
478 stripped: Option<bool>,
479 ) -> rusqlite::Result<()>;
480
481 fn set_member(
482 &self,
483 room_id: &[u8],
484 user_id: &[u8],
485 membership: &[u8],
486 stripped: bool,
487 data: &[u8],
488 ) -> rusqlite::Result<()>;
489 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
490
491 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
492 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
493 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
494
495 fn set_receipt(
496 &self,
497 room_id: &[u8],
498 user_id: &[u8],
499 receipt_type: &[u8],
500 thread_id: &[u8],
501 event_id: &[u8],
502 data: &[u8],
503 ) -> rusqlite::Result<()>;
504 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
505
506 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
507 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
508 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
509 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
510 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
511}
512
513impl SqliteConnectionStateStoreExt for rusqlite::Connection {
514 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
515 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
516 Ok(())
517 }
518
519 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
520 self.prepare_cached(
521 "INSERT OR REPLACE INTO global_account_data (event_type, data)
522 VALUES (?, ?)",
523 )?
524 .execute((event_type, data))?;
525 Ok(())
526 }
527
528 fn set_room_account_data(
529 &self,
530 room_id: &[u8],
531 event_type: &[u8],
532 data: &[u8],
533 ) -> rusqlite::Result<()> {
534 self.prepare_cached(
535 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
536 VALUES (?, ?, ?)",
537 )?
538 .execute((room_id, event_type, data))?;
539 Ok(())
540 }
541
542 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
543 self.prepare(
544 "DELETE FROM room_account_data
545 WHERE room_id = ?",
546 )?
547 .execute((room_id,))?;
548 Ok(())
549 }
550
551 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
552 self.prepare_cached(
553 "INSERT OR REPLACE INTO room_info (room_id, state, data)
554 VALUES (?, ?, ?)",
555 )?
556 .execute((room_id, state, data))?;
557 Ok(())
558 }
559
560 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
561 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
562 .optional()
563 }
564
565 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
567 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
568 Ok(())
569 }
570
571 fn set_state_event(
572 &self,
573 room_id: &[u8],
574 event_type: &[u8],
575 state_key: &[u8],
576 stripped: bool,
577 event_id: Option<&[u8]>,
578 data: &[u8],
579 ) -> rusqlite::Result<()> {
580 self.prepare_cached(
581 "INSERT OR REPLACE
582 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
583 VALUES (?, ?, ?, ?, ?, ?)",
584 )?
585 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
586 Ok(())
587 }
588
589 fn get_state_event_by_id(
590 &self,
591 room_id: &[u8],
592 event_id: &[u8],
593 ) -> rusqlite::Result<Option<Vec<u8>>> {
594 self.query_row(
595 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
596 (room_id, event_id),
597 |row| row.get(0),
598 )
599 .optional()
600 }
601
602 fn remove_room_state_events(
608 &self,
609 room_id: &[u8],
610 stripped: Option<bool>,
611 ) -> rusqlite::Result<()> {
612 if let Some(stripped) = stripped {
613 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
614 .execute((room_id, stripped))?;
615 } else {
616 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
617 .execute((room_id,))?;
618 }
619 Ok(())
620 }
621
622 fn set_member(
623 &self,
624 room_id: &[u8],
625 user_id: &[u8],
626 membership: &[u8],
627 stripped: bool,
628 data: &[u8],
629 ) -> rusqlite::Result<()> {
630 self.prepare_cached(
631 "INSERT OR REPLACE
632 INTO member (room_id, user_id, membership, stripped, data)
633 VALUES (?, ?, ?, ?, ?)",
634 )?
635 .execute((room_id, user_id, membership, stripped, data))?;
636 Ok(())
637 }
638
639 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
644 if let Some(stripped) = stripped {
645 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
646 .execute((room_id, stripped))?;
647 } else {
648 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
649 }
650 Ok(())
651 }
652
653 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
654 self.prepare_cached(
655 "INSERT OR REPLACE
656 INTO profile (room_id, user_id, data)
657 VALUES (?, ?, ?)",
658 )?
659 .execute((room_id, user_id, data))?;
660 Ok(())
661 }
662
663 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
664 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
665 Ok(())
666 }
667
668 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
669 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
670 .execute((room_id, user_id))?;
671 Ok(())
672 }
673
674 fn set_receipt(
675 &self,
676 room_id: &[u8],
677 user_id: &[u8],
678 receipt_type: &[u8],
679 thread: &[u8],
680 event_id: &[u8],
681 data: &[u8],
682 ) -> rusqlite::Result<()> {
683 self.prepare_cached(
684 "INSERT OR REPLACE
685 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
686 VALUES (?, ?, ?, ?, ?, ?)",
687 )?
688 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
689 Ok(())
690 }
691
692 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
693 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
694 Ok(())
695 }
696
697 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
698 self.prepare_cached(
699 "INSERT OR REPLACE
700 INTO display_name (room_id, name, data)
701 VALUES (?, ?, ?)",
702 )?
703 .execute((room_id, name, data))?;
704 Ok(())
705 }
706
707 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
708 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
709 .execute((room_id, name))?;
710 Ok(())
711 }
712
713 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
714 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
715 Ok(())
716 }
717
718 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
719 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
720 Ok(())
721 }
722
723 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
724 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
725 .execute((room_id,))?;
726 Ok(())
727 }
728}
729
730#[async_trait]
731trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
732 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
733 Ok(self
734 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
735 .await
736 .optional()?)
737 }
738
739 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
740 let keys_length = keys.len();
741
742 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
743 let sql_params = repeat_vars(keys.len());
744 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
745
746 let params = rusqlite::params_from_iter(keys);
747
748 Ok(txn
749 .prepare(&sql)?
750 .query(params)?
751 .mapped(|row| row.get(0))
752 .collect::<Result<_, _>>()?)
753 })
754 .await
755 }
756
757 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
758
759 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
760 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
761 Ok(())
762 }
763
764 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
765 Ok(match room_id {
766 None => {
767 self.prepare("SELECT data FROM room_info", move |mut stmt| {
768 stmt.query_map((), |row| row.get(0))?.collect()
769 })
770 .await?
771 }
772
773 Some(room_id) => {
774 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
775 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
776 })
777 .await?
778 }
779 })
780 }
781
782 async fn get_maybe_stripped_state_events_for_keys(
783 &self,
784 room_id: Key,
785 event_type: Key,
786 state_keys: Vec<Key>,
787 ) -> Result<Vec<(bool, Vec<u8>)>> {
788 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
789 let sql_params = repeat_vars(state_keys.len());
790 let sql = format!(
791 "SELECT stripped, data FROM state_event
792 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
793 );
794
795 let params = rusqlite::params_from_iter(
796 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
797 );
798
799 Ok(txn
800 .prepare(&sql)?
801 .query(params)?
802 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
803 .collect::<Result<_, _>>()?)
804 })
805 .await
806 }
807
808 async fn get_maybe_stripped_state_events(
809 &self,
810 room_id: Key,
811 event_type: Key,
812 ) -> Result<Vec<(bool, Vec<u8>)>> {
813 Ok(self
814 .prepare(
815 "SELECT stripped, data FROM state_event
816 WHERE room_id = ? AND event_type = ?",
817 |mut stmt| {
818 stmt.query((room_id, event_type))?
819 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
820 .collect()
821 },
822 )
823 .await?)
824 }
825
826 async fn get_profiles(
827 &self,
828 room_id: Key,
829 user_ids: Vec<Key>,
830 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
831 let user_ids_length = user_ids.len();
832
833 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
834 let sql_params = repeat_vars(user_ids.len());
835 let sql = format!(
836 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
837 );
838
839 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
840
841 Ok(txn
842 .prepare(&sql)?
843 .query(params)?
844 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
845 .collect::<Result<_, _>>()?)
846 })
847 .await
848 }
849
850 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
851 let res = if memberships.is_empty() {
852 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
853 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
854 })
855 .await?
856 } else {
857 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
858 let sql_params = repeat_vars(memberships.len());
859 let sql = format!(
860 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
861 );
862
863 let params =
864 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
865
866 Ok(txn
867 .prepare(&sql)?
868 .query(params)?
869 .mapped(|row| row.get(0))
870 .collect::<Result<_, _>>()?)
871 })
872 .await?
873 };
874
875 Ok(res)
876 }
877
878 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
879 Ok(self
880 .query_row(
881 "SELECT data FROM global_account_data WHERE event_type = ?",
882 (event_type,),
883 |row| row.get(0),
884 )
885 .await
886 .optional()?)
887 }
888
889 async fn get_room_account_data(
890 &self,
891 room_id: Key,
892 event_type: Key,
893 ) -> Result<Option<Vec<u8>>> {
894 Ok(self
895 .query_row(
896 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
897 (room_id, event_type),
898 |row| row.get(0),
899 )
900 .await
901 .optional()?)
902 }
903
904 async fn get_display_names(
905 &self,
906 room_id: Key,
907 names: Vec<Key>,
908 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
909 let names_length = names.len();
910
911 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
912 let sql_params = repeat_vars(names.len());
913 let sql = format!(
914 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
915 );
916
917 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
918
919 Ok(txn
920 .prepare(&sql)?
921 .query(params)?
922 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
923 .collect::<Result<_, _>>()?)
924 })
925 .await
926 }
927
928 async fn get_user_receipt(
929 &self,
930 room_id: Key,
931 receipt_type: Key,
932 thread: Key,
933 user_id: Key,
934 ) -> Result<Option<Vec<u8>>> {
935 Ok(self
936 .query_row(
937 "SELECT data FROM receipt
938 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
939 (room_id, receipt_type, thread, user_id),
940 |row| row.get(0),
941 )
942 .await
943 .optional()?)
944 }
945
946 async fn get_event_receipts(
947 &self,
948 room_id: Key,
949 receipt_type: Key,
950 thread: Key,
951 event_id: Key,
952 ) -> Result<Vec<Vec<u8>>> {
953 Ok(self
954 .prepare(
955 "SELECT data FROM receipt
956 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
957 |mut stmt| {
958 stmt.query((room_id, receipt_type, thread, event_id))?
959 .mapped(|row| row.get(0))
960 .collect()
961 },
962 )
963 .await?)
964 }
965}
966
967#[async_trait]
968impl SqliteObjectStateStoreExt for SqliteAsyncConn {
969 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
970 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
971 }
972}
973
974#[async_trait]
975impl StateStore for SqliteStateStore {
976 type Error = Error;
977
978 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
979 self.acquire()
980 .await?
981 .get_kv_blob(self.encode_state_store_data_key(key))
982 .await?
983 .map(|data| {
984 Ok(match key {
985 StateStoreDataKey::SyncToken => {
986 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
987 }
988 StateStoreDataKey::ServerInfo => {
989 StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
990 }
991 StateStoreDataKey::Filter(_) => {
992 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
993 }
994 StateStoreDataKey::UserAvatarUrl(_) => {
995 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
996 }
997 StateStoreDataKey::RecentlyVisitedRooms(_) => {
998 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
999 }
1000 StateStoreDataKey::UtdHookManagerData => {
1001 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1002 }
1003 StateStoreDataKey::ComposerDraft(_, _) => {
1004 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1005 }
1006 StateStoreDataKey::SeenKnockRequests(_) => {
1007 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1008 }
1009 })
1010 })
1011 .transpose()
1012 }
1013
1014 async fn set_kv_data(
1015 &self,
1016 key: StateStoreDataKey<'_>,
1017 value: StateStoreDataValue,
1018 ) -> Result<()> {
1019 let serialized_value = match key {
1020 StateStoreDataKey::SyncToken => self.serialize_value(
1021 &value.into_sync_token().expect("Session data not a sync token"),
1022 )?,
1023 StateStoreDataKey::ServerInfo => self.serialize_value(
1024 &value.into_server_info().expect("Session data not containing server info"),
1025 )?,
1026 StateStoreDataKey::Filter(_) => {
1027 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1028 }
1029 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1030 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1031 )?,
1032 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1033 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1034 )?,
1035 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1036 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1037 )?,
1038 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1039 &value.into_composer_draft().expect("Session data not a composer draft"),
1040 )?,
1041 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1042 &value
1043 .into_seen_knock_requests()
1044 .expect("Session data is not a set of seen knock request ids"),
1045 )?,
1046 };
1047
1048 self.acquire()
1049 .await?
1050 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1051 .await
1052 }
1053
1054 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1055 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1056 }
1057
1058 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1059 let changes = changes.to_owned();
1060 let this = self.clone();
1061 self.acquire()
1062 .await?
1063 .with_transaction(move |txn| {
1064 let StateChanges {
1065 sync_token,
1066 account_data,
1067 presence,
1068 profiles,
1069 profiles_to_delete,
1070 state,
1071 room_account_data,
1072 room_infos,
1073 receipts,
1074 redactions,
1075 stripped_state,
1076 ambiguity_maps,
1077 } = changes;
1078
1079 if let Some(sync_token) = sync_token {
1080 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1081 let value = this.serialize_value(&sync_token)?;
1082 txn.set_kv_blob(&key, &value)?;
1083 }
1084
1085 for (event_type, event) in account_data {
1086 let event_type =
1087 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1088 let data = this.serialize_json(&event)?;
1089 txn.set_global_account_data(&event_type, &data)?;
1090 }
1091
1092 for (room_id, events) in room_account_data {
1093 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1094 for (event_type, event) in events {
1095 let event_type =
1096 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1097 let data = this.serialize_json(&event)?;
1098 txn.set_room_account_data(&room_id, &event_type, &data)?;
1099 }
1100 }
1101
1102 for (user_id, event) in presence {
1103 let key = this.encode_presence_key(&user_id);
1104 let value = this.serialize_json(&event)?;
1105 txn.set_kv_blob(&key, &value)?;
1106 }
1107
1108 for (room_id, room_info) in room_infos {
1109 let stripped = room_info.state() == RoomState::Invited;
1110 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1112
1113 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1114 let state = this
1115 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1116 let data = this.serialize_json(&room_info)?;
1117 txn.set_room_info(&room_id, &state, &data)?;
1118 }
1119
1120 for (room_id, user_ids) in profiles_to_delete {
1121 let room_id = this.encode_key(keys::PROFILE, room_id);
1122 for user_id in user_ids {
1123 let user_id = this.encode_key(keys::PROFILE, user_id);
1124 txn.remove_room_profile(&room_id, &user_id)?;
1125 }
1126 }
1127
1128 for (room_id, state_event_types) in state {
1129 let profiles = profiles.get(&room_id);
1130 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1131
1132 for (event_type, state_events) in state_event_types {
1133 let encoded_event_type =
1134 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1135
1136 for (state_key, raw_state_event) in state_events {
1137 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1138 let data = this.serialize_json(&raw_state_event)?;
1139
1140 let event_id: Option<String> =
1141 raw_state_event.get_field("event_id").ok().flatten();
1142 let encoded_event_id =
1143 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1144
1145 txn.set_state_event(
1146 &encoded_room_id,
1147 &encoded_event_type,
1148 &encoded_state_key,
1149 false,
1150 encoded_event_id.as_deref(),
1151 &data,
1152 )?;
1153
1154 if event_type == StateEventType::RoomMember {
1155 let member_event = match raw_state_event
1156 .deserialize_as::<SyncRoomMemberEvent>()
1157 {
1158 Ok(ev) => ev,
1159 Err(e) => {
1160 debug!(event_id, "Failed to deserialize member event: {e}");
1161 continue;
1162 }
1163 };
1164
1165 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1166 let user_id = this.encode_key(keys::MEMBER, &state_key);
1167 let membership = this
1168 .encode_key(keys::MEMBER, member_event.membership().as_str());
1169 let data = this.serialize_value(&state_key)?;
1170
1171 txn.set_member(
1172 &encoded_room_id,
1173 &user_id,
1174 &membership,
1175 false,
1176 &data,
1177 )?;
1178
1179 if let Some(profile) =
1180 profiles.and_then(|p| p.get(member_event.state_key()))
1181 {
1182 let room_id = this.encode_key(keys::PROFILE, &room_id);
1183 let user_id = this.encode_key(keys::PROFILE, &state_key);
1184 let data = this.serialize_json(&profile)?;
1185 txn.set_profile(&room_id, &user_id, &data)?;
1186 }
1187 }
1188 }
1189 }
1190 }
1191
1192 for (room_id, stripped_state_event_types) in stripped_state {
1193 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1194
1195 for (event_type, stripped_state_events) in stripped_state_event_types {
1196 let encoded_event_type =
1197 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1198
1199 for (state_key, raw_stripped_state_event) in stripped_state_events {
1200 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1201 let data = this.serialize_json(&raw_stripped_state_event)?;
1202 txn.set_state_event(
1203 &encoded_room_id,
1204 &encoded_event_type,
1205 &encoded_state_key,
1206 true,
1207 None,
1208 &data,
1209 )?;
1210
1211 if event_type == StateEventType::RoomMember {
1212 let member_event = match raw_stripped_state_event
1213 .deserialize_as::<StrippedRoomMemberEvent>(
1214 ) {
1215 Ok(ev) => ev,
1216 Err(e) => {
1217 debug!("Failed to deserialize stripped member event: {e}");
1218 continue;
1219 }
1220 };
1221
1222 let room_id = this.encode_key(keys::MEMBER, &room_id);
1223 let user_id = this.encode_key(keys::MEMBER, &state_key);
1224 let membership = this.encode_key(
1225 keys::MEMBER,
1226 member_event.content.membership.as_str(),
1227 );
1228 let data = this.serialize_value(&state_key)?;
1229
1230 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1231 }
1232 }
1233 }
1234 }
1235
1236 for (room_id, receipt_event) in receipts {
1237 let room_id = this.encode_key(keys::RECEIPT, room_id);
1238
1239 for (event_id, receipt_types) in receipt_event {
1240 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1241
1242 for (receipt_type, receipt_users) in receipt_types {
1243 let receipt_type =
1244 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1245
1246 for (user_id, receipt) in receipt_users {
1247 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1248 let thread = this.encode_key(
1251 keys::RECEIPT,
1252 rmp_serde::to_vec_named(&receipt.thread)?,
1253 );
1254 let data = this.serialize_json(&ReceiptData {
1255 receipt,
1256 event_id: event_id.clone(),
1257 user_id,
1258 })?;
1259
1260 txn.set_receipt(
1261 &room_id,
1262 &encoded_user_id,
1263 &receipt_type,
1264 &thread,
1265 &encoded_event_id,
1266 &data,
1267 )?;
1268 }
1269 }
1270 }
1271 }
1272
1273 for (room_id, redactions) in redactions {
1274 let make_room_version = || {
1275 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1276 txn.get_room_info(&encoded_room_id)
1277 .ok()
1278 .flatten()
1279 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1280 .map(|info| info.room_version_or_default())
1281 .unwrap_or_else(|| {
1282 warn!(
1283 ?room_id,
1284 "Unable to find the room version, assuming {ROOM_VERSION_FALLBACK}"
1285 );
1286 ROOM_VERSION_FALLBACK
1287 })
1288 };
1289
1290 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1291 let mut room_version = None;
1292
1293 for (event_id, redaction) in redactions {
1294 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1295
1296 if let Some(Ok(raw_event)) = txn
1297 .get_state_event_by_id(&encoded_room_id, &event_id)?
1298 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1299 {
1300 let event = raw_event.deserialize()?;
1301 let redacted = redact(
1302 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1303 room_version.get_or_insert_with(make_room_version),
1304 Some(RedactedBecause::from_raw_event(&redaction)?),
1305 )
1306 .map_err(Error::Redaction)?;
1307 let data = this.serialize_json(&redacted)?;
1308
1309 let event_type =
1310 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1311 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1312
1313 txn.set_state_event(
1314 &encoded_room_id,
1315 &event_type,
1316 &state_key,
1317 false,
1318 Some(&event_id),
1319 &data,
1320 )?;
1321 }
1322 }
1323 }
1324
1325 for (room_id, display_names) in ambiguity_maps {
1326 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1327
1328 for (name, user_ids) in display_names {
1329 let encoded_name = this.encode_key(
1330 keys::DISPLAY_NAME,
1331 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1332 );
1333 let data = this.serialize_json(&user_ids)?;
1334
1335 if user_ids.is_empty() {
1336 txn.remove_display_name(&room_id, &encoded_name)?;
1337
1338 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1353 txn.remove_display_name(&room_id, &raw_name)?;
1354 } else {
1355 txn.set_display_name(&room_id, &encoded_name, &data)?;
1357 }
1358 }
1359 }
1360
1361 Ok::<_, Error>(())
1362 })
1363 .await?;
1364
1365 Ok(())
1366 }
1367
1368 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1369 self.acquire()
1370 .await?
1371 .get_kv_blob(self.encode_presence_key(user_id))
1372 .await?
1373 .map(|data| self.deserialize_json(&data))
1374 .transpose()
1375 }
1376
1377 async fn get_presence_events(
1378 &self,
1379 user_ids: &[OwnedUserId],
1380 ) -> Result<Vec<Raw<PresenceEvent>>> {
1381 if user_ids.is_empty() {
1382 return Ok(Vec::new());
1383 }
1384
1385 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1386 self.acquire()
1387 .await?
1388 .get_kv_blobs(user_ids)
1389 .await?
1390 .into_iter()
1391 .map(|data| self.deserialize_json(&data))
1392 .collect()
1393 }
1394
1395 async fn get_state_event(
1396 &self,
1397 room_id: &RoomId,
1398 event_type: StateEventType,
1399 state_key: &str,
1400 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1401 Ok(self
1402 .get_state_events_for_keys(room_id, event_type, &[state_key])
1403 .await?
1404 .into_iter()
1405 .next())
1406 }
1407
1408 async fn get_state_events(
1409 &self,
1410 room_id: &RoomId,
1411 event_type: StateEventType,
1412 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1413 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1414 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1415 self.acquire()
1416 .await?
1417 .get_maybe_stripped_state_events(room_id, event_type)
1418 .await?
1419 .into_iter()
1420 .map(|(stripped, data)| {
1421 let ev = if stripped {
1422 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1423 } else {
1424 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1425 };
1426
1427 Ok(ev)
1428 })
1429 .collect()
1430 }
1431
1432 async fn get_state_events_for_keys(
1433 &self,
1434 room_id: &RoomId,
1435 event_type: StateEventType,
1436 state_keys: &[&str],
1437 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1438 if state_keys.is_empty() {
1439 return Ok(Vec::new());
1440 }
1441
1442 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1443 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1444 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1445 self.acquire()
1446 .await?
1447 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1448 .await?
1449 .into_iter()
1450 .map(|(stripped, data)| {
1451 let ev = if stripped {
1452 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1453 } else {
1454 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1455 };
1456
1457 Ok(ev)
1458 })
1459 .collect()
1460 }
1461
1462 async fn get_profile(
1463 &self,
1464 room_id: &RoomId,
1465 user_id: &UserId,
1466 ) -> Result<Option<MinimalRoomMemberEvent>> {
1467 let room_id = self.encode_key(keys::PROFILE, room_id);
1468 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1469
1470 self.acquire()
1471 .await?
1472 .get_profiles(room_id, user_ids)
1473 .await?
1474 .into_iter()
1475 .next()
1476 .map(|(_, data)| self.deserialize_json(&data))
1477 .transpose()
1478 }
1479
1480 async fn get_profiles<'a>(
1481 &self,
1482 room_id: &RoomId,
1483 user_ids: &'a [OwnedUserId],
1484 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1485 if user_ids.is_empty() {
1486 return Ok(BTreeMap::new());
1487 }
1488
1489 let room_id = self.encode_key(keys::PROFILE, room_id);
1490 let mut user_ids_map = user_ids
1491 .iter()
1492 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1493 .collect::<BTreeMap<_, _>>();
1494 let user_ids = user_ids_map.keys().cloned().collect();
1495
1496 self.acquire()
1497 .await?
1498 .get_profiles(room_id, user_ids)
1499 .await?
1500 .into_iter()
1501 .map(|(user_id, data)| {
1502 Ok((
1503 user_ids_map
1504 .remove(user_id.as_slice())
1505 .expect("returned user IDs were requested"),
1506 self.deserialize_json(&data)?,
1507 ))
1508 })
1509 .collect()
1510 }
1511
1512 async fn get_user_ids(
1513 &self,
1514 room_id: &RoomId,
1515 membership: RoomMemberships,
1516 ) -> Result<Vec<OwnedUserId>> {
1517 let room_id = self.encode_key(keys::MEMBER, room_id);
1518 let memberships = membership
1519 .as_vec()
1520 .into_iter()
1521 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1522 .collect();
1523 self.acquire()
1524 .await?
1525 .get_user_ids(room_id, memberships)
1526 .await?
1527 .iter()
1528 .map(|data| self.deserialize_value(data))
1529 .collect()
1530 }
1531
1532 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1533 self.acquire()
1534 .await?
1535 .get_room_infos(match room_load_settings {
1536 RoomLoadSettings::All => None,
1537 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1538 })
1539 .await?
1540 .into_iter()
1541 .map(|data| self.deserialize_json(&data))
1542 .collect()
1543 }
1544
1545 async fn get_users_with_display_name(
1546 &self,
1547 room_id: &RoomId,
1548 display_name: &DisplayName,
1549 ) -> Result<BTreeSet<OwnedUserId>> {
1550 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1551 let names = vec![self.encode_key(
1552 keys::DISPLAY_NAME,
1553 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1554 )];
1555
1556 Ok(self
1557 .acquire()
1558 .await?
1559 .get_display_names(room_id, names)
1560 .await?
1561 .into_iter()
1562 .next()
1563 .map(|(_, data)| self.deserialize_json(&data))
1564 .transpose()?
1565 .unwrap_or_default())
1566 }
1567
1568 async fn get_users_with_display_names<'a>(
1569 &self,
1570 room_id: &RoomId,
1571 display_names: &'a [DisplayName],
1572 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1573 let mut result = HashMap::new();
1574
1575 if display_names.is_empty() {
1576 return Ok(result);
1577 }
1578
1579 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1580 let mut names_map = display_names
1581 .iter()
1582 .flat_map(|display_name| {
1583 let raw =
1593 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1594 let normalized = display_name.as_normalized_str().map(|normalized| {
1595 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1596 });
1597
1598 iter::once(raw).chain(normalized.into_iter())
1599 })
1600 .collect::<BTreeMap<_, _>>();
1601 let names = names_map.keys().cloned().collect();
1602
1603 for (name, data) in
1604 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1605 {
1606 let display_name =
1607 names_map.remove(name.as_slice()).expect("returned display names were requested");
1608 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1609
1610 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1611 }
1612
1613 Ok(result)
1614 }
1615
1616 async fn get_account_data_event(
1617 &self,
1618 event_type: GlobalAccountDataEventType,
1619 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1620 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1621 self.acquire()
1622 .await?
1623 .get_global_account_data(event_type)
1624 .await?
1625 .map(|value| self.deserialize_json(&value))
1626 .transpose()
1627 }
1628
1629 async fn get_room_account_data_event(
1630 &self,
1631 room_id: &RoomId,
1632 event_type: RoomAccountDataEventType,
1633 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1634 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1635 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1636 self.acquire()
1637 .await?
1638 .get_room_account_data(room_id, event_type)
1639 .await?
1640 .map(|value| self.deserialize_json(&value))
1641 .transpose()
1642 }
1643
1644 async fn get_user_room_receipt_event(
1645 &self,
1646 room_id: &RoomId,
1647 receipt_type: ReceiptType,
1648 thread: ReceiptThread,
1649 user_id: &UserId,
1650 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1651 let room_id = self.encode_key(keys::RECEIPT, room_id);
1652 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1653 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1656 let user_id = self.encode_key(keys::RECEIPT, user_id);
1657
1658 self.acquire()
1659 .await?
1660 .get_user_receipt(room_id, receipt_type, thread, user_id)
1661 .await?
1662 .map(|value| {
1663 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1664 })
1665 .transpose()
1666 }
1667
1668 async fn get_event_room_receipt_events(
1669 &self,
1670 room_id: &RoomId,
1671 receipt_type: ReceiptType,
1672 thread: ReceiptThread,
1673 event_id: &EventId,
1674 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1675 let room_id = self.encode_key(keys::RECEIPT, room_id);
1676 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1677 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1680 let event_id = self.encode_key(keys::RECEIPT, event_id);
1681
1682 self.acquire()
1683 .await?
1684 .get_event_receipts(room_id, receipt_type, thread, event_id)
1685 .await?
1686 .iter()
1687 .map(|value| {
1688 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1689 })
1690 .collect()
1691 }
1692
1693 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1694 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1695 }
1696
1697 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1698 let conn = self.acquire().await?;
1699 let key = self.encode_custom_key(key);
1700 conn.set_kv_blob(key, value).await?;
1701 Ok(())
1702 }
1703
1704 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1705 let conn = self.acquire().await?;
1706 let key = self.encode_custom_key(key);
1707 let previous = conn.get_kv_blob(key.clone()).await?;
1708 conn.set_kv_blob(key, value).await?;
1709 Ok(previous)
1710 }
1711
1712 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1713 let conn = self.acquire().await?;
1714 let key = self.encode_custom_key(key);
1715 let previous = conn.get_kv_blob(key.clone()).await?;
1716 if previous.is_some() {
1717 conn.delete_kv_blob(key).await?;
1718 }
1719 Ok(previous)
1720 }
1721
1722 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1723 let this = self.clone();
1724 let room_id = room_id.to_owned();
1725
1726 let conn = self.acquire().await?;
1727
1728 conn.with_transaction(move |txn| -> Result<()> {
1729 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1730 txn.remove_room_info(&room_info_room_id)?;
1731
1732 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1733 txn.remove_room_state_events(&state_event_room_id, None)?;
1734
1735 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1736 txn.remove_room_members(&member_room_id, None)?;
1737
1738 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1739 txn.remove_room_profiles(&profile_room_id)?;
1740
1741 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1742 txn.remove_room_account_data(&room_account_data_room_id)?;
1743
1744 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1745 txn.remove_room_receipts(&receipt_room_id)?;
1746
1747 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1748 txn.remove_room_display_names(&display_name_room_id)?;
1749
1750 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1751 txn.remove_room_send_queue(&send_queue_room_id)?;
1752
1753 let dependent_send_queue_room_id =
1754 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1755 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1756
1757 Ok(())
1758 })
1759 .await?;
1760
1761 conn.vacuum().await
1762 }
1763
1764 async fn save_send_queue_request(
1765 &self,
1766 room_id: &RoomId,
1767 transaction_id: OwnedTransactionId,
1768 created_at: MilliSecondsSinceUnixEpoch,
1769 content: QueuedRequestKind,
1770 priority: usize,
1771 ) -> Result<(), Self::Error> {
1772 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1773 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1774
1775 let content = self.serialize_json(&content)?;
1776 let created_at_ts: u64 = created_at.0.into();
1782 self.acquire()
1783 .await?
1784 .with_transaction(move |txn| {
1785 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1786 Ok(())
1787 })
1788 .await
1789 }
1790
1791 async fn update_send_queue_request(
1792 &self,
1793 room_id: &RoomId,
1794 transaction_id: &TransactionId,
1795 content: QueuedRequestKind,
1796 ) -> Result<bool, Self::Error> {
1797 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1798
1799 let content = self.serialize_json(&content)?;
1800 let transaction_id = transaction_id.to_string();
1803
1804 let num_updated = self.acquire()
1805 .await?
1806 .with_transaction(move |txn| {
1807 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1808 })
1809 .await?;
1810
1811 Ok(num_updated > 0)
1812 }
1813
1814 async fn remove_send_queue_request(
1815 &self,
1816 room_id: &RoomId,
1817 transaction_id: &TransactionId,
1818 ) -> Result<bool, Self::Error> {
1819 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1820
1821 let transaction_id = transaction_id.to_string();
1823
1824 let num_deleted = self
1825 .acquire()
1826 .await?
1827 .with_transaction(move |txn| {
1828 txn.prepare_cached(
1829 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1830 )?
1831 .execute((room_id, &transaction_id))
1832 })
1833 .await?;
1834
1835 Ok(num_deleted > 0)
1836 }
1837
1838 async fn load_send_queue_requests(
1839 &self,
1840 room_id: &RoomId,
1841 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1842 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1843
1844 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1848 .acquire()
1849 .await?
1850 .prepare(
1851 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1852 |mut stmt| {
1853 stmt.query((room_id,))?
1854 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1855 .collect()
1856 },
1857 )
1858 .await?;
1859
1860 let mut requests = Vec::with_capacity(res.len());
1861 for entry in res {
1862 let created_at = entry
1863 .4
1864 .and_then(UInt::new)
1865 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1866 requests.push(QueuedRequest {
1867 transaction_id: entry.0.into(),
1868 kind: self.deserialize_json(&entry.1)?,
1869 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1870 priority: entry.3,
1871 created_at,
1872 });
1873 }
1874
1875 Ok(requests)
1876 }
1877
1878 async fn update_send_queue_request_status(
1879 &self,
1880 room_id: &RoomId,
1881 transaction_id: &TransactionId,
1882 error: Option<QueueWedgeError>,
1883 ) -> Result<(), Self::Error> {
1884 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1885
1886 let transaction_id = transaction_id.to_string();
1888
1889 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1891
1892 self.acquire()
1893 .await?
1894 .with_transaction(move |txn| {
1895 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1896 Ok(())
1897 })
1898 .await
1899 }
1900
1901 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1902 let res: Vec<Vec<u8>> = self
1907 .acquire()
1908 .await?
1909 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1910 stmt.query(())?.mapped(|row| row.get(0)).collect()
1911 })
1912 .await?;
1913
1914 Ok(res
1917 .into_iter()
1918 .map(|entry| self.deserialize_value(&entry))
1919 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1920 .into_iter()
1921 .collect())
1922 }
1923
1924 async fn save_dependent_queued_request(
1925 &self,
1926 room_id: &RoomId,
1927 parent_txn_id: &TransactionId,
1928 own_txn_id: ChildTransactionId,
1929 created_at: MilliSecondsSinceUnixEpoch,
1930 content: DependentQueuedRequestKind,
1931 ) -> Result<()> {
1932 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1933 let content = self.serialize_json(&content)?;
1934
1935 let parent_txn_id = parent_txn_id.to_string();
1937 let own_txn_id = own_txn_id.to_string();
1938
1939 let created_at_ts: u64 = created_at.0.into();
1940 self.acquire()
1941 .await?
1942 .with_transaction(move |txn| {
1943 txn.prepare_cached(
1944 r#"INSERT INTO dependent_send_queue_events
1945 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1946 VALUES (?, ?, ?, ?, ?)"#,
1947 )?
1948 .execute((
1949 room_id,
1950 parent_txn_id,
1951 own_txn_id,
1952 content,
1953 created_at_ts,
1954 ))?;
1955 Ok(())
1956 })
1957 .await
1958 }
1959
1960 async fn update_dependent_queued_request(
1961 &self,
1962 room_id: &RoomId,
1963 own_transaction_id: &ChildTransactionId,
1964 new_content: DependentQueuedRequestKind,
1965 ) -> Result<bool> {
1966 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1967 let content = self.serialize_json(&new_content)?;
1968
1969 let own_txn_id = own_transaction_id.to_string();
1971
1972 let num_updated = self
1973 .acquire()
1974 .await?
1975 .with_transaction(move |txn| {
1976 txn.prepare_cached(
1977 r#"UPDATE dependent_send_queue_events
1978 SET content = ?
1979 WHERE own_transaction_id = ?
1980 AND room_id = ?"#,
1981 )?
1982 .execute((content, own_txn_id, room_id))
1983 })
1984 .await?;
1985
1986 if num_updated > 1 {
1987 return Err(Error::InconsistentUpdate);
1988 }
1989
1990 Ok(num_updated == 1)
1991 }
1992
1993 async fn mark_dependent_queued_requests_as_ready(
1994 &self,
1995 room_id: &RoomId,
1996 parent_txn_id: &TransactionId,
1997 parent_key: SentRequestKey,
1998 ) -> Result<usize> {
1999 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2000 let parent_key = self.serialize_value(&parent_key)?;
2001
2002 let parent_txn_id = parent_txn_id.to_string();
2004
2005 self.acquire()
2006 .await?
2007 .with_transaction(move |txn| {
2008 Ok(txn.prepare_cached(
2009 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2010 )?
2011 .execute((parent_key, parent_txn_id, room_id))?)
2012 })
2013 .await
2014 }
2015
2016 async fn remove_dependent_queued_request(
2017 &self,
2018 room_id: &RoomId,
2019 txn_id: &ChildTransactionId,
2020 ) -> Result<bool> {
2021 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2022
2023 let txn_id = txn_id.to_string();
2025
2026 let num_deleted = self
2027 .acquire()
2028 .await?
2029 .with_transaction(move |txn| {
2030 txn.prepare_cached(
2031 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2032 )?
2033 .execute((txn_id, room_id))
2034 })
2035 .await?;
2036
2037 Ok(num_deleted > 0)
2038 }
2039
2040 async fn load_dependent_queued_requests(
2041 &self,
2042 room_id: &RoomId,
2043 ) -> Result<Vec<DependentQueuedRequest>> {
2044 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2045
2046 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2048 .acquire()
2049 .await?
2050 .prepare(
2051 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2052 |mut stmt| {
2053 stmt.query((room_id,))?
2054 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2055 .collect()
2056 },
2057 )
2058 .await?;
2059
2060 let mut dependent_events = Vec::with_capacity(res.len());
2061 for entry in res {
2062 let created_at = entry
2063 .4
2064 .and_then(UInt::new)
2065 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2066 dependent_events.push(DependentQueuedRequest {
2067 own_transaction_id: entry.0.into(),
2068 parent_transaction_id: entry.1.into(),
2069 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2070 kind: self.deserialize_json(&entry.3)?,
2071 created_at,
2072 });
2073 }
2074
2075 Ok(dependent_events)
2076 }
2077}
2078
2079#[derive(Debug, Clone, Serialize, Deserialize)]
2080struct ReceiptData {
2081 receipt: Receipt,
2082 event_id: OwnedEventId,
2083 user_id: OwnedUserId,
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2089
2090 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2091 use once_cell::sync::Lazy;
2092 use tempfile::{tempdir, TempDir};
2093
2094 use super::SqliteStateStore;
2095
2096 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2097 static NUM: AtomicU32 = AtomicU32::new(0);
2098
2099 async fn get_store() -> Result<impl StateStore, StoreError> {
2100 let name = NUM.fetch_add(1, SeqCst).to_string();
2101 let tmpdir_path = TMP_DIR.path().join(name);
2102
2103 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2104
2105 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2106 }
2107
2108 statestore_integration_tests!();
2109}
2110
2111#[cfg(test)]
2112mod encrypted_tests {
2113 use std::{
2114 path::PathBuf,
2115 sync::atomic::{AtomicU32, Ordering::SeqCst},
2116 };
2117
2118 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2119 use matrix_sdk_test::async_test;
2120 use once_cell::sync::Lazy;
2121 use tempfile::{tempdir, TempDir};
2122
2123 use super::SqliteStateStore;
2124 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2125
2126 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2127 static NUM: AtomicU32 = AtomicU32::new(0);
2128
2129 fn new_state_store_workspace() -> PathBuf {
2130 let name = NUM.fetch_add(1, SeqCst).to_string();
2131 TMP_DIR.path().join(name)
2132 }
2133
2134 async fn get_store() -> Result<impl StateStore, StoreError> {
2135 let tmpdir_path = new_state_store_workspace();
2136
2137 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2138
2139 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2140 .await
2141 .unwrap())
2142 }
2143
2144 #[async_test]
2145 async fn test_pool_size() {
2146 let tmpdir_path = new_state_store_workspace();
2147 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2148
2149 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2150
2151 assert_eq!(store.pool.status().max_size, 42);
2152 }
2153
2154 #[async_test]
2155 async fn test_cache_size() {
2156 let tmpdir_path = new_state_store_workspace();
2157 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2158
2159 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2160
2161 let conn = store.pool.get().await.unwrap();
2162 let cache_size =
2163 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2164
2165 assert_eq!(cache_size, -(1500 / 1024));
2169 }
2170
2171 #[async_test]
2172 async fn test_journal_size_limit() {
2173 let tmpdir_path = new_state_store_workspace();
2174 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2175
2176 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2177
2178 let conn = store.pool.get().await.unwrap();
2179 let journal_size_limit = conn
2180 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2181 .await
2182 .unwrap();
2183
2184 assert_eq!(journal_size_limit, 1500);
2187 }
2188
2189 statestore_integration_tests!();
2190}
2191
2192#[cfg(test)]
2193mod migration_tests {
2194 use std::{
2195 path::{Path, PathBuf},
2196 sync::{
2197 atomic::{AtomicU32, Ordering::SeqCst},
2198 Arc,
2199 },
2200 };
2201
2202 use as_variant::as_variant;
2203 use deadpool_sqlite::Runtime;
2204 use matrix_sdk_base::{
2205 media::{MediaFormat, MediaRequestParameters},
2206 store::{
2207 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2208 SerializableEventContent,
2209 },
2210 sync::UnreadNotificationsCount,
2211 RoomState, StateStore,
2212 };
2213 use matrix_sdk_test::async_test;
2214 use once_cell::sync::Lazy;
2215 use ruma::{
2216 events::{
2217 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2218 StateEventType,
2219 },
2220 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2221 RoomId, TransactionId, UserId,
2222 };
2223 use rusqlite::Transaction;
2224 use serde::{Deserialize, Serialize};
2225 use serde_json::json;
2226 use tempfile::{tempdir, TempDir};
2227 use tokio::fs;
2228
2229 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2230 use crate::{
2231 error::{Error, Result},
2232 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2233 OpenStoreError,
2234 };
2235
2236 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2237 static NUM: AtomicU32 = AtomicU32::new(0);
2238 const SECRET: &str = "secret";
2239
2240 fn new_path() -> PathBuf {
2241 let name = NUM.fetch_add(1, SeqCst).to_string();
2242 TMP_DIR.path().join(name)
2243 }
2244
2245 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2246 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2247
2248 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2249 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2252 let conn = pool.get().await?;
2253
2254 init(&conn).await?;
2255
2256 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2257 let this = SqliteStateStore { store_cipher, pool };
2258 this.run_migrations(&conn, 1, Some(version)).await?;
2259
2260 Ok(this)
2261 }
2262
2263 fn room_info_v1_json(
2264 room_id: &RoomId,
2265 state: RoomState,
2266 name: Option<&str>,
2267 creator: Option<&UserId>,
2268 ) -> serde_json::Value {
2269 let name_content = match name {
2271 Some(name) => json!({ "name": name }),
2272 None => json!({ "name": null }),
2273 };
2274 let create_content = match creator {
2276 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2277 None => RoomCreateEventContent::new_v11(),
2278 };
2279
2280 json!({
2281 "room_id": room_id,
2282 "room_type": state,
2283 "notification_counts": UnreadNotificationsCount::default(),
2284 "summary": {
2285 "heroes": [],
2286 "joined_member_count": 0,
2287 "invited_member_count": 0,
2288 },
2289 "members_synced": false,
2290 "base_info": {
2291 "dm_targets": [],
2292 "max_power_level": 100,
2293 "name": {
2294 "Original": {
2295 "content": name_content,
2296 },
2297 },
2298 "create": {
2299 "Original": {
2300 "content": create_content,
2301 }
2302 }
2303 },
2304 })
2305 }
2306
2307 #[async_test]
2308 pub async fn test_migrating_v1_to_v2() {
2309 let path = new_path();
2310 {
2312 let db = create_fake_db(&path, 1).await.unwrap();
2313 let conn = db.pool.get().await.unwrap();
2314
2315 let this = db.clone();
2316 conn.with_transaction(move |txn| {
2317 for i in 0..5 {
2318 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2319 let (state, stripped) =
2320 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2321 let info = room_info_v1_json(&room_id, state, None, None);
2322
2323 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2324 let data = this.serialize_json(&info)?;
2325
2326 txn.prepare_cached(
2327 "INSERT INTO room_info (room_id, stripped, data)
2328 VALUES (?, ?, ?)",
2329 )?
2330 .execute((room_id, stripped, data))?;
2331 }
2332
2333 Result::<_, Error>::Ok(())
2334 })
2335 .await
2336 .unwrap();
2337 }
2338
2339 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2341
2342 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2344 }
2345
2346 fn add_room_v2(
2348 this: &SqliteStateStore,
2349 txn: &Transaction<'_>,
2350 room_id: &RoomId,
2351 name: Option<&str>,
2352 create_creator: Option<&UserId>,
2353 create_sender: Option<&UserId>,
2354 ) -> Result<(), Error> {
2355 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2356
2357 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2358 let encoded_state =
2359 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2360 let data = this.serialize_json(&room_info_json)?;
2361
2362 txn.prepare_cached(
2363 "INSERT INTO room_info (room_id, state, data)
2364 VALUES (?, ?, ?)",
2365 )?
2366 .execute((encoded_room_id, encoded_state, data))?;
2367
2368 let Some(create_sender) = create_sender else {
2370 return Ok(());
2371 };
2372
2373 let create_content = match create_creator {
2374 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2375 None => RoomCreateEventContent::new_v11(),
2376 };
2377
2378 let event_id = EventId::new(server_name!("dummy.local"));
2379 let create_event = json!({
2380 "content": create_content,
2381 "event_id": event_id,
2382 "sender": create_sender.to_owned(),
2383 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2384 "state_key": "",
2385 "type": "m.room.create",
2386 "unsigned": {},
2387 });
2388
2389 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2390 let encoded_event_type =
2391 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2392 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2393 let stripped = false;
2394 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2395 let data = this.serialize_json(&create_event)?;
2396
2397 txn.prepare_cached(
2398 "INSERT
2399 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2400 VALUES (?, ?, ?, ?, ?, ?)",
2401 )?
2402 .execute((
2403 encoded_room_id,
2404 encoded_event_type,
2405 encoded_state_key,
2406 stripped,
2407 encoded_event_id,
2408 data,
2409 ))?;
2410
2411 Ok(())
2412 }
2413
2414 #[async_test]
2415 pub async fn test_migrating_v2_to_v3() {
2416 let path = new_path();
2417
2418 let room_a_id = room_id!("!room_a:dummy.local");
2420 let room_a_name = "Room A";
2421 let room_a_creator = user_id!("@creator:dummy.local");
2422 let room_a_create_sender = user_id!("@sender:dummy.local");
2425
2426 let room_b_id = room_id!("!room_b:dummy.local");
2428
2429 let room_c_id = room_id!("!room_c:dummy.local");
2431 let room_c_create_sender = user_id!("@creator:dummy.local");
2432
2433 {
2435 let db = create_fake_db(&path, 2).await.unwrap();
2436 let conn = db.pool.get().await.unwrap();
2437
2438 let this = db.clone();
2439 conn.with_transaction(move |txn| {
2440 add_room_v2(
2441 &this,
2442 txn,
2443 room_a_id,
2444 Some(room_a_name),
2445 Some(room_a_creator),
2446 Some(room_a_create_sender),
2447 )?;
2448 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2449 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2450
2451 Result::<_, Error>::Ok(())
2452 })
2453 .await
2454 .unwrap();
2455 }
2456
2457 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2459
2460 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2462 assert_eq!(room_infos.len(), 3);
2463
2464 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2465 assert_eq!(room_a.name(), Some(room_a_name));
2466 assert_eq!(room_a.creator(), Some(room_a_create_sender));
2467
2468 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2469 assert_eq!(room_b.name(), None);
2470 assert_eq!(room_b.creator(), None);
2471
2472 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2473 assert_eq!(room_c.name(), None);
2474 assert_eq!(room_c.creator(), Some(room_c_create_sender));
2475 }
2476
2477 #[async_test]
2478 pub async fn test_migrating_v7_to_v9() {
2479 let path = new_path();
2480
2481 let room_id = room_id!("!room_a:dummy.local");
2482 let wedged_event_transaction_id = TransactionId::new();
2483 let local_event_transaction_id = TransactionId::new();
2484
2485 {
2487 let db = create_fake_db(&path, 7).await.unwrap();
2488 let conn = db.pool.get().await.unwrap();
2489
2490 let wedge_tx = wedged_event_transaction_id.clone();
2491 let local_tx = local_event_transaction_id.clone();
2492
2493 conn.with_transaction(move |txn| {
2494 add_dependent_send_queue_event_v7(
2495 &db,
2496 txn,
2497 room_id,
2498 &local_tx,
2499 ChildTransactionId::new(),
2500 DependentQueuedRequestKind::RedactEvent,
2501 )?;
2502 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2503 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2504 Result::<_, Error>::Ok(())
2505 })
2506 .await
2507 .unwrap();
2508 }
2509
2510 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2513
2514 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2515 assert!(requests.is_empty());
2516
2517 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2518 assert!(dependent_requests.is_empty());
2519 }
2520
2521 fn add_send_queue_event_v7(
2522 this: &SqliteStateStore,
2523 txn: &Transaction<'_>,
2524 transaction_id: &TransactionId,
2525 room_id: &RoomId,
2526 is_wedged: bool,
2527 ) -> Result<(), Error> {
2528 let content =
2529 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2530
2531 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2532 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2533
2534 let content = this.serialize_json(&content)?;
2535
2536 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2537 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2538
2539 Ok(())
2540 }
2541
2542 fn add_dependent_send_queue_event_v7(
2543 this: &SqliteStateStore,
2544 txn: &Transaction<'_>,
2545 room_id: &RoomId,
2546 parent_txn_id: &TransactionId,
2547 own_txn_id: ChildTransactionId,
2548 content: DependentQueuedRequestKind,
2549 ) -> Result<(), Error> {
2550 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2551
2552 let parent_txn_id = parent_txn_id.to_string();
2553 let own_txn_id = own_txn_id.to_string();
2554 let content = this.serialize_json(&content)?;
2555
2556 txn.prepare_cached(
2557 "INSERT INTO dependent_send_queue_events
2558 (room_id, parent_transaction_id, own_transaction_id, content)
2559 VALUES (?, ?, ?, ?)",
2560 )?
2561 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2562
2563 Ok(())
2564 }
2565
2566 #[derive(Clone, Debug, Serialize, Deserialize)]
2567 pub enum LegacyDependentQueuedRequestKind {
2568 UploadFileWithThumbnail {
2569 content_type: String,
2570 cache_key: MediaRequestParameters,
2571 related_to: OwnedTransactionId,
2572 },
2573 }
2574
2575 #[async_test]
2576 pub async fn test_dependent_queued_request_variant_renaming() {
2577 let path = new_path();
2578 let db = create_fake_db(&path, 7).await.unwrap();
2579
2580 let cache_key = MediaRequestParameters {
2581 format: MediaFormat::File,
2582 source: MediaSource::Plain("https://server.local/foobar".into()),
2583 };
2584 let related_to = TransactionId::new();
2585 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2586 content_type: "image/png".to_owned(),
2587 cache_key,
2588 related_to: related_to.clone(),
2589 };
2590
2591 let data = db
2592 .serialize_json(&request)
2593 .expect("should be able to serialize legacy dependent request");
2594 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2595 "should be able to deserialize dependent request from legacy dependent request",
2596 );
2597
2598 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2599 assert_eq!(de_related_to, related_to);
2600 });
2601 }
2602}