From 0e6b01b7f021c4dd9c4feab68b1c897eb8a5acf3 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sun, 22 Mar 2026 11:48:22 +0100 Subject: [PATCH 1/2] Updated JetstreamMessage controls. --- python/natsrpy/_natsrpy_rs/js/__init__.pyi | 56 ++++++++++++++++++- src/js/consumers/pull/consumer.rs | 4 +- src/js/message.rs | 63 +++++++++++++++++++--- src/nats_cls.rs | 14 ++--- src/utils/py_types.rs | 12 ++--- 5 files changed, 126 insertions(+), 23 deletions(-) diff --git a/python/natsrpy/_natsrpy_rs/js/__init__.pyi b/python/natsrpy/_natsrpy_rs/js/__init__.pyi index cc53a76..3f223f6 100644 --- a/python/natsrpy/_natsrpy_rs/js/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/js/__init__.pyi @@ -1,3 +1,4 @@ +from datetime import timedelta from typing import Any from .managers import KVManager, StreamsManager @@ -26,4 +27,57 @@ class JetStreamMessage: def payload(self) -> bytes: ... @property def headers(self) -> dict[str, Any]: ... - async def ack(self) -> None: ... + async def ack(self, double: bool = False) -> None: + """ + Acknowledge that a message was handled. + + :param double: whether to wait for server response, defaults to False + """ + + async def nack( + self, + delay: float | timedelta | None = None, + double: bool = False, + ) -> None: + """ + Negative acknowledgement. + + Signals that the message will not be processed now + and processing can move onto the next message, NAK'd + message will be retried. + + :param duration: time, defaults to None + :param double: whether to wait for server response, defaults to False + """ + + async def progress(self, double: bool = False) -> None: + """ + Progress acknowledgement. + + Singnals that the mesasge is being handled right now. + Sending this request before the AckWait will extend wait period + before redelivering a message. + + :param double: whether to wait for server response, defaults to False + """ + + async def next(self, double: bool = False) -> None: + """ + Next acknowledgement. + + Only applies to pull consumers! + Acknowledges message processing and instructs server to send + delivery of the next message to the reply subject. + + :param double: whether to wait for server response, defaults to False + """ + + async def term(self, double: bool = False) -> None: + """ + Term acknowledgement. + + Instructs server to stop redelivering message. + Useful to stop redelivering a message after multiple NACKs. + + :param double: whether to wait for server response, defaults to False + """ diff --git a/src/js/consumers/pull/consumer.rs b/src/js/consumers/pull/consumer.rs index fd14f61..7ababae 100644 --- a/src/js/consumers/pull/consumer.rs +++ b/src/js/consumers/pull/consumer.rs @@ -6,7 +6,7 @@ use tokio::sync::RwLock; use crate::{ exceptions::rust_err::NatsrpyResult, - utils::{futures::natsrpy_future_with_timeout, py_types::TimeoutValue}, + utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue}, }; type NatsPullConsumer = @@ -51,7 +51,7 @@ impl PullConsumer { expires: Option, min_pending: Option, min_ack_pending: Option, - timeout: Option, + timeout: Option, ) -> NatsrpyResult> { let ctx = self.consumer.clone(); diff --git a/src/js/message.rs b/src/js/message.rs index 64f59a0..c7f7274 100644 --- a/src/js/message.rs +++ b/src/js/message.rs @@ -4,7 +4,7 @@ use tokio::sync::RwLock; use crate::{ exceptions::rust_err::NatsrpyResult, - utils::{headers::NatsrpyHeadermapExt, natsrpy_future}, + utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::TimeValue}, }; #[pyo3::pyclass] @@ -25,6 +25,25 @@ impl From for JetStreamMessage { } } +impl JetStreamMessage { + pub fn inner_ack<'py>( + &self, + py: Python<'py>, + kind: async_nats::jetstream::message::AckKind, + double: bool, + ) -> NatsrpyResult> { + let acker_guard = self.acker.clone(); + natsrpy_future(py, async move { + if double { + acker_guard.read().await.double_ack_with(kind).await?; + } else { + acker_guard.read().await.ack_with(kind).await?; + } + Ok(()) + }) + } +} + #[pyo3::pymethods] impl JetStreamMessage { #[getter] @@ -51,11 +70,41 @@ impl JetStreamMessage { } } - pub fn ack<'py>(&self, py: Python<'py>) -> NatsrpyResult> { - let acker_guard = self.acker.clone(); - natsrpy_future(py, async move { - acker_guard.read().await.ack().await?; - Ok(()) - }) + #[pyo3(signature=(double=false))] + pub fn ack<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult> { + self.inner_ack(py, async_nats::jetstream::message::AckKind::Ack, double) + } + + #[pyo3(signature=(delay=None, double=false))] + pub fn nack<'py>( + &self, + py: Python<'py>, + delay: Option, + double: bool, + ) -> NatsrpyResult> { + self.inner_ack( + py, + async_nats::jetstream::message::AckKind::Nak(delay.map(Into::into)), + double, + ) + } + + #[pyo3(signature=(double=false))] + pub fn progress<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult> { + self.inner_ack( + py, + async_nats::jetstream::message::AckKind::Progress, + double, + ) + } + + #[pyo3(signature=(double=false))] + pub fn next<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult> { + self.inner_ack(py, async_nats::jetstream::message::AckKind::Next, double) + } + + #[pyo3(signature=(double=false))] + pub fn term<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult> { + self.inner_ack(py, async_nats::jetstream::message::AckKind::Term, double) } } diff --git a/src/nats_cls.rs b/src/nats_cls.rs index c4c4e6e..a42ea7b 100644 --- a/src/nats_cls.rs +++ b/src/nats_cls.rs @@ -13,7 +13,7 @@ use crate::{ futures::natsrpy_future_with_timeout, headers::NatsrpyHeadermapExt, natsrpy_future, - py_types::{SendableValue, TimeoutValue}, + py_types::{SendableValue, TimeValue}, }, }; @@ -28,8 +28,8 @@ pub struct NatsCls { read_buffer_capacity: u16, sender_capacity: usize, max_reconnects: Option, - connection_timeout: TimeoutValue, - request_timeout: Option, + connection_timeout: TimeValue, + request_timeout: Option, } #[pyo3::pymethods] @@ -45,8 +45,8 @@ impl NatsCls { read_buffer_capacity=65535, sender_capacity=128, max_reconnects=None, - connection_timeout=TimeoutValue::FloatSecs(5.0), - request_timeout=TimeoutValue::FloatSecs(10.0), + connection_timeout=TimeValue::FloatSecs(5.0), + request_timeout=TimeValue::FloatSecs(10.0), ))] fn __new__( addrs: Vec, @@ -57,8 +57,8 @@ impl NatsCls { read_buffer_capacity: u16, sender_capacity: usize, max_reconnects: Option, - connection_timeout: TimeoutValue, - request_timeout: Option, + connection_timeout: TimeValue, + request_timeout: Option, ) -> Self { Self { nats_session: Arc::new(RwLock::new(None)), diff --git a/src/utils/py_types.rs b/src/utils/py_types.rs index b0b546a..4aeadad 100644 --- a/src/utils/py_types.rs +++ b/src/utils/py_types.rs @@ -44,21 +44,21 @@ impl From for bytes::Bytes { } #[derive(Clone, Debug, Copy, PartialEq, PartialOrd)] -pub enum TimeoutValue { +pub enum TimeValue { Duration(Duration), FloatSecs(f32), } -impl From for Duration { - fn from(value: TimeoutValue) -> Self { +impl From for Duration { + fn from(value: TimeValue) -> Self { match value { - TimeoutValue::Duration(duration) => duration, - TimeoutValue::FloatSecs(fsecs) => Self::from_secs_f32(fsecs), + TimeValue::Duration(duration) => duration, + TimeValue::FloatSecs(fsecs) => Self::from_secs_f32(fsecs), } } } -impl<'py> FromPyObject<'_, 'py> for TimeoutValue { +impl<'py> FromPyObject<'_, 'py> for TimeValue { type Error = NatsrpyError; fn extract(obj: pyo3::Borrowed<'_, 'py, pyo3::PyAny>) -> Result { From b6075d25dbb7d4ae15c72c3aede972bd53d25319 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Sun, 22 Mar 2026 13:37:09 +0100 Subject: [PATCH 2/2] Added push and pull consumers. --- python/natsrpy/_natsrpy_rs/js/consumers.pyi | 11 +++- src/exceptions/rust_err.rs | 9 ++- src/js/consumers/mod.rs | 4 +- src/js/consumers/pull/consumer.rs | 2 +- src/js/consumers/push/consumer.rs | 73 ++++++++++++++++++++- src/js/message.rs | 52 ++++++++------- src/subscription.rs | 9 ++- 7 files changed, 125 insertions(+), 35 deletions(-) diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index 7cba0d8..8bdbdcc 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -147,7 +147,16 @@ class PushConsumerConfig: pause_until: int | None = None, ) -> None: ... -class PushConsumer: ... +class MessagesIterator: + def __aiter__(self) -> MessagesIterator: ... + async def __anext__(self) -> JetStreamMessage: ... + async def next( + self, + timeout: float | timedelta | None = None, + ) -> JetStreamMessage: ... + +class PushConsumer: + async def messages(self) -> MessagesIterator: ... class PullConsumer: async def fetch( diff --git a/src/exceptions/rust_err.rs b/src/exceptions/rust_err.rs index 86ceb87..32a1c42 100644 --- a/src/exceptions/rust_err.rs +++ b/src/exceptions/rust_err.rs @@ -1,4 +1,4 @@ -use pyo3::exceptions::{PyTimeoutError, PyTypeError}; +use pyo3::exceptions::{PyStopAsyncIteration, PyTimeoutError, PyTypeError}; use crate::exceptions::py_err::{NatsrpyPublishError, NatsrpySessionError}; @@ -12,6 +12,8 @@ pub enum NatsrpyError { InvalidArgument(String), #[error("Session is not initialized. Call startup() first.")] NotInitialized, + #[error("The end of stream")] + AsyncStopIteration, #[error("Connection is closed or lost.")] Disconnected, #[error(transparent)] @@ -66,12 +68,17 @@ pub enum NatsrpyError { PullConsumerError(#[from] async_nats::jetstream::stream::ConsumerError), #[error(transparent)] PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError), + #[error(transparent)] + PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError), + #[error(transparent)] + ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError), } impl From for pyo3::PyErr { fn from(value: NatsrpyError) -> Self { match value { NatsrpyError::PublishError(_) => NatsrpyPublishError::new_err(value.to_string()), + NatsrpyError::AsyncStopIteration => PyStopAsyncIteration::new_err("End of the stream."), NatsrpyError::Timeout(_) => PyTimeoutError::new_err(value.to_string()), NatsrpyError::PyError(py_err) => py_err, NatsrpyError::InvalidArgument(descr) => PyTypeError::new_err(descr), diff --git a/src/js/consumers/mod.rs b/src/js/consumers/mod.rs index 41680d5..721dfbe 100644 --- a/src/js/consumers/mod.rs +++ b/src/js/consumers/mod.rs @@ -9,5 +9,7 @@ pub mod pymod { #[pymodule_export] pub use super::pull::{config::PullConsumerConfig, consumer::PullConsumer}; #[pymodule_export] - pub use super::push::{config::PushConsumerConfig, consumer::PushConsumer}; + pub use super::push::{ + config::PushConsumerConfig, consumer::MessagesIterator, consumer::PushConsumer, + }; } diff --git a/src/js/consumers/pull/consumer.rs b/src/js/consumers/pull/consumer.rs index 7ababae..30264b6 100644 --- a/src/js/consumers/pull/consumer.rs +++ b/src/js/consumers/pull/consumer.rs @@ -89,7 +89,7 @@ impl PullConsumer { let mut ret_messages = Vec::new(); while let Some(msg) = messages.next().await { let raw_msg = msg?; - ret_messages.push(crate::js::message::JetStreamMessage::from(raw_msg)); + ret_messages.push(crate::js::message::JetStreamMessage::try_from(raw_msg)?); } Ok(ret_messages) }) diff --git a/src/js/consumers/push/consumer.rs b/src/js/consumers/push/consumer.rs index 9825712..9cdf968 100644 --- a/src/js/consumers/push/consumer.rs +++ b/src/js/consumers/push/consumer.rs @@ -1,13 +1,20 @@ use std::sync::Arc; +use futures_util::StreamExt; +use pyo3::{Bound, PyAny, PyRef, Python}; use tokio::sync::RwLock; +use crate::{ + exceptions::rust_err::{NatsrpyError, NatsrpyResult}, + js::pymod::JetStreamMessage, + utils::{futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue}, +}; + type NatsPushConsumer = async_nats::jetstream::consumer::Consumer; #[pyo3::pyclass(from_py_object)] #[derive(Debug, Clone)] -#[allow(dead_code)] // TODO! remove later. pub struct PushConsumer { consumer: Arc>, } @@ -21,5 +28,67 @@ impl PushConsumer { } } +#[pyo3::pyclass] +pub struct MessagesIterator { + messages: Option>>, +} + +impl From for MessagesIterator { + fn from(value: async_nats::jetstream::consumer::push::Messages) -> Self { + Self { + messages: Some(Arc::new(RwLock::new(value))), + } + } +} + +#[pyo3::pymethods] +impl PushConsumer { + pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + let consumer_guard = self.consumer.clone(); + natsrpy_future(py, async move { + Ok(MessagesIterator::from( + consumer_guard.read().await.messages().await?, + )) + }) + } +} + #[pyo3::pymethods] -impl PushConsumer {} +impl MessagesIterator { + #[must_use] + pub const fn __aiter__(slf: PyRef) -> PyRef { + slf + } + + pub fn next<'py>( + &self, + py: Python<'py>, + timeout: Option, + ) -> NatsrpyResult> { + let Some(messages_guard) = self.messages.clone() else { + unreachable!("Message is always Some in runtime.") + }; + #[allow(clippy::significant_drop_tightening)] + natsrpy_future_with_timeout(py, timeout, async move { + let mut messages = messages_guard.write().await; + let Some(message) = messages.next().await else { + return Err(NatsrpyError::AsyncStopIteration); + }; + let message = message?; + + JetStreamMessage::try_from(message) + }) + } + + pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult> { + self.next(py, None) + } +} + +impl Drop for MessagesIterator { + fn drop(&mut self) { + pyo3_async_runtimes::tokio::get_runtime().block_on(async move { + self.messages = None; + }); + } +} diff --git a/src/js/message.rs b/src/js/message.rs index c7f7274..4c6a356 100644 --- a/src/js/message.rs +++ b/src/js/message.rs @@ -1,27 +1,30 @@ -use pyo3::{Bound, Py, PyAny, Python, types::PyDict}; +use pyo3::{ + Bound, Py, PyAny, Python, + types::{PyBytes, PyDict}, +}; use std::sync::Arc; use tokio::sync::RwLock; use crate::{ - exceptions::rust_err::NatsrpyResult, - utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::TimeValue}, + exceptions::rust_err::{NatsrpyError, NatsrpyResult}, + utils::{natsrpy_future, py_types::TimeValue}, }; #[pyo3::pyclass] pub struct JetStreamMessage { - message: async_nats::Message, - headers: Option>, + message: crate::message::Message, acker: Arc>, } -impl From for JetStreamMessage { - fn from(value: async_nats::jetstream::Message) -> Self { +impl TryFrom for JetStreamMessage { + type Error = NatsrpyError; + + fn try_from(value: async_nats::jetstream::Message) -> Result { let (message, acker) = value.split(); - Self { - message, - headers: None, + Ok(Self { + message: message.try_into()?, acker: Arc::new(RwLock::new(acker)), - } + }) } } @@ -47,27 +50,23 @@ impl JetStreamMessage { #[pyo3::pymethods] impl JetStreamMessage { #[getter] - pub fn subject(&self) -> &str { + #[must_use] + pub const fn subject(&self) -> &str { self.message.subject.as_str() } #[getter] - pub fn reply(&self) -> Option<&str> { - self.message.reply.as_ref().map(async_nats::Subject::as_str) + #[must_use] + pub const fn reply(&self) -> &Option { + &self.message.reply } #[getter] - pub fn payload(&self) -> &[u8] { + #[must_use] + pub const fn payload(&self) -> &Py { &self.message.payload } #[getter] - pub fn headers(&mut self, py: Python<'_>) -> NatsrpyResult> { - if let Some(headers) = &self.headers { - Ok(headers.clone_ref(py)) - } else { - let headermap = self.message.headers.clone().unwrap_or_default(); - let headers = headermap.to_pydict(py)?.unbind(); - self.headers = Some(headers.clone_ref(py)); - Ok(headers) - } + pub const fn headers(&mut self) -> &Py { + &self.message.headers } #[pyo3(signature=(double=false))] @@ -107,4 +106,9 @@ impl JetStreamMessage { pub fn term<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult> { self.inner_ack(py, async_nats::jetstream::message::AckKind::Term, double) } + + #[must_use] + pub fn __repr__(&self) -> String { + self.message.__repr__() + } } diff --git a/src/subscription.rs b/src/subscription.rs index b9b1aff..47e8aae 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -1,13 +1,12 @@ use futures_util::StreamExt; -use pyo3::exceptions::PyStopAsyncIteration; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use pyo3::{Bound, PyAny, PyRef, Python}; use tokio::sync::Mutex; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, - utils::futures::natsrpy_future_with_timeout, + utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue}, }; #[pyo3::pyclass] @@ -34,14 +33,14 @@ impl Subscription { pub fn next<'py>( &self, py: Python<'py>, - timeout: Option, + timeout: Option, ) -> NatsrpyResult> { let Some(inner) = self.inner.clone() else { return Err(NatsrpyError::NotInitialized); }; natsrpy_future_with_timeout(py, timeout, async move { let Some(message) = inner.lock().await.next().await else { - return Err(PyStopAsyncIteration::new_err("End of the stream.").into()); + return Err(NatsrpyError::AsyncStopIteration); }; crate::message::Message::try_from(message)