-
-
Save visionarylab/7d250ac1a6422102ab212825b7a76fab to your computer and use it in GitHub Desktop.
SQLite and CAS: An Experimental Edge in Edge Compute Platforms
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "rust-sqlite-test" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
anyhow = "1.0.72" | |
byteorder = "1.4.3" | |
cid = "0.10.1" | |
fvm_ipld_amt = "0.6.1" | |
fvm_ipld_blockstore = "0.2.0" | |
fvm_ipld_encoding = "0.4.0" | |
rand = "0.8.5" | |
rusqlite = "0.29.0" | |
sqlite-vfs = "0.2.0" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cargo new --lib rust-sqlite-test | |
cargo add rusqlite | |
cargo add sqlite-vfs | |
cargo add anyhow | |
cargo add rand | |
cargo add byteorder | |
cargo add cid | |
cargo add fvm_ipld_blockstore | |
cargo add fvm_ipld_amt | |
cargo add fvm_ipld_encoding |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Lots of stuff "borrowed" from: | |
// https://github.com/rkusa/wasm-sqlite/blob/main/wasm/src/vfs.rs, and | |
// https://github.com/sanderpick/builtin-actors/blob/sander/tableland-actor2/actors/tableland/src/vfs.rs | |
use byteorder::{BigEndian, ByteOrder}; | |
use cid::Cid; | |
use fvm_ipld_amt::{Amt, Error as AmtError}; | |
use fvm_ipld_blockstore::Blockstore; | |
use fvm_ipld_encoding::BytesDe; | |
use sqlite_vfs::{LockKind, OpenKind, OpenOptions, Vfs}; | |
use std::fmt::Debug; | |
use std::io::{self, ErrorKind}; | |
use std::io::{Cursor, Read, Seek, SeekFrom, Write}; | |
use std::sync::{Arc, Mutex}; | |
fn tbytes(bz: &[u8]) -> BytesDe { | |
BytesDe(bz.to_vec()) | |
} | |
#[derive(Debug, Clone)] | |
pub struct PagesVfs<BS> | |
where | |
BS: Blockstore + Send + Clone, | |
{ | |
lock_state: Arc<Mutex<LockState>>, | |
state: Arc<Mutex<Amt<BytesDe, BS>>>, | |
// This _could_ be a generic value, but since we're reading from the initial_bytes | |
// we'll keep it dynamic like this. | |
page_size: usize, | |
} | |
#[derive(Debug, Default)] | |
struct LockState { | |
read: usize, | |
write: Option<bool>, | |
} | |
pub struct Connection<BS> | |
where | |
BS: Blockstore + Send + Clone, | |
{ | |
lock_state: Arc<Mutex<LockState>>, | |
lock: LockKind, | |
state: Arc<Mutex<Amt<BytesDe, BS>>>, | |
kind: OpenKind, | |
// We can use an in-memory cursor for the journal (and wal files) | |
// Ideally, we store off the root CID on success in the blockstore, | |
// and then we can simply ignore the journal/wal files and rollback | |
// to past root CIDs on failure. | |
cache: Cursor<Vec<u8>>, | |
page_size: usize, | |
} | |
impl<BS> PagesVfs<BS> | |
where | |
BS: Blockstore + Send + Clone, | |
{ | |
pub fn new(db: BS, initial_bytes: &[u8]) -> Self { | |
let mut amt = Amt::new(db); | |
// Assumes the SQLite file header is well formed | |
let page_size = match BigEndian::read_u16(&initial_bytes[16..18]) { | |
1 => 65_536, | |
x => x as u32, | |
}; | |
// The largest power of 2 a u16 can hold _is_ 32_768 | |
if page_size < 512 || !page_size.is_power_of_two() { | |
panic!("invalid page size: {}", page_size); | |
} | |
// Split the initial_bytes into chunks and add them to the Amt | |
for chunk in initial_bytes.chunks(page_size as usize) { | |
amt.set(0, tbytes(chunk)).unwrap(); | |
} | |
PagesVfs { | |
lock_state: Arc::new(Mutex::new(Default::default())), | |
state: Arc::new(Mutex::new(amt)), | |
page_size: page_size as usize, | |
} | |
} | |
pub fn root(&self) -> Result<Cid, AmtError> { | |
self.state.lock().unwrap().flush() | |
} | |
// Just for testing purposes. | |
pub fn state(&self) -> Arc<Mutex<Amt<BytesDe, BS>>> { | |
self.state.clone() | |
} | |
} | |
impl<BS> Vfs for PagesVfs<BS> | |
where | |
BS: Blockstore + Send + Clone, | |
{ | |
type Handle = Connection<BS>; | |
fn open(&self, db: &str, opts: OpenOptions) -> Result<Self::Handle, io::Error> { | |
match opts.kind { | |
OpenKind::MainDb => {} | |
OpenKind::MainJournal => {} | |
_ => { | |
return Err(io::Error::new( | |
ErrorKind::PermissionDenied, | |
"only main database supported", | |
)); | |
} | |
} | |
// Always open the same database for now. | |
if !db.starts_with("main.db") { | |
return Err(io::Error::new( | |
ErrorKind::NotFound, | |
format!("unexpected database name `{db}`; expected `main.db`"), | |
)); | |
} | |
Ok(Connection { | |
lock_state: self.lock_state.clone(), | |
lock: LockKind::None, | |
state: self.state.clone(), | |
kind: opts.kind, | |
cache: Cursor::default(), | |
page_size: self.page_size, | |
}) | |
} | |
fn delete(&self, _db: &str) -> Result<(), io::Error> { | |
// Here we could update our root hash to be the root hash of "state", | |
// otherwise, on rollback, we would simply keep our old root hash. | |
Ok(()) | |
} | |
fn exists(&self, db: &str) -> Result<bool, io::Error> { | |
Ok(db == "main.db" && self.state.lock().unwrap().count() > 0) | |
} | |
fn temporary_name(&self) -> String { | |
String::from("main.db") | |
} | |
fn random(&self, buffer: &mut [i8]) { | |
rand::Rng::fill(&mut rand::thread_rng(), buffer); | |
} | |
fn sleep(&self, duration: std::time::Duration) -> std::time::Duration { | |
std::thread::sleep(duration); | |
// From: https://github.com/rkusa/sqlite-vfs/blob/main/test-vfs/src/vfs.rs#L158 | |
// Well, this function is only supposed to sleep at least `n_micro`μs, but there are | |
// tests that expect the return to match exactly `n_micro`. As those tests are flaky as | |
// a result, we are cheating here. | |
duration | |
} | |
} | |
impl<BS> sqlite_vfs::DatabaseHandle for Connection<BS> | |
where | |
BS: Blockstore + Send + Clone, | |
{ | |
// We aren't going to worry about WAL files... but we could! | |
type WalIndex = sqlite_vfs::WalDisabled; | |
fn size(&self) -> Result<u64, io::Error> { | |
if self.kind == OpenKind::MainJournal { | |
return Ok(self.cache.get_ref().len() as u64); | |
} | |
let size = self.page_count() * self.page_size; | |
Ok(size as u64) | |
} | |
fn read_exact_at(&mut self, buf: &mut [u8], offset: u64) -> Result<(), io::Error> { | |
if self.kind == OpenKind::MainJournal { | |
self.cache.seek(SeekFrom::Start(offset))?; | |
self.cache.read_exact(buf)?; | |
return Ok(()); | |
} | |
let index = offset as usize / self.page_size; | |
let offset = offset as usize % self.page_size; | |
let data = self.get_page(index as u32); | |
if data.len() < buf.len() + offset { | |
return Err(ErrorKind::UnexpectedEof.into()); | |
} | |
buf.copy_from_slice(&data[offset..offset + buf.len()]); | |
Ok(()) | |
} | |
fn write_all_at(&mut self, buf: &[u8], offset: u64) -> Result<(), io::Error> { | |
if self.kind == OpenKind::MainJournal { | |
self.cache.seek(SeekFrom::Start(offset))?; | |
self.cache.write_all(buf)?; | |
return Ok(()); | |
} | |
if offset as usize % self.page_size > 0 { | |
return Err(io::Error::new( | |
ErrorKind::Other, | |
"unexpected write across page boundaries", | |
)); | |
} | |
let index = offset as usize / self.page_size; | |
let page = buf.try_into().map_err(|_| { | |
io::Error::new( | |
ErrorKind::Other, | |
format!( | |
"unexpected write size {}; expected {}", | |
buf.len(), | |
self.page_size | |
), | |
) | |
})?; | |
self.put_page(index as u32, page); | |
Ok(()) | |
} | |
fn sync(&mut self, _data_only: bool) -> Result<(), io::Error> { | |
if self.kind == OpenKind::MainJournal { | |
self.cache.flush()?; | |
} | |
self.state.lock().unwrap().flush().unwrap(); | |
Ok(()) | |
} | |
fn set_len(&mut self, size: u64) -> Result<(), io::Error> { | |
if self.kind == OpenKind::MainJournal { | |
let mut buffer = self.cache.clone().into_inner(); | |
buffer.resize(size as usize, 0); | |
self.cache = Cursor::new(buffer); | |
} | |
let mut page_count = size / self.page_size as u64; | |
if size as usize % self.page_size > 0 { | |
page_count += 1; | |
} | |
let current_page_count = self.page_count() as u64; | |
if page_count > 0 && page_count < current_page_count { | |
let mut state = self.state.lock().unwrap(); | |
let current_count = state.count() as u64; | |
state | |
.batch_delete((page_count + 1)..(current_count - 1), true) | |
.unwrap(); | |
} | |
Ok(()) | |
} | |
fn lock(&mut self, lock: LockKind) -> Result<bool, io::Error> { | |
Ok(Self::lock(self, lock)) | |
} | |
fn reserved(&mut self) -> Result<bool, io::Error> { | |
Ok(Self::reserved(self)) | |
} | |
fn current_lock(&self) -> Result<LockKind, io::Error> { | |
Ok(self.lock) | |
} | |
fn set_chunk_size(&self, chunk_size: usize) -> Result<(), io::Error> { | |
if chunk_size != self.page_size { | |
Err(io::Error::new( | |
ErrorKind::Other, | |
"changing chunk size is not allowed", | |
)) | |
} else { | |
Ok(()) | |
} | |
} | |
fn wal_index(&self, _readonly: bool) -> Result<Self::WalIndex, io::Error> { | |
Ok(sqlite_vfs::WalDisabled::default()) | |
} | |
} | |
impl<BS> Connection<BS> | |
where | |
BS: Blockstore + Send + Clone, | |
{ | |
fn get_page(&self, ix: u32) -> Vec<u8> { | |
let state = self.state.lock().unwrap(); | |
state.get(ix.into()).unwrap().unwrap().clone().into_vec() | |
} | |
fn put_page(&self, ix: u32, data: &[u8]) { | |
let mut state = self.state.lock().unwrap(); | |
state.set(ix.into(), tbytes(data)).unwrap(); | |
} | |
fn page_count(&self) -> usize { | |
let state = self.state.lock().unwrap(); | |
state.count() as usize | |
} | |
fn lock(&mut self, to: LockKind) -> bool { | |
if self.lock == to { | |
return true; | |
} | |
let mut lock_state = self.lock_state.lock().unwrap(); | |
// The following locking implementation is probably not sound (wouldn't be surprised if it | |
// potentially dead-locks), but suffice for the experiment. | |
// See https://github.com/rkusa/wasm-sqlite/blob/main/wasm/src/vfs.rs#L206 | |
match to { | |
LockKind::None => { | |
if self.lock == LockKind::Shared { | |
lock_state.read -= 1; | |
} else if self.lock > LockKind::Shared { | |
lock_state.write = None; | |
} | |
self.lock = LockKind::None; | |
true | |
} | |
LockKind::Shared => { | |
if lock_state.write == Some(true) && self.lock <= LockKind::Shared { | |
return false; | |
} | |
lock_state.read += 1; | |
if self.lock > LockKind::Shared { | |
lock_state.write = None; | |
} | |
self.lock = LockKind::Shared; | |
true | |
} | |
LockKind::Reserved => { | |
if lock_state.write.is_some() || self.lock != LockKind::Shared { | |
return false; | |
} | |
if self.lock == LockKind::Shared { | |
lock_state.read -= 1; | |
} | |
lock_state.write = Some(false); | |
self.lock = LockKind::Reserved; | |
true | |
} | |
LockKind::Pending => { | |
// cannot be requested directly | |
false | |
} | |
LockKind::Exclusive => { | |
if lock_state.write.is_some() && self.lock <= LockKind::Shared { | |
return false; | |
} | |
if self.lock == LockKind::Shared { | |
lock_state.read -= 1; | |
} | |
lock_state.write = Some(true); | |
if lock_state.read == 0 { | |
self.lock = LockKind::Exclusive; | |
true | |
} else { | |
self.lock = LockKind::Pending; | |
false | |
} | |
} | |
} | |
} | |
fn reserved(&self) -> bool { | |
if self.lock > LockKind::Shared { | |
return true; | |
} | |
let lock_state = self.lock_state.lock().unwrap(); | |
lock_state.write.is_some() | |
} | |
} | |
impl<BS> Drop for Connection<BS> | |
where | |
BS: Blockstore + Send + Clone, | |
{ | |
fn drop(&mut self) { | |
if self.lock != LockKind::None { | |
self.lock(LockKind::None); | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use crate::PagesVfs; | |
use fvm_ipld_amt::{diff, Amt}; | |
use fvm_ipld_blockstore::MemoryBlockstore; | |
use rusqlite::{Connection, OpenFlags}; | |
use sqlite_vfs::register; | |
pub const SQLITE_PAGE_SIZE: usize = 4096; | |
#[test] | |
fn basic_get_set() { | |
let mem = MemoryBlockstore::default(); | |
let initial_bytes: &'static [u8] = include_bytes!("file.db"); | |
let vfs = PagesVfs::<_>::new(mem, initial_bytes); | |
let other = vfs.clone(); | |
register("vfs", vfs, true).unwrap(); | |
let mut conn = Connection::open_with_flags_and_vfs( | |
"main.db", | |
OpenFlags::SQLITE_OPEN_READ_WRITE | |
| OpenFlags::SQLITE_OPEN_CREATE | |
| OpenFlags::SQLITE_OPEN_NO_MUTEX, | |
"vfs", | |
) | |
.unwrap(); | |
// If we set this to memory, then we don't need to "cache" cursor, | |
// but then we also don't get the "delete" hook to work with... | |
let journal_mode: String = conn | |
.query_row("pragma journal_mode = delete", [], |row| row.get(0)) | |
.unwrap(); | |
assert_eq!(journal_mode, "delete"); | |
let page_size: usize = conn | |
.query_row("PRAGMA page_size", [], |row| row.get(0)) | |
.unwrap(); | |
assert_eq!(page_size, SQLITE_PAGE_SIZE); | |
let c = other.root().unwrap(); | |
println!("{:?}", c.to_string().as_str()); | |
let tx = conn.transaction().unwrap(); | |
tx.execute( | |
"create table my_table(id integer primary key, msg text);", | |
[], | |
) | |
.unwrap(); | |
tx.execute("insert into my_table(msg) values('hello');", []) | |
.unwrap(); | |
tx.execute("insert into my_table(msg) values('world');", []) | |
.unwrap(); | |
tx.commit().unwrap(); | |
let c = other.root().unwrap(); | |
let mut stmt = conn.prepare("select * from my_table;").unwrap(); | |
let mut rows = stmt.query([]).unwrap(); | |
while let Some(r) = rows.next().unwrap() { | |
let id: i32 = r.get(0).unwrap(); | |
let msg: String = r.get(1).unwrap(); | |
println!("found in db: {} = {}", id, msg); | |
} | |
let db = MemoryBlockstore::new(); | |
let new_amt: Amt<Vec<u8>, _> = Amt::new(&db); | |
let results = diff(&other.state.lock().unwrap(), &new_amt).unwrap(); | |
assert_eq!(results.len(), 2); | |
assert_eq!( | |
c.to_string().as_str(), | |
"bafy2bzaced3vtba4kcg4w5q4dnfdstxfom6t4cv72uuq4jgvkeav24ujdcgtg" | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment