Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -598,14 +598,31 @@ open class Analytics protected constructor(
* Should only be called in containerized environments where you need to free resources like
* CoroutineDispatchers and ExecutorService instances so they allow the container to shutdown
* properly.
*
* @param waitForTasks if true, waits for all analyticsScope coroutines to complete before shutdown
*/
@JvmOverloads
@OptIn(ExperimentalCoroutinesApi::class)
fun shutdown() {
(analyticsDispatcher as CloseableCoroutineDispatcher).close()
(networkIODispatcher as CloseableCoroutineDispatcher).close()
(fileIODispatcher as CloseableCoroutineDispatcher).close()
fun shutdown(waitForTasks: Boolean = false) {
timeline.applyClosure {
it.shutdown()
}

store.shutdown();
val job = analyticsScope.coroutineContext[Job]
job?.cancel()
if (waitForTasks) {
runBlocking {
job?.join()
}
}

(analyticsDispatcher as? CloseableCoroutineDispatcher)?.close()
(networkIODispatcher as? CloseableCoroutineDispatcher)?.close()
(fileIODispatcher as? CloseableCoroutineDispatcher)?.close()

store.shutdown()

storage.close()
}

/**
Expand Down Expand Up @@ -836,4 +853,4 @@ internal fun isAndroid(): Boolean {
} catch (ignored: ClassNotFoundException) {
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ interface Storage {
* Close and finish the current stream and start a new one
*/
suspend fun rollover()

/**
* Close and cleanup storage resources
*/
fun close() {
// empty body default
}
}

fun parseFilePaths(filePathStr: String?): List<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ open class EventPipeline(

writeChannel = Channel(UNLIMITED)
uploadChannel = Channel(UNLIMITED)

registerShutdownHook()
}

fun put(event: BaseEvent) {
Expand Down Expand Up @@ -202,13 +200,4 @@ open class EventPipeline(

return shouldCleanup
}

private fun registerShutdownHook() {
// close the stream if the app shuts down
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
this@EventPipeline.stop()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ interface Plugin {
fun update(settings: Settings, type: UpdateType) {
// empty body default
}

fun shutdown() {
// empty body default
}
}

interface VersionedPlugin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class SegmentDestination: DestinationPlugin(), VersionedPlugin, Subscriber {
pipeline?.flush()
}

override fun shutdown() {
pipeline?.stop()
}

override fun version(): String {
return Constants.LIBRARY_VERSION
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ open class FileEventStream(

init {
createDirectory(directory)
registerShutdownHook()
}

protected open var fs: FileOutputStream? = null
Expand Down Expand Up @@ -239,13 +238,4 @@ open class FileEventStream(
val file = File(source)
return if (file.exists()) FileInputStream(file) else null
}

private fun registerShutdownHook() {
// close the stream if the app shuts down
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
fs?.close()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class EventsFileManager(

init {
createDirectory(directory)
registerShutdownHook()
}

private val fileIndexKey = if(subject == null) "segment.events.file.index.$writeKey" else "segment.events.file.index.$writeKey.$subject"
Expand Down Expand Up @@ -170,15 +169,6 @@ class EventsFileManager(
curFile = null
}

private fun registerShutdownHook() {
// close the stream if the app shuts down
Runtime.getRuntime().addShutdownHook(object : Thread() {
override fun run() {
os?.close()
}
})
}

private suspend fun withLock(block: () -> Unit) {
semaphore.acquire()
block()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ open class StorageImpl(
block()
semaphore.release()
}

override fun close() {
if (eventStream is FileEventStream) {
(eventStream as FileEventStream).close()
}
}
}

object ConcreteStorageProvider : StorageProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,22 @@ class AnalyticsTests {
assertEquals(2, files.size)
}

@Test
fun `shutdown calls shutdown on all plugins`() {
val config = Configuration(
writeKey = "test-shutdown",
application = "TestApp"
)
val testScope = TestScope(testDispatcher)
val testAnalytics = testAnalytics(config, testScope, testDispatcher)
val mockPlugin = spyk<StubPlugin>()
testAnalytics.add(mockPlugin)

testAnalytics.shutdown()

verify { mockPlugin.shutdown() }
}

@Test
fun `purgeStorage clears storage`() = runTest {
analytics.track("test")
Expand Down
Loading