diff --git a/Cargo.lock b/Cargo.lock index 759251c..e3f18c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -237,6 +237,17 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "filetime" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db" +dependencies = [ + "cfg-if", + "libc", + "libredox", +] + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -252,6 +263,7 @@ dependencies = [ "base64", "chrono", "eventsource-stream", + "filetime", "futures", "hex", "hmac", @@ -814,6 +826,7 @@ checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" dependencies = [ "bitflags", "libc", + "redox_syscall 0.7.0", ] [[package]] @@ -1034,7 +1047,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -1221,6 +1234,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "redox_syscall" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f3fe0889e69e2ae9e41f4d6c4c0181701d00e4697b356fb1f74173a5e0ee27" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_users" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index d9a75fc..72ca1c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,3 +68,4 @@ html2text = "0.14" [dev-dependencies] tempfile = "3" +filetime = "0.2" diff --git a/README.md b/README.md index e3df0dc..7fe163f 100644 --- a/README.md +++ b/README.md @@ -290,6 +290,9 @@ allowed_jids = ["admin@localhost"] [memory] backend = "markdown" path = "./data/memory" + +[session] +idle_timeout_mins = 240 # Auto-archive after 4 hours of inactivity (0 = disabled) ``` Memory is stored as human-readable markdown files, workspace files for global agent configuration and per-JID directories for isolated user data. This makes agent memory inspectable, editable, and git-friendly. Admins can customize agent behavior by creating `instructions.md`, `identity.md`, and `personality.md` in the memory root directory. @@ -313,6 +316,7 @@ Each user has a current conversation session (`history.jsonl`) and optionally ar - **`/new`** archives the current session to `sessions/{YYYYMMDD-HHMMSS}.jsonl` and clears the LLM context. - **`/forget`** erases the current history, user profile (`user.md`), and memory (`memory.md`) but preserves archived sessions. - **`/status`** shows the number of messages in the current session and how many sessions have been archived. +- **Session timeout** — idle sessions are automatically archived when the next message arrives after a configurable inactivity period. This is lazy (no background timer) and works per-user and per-room. Memory layout: diff --git a/ROADMAP.md b/ROADMAP.md index cb9982a..ba44679 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -34,10 +34,23 @@ Implemented: the agent supports discrete sessions per user. - **Memory layout** — `{jid}/history.jsonl` (current session, JSONL format), `{jid}/sessions/*.jsonl` (archived), `{jid}/user.md` (user profile), `{jid}/memory.md` (long-term notes). - **`/status`** — Reports message count in current session and number of archived sessions. +#### Session timeout ✓ + +Implemented: idle sessions are automatically archived when the next message arrives. + +- **Lazy evaluation** — No background timer. When a message arrives, the runtime checks the session file's modification time. If idle longer than `idle_timeout_mins`, the session is archived (same as `/new`) before processing the new message. +- **Configurable** — `[session]` TOML section with `idle_timeout_mins` (default: 0 = disabled). Per-user and per-room (MUC sessions have their own timeout check). +- **All handlers** — Freshness check runs in `handle_message`, `handle_muc_message`, `handle_reaction`, and `handle_message_with_attachments`. +- **`/status`** — Shows session timeout configuration. + +```toml +[session] +idle_timeout_mins = 240 # Archive after 4 hours of inactivity (0 = disabled) +``` + #### Future enhancements (not yet implemented) - **XMPP thread ID mapping** — When the XMPP client sends a `` element (XEP-0201), the agent maps it to a session. Different thread IDs = different sessions. Messages without a thread ID use the "default" session. -- **Session timeout** — If no message is received for a configurable duration (e.g. 4 hours), the next message implicitly starts a new session. The timeout is per-user. - **Session context carry-over** — When a new session starts, the agent can optionally summarize the previous session into `context.md`, giving continuity without sending the full old history to the LLM. ### Presence subscription for allowed JIDs ✓ diff --git a/docs/XEPS.md b/docs/XEPS.md index d63cabb..af0ec95 100644 --- a/docs/XEPS.md +++ b/docs/XEPS.md @@ -8,12 +8,16 @@ Fluux Agent implements the following XMPP standards: Core XMPP protocol — XML streams, stanza routing, error handling. -**Implementation:** Stream establishment, stanza parsing (quick-xml event-based `StanzaParser`), error condition handling (§4.9.3). +**Implementation:** Stream establishment, stanza parsing (quick-xml event-based `StanzaParser`), error condition handling (§4.9.3), whitespace keepalive (§4.6.1). + +**Whitespace keepalive (§4.6.1):** The agent sends periodic whitespace pings (single space byte) to detect dead TCP connections — e.g., after machine sleep/wake. A configurable read timeout triggers a connection probe; if the write fails, the agent declares the connection lost and the reconnection loop takes over. This is non-destructive: a timeout alone does not disconnect — only a failed write does. **References:** - `src/xmpp/stanzas.rs:724` — stream error conditions (25 RFC 6120 conditions) - `src/xmpp/stanzas.rs:446` — `StanzaParser` (event-based XML stream parser) -- `src/xmpp/component.rs` — stream management +- `src/xmpp/component.rs` — stream management, keepalive ping handling, read timeout +- `src/agent/runtime.rs:70` — ping interval timer and read timeout probe logic +- `src/config.rs` — `KeepaliveConfig` (enabled, ping_interval_secs, read_timeout_secs) --- @@ -215,7 +219,7 @@ See `docs/DEVELOPING.md` for rationale. ## Version History - **v0.1** — XEP-0114 (component mode), XEP-0085 (chat states), XEP-0045 (MUC), XEP-0066 (OOB file attachments) -- **v0.2** — XEP-0444 inbound reactions, message ID embedding, C2S client mode (RFC 4616 PLAIN, RFC 5802 SCRAM-SHA-1, STARTTLS), JSONL session format +- **v0.2** — XEP-0444 inbound reactions, message ID embedding, C2S client mode (RFC 4616 PLAIN, RFC 5802 SCRAM-SHA-1, STARTTLS), JSONL session format, RFC 6120 §4.6.1 whitespace keepalive --- diff --git a/src/agent/memory.rs b/src/agent/memory.rs index 11f300c..6cd7b4e 100644 --- a/src/agent/memory.rs +++ b/src/agent/memory.rs @@ -429,6 +429,44 @@ impl Memory { )) } + /// Checks whether the current session is still fresh, based on the file's + /// modification time and the configured idle timeout. + /// + /// If the session has been idle for longer than `idle_timeout_mins`, it is + /// automatically archived (same as `/new`) and `Ok(true)` is returned to + /// indicate that a stale session was rotated. + /// + /// Returns `Ok(false)` if the session is still fresh or if there is no + /// active session. A timeout of 0 disables the check entirely. + pub fn check_session_freshness(&self, jid: &str, idle_timeout_mins: u64) -> Result { + if idle_timeout_mins == 0 { + return Ok(false); + } + + let user_dir = self.base_path.join(jid); + let history_path = user_dir.join("history.jsonl"); + + if !history_path.exists() { + return Ok(false); + } + + let metadata = fs::metadata(&history_path)?; + let modified = metadata.modified()?; + let elapsed = modified.elapsed().unwrap_or_default(); + let timeout = std::time::Duration::from_secs(idle_timeout_mins * 60); + + if elapsed > timeout { + info!( + "Session for {jid} idle for {}m (timeout: {idle_timeout_mins}m) — auto-archiving", + elapsed.as_secs() / 60 + ); + self.new_session(jid)?; + Ok(true) + } else { + Ok(false) + } + } + /// Erases all active memory for a user (history + user profile + memory). /// Archived sessions are preserved. pub fn forget(&self, jid: &str) -> Result { @@ -2330,4 +2368,143 @@ not valid json let result = memory.knowledge_search(jid, "anything").unwrap(); assert!(result.contains("No knowledge entries stored yet")); } + + // ── Session freshness tests ───────────────────────────── + + #[test] + fn test_check_freshness_disabled_when_zero() { + let dir = tempfile::tempdir().unwrap(); + let memory = Memory::open(dir.path()).unwrap(); + + memory.store_message("user@test", "user", "Hello").unwrap(); + + // Timeout of 0 means disabled — never archives + let rotated = memory.check_session_freshness("user@test", 0).unwrap(); + assert!(!rotated); + + // Session still intact + assert_eq!(memory.message_count("user@test").unwrap(), 1); + } + + #[test] + fn test_check_freshness_no_session() { + let dir = tempfile::tempdir().unwrap(); + let memory = Memory::open(dir.path()).unwrap(); + + // No history file at all — should return false (no rotation) + let rotated = memory.check_session_freshness("user@test", 60).unwrap(); + assert!(!rotated); + } + + #[test] + fn test_check_freshness_fresh_session() { + let dir = tempfile::tempdir().unwrap(); + let memory = Memory::open(dir.path()).unwrap(); + + memory.store_message("user@test", "user", "Hello").unwrap(); + + // Just created — should be fresh with a 60-minute timeout + let rotated = memory.check_session_freshness("user@test", 60).unwrap(); + assert!(!rotated); + + // Session still intact + assert_eq!(memory.message_count("user@test").unwrap(), 1); + } + + #[test] + fn test_check_freshness_stale_session_archives() { + use filetime::FileTime; + + let dir = tempfile::tempdir().unwrap(); + let memory = Memory::open(dir.path()).unwrap(); + + memory.store_message("user@test", "user", "Hello").unwrap(); + memory.store_message("user@test", "assistant", "Hi!").unwrap(); + + // Backdate the file's mtime by 2 hours + let history_path = dir.path().join("user@test/history.jsonl"); + let two_hours_ago = FileTime::from_unix_time( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - 7200, + 0, + ); + filetime::set_file_mtime(&history_path, two_hours_ago).unwrap(); + + // With a 60-minute timeout, the session should be stale + let rotated = memory.check_session_freshness("user@test", 60).unwrap(); + assert!(rotated); + + // History should be empty (archived) + assert_eq!(memory.message_count("user@test").unwrap(), 0); + + // Archived session should exist + assert_eq!(memory.session_count("user@test").unwrap(), 1); + } + + #[test] + fn test_check_freshness_not_stale_within_timeout() { + use filetime::FileTime; + + let dir = tempfile::tempdir().unwrap(); + let memory = Memory::open(dir.path()).unwrap(); + + memory.store_message("user@test", "user", "Hello").unwrap(); + + // Backdate by 30 minutes + let history_path = dir.path().join("user@test/history.jsonl"); + let thirty_mins_ago = FileTime::from_unix_time( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - 1800, + 0, + ); + filetime::set_file_mtime(&history_path, thirty_mins_ago).unwrap(); + + // With a 60-minute timeout, 30 minutes of idle is still fresh + let rotated = memory.check_session_freshness("user@test", 60).unwrap(); + assert!(!rotated); + + // Session still intact + assert_eq!(memory.message_count("user@test").unwrap(), 1); + } + + #[test] + fn test_check_freshness_new_session_works_after_archive() { + use filetime::FileTime; + + let dir = tempfile::tempdir().unwrap(); + let memory = Memory::open(dir.path()).unwrap(); + + memory.store_message("user@test", "user", "Old message").unwrap(); + + // Backdate the file + let history_path = dir.path().join("user@test/history.jsonl"); + let two_hours_ago = FileTime::from_unix_time( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 + - 7200, + 0, + ); + filetime::set_file_mtime(&history_path, two_hours_ago).unwrap(); + + // Auto-archive + memory.check_session_freshness("user@test", 60).unwrap(); + + // Now store a new message — should start a fresh session + memory.store_message("user@test", "user", "New message").unwrap(); + + let history = memory.get_history("user@test", 10).unwrap(); + assert_eq!(history.len(), 1); + assert_eq!(text(&history[0].content), "New message"); + + // One archived session + assert_eq!(memory.session_count("user@test").unwrap(), 1); + } } diff --git a/src/agent/runtime.rs b/src/agent/runtime.rs index 2537ea3..633286c 100644 --- a/src/agent/runtime.rs +++ b/src/agent/runtime.rs @@ -648,6 +648,15 @@ impl AgentRuntime { "Keepalive: disabled".to_string() }; + let session_timeout_info = if self.config.session.idle_timeout_mins > 0 { + format!( + "Session timeout: {}m idle", + self.config.session.idle_timeout_mins, + ) + } else { + "Session timeout: disabled".to_string() + }; + Ok(format!( "{} — status\n\ Uptime: {hours}h {minutes}m\n\ @@ -655,6 +664,7 @@ impl AgentRuntime { LLM: {}\n\ {skills_info}\n\ {keepalive_info}\n\ + {session_timeout_info}\n\ {context_info}\n\ Workspace: instructions={}, identity={}, personality={}\n\ {domain_info}", @@ -704,6 +714,9 @@ Commands:\n\ // Bare JID for memory (without resource) let bare_jid = stanzas::bare_jid(from); + // Auto-archive stale sessions before loading history + self.memory.check_session_freshness(bare_jid, self.config.session.idle_timeout_mins)?; + // Retrieve conversation history and workspace context let history = self.memory.get_history(bare_jid, MAX_HISTORY)?; let workspace = self.memory.get_workspace_context(bare_jid)?; @@ -745,6 +758,9 @@ Commands:\n\ /// The LLM decides whether a response is warranted based on the full context. /// Returns the LLM response text (caller stores and sends it). async fn handle_reaction(&self, jid: &str) -> Result { + // Auto-archive stale sessions before loading history + self.memory.check_session_freshness(jid, self.config.session.idle_timeout_mins)?; + let history = self.memory.get_history(jid, MAX_HISTORY)?; let workspace = self.memory.get_workspace_context(jid)?; let system_prompt = self.build_system_prompt(&workspace); @@ -768,6 +784,9 @@ Commands:\n\ /// The user message is already stored in history by the caller. /// Returns the LLM response text (caller stores the assistant message). async fn handle_muc_message(&self, room_jid: &str, _body: &str) -> Result { + // Auto-archive stale sessions before loading history + self.memory.check_session_freshness(room_jid, self.config.session.idle_timeout_mins)?; + // Retrieve conversation history and workspace context let history = self.memory.get_history(room_jid, MAX_HISTORY)?; let workspace = self.memory.get_workspace_context(room_jid)?; @@ -1020,6 +1039,9 @@ async fn handle_message_with_attachments( }); } + // Auto-archive stale sessions before loading history + memory.check_session_freshness(bare_jid, config.session.idle_timeout_mins)?; + // Build the multi-modal message with structured JSON metadata block let history = memory.get_history(bare_jid, MAX_HISTORY)?; let workspace = memory.get_workspace_context(bare_jid)?; @@ -1201,6 +1223,7 @@ mod tests { rooms: vec![], skills: SkillsConfig::default(), keepalive: crate::config::KeepaliveConfig::default(), + session: crate::config::SessionConfig::default(), }; let llm: Arc = Arc::new(AnthropicClient::new(config.llm.clone())); @@ -1545,6 +1568,16 @@ mod tests { assert!(result.contains("User memory: none")); assert!(result.contains("instructions=none")); assert!(result.contains("Allowed domains: localhost (default)")); + assert!(result.contains("Session timeout: disabled")); + } + + #[test] + fn test_command_status_session_timeout_enabled() { + let (mut rt, _tmp) = test_runtime(); + rt.config.session.idle_timeout_mins = 120; + + let result = rt.handle_command("admin@localhost", "/status").unwrap(); + assert!(result.contains("Session timeout: 120m idle")); } #[test] diff --git a/src/config.rs b/src/config.rs index de2f8ae..a8afdb4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,6 +17,10 @@ pub struct Config { /// Enabled by default with sensible defaults. #[serde(default)] pub keepalive: KeepaliveConfig, + /// Session timeout configuration. + /// When enabled, idle sessions are automatically archived on next message. + #[serde(default)] + pub session: SessionConfig, } /// Configuration for a MUC room (XEP-0045) @@ -185,6 +189,28 @@ impl Default for KeepaliveConfig { } } +/// Session timeout configuration. +/// +/// When enabled, sessions that have been idle for longer than +/// `idle_timeout_mins` are automatically archived when the next +/// message arrives (lazy evaluation — no background timer). +#[derive(Debug, Deserialize, Clone)] +pub struct SessionConfig { + /// Idle timeout in minutes. If the session has been idle for longer + /// than this, it is archived on the next inbound message. + /// Default: 0 (disabled). + #[serde(default)] + pub idle_timeout_mins: u64, +} + +impl Default for SessionConfig { + fn default() -> Self { + Self { + idle_timeout_mins: 0, + } + } +} + /// Configuration for the `web_search` builtin skill. #[derive(Debug, Deserialize, Clone)] pub struct WebSearchConfig { @@ -338,6 +364,7 @@ mod tests { rooms: vec![], skills: SkillsConfig::default(), keepalive: KeepaliveConfig::default(), + session: SessionConfig::default(), } } @@ -570,4 +597,27 @@ mod tests { assert_eq!(ka.ping_interval_secs, 30); assert_eq!(ka.read_timeout_secs, 120); } + + // ── SessionConfig tests ───────────────────────────── + + #[test] + fn test_session_defaults() { + let sc = SessionConfig::default(); + assert_eq!(sc.idle_timeout_mins, 0); + } + + #[test] + fn test_session_default_when_absent() { + let config = config_with_jids(vec![]); + assert_eq!(config.session.idle_timeout_mins, 0); + } + + #[test] + fn test_session_custom_timeout_toml() { + let toml = r#" + idle_timeout_mins = 120 + "#; + let sc: SessionConfig = toml::from_str(toml).unwrap(); + assert_eq!(sc.idle_timeout_mins, 120); + } }