Skip to content

Subscription: support consumer timeout and idle disconnect#17293

Open
VGalaxies wants to merge 3 commits intomasterfrom
codex/subscription-consumer-timeout
Open

Subscription: support consumer timeout and idle disconnect#17293
VGalaxies wants to merge 3 commits intomasterfrom
codex/subscription-consumer-timeout

Conversation

@VGalaxies
Copy link
Contributor

Summary

  • Expose connectionTimeoutInMs for all subscription consumers and pass it through builder, session open, and handshake.
  • Add a lightweight server-side idle disconnect check based on subscription heartbeat inactivity without changing the receiver threading model.
  • Reuse the existing server-side closeConsumer(...) flow so timeout-triggered disconnect also unsubscribes subscribed topics before dropping the consumer.

Verification

  • mvn -T 8 spotless:apply -P with-integration-tests
  • MAVEN_OPTS='-XX:ReservedCodeCacheSize=512m' mvn -T 8 clean package -P with-integration-tests -DskipUTs -pl integration-test,distribution -DfailIfNoTests=false -am -U

This PR was primarily authored with Codex using gpt-5.4 xhigh and then hand-reviewed by me. I AM responsible for every change made in this PR. I aimed to keep it aligned with our goals, though I may have missed minor issues. Please flag anything that feels off, I'll fix it quickly.

Expose connectionTimeoutInMs across subscription consumer builders and handshake/session setup.

Close idle server-side consumers based on heartbeat inactivity and reuse closeConsumer to unsubscribe topics before dropping the consumer.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends IoTDB’s subscription system to (1) propagate a consumer-side connectionTimeoutInMs through the subscription consumer/provider/session handshake flow, and (2) add a server-side idle-disconnect mechanism driven by subscription inactivity, reusing the existing closeConsumer(...) unsubscribe+drop flow.

Changes:

  • Add server-side timeout checking for idle subscription consumers and trigger server-side close when inactive.
  • Propagate connectionTimeoutInMs from subscription consumer builders into subscription sessions and handshake attributes.
  • Introduce new consumer config keys/getters for connection timeout and heartbeat interval consumption.

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java Tracks last activity/in-flight requests and implements server-side timeout close for idle consumers.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiver.java Adds handleTimeout() to the receiver interface.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java Tracks active receivers and periodically invokes timeout checks.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumerBuilder.java Exposes fluent connectionTimeoutInMs(...) on tree push consumer builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePushConsumer.java Passes connectionTimeoutInMs (and heartbeat interval) into provider construction.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumerBuilder.java Exposes fluent connectionTimeoutInMs(...) on tree pull consumer builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreePullConsumer.java Passes connectionTimeoutInMs (and heartbeat interval) into provider construction.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/tree/SubscriptionTreeProvider.java Propagates connection timeout into the subscription session builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumerBuilder.java Exposes fluent connectionTimeoutInMs(...) on table push consumer builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePushConsumer.java Passes connectionTimeoutInMs (and heartbeat interval) into provider construction.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumerBuilder.java Exposes fluent connectionTimeoutInMs(...) on table pull consumer builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTablePullConsumer.java Passes connectionTimeoutInMs (and heartbeat interval) into provider construction.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/table/SubscriptionTableProvider.java Propagates connection timeout into the subscription session builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPushConsumerBuilder.java Adds covariant fluent connectionTimeoutInMs(...) override for push builders.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumerBuilder.java Adds covariant fluent connectionTimeoutInMs(...) override for pull builders.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java Stores heartbeat/connection timeout and includes them in handshake attributes; passes timeout into session builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumerBuilder.java Adds builder field + setter for connectionTimeoutInMs.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java Stores connectionTimeoutInMs, loads it from properties, passes it into provider, and reports it.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTreeSessionBuilder.java Adds session builder setter for connectionTimeoutInMs.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionTableSessionBuilder.java Adds session builder setter for connectionTimeoutInMs.
iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java Opens subscription session using connection timeout.
iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java Initializes connectionTimeoutInMs from builder.
iotdb-client/session/src/main/java/org/apache/iotdb/session/AbstractSessionBuilder.java Adds connectionTimeoutInMs field to the base session builder.
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java Defines new consumer attribute key/default for connection timeout.
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java Adds getters for heartbeat interval and connection timeout.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

