3 Commits

Author SHA1 Message Date
c7cb73b360 fixed up the frontend display for the local messages
Some checks failed
Release / release (ubuntu-22.04) (push) Has been cancelled
Release / release (windows-latest) (push) Has been cancelled
2026-04-19 02:04:54 -04:00
3c3118c74d basic local message caching 2026-04-19 01:34:59 -04:00
ba6e158ee2 moved the AppState to lib.rs and refactored accordingly 2026-04-19 01:06:40 -04:00
9 changed files with 194 additions and 57 deletions

View File

@@ -1,6 +1,6 @@
{ {
"name": "oxyde", "name": "oxyde",
"version": "0.1.1", "version": "0.1.3",
"description": "A simple Tauri chat app, built with rust, vite, and surrealdb", "description": "A simple Tauri chat app, built with rust, vite, and surrealdb",
"type": "module", "type": "module",
"scripts": { "scripts": {

2
src-tauri/Cargo.lock generated
View File

@@ -3824,7 +3824,7 @@ dependencies = [
[[package]] [[package]]
name = "oxyde" name = "oxyde"
version = "0.1.1" version = "0.1.3"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"serde", "serde",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "oxyde" name = "oxyde"
version = "0.1.1" version = "0.1.3"
description = "A simple Tauri chat app, built with rust, vite, and surrealdb" description = "A simple Tauri chat app, built with rust, vite, and surrealdb"
authors = ["qdust41"] authors = ["qdust41"]
edition = "2021" edition = "2021"

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use futures_util::StreamExt; use futures_util::StreamExt;
use surrealdb::types::{RecordId, RecordIdKey}; use surrealdb::types::{RecordId, RecordIdKey};
@@ -6,13 +7,32 @@ use surrealdb::Notification;
use tauri::{AppHandle, Emitter, State}; use tauri::{AppHandle, Emitter, State};
use uuid::Uuid; use uuid::Uuid;
use crate::db::AppState;
use crate::error::{into_err, AppError}; use crate::error::{into_err, AppError};
use crate::models::{Message, MessageReaction, MessageReactionSummary, Room, User}; use crate::models::{Message, MessageReaction, MessageReactionSummary, Room, User};
use crate::AppState;
const DEFAULT_PAGE_SIZE: i64 = 50; const DEFAULT_PAGE_SIZE: i64 = 50;
const MAX_PAGE_SIZE: i64 = 100; const MAX_PAGE_SIZE: i64 = 100;
const MAX_MESSAGE_LEN: usize = 4000; const MAX_MESSAGE_LEN: usize = 4000;
const MAX_CACHED_ROOMS: usize = 5;
fn cache_put(
cache: &Arc<Mutex<HashMap<String, Vec<Message>>>>,
order: &Arc<Mutex<Vec<String>>>,
room_id: &str,
messages: Vec<Message>,
) {
let mut c = cache.lock().unwrap();
let mut o = order.lock().unwrap();
c.insert(room_id.to_string(), messages);
o.retain(|id| id != room_id);
o.insert(0, room_id.to_string());
while o.len() > MAX_CACHED_ROOMS {
if let Some(evicted) = o.pop() {
c.remove(&evicted);
}
}
}
const MAX_ROOM_NAME_LEN: usize = 80; const MAX_ROOM_NAME_LEN: usize = 80;
/// Wrapper emitted to the frontend for each LIVE query notification. /// Wrapper emitted to the frontend for each LIVE query notification.
@@ -427,7 +447,24 @@ pub async fn send_message(
.ok_or_else(|| into_err(AppError::NotFound("message after create".into()))) .ok_or_else(|| into_err(AppError::NotFound("message after create".into())))
} }
/// Return cached messages for a room without hitting the remote DB.
/// Returns an empty vec if the room has not been cached yet.
#[tauri::command]
pub async fn get_cached_messages(
state: State<'_, AppState>,
room_id: String,
) -> Result<Vec<Message>, String> {
Ok(state
.msg_cache
.lock()
.unwrap()
.get(&room_id)
.cloned()
.unwrap_or_default())
}
/// Fetch a bounded page of messages in a room, oldest first. /// Fetch a bounded page of messages in a room, oldest first.
/// Also updates the in-process message cache.
#[tauri::command] #[tauri::command]
pub async fn get_messages( pub async fn get_messages(
state: State<'_, AppState>, state: State<'_, AppState>,
@@ -451,11 +488,11 @@ pub async fn get_messages(
let mut response = state let mut response = state
.db .db
.query(query) .query(query)
.bind(("room_id", room_id)) .bind(("room_id", room_id.clone()))
.bind(("limit", limit)); .bind(("limit", limit));
if let Some(before) = before { if let Some(ref before) = before {
response = response.bind(("before", before)); response = response.bind(("before", before.clone()));
} }
let mut result: Vec<Message> = response let mut result: Vec<Message> = response
@@ -467,6 +504,18 @@ pub async fn get_messages(
result.reverse(); result.reverse();
let user = current_user(&state).await?; let user = current_user(&state).await?;
hydrate_reactions(&state, &user, &mut result).await?; hydrate_reactions(&state, &user, &mut result).await?;
if before.is_none() {
cache_put(&state.msg_cache, &state.cache_order, &room_id, result.clone());
} else {
let mut c = state.msg_cache.lock().unwrap();
if let Some(existing) = c.get_mut(&room_id) {
let mut merged = result.clone();
merged.extend_from_slice(existing);
*existing = merged;
}
}
Ok(result) Ok(result)
} }
@@ -563,7 +612,8 @@ pub async fn mark_room_read(state: State<'_, AppState>, room_id: String) -> Resu
} }
/// Start a LIVE query for new messages in a room. /// Start a LIVE query for new messages in a room.
/// Spawns a background tokio task that emits "chat:message" Tauri events. /// Spawns a background tokio task that emits "chat:message" Tauri events
/// and keeps the in-process message cache in sync.
/// ///
/// Returns a local subscription UUID — pass it to `unsubscribe_room` on cleanup. /// Returns a local subscription UUID — pass it to `unsubscribe_room` on cleanup.
/// Aborting the JoinHandle drops the stream, which closes the LIVE query automatically. /// Aborting the JoinHandle drops the stream, which closes the LIVE query automatically.
@@ -574,6 +624,9 @@ pub async fn subscribe_room(
room_id: String, room_id: String,
) -> Result<String, String> { ) -> Result<String, String> {
let db = state.db.clone(); let db = state.db.clone();
let msg_cache = Arc::clone(&state.msg_cache);
let cache_order = Arc::clone(&state.cache_order);
let room_id_cache = room_id.clone();
let mut stream = db let mut stream = db
.query("LIVE SELECT * FROM message WHERE room = type::record('room', $room_id)") .query("LIVE SELECT * FROM message WHERE room = type::record('room', $room_id)")
@@ -587,10 +640,39 @@ pub async fn subscribe_room(
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
while let Some(Ok(notification)) = stream.next().await { while let Some(Ok(notification)) = stream.next().await {
let action = format!("{:?}", notification.action);
let data = notification.data.clone();
{
let mut c = msg_cache.lock().unwrap();
let mut o = cache_order.lock().unwrap();
if let Some(msgs) = c.get_mut(&room_id_cache) {
match action.as_str() {
"Create" => msgs.push(data.clone()),
"Update" => {
if let Some(m) = msgs.iter_mut().find(|m| m.id == data.id) {
*m = data.clone();
}
}
"Delete" => msgs.retain(|m| m.id != data.id),
_ => {}
}
} else if action == "Create" {
c.insert(room_id_cache.clone(), vec![data.clone()]);
o.retain(|id| id != &room_id_cache);
o.insert(0, room_id_cache.clone());
while o.len() > MAX_CACHED_ROOMS {
if let Some(evicted) = o.pop() {
c.remove(&evicted);
}
}
}
}
let _ = app_handle.emit( let _ = app_handle.emit(
"chat:message", "chat:message",
&LiveMessageEvent { &LiveMessageEvent {
action: format!("{:?}", notification.action), action,
data: &notification.data, data: &notification.data,
}, },
); );

View File

@@ -1,9 +1,10 @@
use tauri::{AppHandle, State}; use tauri::{AppHandle, State};
use tauri_plugin_store::StoreExt; use tauri_plugin_store::StoreExt;
use crate::db::{AppState, SURREAL_ACCESS, SURREAL_DB, SURREAL_NS}; use crate::db::{SURREAL_ACCESS, SURREAL_DB, SURREAL_NS};
use crate::error::{into_err, AppError}; use crate::error::{into_err, AppError};
use crate::models::{Contact, User}; use crate::models::{Contact, User};
use crate::AppState;
const SESSION_STORE: &str = "session.json"; const SESSION_STORE: &str = "session.json";
const TOKEN_KEY: &str = "token"; const TOKEN_KEY: &str = "token";
@@ -86,7 +87,6 @@ pub async fn signup(
}; };
let token = state.db.signup(credentials).await.map_err(into_err)?; let token = state.db.signup(credentials).await.map_err(into_err)?;
let token_str = token.access.into_insecure_token(); let token_str = token.access.into_insecure_token();
*state.token.lock().unwrap() = Some(token_str.clone());
save_token(&app_handle, &token_str)?; save_token(&app_handle, &token_str)?;
let mut result: Vec<User> = state let mut result: Vec<User> = state
@@ -130,7 +130,6 @@ pub async fn signin(
.map_err(into_err)? .map_err(into_err)?
.access .access
.into_insecure_token(); .into_insecure_token();
*state.token.lock().unwrap() = Some(token_str.clone());
save_token(&app_handle, &token_str)?; save_token(&app_handle, &token_str)?;
Ok(token_str) Ok(token_str)
} }
@@ -139,7 +138,6 @@ pub async fn signin(
#[tauri::command] #[tauri::command]
pub async fn signout(state: State<'_, AppState>, app_handle: AppHandle) -> Result<(), String> { pub async fn signout(state: State<'_, AppState>, app_handle: AppHandle) -> Result<(), String> {
state.db.invalidate().await.map_err(into_err)?; state.db.invalidate().await.map_err(into_err)?;
*state.token.lock().unwrap() = None;
clear_token(&app_handle)?; clear_token(&app_handle)?;
Ok(()) Ok(())
} }
@@ -162,8 +160,6 @@ pub async fn restore_session(
.await .await
{ {
Ok(_) => { Ok(_) => {
*state.token.lock().unwrap() = Some(token_str);
let mut result: Vec<User> = state let mut result: Vec<User> = state
.db .db
.query("SELECT * FROM $auth") .query("SELECT * FROM $auth")
@@ -178,7 +174,6 @@ pub async fn restore_session(
} }
Err(_) => { Err(_) => {
let _ = clear_token(&app_handle); let _ = clear_token(&app_handle);
*state.token.lock().unwrap() = None;
Err(AppError::Auth("session expired, please sign in again".into()).to_string()) Err(AppError::Auth("session expired, please sign in again".into()).to_string())
} }
} }

View File

@@ -1,15 +1,11 @@
use std::collections::HashMap;
use std::env; use std::env;
use std::sync::{Arc, LazyLock, Mutex}; use std::sync::LazyLock;
use surrealdb::engine::remote::ws::{Client, Ws, Wss}; use surrealdb::engine::remote::ws::{Client, Ws, Wss};
use surrealdb::Surreal; use surrealdb::Surreal;
use tokio::task::JoinHandle;
use uuid::Uuid;
use crate::error::AppError; use crate::error::AppError;
// This should set the env variable correctly both during compile time and runtime (for development).
pub static SURREAL_URL: LazyLock<String> = LazyLock::new(|| { pub static SURREAL_URL: LazyLock<String> = LazyLock::new(|| {
option_env!("SURREAL_URL") option_env!("SURREAL_URL")
.map(str::to_string) .map(str::to_string)
@@ -33,19 +29,6 @@ pub static SURREAL_ACCESS: LazyLock<String> = LazyLock::new(|| {
.unwrap_or_else(|| env::var("SURREAL_ACCESS").unwrap_or_else(|_| "account".to_string())) .unwrap_or_else(|| env::var("SURREAL_ACCESS").unwrap_or_else(|_| "account".to_string()))
}); });
pub struct AppState {
/// Long-lived authenticated WebSocket connection to SurrealDB.
pub db: Arc<Surreal<Client>>,
/// JWT token from Record Auth signin. Used to re-authenticate on reconnect.
/// std::sync::Mutex is intentional: lock is acquired and released before any .await.
pub token: Mutex<Option<String>>,
/// Active LIVE query tasks keyed by their SurrealDB LIVE query UUID.
/// Abort a handle + KILL the query to clean up.
/// std::sync::Mutex is intentional: guards are never held across .await points.
/// If a future command needs to lock across .await, switch to tokio::sync::Mutex.
pub subscriptions: Mutex<HashMap<Uuid, JoinHandle<()>>>,
}
/// Connect to SurrealDB over WebSocket and select namespace/database. /// Connect to SurrealDB over WebSocket and select namespace/database.
/// URL may include protocol prefix: `ws://`, `wss://`, `http://`, or `https://`. /// URL may include protocol prefix: `ws://`, `wss://`, `http://`, or `https://`.
/// `wss://` and `https://` use TLS; others use plain WebSocket. /// `wss://` and `https://` use TLS; others use plain WebSocket.

View File

@@ -1,14 +1,30 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tauri::Manager; use tauri::Manager;
use tokio::task::JoinHandle;
use uuid::Uuid;
mod commands; mod commands;
mod db; mod db;
mod error; mod error;
mod models; mod models;
use db::{init_db, AppState, SURREAL_DB, SURREAL_NS, SURREAL_URL}; use db::{init_db, SURREAL_DB, SURREAL_NS, SURREAL_URL};
use models::Message;
pub struct AppState {
pub db: Arc<Surreal<Client>>,
/// In-process message cache keyed by room_id string. Arc so the live-event
/// task in subscribe_room can hold a reference without borrowing AppState.
pub msg_cache: Arc<Mutex<HashMap<String, Vec<Message>>>>,
/// LRU order of cached room IDs (front = most recent). Evicts beyond 5.
pub cache_order: Arc<Mutex<Vec<String>>>,
/// std::sync::Mutex is intentional: guards are never held across .await points.
pub subscriptions: Mutex<HashMap<Uuid, JoinHandle<()>>>,
}
#[cfg_attr(mobile, tauri::mobile_entry_point)] #[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() { pub fn run() {
@@ -28,7 +44,8 @@ pub fn run() {
let state = AppState { let state = AppState {
db: Arc::new(surreal), db: Arc::new(surreal),
token: Mutex::new(None), msg_cache: Arc::new(Mutex::new(HashMap::new())),
cache_order: Arc::new(Mutex::new(Vec::new())),
subscriptions: Mutex::new(HashMap::new()), subscriptions: Mutex::new(HashMap::new()),
}; };
@@ -52,6 +69,7 @@ pub fn run() {
commands::chat::get_or_create_direct_room, commands::chat::get_or_create_direct_room,
commands::chat::send_message, commands::chat::send_message,
commands::chat::get_messages, commands::chat::get_messages,
commands::chat::get_cached_messages,
commands::chat::delete_message, commands::chat::delete_message,
commands::chat::edit_message, commands::chat::edit_message,
commands::chat::toggle_reaction, commands::chat::toggle_reaction,

View File

@@ -1,7 +1,7 @@
{ {
"$schema": "https://schema.tauri.app/config/2", "$schema": "https://schema.tauri.app/config/2",
"productName": "oxyde", "productName": "oxyde",
"version": "0.1.1", "version": "0.1.3",
"identifier": "com.jimweaver.oxyde", "identifier": "com.jimweaver.oxyde",
"build": { "build": {
"beforeDevCommand": "pnpm dev", "beforeDevCommand": "pnpm dev",

View File

@@ -26,6 +26,7 @@
let hasOlderMessages = $state(false); let hasOlderMessages = $state(false);
let isLoadingOlder = $state(false); let isLoadingOlder = $state(false);
let unreadCounts = $state<Record<string, number>>({}); let unreadCounts = $state<Record<string, number>>({});
let roomSelectionToken = 0;
let view = $state<"loading" | "auth" | "app">("loading"); let view = $state<"loading" | "auth" | "app">("loading");
let authMode = $state<"signin" | "signup">("signin"); let authMode = $state<"signin" | "signup">("signin");
@@ -101,6 +102,7 @@
} }
async function signout() { async function signout() {
roomSelectionToken += 1;
await cmd("signout").catch(() => {}); await cmd("signout").catch(() => {});
if (subId) { if (subId) {
await cmd("unsubscribe_room", { subId }).catch(() => {}); await cmd("unsubscribe_room", { subId }).catch(() => {});
@@ -119,34 +121,72 @@
} }
// ─── Rooms ──────────────────────────────────────────────────────────────── // ─── Rooms ────────────────────────────────────────────────────────────────
function isCurrentRoomSelection(token: number, roomId: string) {
return (
token === roomSelectionToken &&
activeRoom !== null &&
sid(activeRoom.id) === roomId
);
}
function onlyRoomMessages(roomId: string, source: Message[]) {
return source.filter((message) => sid(message.room) === roomId);
}
async function loadRooms() { async function loadRooms() {
rooms = await cmd<Room[]>("get_rooms"); rooms = await cmd<Room[]>("get_rooms");
if (rooms.length && !activeRoom) await selectRoom(rooms[0]); if (rooms.length && !activeRoom) await selectRoom(rooms[0]);
} }
async function selectRoom(room: Room) { async function selectRoom(room: Room) {
if (subId) { const token = ++roomSelectionToken;
await cmd("unsubscribe_room", { subId }).catch(() => {}); const roomId = sid(room.id);
subId = null; const previousSubId = subId;
} const previousUnlisten = unlisten;
if (unlisten) {
unlisten(); subId = null;
unlisten = null; unlisten = null;
}
activeRoom = room; activeRoom = room;
messages = [];
hasOlderMessages = false;
isLoadingOlder = false;
replyTo = null; replyTo = null;
messages = await cmd<Message[]>("get_messages", {
roomId: sid(room.id), if (previousSubId) {
await cmd("unsubscribe_room", { subId: previousSubId }).catch(() => {});
}
if (previousUnlisten) {
previousUnlisten();
}
if (token !== roomSelectionToken) return;
const cached = await cmd<Message[]>("get_cached_messages", { roomId });
if (!isCurrentRoomSelection(token, roomId)) return;
if (cached.length > 0) {
messages = onlyRoomMessages(roomId, cached);
hasOlderMessages = false;
}
const fresh = await cmd<Message[]>("get_messages", {
roomId,
limit: 50, limit: 50,
}); });
hasOlderMessages = messages.length === 50; if (!isCurrentRoomSelection(token, roomId)) return;
unreadCounts = { ...unreadCounts, [sid(room.id)]: 0 }; messages = onlyRoomMessages(roomId, fresh);
await cmd("mark_room_read", { roomId: sid(room.id) }).catch(() => {}); hasOlderMessages = fresh.length === 50;
unreadCounts = { ...unreadCounts, [roomId]: 0 };
await cmd("mark_room_read", { roomId }).catch(() => {});
if (!isCurrentRoomSelection(token, roomId)) return;
subId = await cmd<string>("subscribe_room", { roomId: sid(room.id) }); const nextSubId = await cmd<string>("subscribe_room", { roomId });
if (!isCurrentRoomSelection(token, roomId)) {
await cmd("unsubscribe_room", { subId: nextSubId }).catch(() => {});
return;
}
subId = nextSubId;
const { listen } = await import("@tauri-apps/api/event"); const { listen } = await import("@tauri-apps/api/event");
unlisten = await listen<LiveEvent>("chat:message", ({ payload }) => { const nextUnlisten = await listen<LiveEvent>("chat:message", ({ payload }) => {
const { action, data } = payload; const { action, data } = payload;
const eventRoomId = sid(data.room); const eventRoomId = sid(data.room);
const currentRoomId = activeRoom ? sid(activeRoom.id) : ""; const currentRoomId = activeRoom ? sid(activeRoom.id) : "";
@@ -177,6 +217,15 @@
} }
cmd("mark_room_read", { roomId: currentRoomId }).catch(() => {}); cmd("mark_room_read", { roomId: currentRoomId }).catch(() => {});
}); });
if (!isCurrentRoomSelection(token, roomId)) {
nextUnlisten();
if (subId === nextSubId) {
await cmd("unsubscribe_room", { subId: nextSubId }).catch(() => {});
subId = null;
}
return;
}
unlisten = nextUnlisten;
} }
async function loadOlderMessages() { async function loadOlderMessages() {
@@ -187,19 +236,24 @@
messages.length === 0 messages.length === 0
) )
return; return;
const roomId = sid(activeRoom.id);
const token = roomSelectionToken;
isLoadingOlder = true; isLoadingOlder = true;
try { try {
const older = await cmd<Message[]>("get_messages", { const older = await cmd<Message[]>("get_messages", {
roomId: sid(activeRoom.id), roomId,
before: messages[0].created, before: messages[0].created,
limit: 50, limit: 50,
}); });
messages = [...older, ...messages]; if (!isCurrentRoomSelection(token, roomId)) return;
messages = [...onlyRoomMessages(roomId, older), ...messages];
hasOlderMessages = older.length === 50; hasOlderMessages = older.length === 50;
} catch (e) { } catch (e) {
err = String(e); err = String(e);
} finally { } finally {
isLoadingOlder = false; if (isCurrentRoomSelection(token, roomId)) {
isLoadingOlder = false;
}
} }
} }
@@ -267,10 +321,15 @@
try { try {
await cmd("toggle_reaction", { messageId: msgId, emoji }); await cmd("toggle_reaction", { messageId: msgId, emoji });
if (activeRoom) { if (activeRoom) {
messages = await cmd<Message[]>("get_messages", { const roomId = sid(activeRoom.id);
roomId: sid(activeRoom.id), const token = roomSelectionToken;
const refreshed = await cmd<Message[]>("get_messages", {
roomId,
limit: Math.max(50, Math.min(messages.length, 100)), limit: Math.max(50, Math.min(messages.length, 100)),
}); });
if (isCurrentRoomSelection(token, roomId)) {
messages = onlyRoomMessages(roomId, refreshed);
}
} }
} catch (e) { } catch (e) {
err = String(e); err = String(e);