Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class Nats:
read_buffer_capacity: int = 65535,
sender_capacity: int = 128,
max_reconnects: int | None = None,
connection_timeout: timedelta = ...,
request_timeout: timedelta = ...,
connection_timeout: float | timedelta = ...,
request_timeout: float | timedelta = ...,
) -> None: ...
async def startup(self) -> None: ...
async def shutdown(self) -> None: ...
Expand Down
1 change: 1 addition & 0 deletions python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,5 @@ class PullConsumer:
expires: timedelta | None = None,
min_pending: int | None = None,
min_ack_pending: int | None = None,
timeout: float | timedelta | None = None,
) -> list[JetStreamMessage]: ...
3 changes: 2 additions & 1 deletion src/exceptions/rust_err.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use pyo3::exceptions::PyTypeError;
use pyo3::exceptions::{PyTimeoutError, PyTypeError};

use crate::exceptions::py_err::{NatsrpyPublishError, NatsrpySessionError};

Expand Down Expand Up @@ -72,6 +72,7 @@ impl From<NatsrpyError> for pyo3::PyErr {
fn from(value: NatsrpyError) -> Self {
match value {
NatsrpyError::PublishError(_) => NatsrpyPublishError::new_err(value.to_string()),
NatsrpyError::Timeout(_) => PyTimeoutError::new_err(value.to_string()),
NatsrpyError::PyError(py_err) => py_err,
NatsrpyError::InvalidArgument(descr) => PyTypeError::new_err(descr),
_ => NatsrpySessionError::new_err(value.to_string()),
Expand Down
14 changes: 10 additions & 4 deletions src/js/consumers/pull/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use futures_util::StreamExt;
use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;

use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future};
use crate::{
exceptions::rust_err::NatsrpyResult,
utils::{futures::natsrpy_future_with_timeout, py_types::TimeoutValue},
};

type NatsPullConsumer =
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>;
Expand Down Expand Up @@ -35,6 +38,7 @@ impl PullConsumer {
expires=None,
min_pending=None,
min_ack_pending=None,
timeout=None,
))]
pub fn fetch<'py>(
&self,
Expand All @@ -47,12 +51,14 @@ impl PullConsumer {
expires: Option<Duration>,
min_pending: Option<usize>,
min_ack_pending: Option<usize>,
timeout: Option<TimeoutValue>,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.consumer.clone();

// Because we borrow cosnumer lock
// later for modifications of fetchbuilder.
#[allow(clippy::significant_drop_tightening)]
natsrpy_future(py, async move {
// Because we borrow created value
// later for modifications.
natsrpy_future_with_timeout(py, timeout, async move {
let consumer = ctx.read().await;
let mut fetch_builder = consumer.fetch();
if let Some(max_messages) = max_messages {
Expand Down
56 changes: 31 additions & 25 deletions src/nats_cls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ use tokio::sync::RwLock;
use crate::{
exceptions::rust_err::NatsrpyError,
subscription::Subscription,
utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::SendableValue},
utils::{
futures::natsrpy_future_with_timeout,
headers::NatsrpyHeadermapExt,
natsrpy_future,
py_types::{SendableValue, TimeoutValue},
},
};

#[pyo3::pyclass(name = "Nats")]
Expand All @@ -23,8 +28,8 @@ pub struct NatsCls {
read_buffer_capacity: u16,
sender_capacity: usize,
max_reconnects: Option<usize>,
connection_timeout: Duration,
request_timeout: Option<Duration>,
connection_timeout: TimeoutValue,
request_timeout: Option<TimeoutValue>,
}

#[pyo3::pymethods]
Expand All @@ -40,8 +45,8 @@ impl NatsCls {
read_buffer_capacity=65535,
sender_capacity=128,
max_reconnects=None,
connection_timeout=Duration::from_secs(5),
request_timeout=Duration::from_secs(10),
connection_timeout=TimeoutValue::FloatSecs(5.0),
request_timeout=TimeoutValue::FloatSecs(10.0),
))]
fn __new__(
addrs: Vec<String>,
Expand All @@ -52,8 +57,8 @@ impl NatsCls {
read_buffer_capacity: u16,
sender_capacity: usize,
max_reconnects: Option<usize>,
connection_timeout: Duration,
request_timeout: Option<Duration>,
connection_timeout: TimeoutValue,
request_timeout: Option<TimeoutValue>,
) -> Self {
Self {
nats_session: Arc::new(RwLock::new(None)),
Expand All @@ -80,8 +85,8 @@ impl NatsCls {
}
conn_opts = conn_opts
.max_reconnects(self.max_reconnects)
.connection_timeout(self.connection_timeout)
.request_timeout(self.request_timeout)
.connection_timeout(self.connection_timeout.into())
.request_timeout(self.request_timeout.map(Into::into))
.read_buffer_capacity(self.read_buffer_capacity)
.client_capacity(self.sender_capacity);

Expand All @@ -94,23 +99,24 @@ impl NatsCls {

let session = self.nats_session.clone();
let address = self.addr.clone();
let startup_future = async move {
if session.read().await.is_some() {
return Err(NatsrpyError::SessionError(
"NATS session already exists".to_string(),
));
}
// Scoping for early-dropping of a guard.
{
let mut sesion_guard = session.write().await;
*sesion_guard = Some(conn_opts.connect(address).await?);
}
Ok(())
};
let timeout = self.connection_timeout;
return Ok(natsrpy_future(py, async move {
tokio::time::timeout(timeout, startup_future).await?
})?);
return Ok(natsrpy_future_with_timeout(
py,
Some(timeout),
async move {
if session.read().await.is_some() {
return Err(NatsrpyError::SessionError(
"NATS session already exists".to_string(),
));
}
// Scoping for early-dropping of a guard.
{
let mut sesion_guard = session.write().await;
*sesion_guard = Some(conn_opts.connect(address).await?);
}
Ok(())
},
)?);
}

#[pyo3(signature = (subject, payload, *, headers=None, reply=None, err_on_disconnect = false))]
Expand Down
13 changes: 2 additions & 11 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::Mutex;

use crate::{
exceptions::rust_err::{NatsrpyError, NatsrpyResult},
utils::natsrpy_future,
utils::futures::natsrpy_future_with_timeout,
};

#[pyo3::pyclass]
Expand Down Expand Up @@ -39,21 +39,12 @@ impl Subscription {
let Some(inner) = self.inner.clone() else {
return Err(NatsrpyError::NotInitialized);
};

let future = async move {
natsrpy_future_with_timeout(py, timeout, async move {
let Some(message) = inner.lock().await.next().await else {
return Err(PyStopAsyncIteration::new_err("End of the stream.").into());
};

crate::message::Message::try_from(message)
};

natsrpy_future(py, async move {
if let Some(timeout) = timeout {
tokio::time::timeout(timeout, future).await?
} else {
future.await
}
})
}

Expand Down
32 changes: 31 additions & 1 deletion src/utils/futures.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::time::Duration;

use pyo3::{Bound, IntoPyObject, PyAny, Python};

use crate::exceptions::rust_err::NatsrpyResult;
use crate::exceptions::rust_err::{NatsrpyError, NatsrpyResult};

pub fn natsrpy_future<F, T>(py: Python, fut: F) -> NatsrpyResult<Bound<PyAny>>
where
Expand All @@ -11,3 +13,31 @@ where
pyo3_async_runtimes::tokio::future_into_py(py, async { fut.await.map_err(Into::into) })?;
Ok(res)
}

pub fn natsrpy_future_with_timeout<F, T, D>(
py: Python,
timeout: Option<D>,
fut: F,
) -> NatsrpyResult<Bound<PyAny>>
where
F: Future<Output = NatsrpyResult<T>> + Send + 'static,
T: for<'py> IntoPyObject<'py> + Send + 'static,
D: Into<Duration>,
{
let timeout = timeout.map(Into::into);
let res = pyo3_async_runtimes::tokio::future_into_py(py, async move {
if let Some(timeout) = timeout {
tokio::time::timeout(timeout, fut)
.await
// First map_err is for timeout
.map_err(NatsrpyError::from)?
// This one is for result returned from
// a future.
.map_err(Into::into)
} else {
// Simple return with error mapping.
fut.await.map_err(Into::into)
}
})?;
Ok(res)
}
34 changes: 34 additions & 0 deletions src/utils/py_types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use pyo3::{
FromPyObject,
types::{PyBytes, PyBytesMethods},
Expand Down Expand Up @@ -40,3 +42,35 @@ impl From<SendableValue> for bytes::Bytes {
}
}
}

#[derive(Clone, Debug, Copy, PartialEq, PartialOrd)]
pub enum TimeoutValue {
Duration(Duration),
FloatSecs(f32),
}

impl From<TimeoutValue> for Duration {
fn from(value: TimeoutValue) -> Self {
match value {
TimeoutValue::Duration(duration) => duration,
TimeoutValue::FloatSecs(fsecs) => Self::from_secs_f32(fsecs),
}
}
}

impl<'py> FromPyObject<'_, 'py> for TimeoutValue {
type Error = NatsrpyError;

fn extract(obj: pyo3::Borrowed<'_, 'py, pyo3::PyAny>) -> Result<Self, Self::Error> {
#[allow(clippy::option_if_let_else)]
if let Ok(fsec) = obj.extract::<f32>() {
Ok(Self::FloatSecs(fsec))
} else if let Ok(duration) = obj.extract::<Duration>() {
Ok(Self::Duration(duration))
} else {
Err(NatsrpyError::InvalidArgument(String::from(
"As timeouts only float or timedelta are accepted.",
)))
}
}
}
Loading