return getReceiver(reqVersion).handle(req);
final SubscriptionReceiver receiver = getReceiver(reqVersion);
activeReceivers.add(receiver);
receiver.handleTimeout();
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiver.handleTimeout() is invoked before receiver.handle(req). Because SubscriptionReceiverV1 only updates lastActivityTimeMs inside handle() (via beforeHandle), a request that arrives right after a long idle period (including a HEARTBEAT) can be treated as timed-out and closed before the receiver records that new activity. This can cause spurious server-side closes and subsequent requests failing with missing-consumer state.

Consider removing the per-request handleTimeout() call (rely on the scheduled checker), or move timeout evaluation to after the request has updated activity (e.g., inside SubscriptionReceiverV1.handle() after beforeHandle).

Suggested change
receiver.handleTimeout();

Copilot uses AI. Check for mistakes.
Comment on lines +202 to +206
if (inactiveMs <= timeoutMs) {
return;
}
clearSharedConsumerState();
}
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleTimeout() clears the shared consumer state (clearSharedConsumerState()) before attempting closeConsumer(...). If closeConsumer fails (e.g., ConfigNode RPC issue or unsubscribe error), the receiver has already discarded the config, so subsequent timeout checks won’t retry and the consumer may remain leaked/subscribed.

Consider only clearing shared state after a successful close, or retaining a “pending close” state to retry with backoff.

Copilot uses AI. Check for mistakes.
Comment on lines +208 to +212
LOGGER.info(
"Subscription: consumer {} is inactive for {} ms, exceeding timeout {} ms, close it on server side.",
consumerConfig,
inactiveMs,
timeoutMs);
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout-triggered closeConsumer(consumerConfig) can race with a concurrent HANDSHAKE that re-creates/activates a consumer with the same consumerGroupId/consumerId. Since closeConsumer drops the consumer by IDs via ConfigNode, it can end up closing the newly created consumer if the handshake happens while the timeout close is in progress.

Consider guarding handleTimeout with a per-consumer/receiver “epoch” (incremented on activateConsumer) and only closing if the epoch is unchanged, or serialize timeout closes vs. handshakes with a dedicated close-in-progress flag/lock.

Copilot uses AI. Check for mistakes.
@codecov
Copy link

codecov bot commented Mar 12, 2026

Codecov Report

❌ Patch coverage is 11.85185% with 119 lines in your changes missing coverage. Please review.
✅ Project coverage is 39.75%. Comparing base (ccece97) to head (18eef5f).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
.../subscription/receiver/SubscriptionReceiverV1.java 18.57% 57 Missing ⚠️
.../subscription/agent/SubscriptionReceiverAgent.java 0.00% 14 Missing ⚠️
...on/consumer/base/AbstractSubscriptionProvider.java 0.00% 7 Missing ⚠️
...on/consumer/base/AbstractSubscriptionConsumer.java 0.00% 6 Missing ⚠️
...on/consumer/tree/SubscriptionTreePullConsumer.java 0.00% 4 Missing ⚠️
...on/consumer/tree/SubscriptionTreePushConsumer.java 0.00% 4 Missing ⚠️
...umer/base/AbstractSubscriptionConsumerBuilder.java 0.00% 3 Missing ⚠️
...tion/consumer/table/SubscriptionTableProvider.java 0.00% 3 Missing ⚠️
...iption/consumer/tree/SubscriptionTreeProvider.java 0.00% 3 Missing ⚠️
.../subscription/SubscriptionTableSessionBuilder.java 0.00% 2 Missing ⚠️
... and 9 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17293      +/-   ##
============================================
- Coverage     39.76%   39.75%   -0.01%     
  Complexity      282      282              
============================================
  Files          5100     5100              
  Lines        342150   342233      +83     
  Branches      43596    43597       +1     
============================================
+ Hits         136044   136065      +21     
- Misses       206106   206168      +62     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May add a unit test for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a lightweight UT in SubscriptionReceiverV1Test under datanode. It covers the timeout threshold calculation and the non-timeout branches for recently active consumers / in-flight requests. Verified with mvn -pl iotdb-core/datanode -am -DskipITs -DskipIntegrationTests -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=SubscriptionReceiverV1Test test.

@sonarqubecloud
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
B Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants