Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,43 @@ where
}
}

/// Get a value as a different type T
pub fn get_as<T>(&self, key: &K) -> anyhow::Result<T>
where
T: DeserializeOwned,
{
let key = serde_json::to_vec(key)?;
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Get { key },
})?)
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;

match response {
KvResponse::Get { .. } => {
let bytes = match get_blob() {
Some(bytes) => bytes.bytes,
None => return Err(anyhow::anyhow!("kv: no blob")),
};
let value = serde_json::from_slice::<T>(&bytes)
.map_err(|e| anyhow::anyhow!("Failed to deserialize value: {}", e))?;
Ok(value)
}
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}

/// Set a value, optionally in a transaction.
pub fn set(&self, key: &K, value: &V, tx_id: Option<u64>) -> anyhow::Result<()> {
let key = serde_json::to_vec(key)?;
Expand Down Expand Up @@ -130,6 +167,38 @@ where
}
}

/// Set a value as a different type T
pub fn set_as<T>(&self, key: &K, value: &T, tx_id: Option<u64>) -> anyhow::Result<()>
where
T: Serialize,
{
let key = serde_json::to_vec(key)?;
let value = serde_json::to_vec(value)?;

let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Set { key, tx_id },
})?)
.blob_bytes(value)
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;

match response {
KvResponse::Ok => Ok(()),
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}

/// Delete a value, optionally in a transaction.
pub fn delete(&self, key: &K, tx_id: Option<u64>) -> anyhow::Result<()> {
let key = serde_json::to_vec(key)?;
Expand All @@ -156,6 +225,36 @@ where
}
}

/// Delete a value with a different key type
pub fn delete_as<T>(&self, key: &T, tx_id: Option<u64>) -> anyhow::Result<()>
where
T: Serialize,
{
let key = serde_json::to_vec(key)?;

let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Delete { key, tx_id },
})?)
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;

match response {
KvResponse::Ok => Ok(()),
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}

/// Begin a transaction.
pub fn begin_tx(&self) -> anyhow::Result<u64> {
let res = Request::new()
Expand Down
Loading