Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/natsrpy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from natsrpy._natsrpy_rs import Message, Nats, Subscription
from ._natsrpy_rs import Message, Nats, Subscription

__all__ = [
"Message",
Expand Down
8 changes: 4 additions & 4 deletions python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class PriorityPolicy:
PRIORITIZED: PriorityPolicy

class PullConsumerConfig:
durable_name: str | None
name: str | None
durable_name: str | None
description: str | None
deliver_policy: DeliverPolicy
delivery_start_sequence: int | None
Expand Down Expand Up @@ -57,8 +57,8 @@ class PullConsumerConfig:

def __init__(
self,
durable_name: str | None = None,
name: str | None = None,
durable_name: str | None = None,
description: str | None = None,
deliver_policy: DeliverPolicy | None = None,
delivery_start_sequence: int | None = None,
Expand Down Expand Up @@ -89,8 +89,8 @@ class PullConsumerConfig:

class PushConsumerConfig:
deliver_subject: str
durable_name: str | None
name: str | None
durable_name: str | None
description: str | None
deliver_group: str | None
deliver_policy: DeliverPolicy
Expand Down Expand Up @@ -119,8 +119,8 @@ class PushConsumerConfig:
def __init__(
self,
deliver_subject: str,
durable_name: str | None = None,
name: str | None = None,
durable_name: str | None = None,
description: str | None = None,
deliver_group: str | None = None,
deliver_policy: DeliverPolicy | None = None,
Expand Down
10 changes: 10 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/managers.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import overload

from .consumers import (
Expand Down Expand Up @@ -29,6 +30,15 @@ class ConsumersManager:
async def create(self, config: PullConsumerConfig) -> PullConsumer: ...
@overload
async def create(self, config: PushConsumerConfig) -> PushConsumer: ...
@overload
async def update(self, config: PullConsumerConfig) -> PullConsumer: ...
@overload
async def update(self, config: PushConsumerConfig) -> PushConsumer: ...
async def get_pull(self, name: str) -> PullConsumer: ...
async def get_push(self, name: str) -> PushConsumer: ...
async def delete(self, name: str) -> bool: ...
async def pause(self, name: str, delay: float | timedelta) -> bool: ...
async def resume(self, name: str) -> bool: ...

class ObjectStoreManager:
async def create(self, config: ObjectStoreConfig) -> ObjectStore: ...
Expand Down
10 changes: 5 additions & 5 deletions python/natsrpy/js/__init__.py → python/natsrpy/js.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from natsrpy._natsrpy_rs.js import JetStream
from natsrpy.js.consumers import (
from ._natsrpy_rs.js import JetStream
from ._natsrpy_rs.js.consumers import (
AckPolicy,
DeliverPolicy,
PriorityPolicy,
Expand All @@ -9,9 +9,9 @@
PushConsumerConfig,
ReplayPolicy,
)
from natsrpy.js.kv import KeyValue, KVConfig
from natsrpy.js.object_store import ObjectStore, ObjectStoreConfig
from natsrpy.js.stream import (
from ._natsrpy_rs.js.kv import KeyValue, KVConfig
from ._natsrpy_rs.js.object_store import ObjectStore, ObjectStoreConfig
from ._natsrpy_rs.js.stream import (
Compression,
ConsumerLimits,
DiscardPolicy,
Expand Down
21 changes: 0 additions & 21 deletions python/natsrpy/js/consumers.py

This file was deleted.

6 changes: 0 additions & 6 deletions python/natsrpy/js/kv.py

This file was deleted.

6 changes: 0 additions & 6 deletions python/natsrpy/js/object_store.py

This file was deleted.

50 changes: 0 additions & 50 deletions python/natsrpy/js/stream.py

This file was deleted.

4 changes: 3 additions & 1 deletion src/exceptions/rust_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ pub enum NatsrpyError {
#[error(transparent)]
PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError),
#[error(transparent)]
PullConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
#[error(transparent)]
PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError),
#[error(transparent)]
PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError),
#[error(transparent)]
ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError),
#[error(transparent)]
ConsumerUpdateError(#[from] async_nats::jetstream::stream::ConsumerUpdateError),
#[error(transparent)]
ObjectStoreError(#[from] async_nats::jetstream::context::ObjectStoreError),
#[error(transparent)]
ObjectStoreGetError(#[from] async_nats::jetstream::object_store::GetError),
Expand Down
8 changes: 4 additions & 4 deletions src/js/consumers/pull/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
#[pyo3::pyclass(from_py_object, get_all, set_all)]
#[derive(Clone, Debug, Default)]
pub struct PullConsumerConfig {
pub durable_name: Option<String>,
pub name: Option<String>,
pub durable_name: Option<String>,
pub description: Option<String>,
pub deliver_policy: DeliverPolicy,
pub delivery_start_sequence: Option<u64>,
Expand Down Expand Up @@ -42,8 +42,8 @@ pub struct PullConsumerConfig {
impl PullConsumerConfig {
#[new]
#[pyo3(signature=(
durable_name=None,
name=None,
durable_name=None,
description=None,
deliver_policy=None,
delivery_start_sequence=None,
Expand Down Expand Up @@ -73,8 +73,8 @@ impl PullConsumerConfig {
))]
#[must_use]
pub fn __new__(
durable_name: Option<String>,
name: Option<String>,
durable_name: Option<String>,
description: Option<String>,
deliver_policy: Option<DeliverPolicy>,
delivery_start_sequence: Option<u64>,
Expand Down Expand Up @@ -103,8 +103,8 @@ impl PullConsumerConfig {
pause_until: Option<i64>,
) -> Self {
let mut conf = Self {
durable_name,
name,
durable_name,
description,
delivery_start_sequence,
delivery_start_time,
Expand Down
6 changes: 3 additions & 3 deletions src/js/consumers/push/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
#[derive(Clone, Debug)]
pub struct PushConsumerConfig {
pub deliver_subject: String,
pub durable_name: Option<String>,
pub name: Option<String>,
pub durable_name: Option<String>,
pub description: Option<String>,
pub deliver_group: Option<String>,
pub deliver_policy: DeliverPolicy,
Expand Down Expand Up @@ -42,8 +42,8 @@ impl PushConsumerConfig {
#[new]
#[pyo3(signature=(
deliver_subject,
durable_name=None,
name=None,
durable_name=None,
description=None,
deliver_group=None,
deliver_policy=None,
Expand Down Expand Up @@ -73,8 +73,8 @@ impl PushConsumerConfig {
#[must_use]
pub fn __new__(
deliver_subject: String,
durable_name: Option<String>,
name: Option<String>,
durable_name: Option<String>,
description: Option<String>,
deliver_group: Option<String>,
deliver_policy: Option<DeliverPolicy>,
Expand Down
59 changes: 56 additions & 3 deletions src/js/managers/consumers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use pyo3::{Bound, FromPyObject, IntoPyObjectExt, PyAny, Python};
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
js::consumers::{self, pull::PullConsumer, push::PushConsumer},
utils::natsrpy_future,
utils::{natsrpy_future, py_types::TimeValue},
};

#[pyo3::pyclass]
Expand Down Expand Up @@ -78,12 +78,65 @@ impl ConsumersManager {
})
}

pub fn get<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult<Bound<'py, PyAny>> {
pub fn update<'py>(
&self,
py: Python<'py>,
config: ConsumerConfigs,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.stream.clone();
natsrpy_future(py, async move {
match config {
ConsumerConfigs::Pull(config) => {
let consumer = PullConsumer::new(
ctx.read().await.update_consumer(config.try_into()?).await?,
);
Ok(Python::attach(|gil| consumer.into_py_any(gil))?)
}
ConsumerConfigs::Push(config) => {
let consumer = PushConsumer::new(
ctx.read().await.update_consumer(config.try_into()?).await?,
);
Ok(Python::attach(|gil| consumer.into_py_any(gil))?)
}
}
})
}

pub fn get_pull<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.stream.clone();
natsrpy_future(py, async move {
Ok(consumers::pull::consumer::PullConsumer::new(
ctx.read().await.get_consumer(&name).await?,
))
})
}

pub fn get_push<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.stream.clone();
natsrpy_future(py, async move {
Ok(consumers::push::consumer::PushConsumer::new(
ctx.read().await.get_consumer(&name).await?,
))
})
}

pub fn pause<'py>(
&self,
py: Python<'py>,
name: String,
delay: TimeValue,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.stream.clone();
let untill = time::OffsetDateTime::now_utc() + Duration::from(delay);
natsrpy_future(py, async move {
Ok(ctx.read().await.pause_consumer(&name, untill).await?.paused)
})
}

pub fn resume<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.stream.clone();
natsrpy_future(py, async move {
Ok(ctx.read().await.resume_consumer(&name).await?.paused)
})
}
}
Loading