matrix_sdk_ui/sync_service.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Unified API for both the Room List API and the Encryption Sync API, that
16//! takes care of all the underlying details.
17//!
18//! This is an opiniated way to run both APIs, with high-level callbacks that
19//! should be called in reaction to user actions and/or system events.
20//!
21//! The sync service will signal errors via its [`state`](SyncService::state)
22//! that the user MUST observe. Whenever an error/termination is observed, the
23//! user should call [`SyncService::start()`] again to restart the room list
24//! sync, if that is not desirable, the offline support for the [`SyncService`]
25//! may be enabled using the [`SyncServiceBuilder::with_offline_mode`] setting.
26
27use std::{sync::Arc, time::Duration};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_util::{
31 future::{select, Either},
32 pin_mut, StreamExt as _,
33};
34use matrix_sdk::{
35 config::RequestConfig,
36 executor::{spawn, JoinHandle},
37 sleep::sleep,
38 Client,
39};
40use thiserror::Error;
41use tokio::sync::{
42 mpsc::{Receiver, Sender},
43 Mutex as AsyncMutex, OwnedMutexGuard,
44};
45use tracing::{error, info, instrument, trace, warn, Instrument, Level, Span};
46
47use crate::{
48 encryption_sync_service::{self, EncryptionSyncPermit, EncryptionSyncService, WithLocking},
49 room_list_service::{self, RoomListService},
50};
51
52/// Current state of the application.
53///
54/// This is a high-level state indicating what's the status of the underlying
55/// syncs. The application starts in [`State::Running`] mode, and then hits a
56/// terminal state [`State::Terminated`] (if it gracefully exited) or
57/// [`State::Error`] (in case any of the underlying syncs ran into an error).
58///
59/// This can be observed with [`SyncService::state`].
60#[derive(Clone, Debug, PartialEq)]
61pub enum State {
62 /// The service hasn't ever been started yet, or has been stopped.
63 Idle,
64 /// The underlying syncs are properly running in the background.
65 Running,
66 /// Any of the underlying syncs has terminated gracefully (i.e. be stopped).
67 Terminated,
68 /// Any of the underlying syncs has ran into an error.
69 Error,
70 /// The service has entered offline mode. This state will only be entered if
71 /// the [`SyncService`] has been built with the
72 /// [`SyncServiceBuilder::with_offline_mode`] setting.
73 ///
74 /// The [`SyncService`] will enter the offline mode if syncing with the
75 /// server fails, it will then periodically check if the server is
76 /// available using the `/_matrix/client/versions` endpoint.
77 ///
78 /// Once the [`SyncService`] receives a 200 response from the
79 /// `/_matrix/client/versions` endpoint, it will go back into the
80 /// [`State::Running`] mode and attempt to sync again.
81 ///
82 /// Calling [`SyncService::start()`] while in this state will abort the
83 /// `/_matrix/client/versions` checks and attempt to sync immediately.
84 ///
85 /// Calling [`SyncService::stop()`] will abort the offline mode and the
86 /// [`SyncService`] will go into the [`State::Idle`] mode.
87 Offline,
88}
89
90enum MaybeAcquiredPermit {
91 Acquired(OwnedMutexGuard<EncryptionSyncPermit>),
92 Unacquired(Arc<AsyncMutex<EncryptionSyncPermit>>),
93}
94
95impl MaybeAcquiredPermit {
96 async fn acquire(self) -> OwnedMutexGuard<EncryptionSyncPermit> {
97 match self {
98 MaybeAcquiredPermit::Acquired(owned_mutex_guard) => owned_mutex_guard,
99 MaybeAcquiredPermit::Unacquired(lock) => lock.lock_owned().await,
100 }
101 }
102}
103
104/// A supervisor responsible for managing two sync tasks: one for handling the
105/// room list and another for supporting end-to-end encryption.
106///
107/// The two sync tasks are spawned as child tasks and are contained within the
108/// supervising task, which is stored in the [`SyncTaskSupervisor::task`] field.
109///
110/// The supervisor ensures the two child tasks are managed as a single unit,
111/// allowing for them to be shutdown in unison.
112struct SyncTaskSupervisor {
113 /// The supervising task that manages and contains the two sync child tasks.
114 task: JoinHandle<()>,
115 /// [`TerminationReport`] sender for the [`SyncTaskSupervisor::shutdown()`]
116 /// function.
117 termination_sender: Sender<TerminationReport>,
118}
119
120impl SyncTaskSupervisor {
121 async fn new(
122 inner: &SyncServiceInner,
123 room_list_service: Arc<RoomListService>,
124 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
125 ) -> Self {
126 let (task, termination_sender) =
127 Self::spawn_supervisor_task(inner, room_list_service, encryption_sync_permit).await;
128
129 Self { task, termination_sender }
130 }
131
132 /// Check if a homeserver is reachable.
133 ///
134 /// This function handles the offline mode by waiting for either a
135 /// termination report or a successful `/_matrix/client/versions` response.
136 ///
137 /// This function waits for two conditions:
138 ///
139 /// 1. Waiting for a termination report: This ensures that the user can exit
140 /// offline mode and attempt to restart the [`SyncService`] manually.
141 ///
142 /// 2. Waiting to come back online: This continuously checks server
143 /// availability.
144 ///
145 /// If the `/_matrix/client/versions` request succeeds, the function exits
146 /// without a termination report. If we receive a [`TerminationReport`] from
147 /// the user, we exit immediately and return the termination report.
148 async fn offline_check(
149 client: &Client,
150 receiver: &mut Receiver<TerminationReport>,
151 ) -> Option<TerminationReport> {
152 info!("Entering the offline mode");
153
154 let wait_for_termination_report = async {
155 loop {
156 // Since we didn't empty the channel when entering the offline mode in fear that
157 // we might miss a report with the
158 // `TerminationOrigin::Supervisor` origin and the channel might contain stale
159 // reports from one of the sync services, in case both of them have sent a
160 // report, let's ignore all reports we receive from the sync
161 // services.
162 let report =
163 receiver.recv().await.unwrap_or_else(TerminationReport::supervisor_error);
164
165 match report.origin {
166 TerminationOrigin::EncryptionSync | TerminationOrigin::RoomList => {}
167 // Since the sync service aren't running anymore, we can only receive a report
168 // from the supervisor. It would have probably made sense to have separate
169 // channels for reports the sync services send and the user can send using the
170 // `SyncService::stop()` method.
171 TerminationOrigin::Supervisor => break report,
172 }
173 }
174 };
175
176 let wait_to_be_online = async move {
177 loop {
178 // Encountering network failures when sending a request which has with no retry
179 // limit set in the `RequestConfig` are treated as permanent failures and our
180 // exponential backoff doesn't kick in.
181 //
182 // Let's set a retry limit so network failures are retried as well.
183 let request_config = RequestConfig::default().retry_limit(5);
184
185 // We're in an infinite loop, but our request sending already has an exponential
186 // backoff set up. This will kick in for any request errors that we consider to
187 // be transient. Common network errors (timeouts, DNS failures) or any server
188 // error in the 5xx range of HTTP errors are considered to be transient.
189 //
190 // Still, as a precaution, we're going to sleep here for a while in the Error
191 // case.
192 match client.fetch_server_versions(Some(request_config)).await {
193 Ok(_) => break,
194 Err(_) => sleep(Duration::from_millis(100)).await,
195 }
196 }
197 };
198
199 pin_mut!(wait_for_termination_report);
200 pin_mut!(wait_to_be_online);
201
202 let maybe_termination_report = select(wait_for_termination_report, wait_to_be_online).await;
203
204 let report = match maybe_termination_report {
205 Either::Left((termination_report, _)) => Some(termination_report),
206 Either::Right((_, _)) => None,
207 };
208
209 info!("Exiting offline mode: {report:?}");
210
211 report
212 }
213
214 /// The role of the supervisor task is to wait for a termination message
215 /// ([`TerminationReport`]), sent either because we wanted to stop both
216 /// syncs, or because one of the syncs failed (in which case we'll stop the
217 /// other one too).
218 async fn spawn_supervisor_task(
219 inner: &SyncServiceInner,
220 room_list_service: Arc<RoomListService>,
221 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
222 ) -> (JoinHandle<()>, Sender<TerminationReport>) {
223 let (sender, mut receiver) = tokio::sync::mpsc::channel(16);
224
225 let encryption_sync = inner.encryption_sync_service.clone();
226 let state = inner.state.clone();
227 let termination_sender = sender.clone();
228
229 // When we first start, and don't use offline mode, we want to acquire the sync
230 // permit before we enter a future that might be polled at a later time,
231 // this means that the permit will be acquired as soon as this future,
232 // the one the `spawn_supervisor_task` function creates, is awaited.
233 //
234 // In other words, once `sync_service.start().await` is finished, the permit
235 // will be in the acquired state.
236 let mut sync_permit_guard =
237 MaybeAcquiredPermit::Acquired(encryption_sync_permit.clone().lock_owned().await);
238
239 let offline_mode = inner.with_offline_mode;
240 let parent_span = inner.parent_span.clone();
241
242 let future = async move {
243 loop {
244 let (room_list_task, encryption_sync_task) = Self::spawn_child_tasks(
245 room_list_service.clone(),
246 encryption_sync.clone(),
247 sync_permit_guard,
248 sender.clone(),
249 parent_span.clone(),
250 )
251 .await;
252
253 sync_permit_guard = MaybeAcquiredPermit::Unacquired(encryption_sync_permit.clone());
254
255 let report = if let Some(report) = receiver.recv().await {
256 report
257 } else {
258 info!("internal channel has been closed?");
259 // We should still stop the child tasks in the unlikely scenario that our
260 // receiver died.
261 TerminationReport::supervisor_error()
262 };
263
264 // If one service failed, make sure to request stopping the other one.
265 let (stop_room_list, stop_encryption) = match &report.origin {
266 TerminationOrigin::EncryptionSync => (true, false),
267 TerminationOrigin::RoomList => (false, true),
268 TerminationOrigin::Supervisor => (true, true),
269 };
270
271 // Stop both services, and wait for the streams to properly finish: at some
272 // point they'll return `None` and will exit their infinite loops, and their
273 // tasks will gracefully terminate.
274
275 if stop_room_list {
276 if let Err(err) = room_list_service.stop_sync() {
277 warn!(?report, "unable to stop room list service: {err:#}");
278 }
279
280 if report.has_expired {
281 room_list_service.expire_sync_session().await;
282 }
283 }
284
285 if let Err(err) = room_list_task.await {
286 error!("when awaiting room list service: {err:#}");
287 }
288
289 if stop_encryption {
290 if let Err(err) = encryption_sync.stop_sync() {
291 warn!(?report, "unable to stop encryption sync: {err:#}");
292 }
293
294 if report.has_expired {
295 encryption_sync.expire_sync_session().await;
296 }
297 }
298
299 if let Err(err) = encryption_sync_task.await {
300 error!("when awaiting encryption sync: {err:#}");
301 }
302
303 if report.is_error {
304 if offline_mode {
305 state.set(State::Offline);
306
307 let client = room_list_service.client();
308
309 if let Some(report) = Self::offline_check(client, &mut receiver).await {
310 if report.is_error {
311 state.set(State::Error);
312 } else {
313 state.set(State::Idle);
314 }
315 break;
316 }
317
318 state.set(State::Running);
319 } else {
320 state.set(State::Error);
321 break;
322 }
323 } else if matches!(report.origin, TerminationOrigin::Supervisor) {
324 state.set(State::Idle);
325 break;
326 } else {
327 state.set(State::Terminated);
328 break;
329 }
330 }
331 }
332 .instrument(tracing::span!(Level::WARN, "supervisor task"));
333
334 let task = spawn(future);
335
336 (task, termination_sender)
337 }
338
339 async fn spawn_child_tasks(
340 room_list_service: Arc<RoomListService>,
341 encryption_sync_service: Arc<EncryptionSyncService>,
342 sync_permit_guard: MaybeAcquiredPermit,
343 sender: Sender<TerminationReport>,
344 parent_span: Span,
345 ) -> (JoinHandle<()>, JoinHandle<()>) {
346 // First, take care of the room list.
347 let room_list_task = spawn(
348 Self::room_list_sync_task(room_list_service, sender.clone())
349 .instrument(parent_span.clone()),
350 );
351
352 // Then, take care of the encryption sync.
353 let encryption_sync_task = spawn(
354 Self::encryption_sync_task(
355 encryption_sync_service,
356 sender.clone(),
357 sync_permit_guard.acquire().await,
358 )
359 .instrument(parent_span),
360 );
361
362 (room_list_task, encryption_sync_task)
363 }
364
365 fn check_if_expired(err: &matrix_sdk::Error) -> bool {
366 err.client_api_error_kind() == Some(&ruma::api::client::error::ErrorKind::UnknownPos)
367 }
368
369 async fn encryption_sync_task(
370 encryption_sync: Arc<EncryptionSyncService>,
371 sender: Sender<TerminationReport>,
372 sync_permit_guard: OwnedMutexGuard<EncryptionSyncPermit>,
373 ) {
374 use encryption_sync_service::Error;
375
376 let encryption_sync_stream = encryption_sync.sync(sync_permit_guard);
377 pin_mut!(encryption_sync_stream);
378
379 let (is_error, has_expired) = loop {
380 match encryption_sync_stream.next().await {
381 Some(Ok(())) => {
382 // Carry on.
383 }
384 Some(Err(err)) => {
385 // If the encryption sync error was an expired session, also expire the
386 // room list sync.
387 let has_expired = if let Error::SlidingSync(err) = &err {
388 Self::check_if_expired(err)
389 } else {
390 false
391 };
392
393 if !has_expired {
394 error!("Error while processing encryption in sync service: {err:#}");
395 }
396
397 break (true, has_expired);
398 }
399 None => {
400 // The stream has ended.
401 break (false, false);
402 }
403 }
404 };
405
406 if let Err(err) = sender
407 .send(TerminationReport {
408 is_error,
409 has_expired,
410 origin: TerminationOrigin::EncryptionSync,
411 })
412 .await
413 {
414 error!("Error while sending termination report: {err:#}");
415 }
416 }
417
418 async fn room_list_sync_task(
419 room_list_service: Arc<RoomListService>,
420 sender: Sender<TerminationReport>,
421 ) {
422 use room_list_service::Error;
423
424 let room_list_stream = room_list_service.sync();
425 pin_mut!(room_list_stream);
426
427 let (is_error, has_expired) = loop {
428 match room_list_stream.next().await {
429 Some(Ok(())) => {
430 // Carry on.
431 }
432 Some(Err(err)) => {
433 // If the room list error was an expired session, also expire the
434 // encryption sync.
435 let has_expired = if let Error::SlidingSync(err) = &err {
436 Self::check_if_expired(err)
437 } else {
438 false
439 };
440
441 if !has_expired {
442 error!("Error while processing room list in sync service: {err:#}");
443 }
444
445 break (true, has_expired);
446 }
447 None => {
448 // The stream has ended.
449 break (false, false);
450 }
451 }
452 };
453
454 if let Err(err) = sender
455 .send(TerminationReport { is_error, has_expired, origin: TerminationOrigin::RoomList })
456 .await
457 {
458 error!("Error while sending termination report: {err:#}");
459 }
460 }
461
462 async fn shutdown(self) {
463 match self
464 .termination_sender
465 .send(TerminationReport {
466 is_error: false,
467 has_expired: false,
468 origin: TerminationOrigin::Supervisor,
469 })
470 .await
471 {
472 Ok(_) => {
473 let _ = self.task.await.inspect_err(|err| {
474 // A `JoinError` indicates that the task was already dead, either because it got
475 // cancelled or because it panicked. We only cancel the task in the Err branch
476 // below and the task shouldn't be able to panic.
477 //
478 // So let's log an error and return.
479 error!("The supervisor task has stopped unexpectedly: {err:?}");
480 });
481 }
482 Err(err) => {
483 error!("Couldn't send the termination report to the supervisor task: {err}");
484 // Let's abort the task if it won't shut down properly, otherwise we would have
485 // left it as a detached task.
486 self.task.abort();
487 }
488 }
489 }
490}
491
492struct SyncServiceInner {
493 encryption_sync_service: Arc<EncryptionSyncService>,
494
495 /// Is the offline mode for the [`SyncService`] enabled?
496 ///
497 /// The offline mode is described in the [`State::Offline`] enum variant.
498 with_offline_mode: bool,
499
500 state: SharedObservable<State>,
501
502 /// The parent tracing span to use for the tasks within this service.
503 ///
504 /// Normally this will be [`Span::none`], but it may be useful to assign a
505 /// defined span, for example if there is more than one active sync
506 /// service.
507 parent_span: Span,
508
509 /// Supervisor task ensuring proper termination.
510 ///
511 /// This task is waiting for a [`TerminationReport`] from any of the other
512 /// two tasks, or from a user request via [`SyncService::stop()`]. It
513 /// makes sure that the two services are properly shut up and just
514 /// interrupted.
515 ///
516 /// This is set at the same time as the other two tasks.
517 supervisor: Option<SyncTaskSupervisor>,
518}
519
520impl SyncServiceInner {
521 async fn start(
522 &mut self,
523 room_list_service: Arc<RoomListService>,
524 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
525 ) {
526 trace!("starting sync service");
527
528 self.supervisor =
529 Some(SyncTaskSupervisor::new(self, room_list_service, encryption_sync_permit).await);
530 self.state.set(State::Running);
531 }
532
533 async fn stop(&mut self) {
534 trace!("pausing sync service");
535
536 // Remove the supervisor from our state and request the tasks to be shutdown.
537 if let Some(supervisor) = self.supervisor.take() {
538 supervisor.shutdown().await;
539 } else {
540 error!("The sync service was not properly started, the supervisor task doesn't exist");
541 }
542 }
543
544 async fn restart(
545 &mut self,
546 room_list_service: Arc<RoomListService>,
547 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
548 ) {
549 self.stop().await;
550 self.start(room_list_service, encryption_sync_permit).await;
551 }
552}
553
554/// A high level manager for your Matrix syncing needs.
555///
556/// The [`SyncService`] is responsible for managing real-time synchronization
557/// with a Matrix server. It can initiate and maintain the necessary
558/// synchronization tasks for you.
559///
560/// **Note**: The [`SyncService`] requires a server with support for [MSC4186],
561/// otherwise it will fail with an 404 `M_UNRECOGNIZED` request error.
562///
563/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186/
564///
565/// # Example
566///
567/// ```no_run
568/// use matrix_sdk::Client;
569/// use matrix_sdk_ui::sync_service::{State, SyncService};
570/// # use url::Url;
571/// # async {
572/// let homeserver = Url::parse("http://example.com")?;
573/// let client = Client::new(homeserver).await?;
574///
575/// client
576/// .matrix_auth()
577/// .login_username("example", "wordpass")
578/// .initial_device_display_name("My bot")
579/// .await?;
580///
581/// let sync_service = SyncService::builder(client).build().await?;
582/// let mut state = sync_service.state();
583///
584/// while let Some(state) = state.next().await {
585/// match state {
586/// State::Idle => eprintln!("The sync service is idle."),
587/// State::Running => eprintln!("The sync has started to run."),
588/// State::Offline => eprintln!(
589/// "We have entered the offline mode, the server seems to be
590/// unavailable"
591/// ),
592/// State::Terminated => {
593/// eprintln!("The sync service has been gracefully terminated");
594/// break;
595/// }
596/// State::Error => {
597/// eprintln!("The sync service has run into an error");
598/// break;
599/// }
600/// }
601/// }
602/// # anyhow::Ok(()) };
603/// ```
604pub struct SyncService {
605 inner: Arc<AsyncMutex<SyncServiceInner>>,
606
607 /// Room list service used to synchronize the rooms state.
608 room_list_service: Arc<RoomListService>,
609
610 /// What's the state of this sync service? This field is replicated from the
611 /// [`SyncServiceInner`] struct, but it should not be modified in this
612 /// struct. It's re-exposed here so we can subscribe to the state without
613 /// taking the lock on the `inner` field.
614 state: SharedObservable<State>,
615
616 /// Global lock to allow using at most one [`EncryptionSyncService`] at all
617 /// times.
618 ///
619 /// This ensures that there's only one ever existing in the application's
620 /// lifetime (under the assumption that there is at most one [`SyncService`]
621 /// per application).
622 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
623}
624
625impl SyncService {
626 /// Create a new builder for configuring an `SyncService`.
627 pub fn builder(client: Client) -> SyncServiceBuilder {
628 SyncServiceBuilder::new(client)
629 }
630
631 /// Get the underlying `RoomListService` instance for easier access to its
632 /// methods.
633 pub fn room_list_service(&self) -> Arc<RoomListService> {
634 self.room_list_service.clone()
635 }
636
637 /// Returns the state of the sync service.
638 pub fn state(&self) -> Subscriber<State> {
639 self.state.subscribe()
640 }
641
642 /// Start (or restart) the underlying sliding syncs.
643 ///
644 /// This can be called multiple times safely:
645 /// - if the stream is still properly running, it won't be restarted.
646 /// - if the [`SyncService`] is in the offline mode we will exit the offline
647 /// mode and immediately attempt to sync again.
648 /// - if the stream has been aborted before, it will be properly cleaned up
649 /// and restarted.
650 pub async fn start(&self) {
651 let mut inner = self.inner.lock().await;
652
653 // Only (re)start the tasks if it's stopped or if we're in the offline mode.
654 match inner.state.get() {
655 // If we're already running, there's nothing to do.
656 State::Running => {}
657 // If we're in the offline mode, first stop the service and then start it again.
658 State::Offline => {
659 inner
660 .restart(self.room_list_service.clone(), self.encryption_sync_permit.clone())
661 .await
662 }
663 // Otherwise just start.
664 State::Idle | State::Terminated | State::Error => {
665 inner
666 .start(self.room_list_service.clone(), self.encryption_sync_permit.clone())
667 .await
668 }
669 }
670 }
671
672 /// Stop the underlying sliding syncs.
673 ///
674 /// This must be called when the app goes into the background. It's better
675 /// to call this API when the application exits, although not strictly
676 /// necessary.
677 #[instrument(skip_all)]
678 pub async fn stop(&self) {
679 let mut inner = self.inner.lock().await;
680
681 match inner.state.get() {
682 State::Idle | State::Terminated | State::Error => {
683 // No need to stop if we were not running.
684 return;
685 }
686 State::Running | State::Offline => {}
687 }
688
689 inner.stop().await;
690 }
691
692 /// Force expiring both sessions.
693 ///
694 /// This ensures that the sync service is stopped before expiring both
695 /// sessions. It should be used sparingly, as it will cause a restart of
696 /// the sessions on the server as well.
697 #[instrument(skip_all)]
698 pub async fn expire_sessions(&self) {
699 // First, stop the sync service if it was running; it's a no-op if it was
700 // already stopped.
701 self.stop().await;
702
703 // Expire the room list sync session.
704 self.room_list_service.expire_sync_session().await;
705
706 // Expire the encryption sync session.
707 self.inner.lock().await.encryption_sync_service.expire_sync_session().await;
708 }
709
710 /// Attempt to get a permit to use an `EncryptionSyncService` at a given
711 /// time.
712 ///
713 /// This ensures there is at most one [`EncryptionSyncService`] active at
714 /// any time, per application.
715 pub fn try_get_encryption_sync_permit(&self) -> Option<OwnedMutexGuard<EncryptionSyncPermit>> {
716 self.encryption_sync_permit.clone().try_lock_owned().ok()
717 }
718}
719
720#[derive(Debug)]
721enum TerminationOrigin {
722 EncryptionSync,
723 RoomList,
724 Supervisor,
725}
726
727#[derive(Debug)]
728struct TerminationReport {
729 is_error: bool,
730 has_expired: bool,
731 origin: TerminationOrigin,
732}
733
734impl TerminationReport {
735 fn supervisor_error() -> Self {
736 TerminationReport {
737 is_error: true,
738 has_expired: false,
739 origin: TerminationOrigin::Supervisor,
740 }
741 }
742}
743
744// Testing helpers, mostly.
745#[doc(hidden)]
746impl SyncService {
747 /// Is the task supervisor running?
748 pub async fn is_supervisor_running(&self) -> bool {
749 self.inner.lock().await.supervisor.is_some()
750 }
751}
752
753#[derive(Clone)]
754pub struct SyncServiceBuilder {
755 /// SDK client.
756 client: Client,
757
758 /// Is the cross-process lock for the crypto store enabled?
759 with_cross_process_lock: bool,
760
761 /// Is the offline mode for the [`SyncService`] enabled?
762 ///
763 /// The offline mode is described in the [`State::Offline`] enum variant.
764 with_offline_mode: bool,
765
766 /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
767 ///
768 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
769 with_share_pos: bool,
770
771 /// The parent tracing span to use for the tasks within this service.
772 ///
773 /// Normally this will be [`Span::none`], but it may be useful to assign a
774 /// defined span, for example if there is more than one active sync
775 /// service.
776 parent_span: Span,
777}
778
779impl SyncServiceBuilder {
780 fn new(client: Client) -> Self {
781 Self {
782 client,
783 with_cross_process_lock: false,
784 with_offline_mode: false,
785 with_share_pos: true,
786 parent_span: Span::none(),
787 }
788 }
789
790 /// Enables the cross-process lock, if the sync service is being built in a
791 /// multi-process setup.
792 ///
793 /// It's a prerequisite if another process can *also* process encryption
794 /// events. This is only applicable to very specific use cases, like an
795 /// external process attempting to decrypt notifications. In general,
796 /// `with_cross_process_lock` should not be called.
797 ///
798 /// Be sure to have configured
799 /// [`Client::cross_process_store_locks_holder_name`] accordingly.
800 pub fn with_cross_process_lock(mut self) -> Self {
801 self.with_cross_process_lock = true;
802 self
803 }
804
805 /// Enable the "offline" mode for the [`SyncService`].
806 ///
807 /// To learn more about the "offline" mode read the documentation for the
808 /// [`State::Offline`] enum variant.
809 pub fn with_offline_mode(mut self) -> Self {
810 self.with_offline_mode = true;
811 self
812 }
813
814 /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
815 ///
816 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
817 pub fn with_share_pos(mut self, enable: bool) -> Self {
818 self.with_share_pos = enable;
819 self
820 }
821
822 /// Set the parent tracing span to be used for the tasks within this
823 /// service.
824 pub fn with_parent_span(mut self, parent_span: Span) -> Self {
825 self.parent_span = parent_span;
826 self
827 }
828
829 /// Finish setting up the [`SyncService`].
830 ///
831 /// This creates the underlying sliding syncs, and will *not* start them in
832 /// the background. The resulting [`SyncService`] must be kept alive as long
833 /// as the sliding syncs are supposed to run.
834 pub async fn build(self) -> Result<SyncService, Error> {
835 let Self {
836 client,
837 with_cross_process_lock,
838 with_offline_mode,
839 with_share_pos,
840 parent_span,
841 } = self;
842
843 let encryption_sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
844
845 let room_list = RoomListService::new_with_share_pos(client.clone(), with_share_pos).await?;
846
847 let encryption_sync = Arc::new(
848 EncryptionSyncService::new(client, None, WithLocking::from(with_cross_process_lock))
849 .await?,
850 );
851
852 let room_list_service = Arc::new(room_list);
853 let state = SharedObservable::new(State::Idle);
854
855 Ok(SyncService {
856 state: state.clone(),
857 room_list_service,
858 encryption_sync_permit,
859 inner: Arc::new(AsyncMutex::new(SyncServiceInner {
860 supervisor: None,
861 encryption_sync_service: encryption_sync,
862 state,
863 with_offline_mode,
864 parent_span,
865 })),
866 })
867 }
868}
869
870/// Errors for the [`SyncService`] API.
871#[derive(Debug, Error)]
872pub enum Error {
873 /// An error received from the `RoomListService` API.
874 #[error(transparent)]
875 RoomList(#[from] room_list_service::Error),
876
877 /// An error received from the `EncryptionSyncService` API.
878 #[error(transparent)]
879 EncryptionSync(#[from] encryption_sync_service::Error),
880
881 /// An error had occurred in the sync task supervisor, likely due to a bug.
882 #[error("the supervisor channel has run into an unexpected error")]
883 InternalSupervisorError,
884}