diff --git a/pyproject.toml b/pyproject.toml index f7f4336..72cc040 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,9 @@ classifiers = [ "Topic :: System :: Networking", ] dynamic = ["version"] +dependencies = [ + "typing-extensions>=4.14.0", +] [[project.authors]] name = "Pavel Kirilin" diff --git a/python/natsrpy/_natsrpy_rs/js/__init__.pyi b/python/natsrpy/_natsrpy_rs/js/__init__.pyi index 3f223f6..70b9c28 100644 --- a/python/natsrpy/_natsrpy_rs/js/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/js/__init__.pyi @@ -1,7 +1,7 @@ from datetime import timedelta from typing import Any -from .managers import KVManager, StreamsManager +from .managers import KVManager, ObjectStoreManager, StreamsManager class JetStream: async def publish( @@ -17,6 +17,8 @@ class JetStream: def kv(self) -> KVManager: ... @property def streams(self) -> StreamsManager: ... + @property + def object_store(self) -> ObjectStoreManager: ... class JetStreamMessage: @property diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index 64a4a72..5da191a 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -7,6 +7,7 @@ from .consumers import ( PushConsumerConfig, ) from .kv import KeyValue, KVConfig +from .object_store import ObjectStore, ObjectStoreConfig from .stream import Stream, StreamConfig class StreamsManager: @@ -28,3 +29,8 @@ class ConsumersManager: async def create(self, config: PullConsumerConfig) -> PullConsumer: ... @overload async def create(self, config: PushConsumerConfig) -> PushConsumer: ... + +class ObjectStoreManager: + async def create(self, config: ObjectStoreConfig) -> ObjectStore: ... + async def get(self, bucket: str) -> ObjectStore: ... + async def delete(self, bucket: str) -> None: ... diff --git a/python/natsrpy/_natsrpy_rs/js/object_store.pyi b/python/natsrpy/_natsrpy_rs/js/object_store.pyi new file mode 100644 index 0000000..ac596de --- /dev/null +++ b/python/natsrpy/_natsrpy_rs/js/object_store.pyi @@ -0,0 +1,45 @@ +from datetime import timedelta + +from typing_extensions import Writer + +from .stream import Placement, StorageType + +class ObjectStoreConfig: + bucket: str + description: str | None + max_age: timedelta + max_bytes: int + storage: StorageType + num_replicas: int + compression: bool + placement: Placement | None + + def __init__( + self, + bucket: str, + description: str | None = None, + max_age: float | timedelta | None = None, + max_bytes: int | None = None, + storage: StorageType | None = None, + num_replicas: int | None = None, + compression: bool | None = None, + placement: Placement | None = None, + ) -> None: ... + +class ObjectStore: + async def get( + self, + name: str, + writer: Writer[bytes], + chunk_size: int | None = 24576, # 24MB + ) -> None: ... + async def put( + self, + name: str, + value: bytes | str, + chunk_size: int = 24576, # 24MB + description: str | None = None, + headers: dict[str, str | list[str]] | None = None, + metadata: dict[str, str] | None = None, + ) -> None: ... + async def delete(self, name: str) -> None: ... diff --git a/python/natsrpy/js/__init__.py b/python/natsrpy/js/__init__.py index 24b1788..271ea54 100644 --- a/python/natsrpy/js/__init__.py +++ b/python/natsrpy/js/__init__.py @@ -10,6 +10,7 @@ ReplayPolicy, ) from natsrpy.js.kv import KeyValue, KVConfig +from natsrpy.js.object_store import ObjectStore, ObjectStoreConfig from natsrpy.js.stream import ( Compression, ConsumerLimits, @@ -37,6 +38,8 @@ "JetStream", "KVConfig", "KeyValue", + "ObjectStore", + "ObjectStoreConfig", "PersistenceMode", "Placement", "PriorityPolicy", diff --git a/python/natsrpy/js/object_store.py b/python/natsrpy/js/object_store.py new file mode 100644 index 0000000..0c568ad --- /dev/null +++ b/python/natsrpy/js/object_store.py @@ -0,0 +1,6 @@ +from natsrpy._natsrpy_rs.js.object_store import ObjectStore, ObjectStoreConfig + +__all__ = [ + "ObjectStore", + "ObjectStoreConfig", +] diff --git a/src/exceptions/rust_err.rs b/src/exceptions/rust_err.rs index 32a1c42..02ca844 100644 --- a/src/exceptions/rust_err.rs +++ b/src/exceptions/rust_err.rs @@ -6,6 +6,8 @@ pub type NatsrpyResult = Result; #[derive(thiserror::Error, Debug)] pub enum NatsrpyError { + #[error(transparent)] + StdIOError(#[from] std::io::Error), #[error("NATS session error: {0}")] SessionError(String), #[error("Invalid arguemnt: {0}")] @@ -72,6 +74,14 @@ pub enum NatsrpyError { PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError), #[error(transparent)] ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError), + #[error(transparent)] + ObjectStoreError(#[from] async_nats::jetstream::context::ObjectStoreError), + #[error(transparent)] + ObjectStoreGetError(#[from] async_nats::jetstream::object_store::GetError), + #[error(transparent)] + ObjectStorePutError(#[from] async_nats::jetstream::object_store::PutError), + #[error(transparent)] + ObjectStoreDeleteError(#[from] async_nats::jetstream::object_store::DeleteError), } impl From for pyo3::PyErr { diff --git a/src/js/counters.rs b/src/js/counters.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/js/jetstream.rs b/src/js/jetstream.rs index b2f0dac..00dbae0 100644 --- a/src/js/jetstream.rs +++ b/src/js/jetstream.rs @@ -6,7 +6,7 @@ use tokio::sync::RwLock; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, - js::managers::{kv::KVManager, streams::StreamsManager}, + js::managers::{kv::KVManager, object_store::ObjectStoreManager, streams::StreamsManager}, utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::SendableValue}, }; @@ -26,6 +26,24 @@ impl JetStream { #[pyo3::pymethods] impl JetStream { + #[getter] + #[must_use] + pub fn kv(&self) -> KVManager { + KVManager::new(self.ctx.clone()) + } + + #[getter] + #[must_use] + pub fn streams(&self) -> StreamsManager { + StreamsManager::new(self.ctx.clone()) + } + + #[getter] + #[must_use] + pub fn object_store(&self) -> ObjectStoreManager { + ObjectStoreManager::new(self.ctx.clone()) + } + #[pyo3(signature = ( subject, payload, @@ -66,16 +84,4 @@ impl JetStream { Ok(()) }) } - - #[getter] - #[must_use] - pub fn kv(&self) -> KVManager { - KVManager::new(self.ctx.clone()) - } - - #[getter] - #[must_use] - pub fn streams(&self) -> StreamsManager { - StreamsManager::new(self.ctx.clone()) - } } diff --git a/src/js/managers/mod.rs b/src/js/managers/mod.rs index 99d5e38..399b866 100644 --- a/src/js/managers/mod.rs +++ b/src/js/managers/mod.rs @@ -1,5 +1,6 @@ pub mod consumers; pub mod kv; +pub mod object_store; pub mod streams; #[pyo3::pymodule(submodule, name = "managers")] @@ -9,5 +10,7 @@ pub mod pymod { #[pymodule_export] use super::kv::KVManager; #[pymodule_export] + use super::object_store::ObjectStoreManager; + #[pymodule_export] use super::streams::StreamsManager; } diff --git a/src/js/managers/object_store.rs b/src/js/managers/object_store.rs new file mode 100644 index 0000000..c4c9c51 --- /dev/null +++ b/src/js/managers/object_store.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use pyo3::{Bound, PyAny, Python}; +use tokio::sync::RwLock; + +use crate::{ + exceptions::rust_err::NatsrpyResult, + js::object_store::{ObjectStore, ObjectStoreConfig}, + utils::natsrpy_future, +}; + +#[pyo3::pyclass] +pub struct ObjectStoreManager { + ctx: Arc>, +} + +impl ObjectStoreManager { + pub const fn new(ctx: Arc>) -> Self { + Self { ctx } + } +} + +#[pyo3::pymethods] +impl ObjectStoreManager { + pub fn get<'py>(&self, py: Python<'py>, bucket: String) -> NatsrpyResult> { + let ctx_guard = self.ctx.clone(); + natsrpy_future(py, async move { + Ok(ObjectStore::new( + ctx_guard.read().await.get_object_store(bucket).await?, + )) + }) + } + + pub fn create<'py>( + &self, + py: Python<'py>, + config: ObjectStoreConfig, + ) -> NatsrpyResult> { + let ctx_guard = self.ctx.clone(); + natsrpy_future(py, async move { + Ok(ObjectStore::new( + ctx_guard + .read() + .await + .create_object_store(config.into()) + .await?, + )) + }) + } + + pub fn delete<'py>(&self, py: Python<'py>, bucket: String) -> NatsrpyResult> { + let ctx_guard = self.ctx.clone(); + natsrpy_future(py, async move { + ctx_guard.read().await.delete_object_store(bucket).await?; + Ok(()) + }) + } +} diff --git a/src/js/mod.rs b/src/js/mod.rs index 1ebefa8..f6f16f9 100644 --- a/src/js/mod.rs +++ b/src/js/mod.rs @@ -3,6 +3,7 @@ pub mod jetstream; pub mod kv; pub mod managers; pub mod message; +pub mod object_store; pub mod stream; #[pyo3::pymodule(submodule, name = "js")] @@ -21,5 +22,7 @@ pub mod pymod { #[pymodule_export] pub use super::managers::pymod as managers; #[pymodule_export] + pub use super::object_store::pymod as object_store; + #[pymodule_export] pub use super::stream::pymod as stream; } diff --git a/src/js/object_store.rs b/src/js/object_store.rs new file mode 100644 index 0000000..4058f7f --- /dev/null +++ b/src/js/object_store.rs @@ -0,0 +1,197 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use async_nats::HeaderMap; +use pyo3::{Bound, Py, PyAny, Python, types::PyDict}; +use tokio::{io::AsyncReadExt, sync::RwLock}; + +use crate::{ + exceptions::rust_err::NatsrpyResult, + js::stream::{Placement, StorageType}, + utils::{ + headers::NatsrpyHeadermapExt, + natsrpy_future, + py_types::{SendableValue, TimeValue}, + }, +}; + +#[pyo3::pyclass(from_py_object, get_all, set_all)] +#[derive(Debug, Clone)] +pub struct ObjectStoreConfig { + pub bucket: String, + pub description: Option, + pub max_age: Duration, + pub max_bytes: i64, + pub storage: StorageType, + pub num_replicas: usize, + pub compression: bool, + pub placement: Option, +} + +impl From for async_nats::jetstream::object_store::Config { + fn from(value: ObjectStoreConfig) -> Self { + Self { + bucket: value.bucket, + description: value.description, + max_age: value.max_age, + max_bytes: value.max_bytes, + storage: value.storage.into(), + num_replicas: value.num_replicas, + compression: value.compression, + placement: value.placement.map(Into::into), + } + } +} + +#[pyo3::pymethods] +impl ObjectStoreConfig { + #[new] + #[pyo3(signature=( + bucket, + description=None, + max_age=None, + max_bytes=None, + storage=None, + num_replicas=None, + compression=None, + placement=None, + + ))] + pub fn __new__( + bucket: String, + description: Option, + max_age: Option, + max_bytes: Option, + storage: Option, + num_replicas: Option, + compression: Option, + placement: Option, + ) -> Self { + Self { + bucket, + description, + placement, + max_age: max_age.map(Into::into).unwrap_or_default(), + max_bytes: max_bytes.unwrap_or_default(), + storage: storage.unwrap_or_default(), + num_replicas: num_replicas.unwrap_or_default(), + compression: compression.unwrap_or_default(), + } + } +} + +#[pyo3::pyclass] +pub struct ObjectStore { + object_store: Arc>, +} + +impl ObjectStore { + #[must_use] + pub fn new(object_store: async_nats::jetstream::object_store::ObjectStore) -> Self { + Self { + object_store: Arc::new(RwLock::new(object_store)), + } + } +} + +#[pyo3::pymethods] +impl ObjectStore { + #[pyo3(signature=( + name, + writer, + chunk_size=24 * 1024 + ))] + pub fn get<'py>( + &self, + py: Python<'py>, + name: String, + writer: Py, + chunk_size: Option, + ) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + let arc_writer = Arc::new(writer); + natsrpy_future(py, async move { + let mut object = ctx_guard.read().await.get(name).await?; + let mut buf = + chunk_size.map_or_else(bytes::BytesMut::new, bytes::BytesMut::with_capacity); + loop { + let read = object.read_buf(&mut buf).await?; + if read == 0 { + break; + } + // Buffer is cheap to clone. Since + // it copies only pointer to memory. + let to_write = buf.clone(); + // Writer is wrapped into Arc, so it's also + // cheap to clone. So its fine. + let writer_ref = arc_writer.clone(); + tokio::task::spawn_blocking(move || { + Python::attach(|gil| { + writer_ref.call_method1(gil, "write", (&to_write[..read],)) + }) + }) + .await??; + buf.clear(); + } + Ok(()) + }) + } + + pub fn delete<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + natsrpy_future(py, async move { + ctx_guard.read().await.delete(name).await?; + Ok(()) + }) + } + + #[pyo3(signature=( + name, + value, + chunk_size=24 * 1024, + description=None, + headers=None, + metadata=None, + ))] + pub fn put<'py>( + &self, + py: Python<'py>, + name: String, + value: SendableValue, + chunk_size: Option, + description: Option, + headers: Option>, + metadata: Option>, + ) -> NatsrpyResult> { + let ctx_guard = self.object_store.clone(); + let headers = headers.map(|val| HeaderMap::from_pydict(val)).transpose()?; + let meta = async_nats::jetstream::object_store::ObjectMetadata { + name, + chunk_size, + description, + metadata: metadata.unwrap_or_default(), + headers, + }; + natsrpy_future(py, async move { + match value { + SendableValue::Bytes(data) => { + let mut reader = tokio::io::BufReader::new(&*data); + ctx_guard.read().await.put(meta, &mut reader).await?; + } + SendableValue::String(filename) => { + let mut reader = tokio::io::BufReader::with_capacity( + chunk_size.unwrap_or(200 * 1024), + tokio::fs::File::open(filename).await?, + ); + ctx_guard.read().await.put(meta, &mut reader).await?; + } + } + Ok(()) + }) + } +} + +#[pyo3::pymodule(submodule, name = "object_store")] +pub mod pymod { + #[pymodule_export] + pub use super::{ObjectStore, ObjectStoreConfig}; +} diff --git a/uv.lock b/uv.lock index 265b25d..70fd9ac 100644 --- a/uv.lock +++ b/uv.lock @@ -67,6 +67,9 @@ wheels = [ [[package]] name = "natsrpy" source = { editable = "." } +dependencies = [ + { name = "typing-extensions" }, +] [package.dev-dependencies] dev = [ @@ -76,6 +79,7 @@ dev = [ ] [package.metadata] +requires-dist = [{ name = "typing-extensions", specifier = ">=4.14.0" }] [package.metadata.requires-dev] dev = [