Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion python/natsrpy/_natsrpy_rs/js/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from typing import Any

from .managers import KVManager, StreamsManager
Expand Down Expand Up @@ -26,4 +27,57 @@ class JetStreamMessage:
def payload(self) -> bytes: ...
@property
def headers(self) -> dict[str, Any]: ...
async def ack(self) -> None: ...
async def ack(self, double: bool = False) -> None:
"""
Acknowledge that a message was handled.

:param double: whether to wait for server response, defaults to False
"""

async def nack(
self,
delay: float | timedelta | None = None,
double: bool = False,
) -> None:
"""
Negative acknowledgement.

Signals that the message will not be processed now
and processing can move onto the next message, NAK'd
message will be retried.

:param duration: time, defaults to None
:param double: whether to wait for server response, defaults to False
"""

async def progress(self, double: bool = False) -> None:
"""
Progress acknowledgement.

Singnals that the mesasge is being handled right now.
Sending this request before the AckWait will extend wait period
before redelivering a message.

:param double: whether to wait for server response, defaults to False
"""

async def next(self, double: bool = False) -> None:
"""
Next acknowledgement.

Only applies to pull consumers!
Acknowledges message processing and instructs server to send
delivery of the next message to the reply subject.

:param double: whether to wait for server response, defaults to False
"""

async def term(self, double: bool = False) -> None:
"""
Term acknowledgement.

Instructs server to stop redelivering message.
Useful to stop redelivering a message after multiple NACKs.

:param double: whether to wait for server response, defaults to False
"""
11 changes: 10 additions & 1 deletion python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,16 @@ class PushConsumerConfig:
pause_until: int | None = None,
) -> None: ...

class PushConsumer: ...
class MessagesIterator:
def __aiter__(self) -> MessagesIterator: ...
async def __anext__(self) -> JetStreamMessage: ...
async def next(
self,
timeout: float | timedelta | None = None,
) -> JetStreamMessage: ...

class PushConsumer:
async def messages(self) -> MessagesIterator: ...

