fix(resource): prevent stale archive refresh overwrite
This commit is contained in:
@@ -449,42 +449,64 @@ impl CachedResourceRepository {
|
|||||||
|
|
||||||
impl ResourceRepository for CachedResourceRepository {
|
impl ResourceRepository for CachedResourceRepository {
|
||||||
fn open_archive(&self, path: &NormalizedPath) -> Result<ArchiveId, ResourceError> {
|
fn open_archive(&self, path: &NormalizedPath) -> Result<ArchiveId, ResourceError> {
|
||||||
let bytes = self
|
|
||||||
.vfs
|
|
||||||
.read(path)
|
|
||||||
.map_err(|err| resource_error_from_vfs(path, err))?;
|
|
||||||
let fingerprint = sha256(&bytes);
|
|
||||||
let mut slot = decode_archive(path.clone(), bytes, fingerprint)?;
|
|
||||||
let mut state = self.state.lock().map_err(|_| ResourceError::Poisoned)?;
|
let mut state = self.state.lock().map_err(|_| ResourceError::Poisoned)?;
|
||||||
let key = path.identity_bytes().to_vec();
|
let key = path.identity_bytes().to_vec();
|
||||||
if let Some(id) = state.paths.get(&key).copied() {
|
loop {
|
||||||
let current = state.archive(id)?;
|
// Decode outside the repository lock, then verify the VFS still points
|
||||||
if current.fingerprint == fingerprint && current.document.is_some() {
|
// at the same bytes before committing the slot under the lock.
|
||||||
state.touch_archive(id)?;
|
drop(state);
|
||||||
|
let bytes = self
|
||||||
|
.vfs
|
||||||
|
.read(path)
|
||||||
|
.map_err(|err| resource_error_from_vfs(path, err))?;
|
||||||
|
let observed_fingerprint = sha256(&bytes);
|
||||||
|
let mut slot = decode_archive(path.clone(), bytes, observed_fingerprint)?;
|
||||||
|
let current_vfs_fingerprint = self
|
||||||
|
.vfs
|
||||||
|
.metadata(path)
|
||||||
|
.map_err(|err| resource_error_from_vfs(path, err))?
|
||||||
|
.fingerprint;
|
||||||
|
state = self.state.lock().map_err(|_| ResourceError::Poisoned)?;
|
||||||
|
if let Some(id) = state.paths.get(&key).copied() {
|
||||||
|
let current = state.archive(id)?;
|
||||||
|
if current.document.is_some() && current.fingerprint == current_vfs_fingerprint {
|
||||||
|
state.touch_archive(id)?;
|
||||||
|
return Ok(id);
|
||||||
|
}
|
||||||
|
if current_vfs_fingerprint != observed_fingerprint {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if current.document.is_some() && current.fingerprint == observed_fingerprint {
|
||||||
|
state.touch_archive(id)?;
|
||||||
|
return Ok(id);
|
||||||
|
}
|
||||||
|
let current_generation = current.generation;
|
||||||
|
let current_fingerprint = current.fingerprint;
|
||||||
|
if current_fingerprint != observed_fingerprint {
|
||||||
|
slot.generation = current_generation.saturating_add(1);
|
||||||
|
state.payload_cache.remove_archive(id);
|
||||||
|
} else {
|
||||||
|
slot.generation = current_generation;
|
||||||
|
}
|
||||||
|
state.unload_archive(id)?;
|
||||||
|
*state.archive_mut(id)? = slot;
|
||||||
|
state.load_archive(id)?;
|
||||||
|
state.evict_archives(id)?;
|
||||||
return Ok(id);
|
return Ok(id);
|
||||||
}
|
}
|
||||||
let current_generation = current.generation;
|
if current_vfs_fingerprint != observed_fingerprint {
|
||||||
let current_fingerprint = current.fingerprint;
|
continue;
|
||||||
if current_fingerprint != fingerprint {
|
|
||||||
slot.generation = current_generation.saturating_add(1);
|
|
||||||
state.payload_cache.remove_archive(id);
|
|
||||||
} else {
|
|
||||||
slot.generation = current_generation;
|
|
||||||
}
|
}
|
||||||
state.unload_archive(id)?;
|
let id = ArchiveId(
|
||||||
*state.archive_mut(id)? = slot;
|
u64::try_from(state.archives.len())
|
||||||
|
.map_err(|_| ResourceError::HandleSpaceExhausted)?,
|
||||||
|
);
|
||||||
|
state.paths.insert(key.clone(), id);
|
||||||
|
state.archives.push(slot);
|
||||||
state.load_archive(id)?;
|
state.load_archive(id)?;
|
||||||
state.evict_archives(id)?;
|
state.evict_archives(id)?;
|
||||||
return Ok(id);
|
return Ok(id);
|
||||||
}
|
}
|
||||||
let id = ArchiveId(
|
|
||||||
u64::try_from(state.archives.len()).map_err(|_| ResourceError::HandleSpaceExhausted)?,
|
|
||||||
);
|
|
||||||
state.paths.insert(key, id);
|
|
||||||
state.archives.push(slot);
|
|
||||||
state.load_archive(id)?;
|
|
||||||
state.evict_archives(id)?;
|
|
||||||
Ok(id)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find(
|
fn find(
|
||||||
@@ -881,6 +903,8 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use fparkan_vfs::{DirectoryVfs, MemoryVfs, Vfs, VfsEntry, VfsError, VfsMetadata};
|
use fparkan_vfs::{DirectoryVfs, MemoryVfs, Vfs, VfsEntry, VfsError, VfsMetadata};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Condvar;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
enum FailingReadMode {
|
enum FailingReadMode {
|
||||||
Ambiguous(&'static str),
|
Ambiguous(&'static str),
|
||||||
@@ -910,6 +934,74 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct CoordinatedReadState {
|
||||||
|
current: Arc<[u8]>,
|
||||||
|
first_read_started: bool,
|
||||||
|
release_first_read: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct CoordinatedReadVfs {
|
||||||
|
state: Mutex<CoordinatedReadState>,
|
||||||
|
first_read_gate: Condvar,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CoordinatedReadVfs {
|
||||||
|
fn new(initial: Arc<[u8]>) -> Self {
|
||||||
|
Self {
|
||||||
|
state: Mutex::new(CoordinatedReadState {
|
||||||
|
current: initial,
|
||||||
|
first_read_started: false,
|
||||||
|
release_first_read: false,
|
||||||
|
}),
|
||||||
|
first_read_gate: Condvar::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wait_for_first_read(&self) {
|
||||||
|
let mut state = self.state.lock().expect("state");
|
||||||
|
while !state.first_read_started {
|
||||||
|
state = self.first_read_gate.wait(state).expect("wait");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn replace_current(&self, bytes: Arc<[u8]>) {
|
||||||
|
self.state.lock().expect("state").current = bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn release_first_read(&self) {
|
||||||
|
let mut state = self.state.lock().expect("state");
|
||||||
|
state.release_first_read = true;
|
||||||
|
self.first_read_gate.notify_all();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Vfs for CoordinatedReadVfs {
|
||||||
|
fn metadata(&self, _path: &NormalizedPath) -> Result<VfsMetadata, VfsError> {
|
||||||
|
let state = self.state.lock().expect("state");
|
||||||
|
Ok(VfsMetadata {
|
||||||
|
len: state.current.len() as u64,
|
||||||
|
fingerprint: sha256(&state.current),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read(&self, _path: &NormalizedPath) -> Result<Arc<[u8]>, VfsError> {
|
||||||
|
let mut state = self.state.lock().expect("state");
|
||||||
|
let snapshot = Arc::clone(&state.current);
|
||||||
|
if !state.first_read_started {
|
||||||
|
state.first_read_started = true;
|
||||||
|
self.first_read_gate.notify_all();
|
||||||
|
while !state.release_first_read {
|
||||||
|
state = self.first_read_gate.wait(state).expect("wait");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(snapshot)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list(&self, _prefix: &NormalizedPath) -> Result<Vec<VfsEntry>, VfsError> {
|
||||||
|
unreachable!("list is not used in these tests");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn cached_repository_reads_synthetic_nres() {
|
fn cached_repository_reads_synthetic_nres() {
|
||||||
let path = archive_path(b"archives/test.lib").expect("path");
|
let path = archive_path(b"archives/test.lib").expect("path");
|
||||||
@@ -941,6 +1033,26 @@ mod tests {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn concurrent_same_archive_open_reuses_archive_id() {
|
||||||
|
let path = archive_path(b"archives/test.lib").expect("path");
|
||||||
|
let bytes = Arc::from(build_nres(&[("Alpha.TXT", b"alpha".as_slice())]).into_boxed_slice());
|
||||||
|
let mut vfs = MemoryVfs::default();
|
||||||
|
vfs.insert(path.clone(), bytes);
|
||||||
|
let repo = Arc::new(CachedResourceRepository::new(Arc::new(vfs)));
|
||||||
|
let first_repo = Arc::clone(&repo);
|
||||||
|
let first_path = path.clone();
|
||||||
|
let first = thread::spawn(move || first_repo.open_archive(&first_path));
|
||||||
|
let second_repo = Arc::clone(&repo);
|
||||||
|
let second_path = path.clone();
|
||||||
|
let second = thread::spawn(move || second_repo.open_archive(&second_path));
|
||||||
|
|
||||||
|
let first = first.join().expect("first join").expect("first archive");
|
||||||
|
let second = second.join().expect("second join").expect("second archive");
|
||||||
|
|
||||||
|
assert_eq!(first, second);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn entry_handles_are_archive_qualified() {
|
fn entry_handles_are_archive_qualified() {
|
||||||
let first_path = archive_path(b"first.lib").expect("first path");
|
let first_path = archive_path(b"first.lib").expect("first path");
|
||||||
@@ -1125,6 +1237,36 @@ mod tests {
|
|||||||
let _ = std::fs::remove_dir_all(root);
|
let _ = std::fs::remove_dir_all(root);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn concurrent_replacement_old_decode_cannot_overwrite_new() {
|
||||||
|
let path = archive_path(b"cache/concurrent.lib").expect("path");
|
||||||
|
let old_bytes =
|
||||||
|
Arc::from(build_nres(&[("same.bin", b"old".as_slice())]).into_boxed_slice());
|
||||||
|
let new_bytes =
|
||||||
|
Arc::from(build_nres(&[("same.bin", b"new".as_slice())]).into_boxed_slice());
|
||||||
|
let vfs = Arc::new(CoordinatedReadVfs::new(old_bytes));
|
||||||
|
let repo = Arc::new(CachedResourceRepository::new(vfs.clone()));
|
||||||
|
let stale_repo = Arc::clone(&repo);
|
||||||
|
let stale_path = path.clone();
|
||||||
|
let stale_open = thread::spawn(move || stale_repo.open_archive(&stale_path));
|
||||||
|
|
||||||
|
vfs.wait_for_first_read();
|
||||||
|
vfs.replace_current(Arc::clone(&new_bytes));
|
||||||
|
let current_archive = repo.open_archive(&path).expect("open current archive");
|
||||||
|
vfs.release_first_read();
|
||||||
|
let raced_archive = stale_open
|
||||||
|
.join()
|
||||||
|
.expect("join stale thread")
|
||||||
|
.expect("stale open");
|
||||||
|
|
||||||
|
assert_eq!(raced_archive, current_archive);
|
||||||
|
let handle = repo
|
||||||
|
.find(current_archive, &resource_name(b"same.bin"))
|
||||||
|
.expect("find current")
|
||||||
|
.expect("current handle");
|
||||||
|
assert_eq!(repo.read(handle).expect("read current").as_slice(), b"new");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn entry_read_error_carries_archive_path_and_entry_name() {
|
fn entry_read_error_carries_archive_path_and_entry_name() {
|
||||||
let path = archive_path(b"bad/rsli.lib").expect("path");
|
let path = archive_path(b"bad/rsli.lib").expect("path");
|
||||||
|
|||||||
Reference in New Issue
Block a user