From f1653856f6e1d6fb96d21d1c0d128c8ddf778629 Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Sat, 2 Aug 2025 02:10:14 +0200 Subject: [PATCH 1/2] Improve resource cleanup on shutdown --- .../analytics/kotlin/core/Analytics.kt | 32 +++++++++++++++++-- .../kotlin/core/platform/EventPipeline.kt | 11 ------- .../kotlin/core/utilities/EventStream.kt | 10 ------ .../core/utilities/EventsFileManager.kt | 10 ------ 4 files changed, 29 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt index 80598854..df447ad2 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt @@ -11,6 +11,8 @@ import com.segment.analytics.kotlin.core.platform.plugins.StartupQueue import com.segment.analytics.kotlin.core.platform.plugins.UserInfoPlugin import com.segment.analytics.kotlin.core.platform.plugins.logger.* import com.segment.analytics.kotlin.core.utilities.JsonAnySerializer +import com.segment.analytics.kotlin.core.utilities.StorageImpl +import com.segment.analytics.kotlin.core.utilities.FileEventStream import kotlinx.coroutines.* import kotlinx.serialization.* import kotlinx.serialization.json.Json @@ -598,14 +600,38 @@ open class Analytics protected constructor( * Should only be called in containerized environments where you need to free resources like * CoroutineDispatchers and ExecutorService instances so they allow the container to shutdown * properly. + * + * @param waitForTasks if true, waits for all analyticsScope coroutines to complete before shutdown */ + @JvmOverloads @OptIn(ExperimentalCoroutinesApi::class) - fun shutdown() { + fun shutdown(waitForTasks: Boolean = false) { + timeline.applyClosure { + if (it is com.segment.analytics.kotlin.core.platform.EventPipeline) { + it.stop() + } + } + + val job = analyticsScope.coroutineContext[Job] + job?.cancel() + if (waitForTasks) { + runBlocking { + job?.join() + } + } + (analyticsDispatcher as CloseableCoroutineDispatcher).close() (networkIODispatcher as CloseableCoroutineDispatcher).close() (fileIODispatcher as CloseableCoroutineDispatcher).close() - store.shutdown(); + store.shutdown() + + if (storage is StorageImpl) { + val s = storage as StorageImpl + if (s.eventStream is FileEventStream) { + (s.eventStream as FileEventStream).close() + } + } } /** @@ -836,4 +862,4 @@ internal fun isAndroid(): Boolean { } catch (ignored: ClassNotFoundException) { false } -} +} \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt index a133ad2e..f0e3568d 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt @@ -53,8 +53,6 @@ open class EventPipeline( writeChannel = Channel(UNLIMITED) uploadChannel = Channel(UNLIMITED) - - registerShutdownHook() } fun put(event: BaseEvent) { @@ -202,13 +200,4 @@ open class EventPipeline( return shouldCleanup } - - private fun registerShutdownHook() { - // close the stream if the app shuts down - Runtime.getRuntime().addShutdownHook(object : Thread() { - override fun run() { - this@EventPipeline.stop() - } - }) - } } \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventStream.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventStream.kt index 5ee3ed00..f0f9f7f9 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventStream.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventStream.kt @@ -160,7 +160,6 @@ open class FileEventStream( init { createDirectory(directory) - registerShutdownHook() } protected open var fs: FileOutputStream? = null @@ -239,13 +238,4 @@ open class FileEventStream( val file = File(source) return if (file.exists()) FileInputStream(file) else null } - - private fun registerShutdownHook() { - // close the stream if the app shuts down - Runtime.getRuntime().addShutdownHook(object : Thread() { - override fun run() { - fs?.close() - } - }) - } } \ No newline at end of file diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt index cb778aa8..78fc98f6 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/EventsFileManager.kt @@ -41,7 +41,6 @@ class EventsFileManager( init { createDirectory(directory) - registerShutdownHook() } private val fileIndexKey = if(subject == null) "segment.events.file.index.$writeKey" else "segment.events.file.index.$writeKey.$subject" @@ -170,15 +169,6 @@ class EventsFileManager( curFile = null } - private fun registerShutdownHook() { - // close the stream if the app shuts down - Runtime.getRuntime().addShutdownHook(object : Thread() { - override fun run() { - os?.close() - } - }) - } - private suspend fun withLock(block: () -> Unit) { semaphore.acquire() block() From efd7f0db794f82ed3598c0dc5a8177254425115c Mon Sep 17 00:00:00 2001 From: Albert Puig Date: Tue, 16 Dec 2025 16:46:08 +0100 Subject: [PATCH 2/2] review comments --- .../analytics/kotlin/core/Analytics.kt | 19 +++++-------------- .../segment/analytics/kotlin/core/Storage.kt | 7 +++++++ .../analytics/kotlin/core/platform/Plugin.kt | 4 ++++ .../platform/plugins/SegmentDestination.kt | 4 ++++ .../kotlin/core/utilities/StorageImpl.kt | 6 ++++++ .../analytics/kotlin/core/AnalyticsTests.kt | 16 ++++++++++++++++ 6 files changed, 42 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt index df447ad2..9030fcb2 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt @@ -11,8 +11,6 @@ import com.segment.analytics.kotlin.core.platform.plugins.StartupQueue import com.segment.analytics.kotlin.core.platform.plugins.UserInfoPlugin import com.segment.analytics.kotlin.core.platform.plugins.logger.* import com.segment.analytics.kotlin.core.utilities.JsonAnySerializer -import com.segment.analytics.kotlin.core.utilities.StorageImpl -import com.segment.analytics.kotlin.core.utilities.FileEventStream import kotlinx.coroutines.* import kotlinx.serialization.* import kotlinx.serialization.json.Json @@ -607,9 +605,7 @@ open class Analytics protected constructor( @OptIn(ExperimentalCoroutinesApi::class) fun shutdown(waitForTasks: Boolean = false) { timeline.applyClosure { - if (it is com.segment.analytics.kotlin.core.platform.EventPipeline) { - it.stop() - } + it.shutdown() } val job = analyticsScope.coroutineContext[Job] @@ -620,18 +616,13 @@ open class Analytics protected constructor( } } - (analyticsDispatcher as CloseableCoroutineDispatcher).close() - (networkIODispatcher as CloseableCoroutineDispatcher).close() - (fileIODispatcher as CloseableCoroutineDispatcher).close() + (analyticsDispatcher as? CloseableCoroutineDispatcher)?.close() + (networkIODispatcher as? CloseableCoroutineDispatcher)?.close() + (fileIODispatcher as? CloseableCoroutineDispatcher)?.close() store.shutdown() - if (storage is StorageImpl) { - val s = storage as StorageImpl - if (s.eventStream is FileEventStream) { - (s.eventStream as FileEventStream).close() - } - } + storage.close() } /** diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt index 9acf2f49..a6b0bd26 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Storage.kt @@ -99,6 +99,13 @@ interface Storage { * Close and finish the current stream and start a new one */ suspend fun rollover() + + /** + * Close and cleanup storage resources + */ + fun close() { + // empty body default + } } fun parseFilePaths(filePathStr: String?): List { diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt index afd959fa..4845654b 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt @@ -43,6 +43,10 @@ interface Plugin { fun update(settings: Settings, type: UpdateType) { // empty body default } + + fun shutdown() { + // empty body default + } } interface VersionedPlugin { diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestination.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestination.kt index 49740790..ee09727e 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestination.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestination.kt @@ -111,6 +111,10 @@ class SegmentDestination: DestinationPlugin(), VersionedPlugin, Subscriber { pipeline?.flush() } + override fun shutdown() { + pipeline?.stop() + } + override fun version(): String { return Constants.LIBRARY_VERSION } diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt index 0642c764..eabb1b3b 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/utilities/StorageImpl.kt @@ -195,6 +195,12 @@ open class StorageImpl( block() semaphore.release() } + + override fun close() { + if (eventStream is FileEventStream) { + (eventStream as FileEventStream).close() + } + } } object ConcreteStorageProvider : StorageProvider { diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt index 29c78763..4c37e9b3 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt @@ -947,6 +947,22 @@ class AnalyticsTests { assertEquals(2, files.size) } + @Test + fun `shutdown calls shutdown on all plugins`() { + val config = Configuration( + writeKey = "test-shutdown", + application = "TestApp" + ) + val testScope = TestScope(testDispatcher) + val testAnalytics = testAnalytics(config, testScope, testDispatcher) + val mockPlugin = spyk() + testAnalytics.add(mockPlugin) + + testAnalytics.shutdown() + + verify { mockPlugin.shutdown() } + } + @Test fun `purgeStorage clears storage`() = runTest { analytics.track("test")