Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ classifiers = [
"Topic :: System :: Networking",
]
dynamic = ["version"]
dependencies = [
"typing-extensions>=4.14.0",
]

[[project.authors]]
name = "Pavel Kirilin"
Expand Down
4 changes: 3 additions & 1 deletion python/natsrpy/_natsrpy_rs/js/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import timedelta
from typing import Any

from .managers import KVManager, StreamsManager
from .managers import KVManager, ObjectStoreManager, StreamsManager

class JetStream:
async def publish(
Expand All @@ -17,6 +17,8 @@ class JetStream:
def kv(self) -> KVManager: ...
@property
def streams(self) -> StreamsManager: ...
@property
def object_store(self) -> ObjectStoreManager: ...

class JetStreamMessage:
@property
Expand Down
6 changes: 6 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/managers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ from .consumers import (
PushConsumerConfig,
)
from .kv import KeyValue, KVConfig
from .object_store import ObjectStore, ObjectStoreConfig
from .stream import Stream, StreamConfig

class StreamsManager:
Expand All @@ -28,3 +29,8 @@ class ConsumersManager:
async def create(self, config: PullConsumerConfig) -> PullConsumer: ...
@overload
async def create(self, config: PushConsumerConfig) -> PushConsumer: ...

class ObjectStoreManager:
async def create(self, config: ObjectStoreConfig) -> ObjectStore: ...
async def get(self, bucket: str) -> ObjectStore: ...
async def delete(self, bucket: str) -> None: ...
45 changes: 45 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/object_store.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from datetime import timedelta

from typing_extensions import Writer

from .stream import Placement, StorageType

class ObjectStoreConfig:
bucket: str
description: str | None
max_age: timedelta
max_bytes: int
storage: StorageType
num_replicas: int
compression: bool
placement: Placement | None

def __init__(
self,
bucket: str,
description: str | None = None,
max_age: float | timedelta | None = None,
max_bytes: int | None = None,
storage: StorageType | None = None,
num_replicas: int | None = None,
compression: bool | None = None,
placement: Placement | None = None,
) -> None: ...

class ObjectStore:
async def get(
self,
name: str,
writer: Writer[bytes],
chunk_size: int | None = 24576, # 24MB
) -> None: ...
async def put(
self,
name: str,
value: bytes | str,
chunk_size: int = 24576, # 24MB
description: str | None = None,
headers: dict[str, str | list[str]] | None = None,
metadata: dict[str, str] | None = None,
) -> None: ...
async def delete(self, name: str) -> None: ...
3 changes: 3 additions & 0 deletions python/natsrpy/js/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
ReplayPolicy,
)
from natsrpy.js.kv import KeyValue, KVConfig
from natsrpy.js.object_store import ObjectStore, ObjectStoreConfig
from natsrpy.js.stream import (
Compression,
ConsumerLimits,
Expand Down Expand Up @@ -37,6 +38,8 @@
"JetStream",
"KVConfig",
"KeyValue",
"ObjectStore",
"ObjectStoreConfig",
"PersistenceMode",
"Placement",
"PriorityPolicy",
Expand Down
6 changes: 6 additions & 0 deletions python/natsrpy/js/object_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from natsrpy._natsrpy_rs.js.object_store import ObjectStore, ObjectStoreConfig

__all__ = [
"ObjectStore",
"ObjectStoreConfig",
]
10 changes: 10 additions & 0 deletions src/exceptions/rust_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub type NatsrpyResult<T> = Result<T, NatsrpyError>;

#[derive(thiserror::Error, Debug)]
pub enum NatsrpyError {
#[error(transparent)]
StdIOError(#[from] std::io::Error),
#[error("NATS session error: {0}")]
SessionError(String),
#[error("Invalid arguemnt: {0}")]
Expand Down Expand Up @@ -72,6 +74,14 @@ pub enum NatsrpyError {
PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError),
#[error(transparent)]
ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError),
#[error(transparent)]
ObjectStoreError(#[from] async_nats::jetstream::context::ObjectStoreError),
#[error(transparent)]
ObjectStoreGetError(#[from] async_nats::jetstream::object_store::GetError),
#[error(transparent)]
ObjectStorePutError(#[from] async_nats::jetstream::object_store::PutError),
#[error(transparent)]
ObjectStoreDeleteError(#[from] async_nats::jetstream::object_store::DeleteError),
}

impl From<NatsrpyError> for pyo3::PyErr {
Expand Down
Empty file added src/js/counters.rs
Empty file.
32 changes: 19 additions & 13 deletions src/js/jetstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
js::managers::{kv::KVManager, streams::StreamsManager},
js::managers::{kv::KVManager, object_store::ObjectStoreManager, streams::StreamsManager},
utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::SendableValue},
};

Expand All @@ -26,6 +26,24 @@ impl JetStream {

#[pyo3::pymethods]
impl JetStream {
#[getter]
#[must_use]
pub fn kv(&self) -> KVManager {
KVManager::new(self.ctx.clone())
}

#[getter]
#[must_use]
pub fn streams(&self) -> StreamsManager {
StreamsManager::new(self.ctx.clone())
}

#[getter]
#[must_use]
pub fn object_store(&self) -> ObjectStoreManager {
ObjectStoreManager::new(self.ctx.clone())
}

#[pyo3(signature = (
subject,
payload,
Expand Down Expand Up @@ -66,16 +84,4 @@ impl JetStream {
Ok(())
})
}

#[getter]
#[must_use]
pub fn kv(&self) -> KVManager {
KVManager::new(self.ctx.clone())
}

#[getter]
#[must_use]
pub fn streams(&self) -> StreamsManager {
StreamsManager::new(self.ctx.clone())
}
}
3 changes: 3 additions & 0 deletions src/js/managers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod consumers;
pub mod kv;
pub mod object_store;
pub mod streams;

#[pyo3::pymodule(submodule, name = "managers")]
Expand All @@ -9,5 +10,7 @@ pub mod pymod {
#[pymodule_export]
use super::kv::KVManager;
#[pymodule_export]
use super::object_store::ObjectStoreManager;
#[pymodule_export]
use super::streams::StreamsManager;
}
58 changes: 58 additions & 0 deletions src/js/managers/object_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::NatsrpyResult,
js::object_store::{ObjectStore, ObjectStoreConfig},
utils::natsrpy_future,
};

#[pyo3::pyclass]
pub struct ObjectStoreManager {
ctx: Arc<RwLock<async_nats::jetstream::Context>>,
}

impl ObjectStoreManager {
pub const fn new(ctx: Arc<RwLock<async_nats::jetstream::Context>>) -> Self {
Self { ctx }
}
}

#[pyo3::pymethods]
impl ObjectStoreManager {
pub fn get<'py>(&self, py: Python<'py>, bucket: String) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx_guard = self.ctx.clone();
natsrpy_future(py, async move {
Ok(ObjectStore::new(
ctx_guard.read().await.get_object_store(bucket).await?,
))
})
}

pub fn create<'py>(
&self,
py: Python<'py>,
config: ObjectStoreConfig,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx_guard = self.ctx.clone();
natsrpy_future(py, async move {
Ok(ObjectStore::new(
ctx_guard
.read()
.await
.create_object_store(config.into())
.await?,
))
})
}

pub fn delete<'py>(&self, py: Python<'py>, bucket: String) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx_guard = self.ctx.clone();
natsrpy_future(py, async move {
ctx_guard.read().await.delete_object_store(bucket).await?;
Ok(())
})
}
}
3 changes: 3 additions & 0 deletions src/js/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod jetstream;
pub mod kv;
pub mod managers;
pub mod message;
pub mod object_store;
pub mod stream;

#[pyo3::pymodule(submodule, name = "js")]
Expand All @@ -21,5 +22,7 @@ pub mod pymod {
#[pymodule_export]
pub use super::managers::pymod as managers;
#[pymodule_export]
pub use super::object_store::pymod as object_store;
#[pymodule_export]
pub use super::stream::pymod as stream;
}
Loading
Loading