class PullConsumer:
async def fetch(
Expand Down
9 changes: 8 additions & 1 deletion src/exceptions/rust_err.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pyo3::exceptions::{PyTimeoutError, PyTypeError};
use pyo3::exceptions::{PyStopAsyncIteration, PyTimeoutError, PyTypeError};

use crate::exceptions::py_err::{NatsrpyPublishError, NatsrpySessionError};

Expand All @@ -12,6 +12,8 @@ pub enum NatsrpyError {
InvalidArgument(String),
#[error("Session is not initialized. Call startup() first.")]
NotInitialized,
#[error("The end of stream")]
AsyncStopIteration,
#[error("Connection is closed or lost.")]
Disconnected,
#[error(transparent)]
Expand Down Expand Up @@ -66,12 +68,17 @@ pub enum NatsrpyError {
PullConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
#[error(transparent)]
PullConsumerBatchError(#[from] async_nats::jetstream::consumer::pull::BatchError),
#[error(transparent)]
PushConsumerMessageError(#[from] async_nats::jetstream::consumer::push::MessagesError),
#[error(transparent)]
ConsumerStreamError(#[from] async_nats::jetstream::consumer::StreamError),
}

impl From<NatsrpyError> for pyo3::PyErr {
fn from(value: NatsrpyError) -> Self {
match value {
NatsrpyError::PublishError(_) => NatsrpyPublishError::new_err(value.to_string()),
NatsrpyError::AsyncStopIteration => PyStopAsyncIteration::new_err("End of the stream."),
NatsrpyError::Timeout(_) => PyTimeoutError::new_err(value.to_string()),
NatsrpyError::PyError(py_err) => py_err,
NatsrpyError::InvalidArgument(descr) => PyTypeError::new_err(descr),
Expand Down
4 changes: 3 additions & 1 deletion src/js/consumers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ pub mod pymod {
#[pymodule_export]
pub use super::pull::{config::PullConsumerConfig, consumer::PullConsumer};
#[pymodule_export]
pub use super::push::{config::PushConsumerConfig, consumer::PushConsumer};
pub use super::push::{
config::PushConsumerConfig, consumer::MessagesIterator, consumer::PushConsumer,
};
}
6 changes: 3 additions & 3 deletions src/js/consumers/pull/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::NatsrpyResult,
utils::{futures::natsrpy_future_with_timeout, py_types::TimeoutValue},
utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue},
};

type NatsPullConsumer =
Expand Down Expand Up @@ -51,7 +51,7 @@ impl PullConsumer {
expires: Option<Duration>,
min_pending: Option<usize>,
min_ack_pending: Option<usize>,
timeout: Option<TimeoutValue>,
timeout: Option<TimeValue>,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.consumer.clone();

Expand Down Expand Up @@ -89,7 +89,7 @@ impl PullConsumer {
let mut ret_messages = Vec::new();
while let Some(msg) = messages.next().await {
let raw_msg = msg?;
ret_messages.push(crate::js::message::JetStreamMessage::from(raw_msg));
ret_messages.push(crate::js::message::JetStreamMessage::try_from(raw_msg)?);
}
Ok(ret_messages)
})
Expand Down
73 changes: 71 additions & 2 deletions src/js/consumers/push/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
use std::sync::Arc;

use futures_util::StreamExt;
use pyo3::{Bound, PyAny, PyRef, Python};
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
js::pymod::JetStreamMessage,
utils::{futures::natsrpy_future_with_timeout, natsrpy_future, py_types::TimeValue},
};

type NatsPushConsumer =
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::push::Config>;

#[pyo3::pyclass(from_py_object)]
#[derive(Debug, Clone)]
#[allow(dead_code)] // TODO! remove later.
pub struct PushConsumer {
consumer: Arc<RwLock<NatsPushConsumer>>,
}
Expand All @@ -21,5 +28,67 @@ impl PushConsumer {
}
}

#[pyo3::pyclass]
pub struct MessagesIterator {
messages: Option<Arc<RwLock<async_nats::jetstream::consumer::push::Messages>>>,
}

impl From<async_nats::jetstream::consumer::push::Messages> for MessagesIterator {
fn from(value: async_nats::jetstream::consumer::push::Messages) -> Self {
Self {
messages: Some(Arc::new(RwLock::new(value))),
}
}
}

#[pyo3::pymethods]
impl PushConsumer {
pub fn messages<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
let consumer_guard = self.consumer.clone();
natsrpy_future(py, async move {
Ok(MessagesIterator::from(
consumer_guard.read().await.messages().await?,
))
})
}
}

#[pyo3::pymethods]
impl PushConsumer {}
impl MessagesIterator {
#[must_use]
pub const fn __aiter__(slf: PyRef<Self>) -> PyRef<Self> {
slf
}

pub fn next<'py>(
&self,
py: Python<'py>,
timeout: Option<TimeValue>,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let Some(messages_guard) = self.messages.clone() else {
unreachable!("Message is always Some in runtime.")
};
#[allow(clippy::significant_drop_tightening)]
natsrpy_future_with_timeout(py, timeout, async move {
let mut messages = messages_guard.write().await;
let Some(message) = messages.next().await else {
return Err(NatsrpyError::AsyncStopIteration);
};
let message = message?;

JetStreamMessage::try_from(message)
})
}

pub fn __anext__<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
self.next(py, None)
}
}

impl Drop for MessagesIterator {
fn drop(&mut self) {
pyo3_async_runtimes::tokio::get_runtime().block_on(async move {
self.messages = None;
});
}
}
117 changes: 85 additions & 32 deletions src/js/message.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,114 @@
use pyo3::{Bound, Py, PyAny, Python, types::PyDict};
use pyo3::{
Bound, Py, PyAny, Python,
types::{PyBytes, PyDict},
};
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::{
exceptions::rust_err::NatsrpyResult,
utils::{headers::NatsrpyHeadermapExt, natsrpy_future},
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
utils::{natsrpy_future, py_types::TimeValue},
};

#[pyo3::pyclass]
pub struct JetStreamMessage {
message: async_nats::Message,
headers: Option<Py<PyDict>>,
message: crate::message::Message,
acker: Arc<RwLock<async_nats::jetstream::message::Acker>>,
}

impl From<async_nats::jetstream::Message> for JetStreamMessage {
fn from(value: async_nats::jetstream::Message) -> Self {
impl TryFrom<async_nats::jetstream::Message> for JetStreamMessage {
type Error = NatsrpyError;

fn try_from(value: async_nats::jetstream::Message) -> Result<Self, Self::Error> {
let (message, acker) = value.split();
Self {
message,
headers: None,
Ok(Self {
message: message.try_into()?,
acker: Arc::new(RwLock::new(acker)),
}
})
}
}

impl JetStreamMessage {
pub fn inner_ack<'py>(
&self,
py: Python<'py>,
kind: async_nats::jetstream::message::AckKind,
double: bool,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let acker_guard = self.acker.clone();
natsrpy_future(py, async move {
if double {
acker_guard.read().await.double_ack_with(kind).await?;
} else {
acker_guard.read().await.ack_with(kind).await?;
}
Ok(())
})
}
}

#[pyo3::pymethods]
impl JetStreamMessage {
#[getter]
pub fn subject(&self) -> &str {
#[must_use]
pub const fn subject(&self) -> &str {
self.message.subject.as_str()
}
#[getter]
pub fn reply(&self) -> Option<&str> {
self.message.reply.as_ref().map(async_nats::Subject::as_str)
#[must_use]
pub const fn reply(&self) -> &Option<String> {
&self.message.reply
}
#[getter]
pub fn payload(&self) -> &[u8] {
#[must_use]
pub const fn payload(&self) -> &Py<PyBytes> {
&self.message.payload
}
#[getter]
pub fn headers(&mut self, py: Python<'_>) -> NatsrpyResult<Py<PyDict>> {
if let Some(headers) = &self.headers {
Ok(headers.clone_ref(py))
} else {
let headermap = self.message.headers.clone().unwrap_or_default();
let headers = headermap.to_pydict(py)?.unbind();
self.headers = Some(headers.clone_ref(py));
Ok(headers)
}
}

pub fn ack<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyAny>> {
let acker_guard = self.acker.clone();
natsrpy_future(py, async move {
acker_guard.read().await.ack().await?;
Ok(())
})
pub const fn headers(&mut self) -> &Py<PyDict> {
&self.message.headers
}

#[pyo3(signature=(double=false))]
pub fn ack<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
self.inner_ack(py, async_nats::jetstream::message::AckKind::Ack, double)
}

#[pyo3(signature=(delay=None, double=false))]
pub fn nack<'py>(
&self,
py: Python<'py>,
delay: Option<TimeValue>,
double: bool,
) -> NatsrpyResult<Bound<'py, PyAny>> {
self.inner_ack(
py,
async_nats::jetstream::message::AckKind::Nak(delay.map(Into::into)),
double,
)
}

#[pyo3(signature=(double=false))]
pub fn progress<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
self.inner_ack(
py,
async_nats::jetstream::message::AckKind::Progress,
double,
)
}

#[pyo3(signature=(double=false))]
pub fn next<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
self.inner_ack(py, async_nats::jetstream::message::AckKind::Next, double)
}

#[pyo3(signature=(double=false))]
pub fn term<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> {
self.inner_ack(py, async_nats::jetstream::message::AckKind::Term, double)
}

#[must_use]
pub fn __repr__(&self) -> String {
self.message.__repr__()
}
}
Loading
Loading