diff --git a/crates/fparkan-resource/src/lib.rs b/crates/fparkan-resource/src/lib.rs index 07484fb..666245f 100644 --- a/crates/fparkan-resource/src/lib.rs +++ b/crates/fparkan-resource/src/lib.rs @@ -449,42 +449,64 @@ impl CachedResourceRepository { impl ResourceRepository for CachedResourceRepository { fn open_archive(&self, path: &NormalizedPath) -> Result { - let bytes = self - .vfs - .read(path) - .map_err(|err| resource_error_from_vfs(path, err))?; - let fingerprint = sha256(&bytes); - let mut slot = decode_archive(path.clone(), bytes, fingerprint)?; let mut state = self.state.lock().map_err(|_| ResourceError::Poisoned)?; let key = path.identity_bytes().to_vec(); - if let Some(id) = state.paths.get(&key).copied() { - let current = state.archive(id)?; - if current.fingerprint == fingerprint && current.document.is_some() { - state.touch_archive(id)?; + loop { + // Decode outside the repository lock, then verify the VFS still points + // at the same bytes before committing the slot under the lock. + drop(state); + let bytes = self + .vfs + .read(path) + .map_err(|err| resource_error_from_vfs(path, err))?; + let observed_fingerprint = sha256(&bytes); + let mut slot = decode_archive(path.clone(), bytes, observed_fingerprint)?; + let current_vfs_fingerprint = self + .vfs + .metadata(path) + .map_err(|err| resource_error_from_vfs(path, err))? + .fingerprint; + state = self.state.lock().map_err(|_| ResourceError::Poisoned)?; + if let Some(id) = state.paths.get(&key).copied() { + let current = state.archive(id)?; + if current.document.is_some() && current.fingerprint == current_vfs_fingerprint { + state.touch_archive(id)?; + return Ok(id); + } + if current_vfs_fingerprint != observed_fingerprint { + continue; + } + if current.document.is_some() && current.fingerprint == observed_fingerprint { + state.touch_archive(id)?; + return Ok(id); + } + let current_generation = current.generation; + let current_fingerprint = current.fingerprint; + if current_fingerprint != observed_fingerprint { + slot.generation = current_generation.saturating_add(1); + state.payload_cache.remove_archive(id); + } else { + slot.generation = current_generation; + } + state.unload_archive(id)?; + *state.archive_mut(id)? = slot; + state.load_archive(id)?; + state.evict_archives(id)?; return Ok(id); } - let current_generation = current.generation; - let current_fingerprint = current.fingerprint; - if current_fingerprint != fingerprint { - slot.generation = current_generation.saturating_add(1); - state.payload_cache.remove_archive(id); - } else { - slot.generation = current_generation; + if current_vfs_fingerprint != observed_fingerprint { + continue; } - state.unload_archive(id)?; - *state.archive_mut(id)? = slot; + let id = ArchiveId( + u64::try_from(state.archives.len()) + .map_err(|_| ResourceError::HandleSpaceExhausted)?, + ); + state.paths.insert(key.clone(), id); + state.archives.push(slot); state.load_archive(id)?; state.evict_archives(id)?; return Ok(id); } - let id = ArchiveId( - u64::try_from(state.archives.len()).map_err(|_| ResourceError::HandleSpaceExhausted)?, - ); - state.paths.insert(key, id); - state.archives.push(slot); - state.load_archive(id)?; - state.evict_archives(id)?; - Ok(id) } fn find( @@ -881,6 +903,8 @@ mod tests { use super::*; use fparkan_vfs::{DirectoryVfs, MemoryVfs, Vfs, VfsEntry, VfsError, VfsMetadata}; use std::path::PathBuf; + use std::sync::Condvar; + use std::thread; enum FailingReadMode { Ambiguous(&'static str), @@ -910,6 +934,74 @@ mod tests { } } + struct CoordinatedReadState { + current: Arc<[u8]>, + first_read_started: bool, + release_first_read: bool, + } + + struct CoordinatedReadVfs { + state: Mutex, + first_read_gate: Condvar, + } + + impl CoordinatedReadVfs { + fn new(initial: Arc<[u8]>) -> Self { + Self { + state: Mutex::new(CoordinatedReadState { + current: initial, + first_read_started: false, + release_first_read: false, + }), + first_read_gate: Condvar::new(), + } + } + + fn wait_for_first_read(&self) { + let mut state = self.state.lock().expect("state"); + while !state.first_read_started { + state = self.first_read_gate.wait(state).expect("wait"); + } + } + + fn replace_current(&self, bytes: Arc<[u8]>) { + self.state.lock().expect("state").current = bytes; + } + + fn release_first_read(&self) { + let mut state = self.state.lock().expect("state"); + state.release_first_read = true; + self.first_read_gate.notify_all(); + } + } + + impl Vfs for CoordinatedReadVfs { + fn metadata(&self, _path: &NormalizedPath) -> Result { + let state = self.state.lock().expect("state"); + Ok(VfsMetadata { + len: state.current.len() as u64, + fingerprint: sha256(&state.current), + }) + } + + fn read(&self, _path: &NormalizedPath) -> Result, VfsError> { + let mut state = self.state.lock().expect("state"); + let snapshot = Arc::clone(&state.current); + if !state.first_read_started { + state.first_read_started = true; + self.first_read_gate.notify_all(); + while !state.release_first_read { + state = self.first_read_gate.wait(state).expect("wait"); + } + } + Ok(snapshot) + } + + fn list(&self, _prefix: &NormalizedPath) -> Result, VfsError> { + unreachable!("list is not used in these tests"); + } + } + #[test] fn cached_repository_reads_synthetic_nres() { let path = archive_path(b"archives/test.lib").expect("path"); @@ -941,6 +1033,26 @@ mod tests { )); } + #[test] + fn concurrent_same_archive_open_reuses_archive_id() { + let path = archive_path(b"archives/test.lib").expect("path"); + let bytes = Arc::from(build_nres(&[("Alpha.TXT", b"alpha".as_slice())]).into_boxed_slice()); + let mut vfs = MemoryVfs::default(); + vfs.insert(path.clone(), bytes); + let repo = Arc::new(CachedResourceRepository::new(Arc::new(vfs))); + let first_repo = Arc::clone(&repo); + let first_path = path.clone(); + let first = thread::spawn(move || first_repo.open_archive(&first_path)); + let second_repo = Arc::clone(&repo); + let second_path = path.clone(); + let second = thread::spawn(move || second_repo.open_archive(&second_path)); + + let first = first.join().expect("first join").expect("first archive"); + let second = second.join().expect("second join").expect("second archive"); + + assert_eq!(first, second); + } + #[test] fn entry_handles_are_archive_qualified() { let first_path = archive_path(b"first.lib").expect("first path"); @@ -1125,6 +1237,36 @@ mod tests { let _ = std::fs::remove_dir_all(root); } + #[test] + fn concurrent_replacement_old_decode_cannot_overwrite_new() { + let path = archive_path(b"cache/concurrent.lib").expect("path"); + let old_bytes = + Arc::from(build_nres(&[("same.bin", b"old".as_slice())]).into_boxed_slice()); + let new_bytes = + Arc::from(build_nres(&[("same.bin", b"new".as_slice())]).into_boxed_slice()); + let vfs = Arc::new(CoordinatedReadVfs::new(old_bytes)); + let repo = Arc::new(CachedResourceRepository::new(vfs.clone())); + let stale_repo = Arc::clone(&repo); + let stale_path = path.clone(); + let stale_open = thread::spawn(move || stale_repo.open_archive(&stale_path)); + + vfs.wait_for_first_read(); + vfs.replace_current(Arc::clone(&new_bytes)); + let current_archive = repo.open_archive(&path).expect("open current archive"); + vfs.release_first_read(); + let raced_archive = stale_open + .join() + .expect("join stale thread") + .expect("stale open"); + + assert_eq!(raced_archive, current_archive); + let handle = repo + .find(current_archive, &resource_name(b"same.bin")) + .expect("find current") + .expect("current handle"); + assert_eq!(repo.read(handle).expect("read current").as_slice(), b"new"); + } + #[test] fn entry_read_error_carries_archive_path_and_entry_name() { let path = archive_path(b"bad/rsli.lib").expect("path");