From 10c61577e14e20f11fd08eba3f23dbfc9134bb43 Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 11 Mar 2026 16:10:00 +0000 Subject: [PATCH 1/7] [AIT-453] feat: add `MAP_CLEAR` operation support - Introduced support for the `MAP_CLEAR` operation across LiveMap components. - Updated `MsgpackSerialization` to handle `MAP_CLEAR` serialization/deserialization. - Added `clearTimeserial` tracking in `DefaultLiveMap` to skip outdated operations. - Enhanced `LiveMapManager` to process `MAP_CLEAR` and synchronize state updates. - Comprehensive unit tests added for various `MAP_CLEAR` scenarios. --- .../io/ably/lib/objects/ObjectMessage.kt | 22 ++- .../serialization/MsgpackSerialization.kt | 23 ++- .../objects/type/livemap/DefaultLiveMap.kt | 4 + .../objects/type/livemap/LiveMapManager.kt | 57 ++++++ .../unit/type/livemap/LiveMapManagerTest.kt | 180 ++++++++++++++++++ 5 files changed, 284 insertions(+), 2 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt index e3ba0649d..7f3e9b372 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt @@ -20,6 +20,7 @@ internal enum class ObjectOperationAction(val code: Int) { CounterCreate(3), CounterInc(4), ObjectDelete(5), + MapClear(6), Unknown(-1); // code for unknown value during deserialization } @@ -109,6 +110,13 @@ internal data class CounterInc( */ internal object ObjectDelete +/** + * Payload for MAP_CLEAR operation. + * Spec: MCL* + * No fields - action is sufficient + */ +internal object MapClear + /** * Payload for MAP_CREATE_WITH_OBJECT_ID operation. * Spec: MCRO* @@ -176,7 +184,13 @@ internal data class ObjectsMap( * The map entries, indexed by key. * Spec: OMP3b */ - val entries: Map? = null + val entries: Map? = null, + + /** + * The serial value of the last MAP_CLEAR operation applied to the map. + * Spec: OMP3c + */ + val clearTimeserial: String? = null, ) /** @@ -255,6 +269,12 @@ internal data class ObjectOperation( * Spec: OOP3q */ val counterCreateWithObjectId: CounterCreateWithObjectId? = null, + + /** + * Payload for MAP_CLEAR operation. + * Spec: OOP3r + */ + val mapClear: MapClear? = null, ) /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt index 1980f3c93..2eb10d0bd 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt @@ -12,6 +12,7 @@ import io.ably.lib.objects.MapCreate import io.ably.lib.objects.MapCreateWithObjectId import io.ably.lib.objects.MapRemove import io.ably.lib.objects.MapSet +import io.ably.lib.objects.MapClear import io.ably.lib.objects.ObjectDelete import io.ably.lib.objects.ObjectsMapSemantics import io.ably.lib.objects.ObjectsCounter @@ -174,6 +175,7 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) { if (objectDelete != null) fieldCount++ if (mapCreateWithObjectId != null) fieldCount++ if (counterCreateWithObjectId != null) fieldCount++ + if (mapClear != null) fieldCount++ packer.packMapHeader(fieldCount) @@ -224,6 +226,11 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) { counterCreateWithObjectId.writeMsgpack(packer) } + if (mapClear != null) { + packer.packString("mapClear") + packer.packMapHeader(0) // empty map, no fields + } + } /** @@ -242,6 +249,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation { var objectDelete: ObjectDelete? = null var mapCreateWithObjectId: MapCreateWithObjectId? = null var counterCreateWithObjectId: CounterCreateWithObjectId? = null + var mapClear: MapClear? = null for (i in 0 until fieldCount) { val fieldName = unpacker.unpackString().intern() @@ -271,6 +279,10 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation { } "mapCreateWithObjectId" -> mapCreateWithObjectId = readMapCreateWithObjectId(unpacker) "counterCreateWithObjectId" -> counterCreateWithObjectId = readCounterCreateWithObjectId(unpacker) + "mapClear" -> { + unpacker.skipValue() // empty map, consume it + mapClear = MapClear + } else -> unpacker.skipValue() } } @@ -290,6 +302,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation { objectDelete = objectDelete, mapCreateWithObjectId = mapCreateWithObjectId, counterCreateWithObjectId = counterCreateWithObjectId, + mapClear = mapClear, ) } @@ -631,6 +644,7 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) { if (semantics != null) fieldCount++ if (entries != null) fieldCount++ + if (clearTimeserial != null) fieldCount++ packer.packMapHeader(fieldCount) @@ -647,6 +661,11 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) { value.writeMsgpack(packer) } } + + if (clearTimeserial != null) { + packer.packString("clearTimeserial") + packer.packString(clearTimeserial) + } } /** @@ -657,6 +676,7 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap { var semantics: ObjectsMapSemantics? = null var entries: Map? = null + var clearTimeserial: String? = null for (i in 0 until fieldCount) { val fieldName = unpacker.unpackString().intern() @@ -684,11 +704,12 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap { } entries = tempMap } + "clearTimeserial" -> clearTimeserial = unpacker.unpackString() else -> unpacker.skipValue() } } - return ObjectsMap(semantics = semantics, entries = entries) + return ObjectsMap(semantics = semantics, entries = entries, clearTimeserial = clearTimeserial) } /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index b84c55d76..493a74f07 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -38,6 +38,9 @@ internal class DefaultLiveMap private constructor( */ internal val data = ConcurrentHashMap() + /** @spec RTLM25 */ + internal var clearTimeserial: String? = null + /** * LiveMapManager instance for managing LiveMap operations */ @@ -174,6 +177,7 @@ internal class DefaultLiveMap private constructor( } override fun clearData(): LiveMapUpdate { + clearTimeserial = null // RTLM4 return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap()) .apply { data.clear() } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index cd1644b38..c8990f06b 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -32,6 +32,8 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang liveMap.createOperationIsMerged = false // RTLM6b liveMap.data.clear() + liveMap.clearTimeserial = objectState.map?.clearTimeserial // RTLM6i + objectState.map?.entries?.forEach { (key, entry) -> liveMap.data[key] = LiveMapEntry( isTombstoned = entry.tombstone ?: false, @@ -83,6 +85,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang liveMap.notifyUpdated(update) true // RTLM15d5b } + ObjectOperationAction.MapClear -> { + val update = applyMapClear(serial) // RTLM15d8 + liveMap.notifyUpdated(update) // RTLM15d8a + true // RTLM15d8b + } else -> { Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 false @@ -118,6 +125,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang mapSet: MapSet, // RTLM7d1 timeSerial: String?, // RTLM7d2 ): LiveMapUpdate { + // RTLM7h - skip if operation is older than the last MAP_CLEAR + val clearSerial = liveMap.clearTimeserial + if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) { + Log.v(tag, + "Skipping MAP_SET for key=\"${mapSet.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + val existingEntry = liveMap.data[mapSet.key] // RTLM7a @@ -170,6 +185,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang timeSerial: String?, // RTLM8c2 timeStamp: Long?, // RTLM8c3 ): LiveMapUpdate { + // RTLM8g - skip if operation is older than the last MAP_CLEAR + val clearSerial = liveMap.clearTimeserial + if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) { + Log.v(tag, + "Skipping MAP_REMOVE for key=\"${mapRemove.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + val existingEntry = liveMap.data[mapRemove.key] // RTLM8a @@ -212,6 +235,40 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang return LiveMapUpdate(mapOf(mapRemove.key to LiveMapUpdate.Change.REMOVED)) } + /** + * @spec RTLM24 - Applies MAP_CLEAR operation to LiveMap + */ + private fun applyMapClear(timeSerial: String?): LiveMapUpdate { + val clearSerial = liveMap.clearTimeserial + + // RTLM24c - skip if existing clear serial is strictly newer than incoming op serial + if (clearSerial != null && (timeSerial == null || clearSerial > timeSerial)) { + Log.v(tag, + "Skipping MAP_CLEAR: op serial $timeSerial <= current clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + + Log.v(tag, + "Updating clearTimeserial; previous=$clearSerial, new=$timeSerial; objectId=$objectId") + liveMap.clearTimeserial = timeSerial // RTLM24d + + val update = mutableMapOf() + + // RTLM24e - remove all entries whose serial is older than (or equal to missing) the clear serial + liveMap.data.entries.removeIf { + val (key, entry) = it + val entrySerial = entry.timeserial + if (entrySerial == null || (timeSerial != null && timeSerial > entrySerial)) { + update[key] = LiveMapUpdate.Change.REMOVED + true + } else { + false + } + } + + return LiveMapUpdate(update) + } + /** * For Lww CRDT semantics (the only supported LiveMap semantic) an operation * Should only be applied if incoming serial is strictly greater than existing entry's serial. diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index 5c5068c8c..044e3ee89 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -1,6 +1,7 @@ package io.ably.lib.objects.unit.type.livemap import io.ably.lib.objects.* +import io.ably.lib.objects.MapClear import io.ably.lib.objects.MapCreate import io.ably.lib.objects.MapRemove import io.ably.lib.objects.MapSet @@ -1129,4 +1130,183 @@ class LiveMapManagerTest { val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) assertEquals(expectedUpdate, update.update) } + + @Test + fun `(RTLM24) applyMapClear removes entries older than clear serial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial1", + data = ObjectData(string = "value1") + ) + liveMap.data["key2"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial3", + data = ObjectData(string = "value2") + ) + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + // Apply MAP_CLEAR with serial "serial2" — between serial1 and serial3 + liveMapManager.applyOperation(operation, "serial2", null) + + assertNull(liveMap.data["key1"], "Entry at serial1 should be removed") + assertNotNull(liveMap.data["key2"], "Entry at serial3 should be kept") + assertEquals("serial2", liveMap.clearTimeserial) + } + + @Test + fun `(RTLM24c) applyMapClear skips when existing clearTimeserial is newer`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial1", + data = ObjectData(string = "value1") + ) + liveMap.clearTimeserial = "serial3" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + liveMapManager.applyOperation(operation, "serial2", null) + + // clearTimeserial should remain unchanged and data should be untouched + assertEquals("serial3", liveMap.clearTimeserial) + assertNotNull(liveMap.data["key1"], "Entry should not be removed") + } + + @Test + fun `(RTLM25) clearTimeserial is set after MAP_CLEAR`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + assertNull(liveMap.clearTimeserial) + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + liveMapManager.applyOperation(operation, "serial1", null) + + assertEquals("serial1", liveMap.clearTimeserial) + } + + @Test + fun `(RTLM7h) applyMapSet skips when op serial is less than or equal to clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.clearTimeserial = "serial2" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapSet, + objectId = "map:testMap@1", + mapSet = MapSet(key = "key1", value = ObjectData(string = "value1")) + ) + + liveMapManager.applyOperation(operation, "serial1", null) + + assertNull(liveMap.data["key1"], "Entry should NOT be added when op serial <= clearTimeserial") + } + + @Test + fun `(RTLM7h) applyMapSet applies when op serial is greater than clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.clearTimeserial = "serial1" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapSet, + objectId = "map:testMap@1", + mapSet = MapSet(key = "key1", value = ObjectData(string = "value1")) + ) + + liveMapManager.applyOperation(operation, "serial2", null) + + assertNotNull(liveMap.data["key1"], "Entry should be added when op serial > clearTimeserial") + assertEquals("value1", liveMap.data["key1"]?.data?.string) + } + + @Test + fun `(RTLM8g) applyMapRemove skips when op serial is less than or equal to clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial3", + data = ObjectData(string = "value1") + ) + liveMap.clearTimeserial = "serial2" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapRemove, + objectId = "map:testMap@1", + mapRemove = MapRemove(key = "key1") + ) + + liveMapManager.applyOperation(operation, "serial1", null) + + assertFalse(liveMap.data["key1"]?.isTombstoned == true, "Entry should NOT be tombstoned when op serial <= clearTimeserial") + } + + @Test + fun `(RTLM6i) applyState sets clearTimeserial from objectState`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = emptyMap(), + clearTimeserial = "serial1" + ), + siteTimeserials = emptyMap(), + tombstone = false, + ) + + liveMapManager.applyState(objectState, null) + + assertEquals("serial1", liveMap.clearTimeserial) + } + + @Test + fun `(RTLM4) clearData resets clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + + liveMap.clearTimeserial = "serial1" + liveMap.clearData() + + assertNull(liveMap.clearTimeserial) + } + + @Test + fun `(RTLM15d8) applyOperation returns true for MAP_CLEAR`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + val result = liveMapManager.applyOperation(operation, "serial1", null) + assertTrue(result, "applyOperation should return true for MAP_CLEAR") + } } From 3f8d14b7990af56742d8c73e32f5c27d35561937 Mon Sep 17 00:00:00 2001 From: evgeny Date: Mon, 16 Mar 2026 11:16:52 +0000 Subject: [PATCH 2/7] [AIT-53] test: add filtering validation for createOp entries in `applyState` - Verified that `applyState` filters out `createOp` entries with null or older-than-clear serials as per RTLM7h. - Added unit test to ensure only entries with newer serials than `clearTimeserial` persist in the LiveMap. --- .../unit/type/livemap/LiveMapManagerTest.kt | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index 044e3ee89..adaf4eb81 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -1285,6 +1285,82 @@ class LiveMapManagerTest { assertEquals("serial1", liveMap.clearTimeserial) } + @Test + fun `(RTLM6i) applyState resets clearTimeserial to null when objectState has no clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.clearTimeserial = "serial1" + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = emptyMap(), + clearTimeserial = null + ), + siteTimeserials = emptyMap(), + tombstone = false, + ) + + liveMapManager.applyState(objectState, null) + + assertNull(liveMap.clearTimeserial) + } + + @Test + fun `(RTLM6i, RTLM6d, RTLM7h) applyState filters createOp entries older than or equal to clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // createOp has three entries: + // key-null-serial — no timeserial (treated as pre-clear by RTLM7h) + // key-old-serial — serial1, strictly older than the clear serial (serial2) + // key-new-serial — serial3, strictly newer than the clear serial (serial2) + val createOp = ObjectOperation( + action = ObjectOperationAction.MapCreate, + objectId = "map:testMap@1", + mapCreate = MapCreate( + semantics = ObjectsMapSemantics.LWW, + entries = mapOf( + "key-null-serial" to ObjectsMapEntry( + data = ObjectData(string = "nullSerialValue"), + timeserial = null + ), + "key-old-serial" to ObjectsMapEntry( + data = ObjectData(string = "oldSerialValue"), + timeserial = "serial1" + ), + "key-new-serial" to ObjectsMapEntry( + data = ObjectData(string = "newSerialValue"), + timeserial = "serial3" + ) + ) + ) + ) + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = emptyMap(), + clearTimeserial = "serial2" // RTLM6i: set before createOp entries are merged + ), + createOp = createOp, + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = false, + ) + + liveMapManager.applyState(objectState, null) + + // RTLM7h: entries with null or older-than-clear serials must be filtered out + assertNull(liveMap.data["key-null-serial"], "Entry with null serial should be filtered by RTLM7h") + assertNull(liveMap.data["key-old-serial"], "Entry with serial1 <= clearTimeserial serial2 should be filtered by RTLM7h") + // Entry whose serial is strictly newer than clearTimeserial must survive + assertNotNull(liveMap.data["key-new-serial"], "Entry with serial3 > clearTimeserial serial2 should be present") + assertEquals("newSerialValue", liveMap.data["key-new-serial"]?.data?.string) + } + @Test fun `(RTLM4) clearData resets clearTimeserial`() { val liveMap = getDefaultLiveMapWithMockedDeps() From 6f08c8f4aab11730e9d3de8165ab88d0685903fe Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 27 Aug 2025 18:53:40 +0530 Subject: [PATCH 3/7] [ECO-5056] use server-provided GC grace period for garbage collection - Add support for server-configured objectsGCGracePeriod from connection details, with fallback to default 24-hour period. Updates ObjectsPool and all LiveObjects - Implementations to use the server-provided value for tombstone cleanup timing. --- .../java/io/ably/lib/objects/Adapter.java | 6 ++-- .../io/ably/lib/objects/ObjectsAdapter.java | 8 ++--- .../ably/lib/transport/ConnectionManager.java | 2 ++ .../io/ably/lib/types/ConnectionDetails.java | 8 +++++ .../kotlin/io/ably/lib/objects/Helpers.kt | 20 ++++++++++--- .../kotlin/io/ably/lib/objects/ObjectsPool.kt | 18 ++++++++++-- .../lib/objects/type/BaseRealtimeObject.kt | 29 +++++++++++++++---- .../type/livecounter/DefaultLiveCounter.kt | 2 +- .../objects/type/livemap/DefaultLiveMap.kt | 4 +-- .../lib/objects/type/livemap/LiveMapEntry.kt | 5 ++-- .../io/ably/lib/objects/unit/HelpersTest.kt | 20 +++++++++---- .../lib/objects/unit/ObjectMessageSizeTest.kt | 18 +++++++++--- 12 files changed, 106 insertions(+), 34 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index e9a084ae7..76c35cc37 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -2,7 +2,7 @@ import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; @@ -23,8 +23,8 @@ public Adapter(@NotNull AblyRealtime ably) { } @Override - public @NotNull ConnectionManager getConnectionManager() { - return ably.connection.connectionManager; + public @NotNull Connection getConnection() { + return ably.connection; } @Override diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java index 21262942a..b6054e71a 100644 --- a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java @@ -1,7 +1,7 @@ package io.ably.lib.objects; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import org.jetbrains.annotations.Blocking; @@ -18,13 +18,13 @@ public interface ObjectsAdapter { @NotNull ClientOptions getClientOptions(); /** - * Retrieves the connection manager for handling connection state and operations. + * Retrieves the connection instance for handling connection state and operations. * Used to check connection status, obtain error information, and manage * message transmission across the Ably connection. * - * @return the connection manager instance + * @return the connection instance */ - @NotNull ConnectionManager getConnectionManager(); + @NotNull Connection getConnection(); /** * Retrieves the current time in milliseconds from the Ably server. diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 3fab010cb..94ca2b822 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -104,6 +104,7 @@ public class ConnectionManager implements ConnectListener { * This field is initialized only if the LiveObjects plugin is present in the classpath. */ private final LiveObjectsPlugin liveObjectsPlugin; + public Long objectsGCGracePeriod = null; /** * Methods on the channels map owned by the {@link AblyRealtime} instance @@ -1319,6 +1320,7 @@ private synchronized void onConnected(ProtocolMessage message) { connectionStateTtl = connectionDetails.connectionStateTtl; maxMessageSize = connectionDetails.maxMessageSize; siteCode = connectionDetails.siteCode; // CD2j + objectsGCGracePeriod = connectionDetails.objectsGCGracePeriod; /* set the clientId resolved from token, if any */ String clientId = connectionDetails.clientId; diff --git a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java index 587b9241f..6a557a12a 100644 --- a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java +++ b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java @@ -81,6 +81,11 @@ public class ConnectionDetails { */ public String siteCode; + /** + * The duration in milliseconds used to retain tombstoned objects at client side. + */ + public Long objectsGCGracePeriod; + ConnectionDetails() { maxIdleInterval = Defaults.maxIdleInterval; connectionStateTtl = Defaults.connectionStateTtl; @@ -124,6 +129,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException { case "siteCode": siteCode = unpacker.unpackString(); break; + case "objectsGCGracePeriod": + objectsGCGracePeriod = unpacker.unpackLong(); + break; default: Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 6a855868c..a3a1138e1 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -3,6 +3,7 @@ package io.ably.lib.objects import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.Callback +import io.ably.lib.realtime.ConnectionEvent import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -17,7 +18,7 @@ import kotlin.coroutines.resumeWithException */ internal suspend fun ObjectsAdapter.sendAsync(message: ProtocolMessage): PublishResult = suspendCancellableCoroutine { continuation -> try { - connectionManager.send(message, clientOptions.queueMessages, object : Callback { + connection.connectionManager.send(message, clientOptions.queueMessages, object : Callback { override fun onSuccess(result: PublishResult) { continuation.resume(result) } @@ -47,6 +48,17 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } } +internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { + val connectionManager = connection.connectionManager + if (connectionManager.objectsGCGracePeriod != null) { + block(connectionManager.objectsGCGracePeriod) + return + } + connection.once(ConnectionEvent.connected) { + block(connectionManager.objectsGCGracePeriod) + } +} + /** * Retrieves the channel modes for a specific channel. * This method returns the modes that are set for the specified channel. @@ -78,7 +90,7 @@ internal fun ObjectsAdapter.getChannelModes(channelName: String): Array) { - val maximumAllowedSize = connectionManager.maxMessageSize + val maximumAllowedSize = connection.connectionManager.maxMessageSize val objectsTotalMessageSize = objectMessages.sumOf { it.size() } if (objectsTotalMessageSize > maximumAllowedSize) { throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes", @@ -133,8 +145,8 @@ internal fun ObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: Str } internal fun ObjectsAdapter.throwIfUnpublishableState(channelName: String) { - if (!connectionManager.isActive) { - throw ablyException(connectionManager.stateErrorInfo) + if (!connection.connectionManager.isActive) { + throw ablyException(connection.connectionManager.stateErrorInfo) } throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended)) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index 28ee839e0..43cab31c2 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -14,6 +14,9 @@ import java.util.concurrent.ConcurrentHashMap internal object ObjectsPoolDefaults { const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes /** + * The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails` + * object of the `CONNECTED` event. + * If the server does not provide this value, the SDK will fall back to this default value. * Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation * with an earlier serial that would not have been applied if the tombstone still existed. * @@ -49,10 +52,19 @@ internal class ObjectsPool( private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var gcJob: Job // Job for the garbage collection coroutine + @Volatile + private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + init { // RTO3b - Initialize pool with root object pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) - // Start garbage collection coroutine + // Start garbage collection coroutine with server-provided grace period if available + realtimeObjects.adapter.retrieveObjectsGCGracePeriod { period -> + period?.let { + gcGracePeriod = it + Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") + } ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms") + } gcJob = startGCJob() } @@ -123,9 +135,9 @@ internal class ObjectsPool( */ private fun onGCInterval() { pool.entries.removeIf { (_, obj) -> - if (obj.isEligibleForGc()) { true } // Remove from pool + if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool else { - obj.onGCInterval() + obj.onGCInterval(gcGracePeriod) false // Keep in pool } } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt index c6f602b5c..2eca29b55 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt @@ -4,7 +4,6 @@ import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.ObjectsOperationSource -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.objectError import io.ably.lib.objects.type.livecounter.noOpCounterUpdate import io.ably.lib.objects.type.livemap.noOpMapUpdate @@ -138,10 +137,20 @@ internal abstract class BaseRealtimeObject( /** * Checks if the object is eligible for garbage collection. + * + * An object is eligible for garbage collection if it has been tombstoned and + * the time since tombstoning exceeds the specified grace period. + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned objects + * should be kept before being eligible for collection. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * @return true if the object is tombstoned and the grace period has elapsed, + * false otherwise */ - internal fun isEligibleForGc(): Boolean { + internal fun isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } /** @@ -198,12 +207,22 @@ internal abstract class BaseRealtimeObject( /** * Called during garbage collection intervals to clean up expired entries. * + * This method is invoked periodically (every 5 minutes) by the ObjectsPool + * to perform cleanup of tombstoned data that has exceeded the grace period. + * * This method should identify and remove entries that: * - Have been marked as tombstoned - * - Have a tombstone timestamp older than the configured grace period + * - Have a tombstone timestamp older than the specified grace period + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned entries + * should be kept before being eligible for removal. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * Must be greater than 2 minutes to ensure proper operation + * ordering and avoid issues with delayed operations. * * Implementations typically use single-pass removal techniques to * efficiently clean up expired data without creating temporary collections. */ - abstract fun onGCInterval() + abstract fun onGCInterval(gcGracePeriod: Long) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index d242a9bd3..4f1ef28e5 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -111,7 +111,7 @@ internal class DefaultLiveCounter private constructor( liveCounterManager.notify(update as LiveCounterUpdate) } - override fun onGCInterval() { + override fun onGCInterval(gcGracePeriod: Long) { // Nothing to GC for a counter object return } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 493a74f07..8e9746d6e 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -190,8 +190,8 @@ internal class DefaultLiveMap private constructor( liveMapManager.notify(update as LiveMapUpdate) } - override fun onGCInterval() { - data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() } + override fun onGCInterval(gcGracePeriod: Long) { + data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod) } } companion object { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt index df2259583..f12e88d88 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt @@ -3,7 +3,6 @@ package io.ably.lib.objects.type.livemap import io.ably.lib.objects.* import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectsPool -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.type.BaseRealtimeObject import io.ably.lib.objects.type.ObjectType import io.ably.lib.objects.type.counter.LiveCounter @@ -73,9 +72,9 @@ internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapVal /** * Extension function to check if a LiveMapEntry is expired and ready for garbage collection */ -internal fun LiveMapEntry.isEligibleForGc(): Boolean { +internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } private fun fromRealtimeObject(realtimeObject: BaseRealtimeObject): LiveMapValue { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 5750046b0..222d1cb25 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -25,7 +25,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions().apply { queueMessages = false } - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -48,7 +50,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -69,7 +73,9 @@ class HelpersTest { val connManager = mockk(relaxed = true) val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") @@ -346,7 +352,9 @@ class HelpersTest { fun testThrowIfUnpublishableStateInactiveConnection() { val adapter = mockk(relaxed = true) val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { connManager.isActive } returns false every { connManager.stateErrorInfo } returns serverError("not active").errorInfo @@ -359,7 +367,9 @@ class HelpersTest { fun testThrowIfUnpublishableStateChannelFailed() { val adapter = mockk(relaxed = true) val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + every { adapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } every { connManager.isActive } returns true val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index 57e8f45ea..be6f6e797 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -14,8 +14,10 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size +import io.ably.lib.transport.ConnectionManager import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException +import io.mockk.every import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test @@ -27,8 +29,12 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeWithinLimit() = runTest { val mockAdapter = mockk(relaxed = true) - mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) + val connManager = mockk(relaxed = true) + every { mockAdapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } + connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, connManager.maxMessageSize) // ObjectMessage with all size-contributing fields val objectMessage = ObjectMessage( @@ -158,8 +164,12 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { val mockAdapter = mockk(relaxed = true) - mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) + val connManager = mockk(relaxed = true) + every { mockAdapter.connection } returns mockk(relaxed = true) { + setPrivateField("connectionManager", connManager) + } + connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, connManager.maxMessageSize) // Create ObjectMessage with dummy data that results in size 60kb val objectMessage1 = ObjectMessage( From 8e193bccf34e19bb2259e69ed8fc276d6bf6cfd9 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 28 Aug 2025 14:44:49 +0530 Subject: [PATCH 4/7] [ECO-5056] Updated test helper getMockObjectsAdapter with static mocck 1. Encouraged use of global mocck for Adapter 2. Updated failing tests accordingly 3. Added unit tests covering all cases for retrieveObjectsGCGracePeriod --- .../kotlin/io/ably/lib/objects/Helpers.kt | 19 ++- .../io/ably/lib/objects/unit/HelpersTest.kt | 128 +++++++++++------- .../lib/objects/unit/ObjectMessageSizeTest.kt | 24 +--- .../io/ably/lib/objects/unit/TestHelpers.kt | 10 +- .../objects/unit/objects/ObjectsPoolTest.kt | 8 +- 5 files changed, 109 insertions(+), 80 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index a3a1138e1..520dfe557 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -4,6 +4,7 @@ import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.Callback import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionState import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -13,12 +14,14 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException +internal val ObjectsAdapter.connectionManager get() = connection.connectionManager + /** * Spec: RTO15g */ internal suspend fun ObjectsAdapter.sendAsync(message: ProtocolMessage): PublishResult = suspendCancellableCoroutine { continuation -> try { - connection.connectionManager.send(message, clientOptions.queueMessages, object : Callback { + connectionManager.send(message, clientOptions.queueMessages, object : Callback { override fun onSuccess(result: PublishResult) { continuation.resume(result) } @@ -49,8 +52,12 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { - val connectionManager = connection.connectionManager - if (connectionManager.objectsGCGracePeriod != null) { + connectionManager.objectsGCGracePeriod?.let { + block(it) + return + } + // If already connected, no further `connected` event is guaranteed; return immediately. + if (connection.state == ConnectionState.connected) { block(connectionManager.objectsGCGracePeriod) return } @@ -90,7 +97,7 @@ internal fun ObjectsAdapter.getChannelModes(channelName: String): Array) { - val maximumAllowedSize = connection.connectionManager.maxMessageSize + val maximumAllowedSize = connectionManager.maxMessageSize val objectsTotalMessageSize = objectMessages.sumOf { it.size() } if (objectsTotalMessageSize > maximumAllowedSize) { throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes", @@ -145,8 +152,8 @@ internal fun ObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: Str } internal fun ObjectsAdapter.throwIfUnpublishableState(channelName: String) { - if (!connection.connectionManager.isActive) { - throw ablyException(connection.connectionManager.stateErrorInfo) + if (!connectionManager.isActive) { + throw ablyException(connectionManager.stateErrorInfo) } throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended)) } diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 222d1cb25..b47db1a53 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -5,12 +5,10 @@ import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener import io.ably.lib.realtime.CompletionListener -import io.ably.lib.transport.ConnectionManager +import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.* -import io.mockk.every -import io.mockk.mockk -import io.mockk.slot -import io.mockk.verify +import io.mockk.* import kotlinx.coroutines.test.runTest import org.junit.Assert.* import org.junit.Test @@ -21,13 +19,10 @@ class HelpersTest { // sendAsync @Test fun testSendAsyncShouldQueueAccordingToClientOptions() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions().apply { queueMessages = false } - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -46,13 +41,10 @@ class HelpersTest { @Test fun testSendAsyncErrorPropagatesAblyException() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -67,15 +59,59 @@ class HelpersTest { assertEquals(40000, ex.errorInfo.code) } + @Test + fun testRetrieveObjectsGCGracePeriodImmediateInvokesBlock() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + connManager.setPrivateField("objectsGCGracePeriod", 123L) + + var value: Long? = null + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertEquals(123L, value) + verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + + @Test + fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + + var value: Long? = null + every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + connManager.setPrivateField("objectsGCGracePeriod", 456L) + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertEquals(456L, value) + verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + + @Test + fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { + val adapter = getMockObjectsAdapter() + + var value: Long? = null + every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.retrieveObjectsGCGracePeriod { v -> value = v } + + assertNull(value) + verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + } + @Test fun testSendAsyncThrowsWhenConnectionManagerThrows() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") @@ -251,25 +287,25 @@ class HelpersTest { verify(exactly = 1) { channel.once(any()) } } - @Test - fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { - val adapter = mockk(relaxed = true) - val channel = mockk(relaxed = true) - every { adapter.getChannel("ch") } returns channel - channel.state = ChannelState.attaching - every { channel.once(any()) } answers { - val listener = firstArg() - val stateChange = mockk(relaxed = true) { - setPrivateField("current", ChannelState.suspended) - setPrivateField("reason", clientError("Not attached").errorInfo) - } - listener.onChannelStateChanged(stateChange) - } - val ex = assertFailsWith { adapter.ensureAttached("ch") } - assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) - assertTrue(ex.errorInfo.message.contains("Not attached")) - verify(exactly = 1) { channel.once(any()) } - } + @Test + fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attaching + every { channel.once(any()) } answers { + val listener = firstArg() + val stateChange = mockk(relaxed = true) { + setPrivateField("current", ChannelState.suspended) + setPrivateField("reason", clientError("Not attached").errorInfo) + } + listener.onChannelStateChanged(stateChange) + } + val ex = assertFailsWith { adapter.ensureAttached("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("Not attached")) + verify(exactly = 1) { channel.once(any()) } + } @Test fun testEnsureAttachedThrowsForInvalidState() = runTest { @@ -350,11 +386,8 @@ class HelpersTest { // throwIfUnpublishableState @Test fun testThrowIfUnpublishableStateInactiveConnection() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns false every { connManager.stateErrorInfo } returns serverError("not active").errorInfo @@ -365,11 +398,8 @@ class HelpersTest { @Test fun testThrowIfUnpublishableStateChannelFailed() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns true val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index be6f6e797..4c413649e 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -14,27 +14,19 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size -import io.ably.lib.transport.ConnectionManager import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException -import io.mockk.every -import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith class ObjectMessageSizeTest { - @Test fun testObjectMessageSizeWithinLimit() = runTest { - val mockAdapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { mockAdapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } - connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, connManager.maxMessageSize) + val mockAdapter = getMockObjectsAdapter() + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // ObjectMessage with all size-contributing fields val objectMessage = ObjectMessage( @@ -163,13 +155,9 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { - val mockAdapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { mockAdapter.connection } returns mockk(relaxed = true) { - setPrivateField("connectionManager", connManager) - } - connManager.maxMessageSize = Defaults.maxMessageSize // 64 kb - assertEquals(65536, connManager.maxMessageSize) + val mockAdapter = getMockObjectsAdapter() + mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb + assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) // Create ObjectMessage with dummy data that results in size 60kb val objectMessage1 = ObjectMessage( diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt index 56e4b2d20..17be76951 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -11,11 +11,13 @@ import io.ably.lib.objects.type.livemap.LiveMapManager import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState +import io.ably.lib.transport.ConnectionManager import io.ably.lib.types.ChannelMode import io.ably.lib.types.ChannelOptions import io.ably.lib.types.ClientOptions import io.mockk.every import io.mockk.mockk +import io.mockk.mockkStatic import io.mockk.spyk import kotlinx.coroutines.CompletableDeferred @@ -46,9 +48,11 @@ internal fun getMockRealtimeChannel( } internal fun getMockObjectsAdapter(): ObjectsAdapter { - val mockkAdapter = mockk(relaxed = true) - every { mockkAdapter.getChannel(any()) } returns getMockRealtimeChannel("testChannelName") - return mockkAdapter + mockkStatic("io.ably.lib.objects.HelpersKt") + return mockk(relaxed = true) { + every { getChannel(any()) } returns getMockRealtimeChannel("testChannelName") + every { connectionManager } returns mockk(relaxed = true) + } } internal fun getMockObjectsPool(): ObjectsPool { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt index 656b1e7c1..aff4f9d1a 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt @@ -19,7 +19,7 @@ class ObjectsPoolTest { @Test fun `(RTO3, RTO3a, RTO3b) An internal ObjectsPool should be used to maintain the list of objects present on a channel`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertNotNull(objectsPool) @@ -44,7 +44,7 @@ class ObjectsPoolTest { @Test fun `(RTO6) ObjectsPool should create zero-value objects if not exists`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = spyk(defaultRealtimeObjects.objectsPool) assertEquals(1, objectsPool.size(), "RTO3 - Should only contain the root object initially") @@ -78,7 +78,7 @@ class ObjectsPoolTest { @Test fun `(RTO4b1, RTO4b2) ObjectsPool should reset to initial pool retaining original root map`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertEquals(1, objectsPool.size()) val rootMap = objectsPool.get(ROOT_OBJECT_ID) as DefaultLiveMap @@ -107,7 +107,7 @@ class ObjectsPoolTest { @Test fun `(RTO5c2, RTO5c2a) ObjectsPool should delete extra object IDs`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool // Add some objects From 551451a553bc3260b678c236567ede3d63d6c5ac Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 28 Aug 2025 16:25:01 +0530 Subject: [PATCH 5/7] [ECO-5056] Added test for getObjects when liveobjects plugin is not installed --- .../test/realtime/RealtimeChannelTest.java | 23 +++++++++++++++++++ .../io/ably/lib/objects/unit/HelpersTest.kt | 10 ++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index a64f96daa..5350f0515 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -2617,6 +2617,29 @@ public void connect_should_not_rewrite_immediate_attach() throws AblyException { } } + @Test + public void channel_get_objects_throws_exception() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + AblyException exception = assertThrows(AblyException.class, channel::getObjects); + assertNotNull(exception); + assertEquals(40019, exception.errorInfo.code); + assertEquals(400, exception.errorInfo.statusCode); + assertTrue(exception.errorInfo.message.contains("LiveObjects plugin hasn't been installed")); + } + } + static class DetachingProtocolListener implements DebugOptions.RawProtocolListener { public Channel theChannel; diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index b47db1a53..95795ca0f 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -76,9 +76,10 @@ class HelpersTest { fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager + val connection = adapter.connection var value: Long? = null - every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) @@ -87,15 +88,16 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { val adapter = getMockObjectsAdapter() + val connection = adapter.connection var value: Long? = null - every { adapter.connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } @@ -103,7 +105,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertNull(value) - verify(exactly = 1) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test From 491c9e448b348892aa68ff895258ffa5f5a3ca0c Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 29 Aug 2025 16:32:43 +0530 Subject: [PATCH 6/7] [ECO-5056] Refactored HelpersTest.kt assertions with concrete types --- .../io/ably/lib/objects/unit/HelpersTest.kt | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 95795ca0f..bd05e0579 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -69,7 +69,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(123L, value) - verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } } @Test @@ -79,7 +79,7 @@ class HelpersTest { val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) @@ -88,7 +88,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test @@ -97,7 +97,7 @@ class HelpersTest { val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.once(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } @@ -105,7 +105,7 @@ class HelpersTest { adapter.retrieveObjectsGCGracePeriod { v -> value = v } assertNull(value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } } @Test @@ -277,7 +277,7 @@ class HelpersTest { every { adapter.getChannel("ch") } returns channel channel.state = ChannelState.attaching - every { channel.once(any()) } answers { + every { channel.once(any()) } answers { val listener = firstArg() val stateChange = mockk(relaxed = true) { setPrivateField("current", ChannelState.attached) @@ -286,7 +286,7 @@ class HelpersTest { } adapter.ensureAttached("ch") - verify(exactly = 1) { channel.once(any()) } + verify(exactly = 1) { channel.once(any()) } } @Test @@ -295,7 +295,7 @@ class HelpersTest { val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel channel.state = ChannelState.attaching - every { channel.once(any()) } answers { + every { channel.once(any()) } answers { val listener = firstArg() val stateChange = mockk(relaxed = true) { setPrivateField("current", ChannelState.suspended) @@ -306,7 +306,7 @@ class HelpersTest { val ex = assertFailsWith { adapter.ensureAttached("ch") } assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) assertTrue(ex.errorInfo.message.contains("Not attached")) - verify(exactly = 1) { channel.once(any()) } + verify(exactly = 1) { channel.once(any()) } } @Test From debbcd0c42894058dbdd18e0e358977ab29c95a3 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 25 Sep 2025 17:57:13 +0530 Subject: [PATCH 7/7] [ECO-5056] Updated retrieveObjectsGCGracePeriod to return gcperiod every time CONNECTED message is received, updated tests for the same --- .../kotlin/io/ably/lib/objects/Helpers.kt | 19 ++++++---------- .../kotlin/io/ably/lib/objects/ObjectsPool.kt | 7 +++--- .../io/ably/lib/objects/unit/HelpersTest.kt | 22 +++++++++---------- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 520dfe557..683971510 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -4,7 +4,7 @@ import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.Callback import io.ably.lib.realtime.ConnectionEvent -import io.ably.lib.realtime.ConnectionState +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -51,19 +51,14 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } } -internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) { - connectionManager.objectsGCGracePeriod?.let { - block(it) - return - } - // If already connected, no further `connected` event is guaranteed; return immediately. - if (connection.state == ConnectionState.connected) { - block(connectionManager.objectsGCGracePeriod) - return - } - connection.once(ConnectionEvent.connected) { +internal fun ObjectsAdapter.onGCGracePeriodUpdated(block : (Long?) -> Unit) : ObjectsSubscription { + connectionManager.objectsGCGracePeriod?.let { block(it) } + // Return new objectsGCGracePeriod whenever connection state changes to connected + val listener: (_: ConnectionStateListener.ConnectionStateChange) -> Unit = { block(connectionManager.objectsGCGracePeriod) } + connection.on(ConnectionEvent.connected, listener) + return ObjectsSubscription { connection.off(listener) } } /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index 43cab31c2..224cd606f 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -52,14 +52,14 @@ internal class ObjectsPool( private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var gcJob: Job // Job for the garbage collection coroutine - @Volatile - private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + @Volatile private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + private var gcPeriodSubscription: ObjectsSubscription init { // RTO3b - Initialize pool with root object pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) // Start garbage collection coroutine with server-provided grace period if available - realtimeObjects.adapter.retrieveObjectsGCGracePeriod { period -> + gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period -> period?.let { gcGracePeriod = it Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") @@ -164,6 +164,7 @@ internal class ObjectsPool( * Should be called when the pool is no longer needed. */ fun dispose() { + gcPeriodSubscription.unsubscribe() gcJob.cancel() gcScope.cancel() pool.clear() diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index bd05e0579..21f5c6792 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -60,52 +60,52 @@ class HelpersTest { } @Test - fun testRetrieveObjectsGCGracePeriodImmediateInvokesBlock() { + fun testOnGCGracePeriodImmediateInvokesBlock() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager connManager.setPrivateField("objectsGCGracePeriod", 123L) var value: Long? = null - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertEquals(123L, value) - verify(exactly = 0) { adapter.connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { adapter.connection.on(ConnectionEvent.connected, any()) } } @Test - fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithValue() { + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithValue() { val adapter = getMockObjectsAdapter() val connManager = adapter.connectionManager val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.on(ConnectionEvent.connected, any()) } answers { val listener = secondArg() connManager.setPrivateField("objectsGCGracePeriod", 456L) listener.onConnectionStateChanged(mockk(relaxed = true)) } - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertEquals(456L, value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } } @Test - fun testRetrieveObjectsGCGracePeriodDeferredInvokesOnConnectedWithNull() { + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithNull() { val adapter = getMockObjectsAdapter() val connection = adapter.connection var value: Long? = null - every { connection.once(ConnectionEvent.connected, any()) } answers { + every { connection.on(ConnectionEvent.connected, any()) } answers { val listener = secondArg() listener.onConnectionStateChanged(mockk(relaxed = true)) } - adapter.retrieveObjectsGCGracePeriod { v -> value = v } + adapter.onGCGracePeriodUpdated { v -> value = v } assertNull(value) - verify(exactly = 1) { connection.once(ConnectionEvent.connected, any()) } + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } } @Test