From 1b879831491be17c50d9b48b545408ae1a691f96 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Fri, 23 Jan 2026 16:51:40 +0100 Subject: [PATCH 1/5] feat(core): add custom audio effect API in the audio processor --- .../elements/processing/IEffectProcessor.kt | 62 ++++++++++++++++ .../elements/processing/IFrameProcessor.kt | 28 -------- .../elements/processing/RawFramePullPush.kt | 6 +- .../elements/processing/audio/AudioEffects.kt | 37 +++++++--- .../processing/audio/AudioFrameProcessor.kt | 72 ++++++++++++++++--- .../processing/audio/IAudioFrameProcessor.kt | 2 +- .../core/pipelines/inputs/AudioInput.kt | 19 +++-- 7 files changed, 170 insertions(+), 56 deletions(-) create mode 100644 core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IEffectProcessor.kt delete mode 100644 core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IFrameProcessor.kt diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IEffectProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IEffectProcessor.kt new file mode 100644 index 000000000..0bad99623 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IEffectProcessor.kt @@ -0,0 +1,62 @@ +/* + * Copyright 2025 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.elements.processing + +/** + * Interface to process a data and returns a result. + * + * @param T type of data to proces) + */ +interface IProcessor { + /** + * Process a data and returns a result. + * + * @param data data to process + * @return processed data + */ + fun process(data: T): T +} + +/** + * Interface to process a data and returns a result. + * + * @param T type of data to proces) + */ +interface IEffectProcessor { + /** + * Process a data and returns a result. + * + * @param isMuted whether the data contains only 0 + * @param data data to process + * @return processed data + */ + fun process(isMuted: Boolean, data: T): T +} + +/** + * Interface to process a data. + * + * @param T type of data to process + */ +interface IEffectConsumer { + /** + * Process a data. + * + * @param isMuted whether the data contains only 0 + * @param data data to process + */ + fun consume(isMuted: Boolean, data: T) +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IFrameProcessor.kt deleted file mode 100644 index dc9375392..000000000 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IFrameProcessor.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2025 Thibault B. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.github.thibaultbee.streampack.core.elements.processing - -import io.github.thibaultbee.streampack.core.elements.data.Frame -import io.github.thibaultbee.streampack.core.elements.data.RawFrame - -/** - * Interface to process a frame. - * - * @param T type of frame to process (probably [RawFrame] or [Frame]) - */ -interface IFrameProcessor { - fun processFrame(frame: T): T -} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt index f87b531ad..eca215f31 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt @@ -32,7 +32,7 @@ import kotlinx.coroutines.sync.withLock import java.util.concurrent.atomic.AtomicBoolean fun RawFramePullPush( - frameProcessor: IFrameProcessor, + frameProcessor: IProcessor, onFrame: suspend (RawFrame) -> Unit, processDispatcher: CoroutineDispatcher, isDirect: Boolean = true @@ -47,7 +47,7 @@ fun RawFramePullPush( * @param processDispatcher the dispatcher to process frames on */ class RawFramePullPush( - private val frameProcessor: IFrameProcessor, + private val frameProcessor: IProcessor, val onFrame: suspend (RawFrame) -> Unit, private val bufferPool: ByteBufferPool, private val processDispatcher: CoroutineDispatcher, @@ -99,7 +99,7 @@ class RawFramePullPush( // Process buffer with effects val processedFrame = try { - frameProcessor.processFrame(rawFrame) + frameProcessor.process(rawFrame) } catch (t: Throwable) { Logger.e(TAG, "Failed to pre-process frame: ${t.message}") continue diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt index 83af9a5a7..bde293f80 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt @@ -16,30 +16,47 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio import io.github.thibaultbee.streampack.core.elements.data.RawFrame -import io.github.thibaultbee.streampack.core.elements.processing.IFrameProcessor +import io.github.thibaultbee.streampack.core.elements.processing.IEffectConsumer +import io.github.thibaultbee.streampack.core.elements.processing.IEffectProcessor import java.io.Closeable /** * The base audio effect. */ -interface IAudioEffect : IFrameProcessor, Closeable +sealed interface IAudioEffect : Closeable /** - * Mute audio effect. + * An audio effect that can be dispatched to another thread. The result is not use by the audio pipeline. + * Example: a VU meter. */ -class MuteEffect : IAudioEffect { +interface IConsumerAudioEffect : IAudioEffect, IEffectConsumer + +/** + * An audio effect that can't be dispatched to another thread. The result is used by the audio pipeline. + * + * The [RawFrame.rawBuffer] can't be modified. + */ +interface IProcessorAudioEffect : IAudioEffect, IEffectProcessor + +/** + * An audio effect that mute the audio. + */ +class MuteEffect : IProcessorAudioEffect { private var mutedByteArray: ByteArray? = null - override fun processFrame(frame: RawFrame): RawFrame { - val remaining = frame.rawBuffer.remaining() - val position = frame.rawBuffer.position() + override fun process(isMuted: Boolean, data: RawFrame): RawFrame { + if (!isMuted) { + return data + } + val remaining = data.rawBuffer.remaining() + val position = data.rawBuffer.position() if (remaining != mutedByteArray?.size) { mutedByteArray = ByteArray(remaining) } - frame.rawBuffer.put(mutedByteArray!!) - frame.rawBuffer.position(position) + data.rawBuffer.put(mutedByteArray!!) + data.rawBuffer.position(position) - return frame + return data } override fun close() { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt index 4cd6e7b7a..99c586a22 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt @@ -16,22 +16,76 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio import io.github.thibaultbee.streampack.core.elements.data.RawFrame -import io.github.thibaultbee.streampack.core.elements.processing.IFrameProcessor +import io.github.thibaultbee.streampack.core.elements.processing.IProcessor +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import java.io.Closeable +import java.util.function.IntFunction /** * Audio frame processor. * - * Only supports mute effect for now. + * It is not thread-safe. */ -class AudioFrameProcessor : IFrameProcessor, - IAudioFrameProcessor { +class AudioFrameProcessor( + dispatcher: CoroutineDispatcher, + private val effects: MutableList = mutableListOf() +) : IProcessor, IAudioFrameProcessor, Closeable, MutableList by effects { + private val coroutineScope = CoroutineScope(dispatcher + SupervisorJob()) + + /** + * Whether the audio is muted. + * + * When the audio is muted, the audio effect are not processed. Only consumer effects are processed. + */ override var isMuted = false private val muteEffect = MuteEffect() - override fun processFrame(frame: RawFrame): RawFrame { - if (isMuted) { - return muteEffect.processFrame(frame) + private fun launchConsumerEffect( + effect: IConsumerAudioEffect, + isMuted: Boolean, + data: RawFrame + ) { + coroutineScope.launch { + val consumeFrame = + data.copy( + rawBuffer = data.rawBuffer.duplicate().asReadOnlyBuffer() + ) + effect.consume(isMuted, consumeFrame) } - return frame } -} \ No newline at end of file + + override fun process(data: RawFrame): RawFrame { + val isMuted = isMuted + + var processedFrame = muteEffect.process(isMuted, data) + + effects.forEach { + if (it is IProcessorAudioEffect) { + processedFrame = it.process(isMuted, processedFrame) + } else if (it is IConsumerAudioEffect) { + launchConsumerEffect(it, isMuted, processedFrame) + } + } + + return processedFrame + } + + override fun close() { + effects.forEach { it.close() } + effects.clear() + + muteEffect.close() + + coroutineScope.cancel() + } + + @Deprecated("'fun toArray(generator: IntFunction!>!): Array<(out) T!>!' is deprecated. This declaration is redundant in Kotlin and might be removed soon.") + @Suppress("DEPRECATION") + override fun toArray(generator: IntFunction?>): Array { + return super.toArray(generator) + } +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt index 24c7606e1..19cc6ba9c 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt @@ -20,7 +20,7 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio */ interface IAudioFrameProcessor { /** - * Mute audio. + * Whether the processor is muted. */ var isMuted: Boolean } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt index 9560378e1..85ac22509 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt @@ -135,12 +135,12 @@ internal class AudioInput( /** * The audio processor. */ - private val frameProcessorInternal = AudioFrameProcessor() - override val processor: IAudioFrameProcessor = frameProcessorInternal + private val processorInternal = AudioFrameProcessor(dispatcherProvider.default) + override val processor: IAudioFrameProcessor = processorInternal private val port = if (config is PushConfig) { - PushAudioPort(frameProcessorInternal, config, dispatcherProvider) + PushAudioPort(processorInternal, config, dispatcherProvider) } else { - CallbackAudioPort(frameProcessorInternal) // No threading needed, called from encoder thread + CallbackAudioPort(processorInternal) // No threading needed, called from encoder thread } // CONFIG @@ -355,6 +355,15 @@ internal class AudioInput( ) } + try { + processorInternal.close() + } catch (t: Throwable) { + Logger.w( + TAG, + "release: Can't close audio processor: ${t.message}" + ) + } + isStreamingJob.cancel() } coroutineScope.coroutineContext.cancelChildren() @@ -430,7 +439,7 @@ private class CallbackAudioPort(private val audioFrameProcessor: AudioFrameProce val timestampInUs = source.fillAudioFrame(buffer) pool.get(buffer, timestampInUs) } - return audioFrameProcessor.processFrame(frame) + return audioFrameProcessor.process(frame) } } From d4bf3be63ec01369065fb92114bb3ea24103b9e4 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Mon, 2 Feb 2026 09:45:25 +0100 Subject: [PATCH 2/5] fix(core): processor: duplicate raw buffer dispatching --- .../elements/processing/audio/AudioFrameProcessor.kt | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt index 99c586a22..f6ccc4934 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt @@ -16,6 +16,7 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio import io.github.thibaultbee.streampack.core.elements.data.RawFrame +import io.github.thibaultbee.streampack.core.elements.data.copy import io.github.thibaultbee.streampack.core.elements.processing.IProcessor import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -49,11 +50,11 @@ class AudioFrameProcessor( isMuted: Boolean, data: RawFrame ) { + val consumeFrame = + data.copy( + rawBuffer = data.rawBuffer.duplicate().asReadOnlyBuffer() + ) coroutineScope.launch { - val consumeFrame = - data.copy( - rawBuffer = data.rawBuffer.duplicate().asReadOnlyBuffer() - ) effect.consume(isMuted, consumeFrame) } } From c094b214b33751430a9036e2f58612cb236a616a Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Mon, 2 Feb 2026 10:40:52 +0100 Subject: [PATCH 3/5] refactor(core): rename `clone` to `deepCopy` --- .../endpoints/composites/muxers/mp4/models/TrackChunks.kt | 4 ++-- .../elements/utils/extensions/ByteBufferExtensions.kt | 8 ++++---- .../utils/extensions/ByteBufferExtensionsKtTest.kt | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt index cbccabd76..28dde5d47 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt @@ -63,7 +63,7 @@ import io.github.thibaultbee.streampack.core.elements.utils.av.descriptors.SLCon import io.github.thibaultbee.streampack.core.elements.utils.av.video.avc.AVCDecoderConfigurationRecord import io.github.thibaultbee.streampack.core.elements.utils.av.video.hevc.HEVCDecoderConfigurationRecord import io.github.thibaultbee.streampack.core.elements.utils.av.video.vpx.VPCodecConfigurationRecord -import io.github.thibaultbee.streampack.core.elements.utils.extensions.clone +import io.github.thibaultbee.streampack.core.elements.utils.extensions.deepCopy import io.github.thibaultbee.streampack.core.elements.utils.extensions.isAnnexB import io.github.thibaultbee.streampack.core.elements.utils.extensions.isAvcc import io.github.thibaultbee.streampack.core.elements.utils.extensions.skipStartCode @@ -176,7 +176,7 @@ class TrackChunks( } val frameCopy = - frame.copy(rawBuffer = frame.rawBuffer.clone()) // Do not keep mediacodec buffer + frame.copy(rawBuffer = frame.rawBuffer.deepCopy()) // Do not keep mediacodec buffer chunks.last().add(frameId, frameCopy) frameId++ } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt index e4beee7ab..88dd17ffc 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt @@ -253,18 +253,18 @@ fun ByteBuffer.toByteArray(): ByteArray { } /** - * Clone [ByteBuffer]. + * Deep copy of [ByteBuffer]. * The position of the original [ByteBuffer] will be 0 after the clone. */ -fun ByteBuffer.clone(): ByteBuffer { +fun ByteBuffer.deepCopy(): ByteBuffer { val originalPosition = this.position() try { - val clone = if (isDirect) { + val copy = if (isDirect) { ByteBuffer.allocateDirect(this.remaining()) } else { ByteBuffer.allocate(this.remaining()) } - return clone.put(this).apply { rewind() } + return copy.put(this).apply { rewind() } } finally { this.position(originalPosition) } diff --git a/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt b/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt index 3ceff1035..489364031 100644 --- a/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt +++ b/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt @@ -117,7 +117,7 @@ class ByteBufferExtensionsKtTest { ) testBuffer.position(2) - val clonedBuffer = testBuffer.clone() + val clonedBuffer = testBuffer.deepCopy() assertArrayEquals( testBuffer.toByteArray(), clonedBuffer.toByteArray() ) From 44dffa4a42cc5f345fbf22affee822e4e6bc0733 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Mon, 2 Feb 2026 11:14:42 +0100 Subject: [PATCH 4/5] fix(core): processor: consumer audio effect must receive a deep copy of the current audio buffer. Because in callback mode, the rawBuffer belongs to the codec input buffer pool and it could be released before the effect processes the buffer --- .../streampack/core/elements/data/Frame.kt | 20 +++++++++++++++++++ .../elements/processing/RawFramePullPush.kt | 7 ------- .../processing/audio/AudioFrameProcessor.kt | 10 +++++----- .../utils/extensions/ByteBufferExtensions.kt | 20 +++++++++++++++++++ .../elements/utils/pool/ByteBufferPool.kt | 2 +- .../core/pipelines/inputs/AudioInput.kt | 9 +++++++-- 6 files changed, 53 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt index c9c08acea..ab615b1ad 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt @@ -16,7 +16,9 @@ package io.github.thibaultbee.streampack.core.elements.data import android.media.MediaFormat +import io.github.thibaultbee.streampack.core.elements.utils.extensions.deepCopy import io.github.thibaultbee.streampack.core.elements.utils.pool.FramePool +import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool import io.github.thibaultbee.streampack.core.elements.utils.pool.RawFramePool import java.io.Closeable import java.nio.ByteBuffer @@ -36,6 +38,24 @@ interface RawFrame : Closeable { val timestampInUs: Long } +/** + * Deep copy the [RawFrame.rawBuffer] into a new [RawFrame]. + * + * For better memory allocation, you should close the returned frame after usage. + */ +fun RawFrame.deepCopy( + bufferPool: IBufferPool, + timestampInUs: Long = this.timestampInUs, + onClosed: (RawFrame) -> Unit = {} +): RawFrame { + val copy = this.rawBuffer.deepCopy(bufferPool) + return copy( + rawBuffer = copy, timestampInUs = timestampInUs, onClosed = { + onClosed(it) + bufferPool.put(copy) + } + ) +} /** * Copy a [RawFrame] to a new [RawFrame]. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt index eca215f31..aa5bfedd5 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt @@ -31,13 +31,6 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import java.util.concurrent.atomic.AtomicBoolean -fun RawFramePullPush( - frameProcessor: IProcessor, - onFrame: suspend (RawFrame) -> Unit, - processDispatcher: CoroutineDispatcher, - isDirect: Boolean = true -) = RawFramePullPush(frameProcessor, onFrame, ByteBufferPool(isDirect), processDispatcher) - /** * A component that pull a frame from an input and push it to [onFrame] output. * diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt index f6ccc4934..ab249b756 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt @@ -16,14 +16,16 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio import io.github.thibaultbee.streampack.core.elements.data.RawFrame -import io.github.thibaultbee.streampack.core.elements.data.copy +import io.github.thibaultbee.streampack.core.elements.data.deepCopy import io.github.thibaultbee.streampack.core.elements.processing.IProcessor +import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import java.io.Closeable +import java.nio.ByteBuffer import java.util.function.IntFunction /** @@ -32,6 +34,7 @@ import java.util.function.IntFunction * It is not thread-safe. */ class AudioFrameProcessor( + private val bufferPool: IBufferPool, dispatcher: CoroutineDispatcher, private val effects: MutableList = mutableListOf() ) : IProcessor, IAudioFrameProcessor, Closeable, MutableList by effects { @@ -50,10 +53,7 @@ class AudioFrameProcessor( isMuted: Boolean, data: RawFrame ) { - val consumeFrame = - data.copy( - rawBuffer = data.rawBuffer.duplicate().asReadOnlyBuffer() - ) + val consumeFrame = data.deepCopy(bufferPool) coroutineScope.launch { effect.consume(isMuted, consumeFrame) } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt index 88dd17ffc..09cee1ff1 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt @@ -15,6 +15,7 @@ */ package io.github.thibaultbee.streampack.core.elements.utils.extensions +import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool import java.nio.ByteBuffer import java.nio.ByteOrder import java.nio.charset.StandardCharsets @@ -256,6 +257,7 @@ fun ByteBuffer.toByteArray(): ByteArray { * Deep copy of [ByteBuffer]. * The position of the original [ByteBuffer] will be 0 after the clone. */ +@Deprecated("Use ByteBufferPool instead") fun ByteBuffer.deepCopy(): ByteBuffer { val originalPosition = this.position() try { @@ -270,6 +272,24 @@ fun ByteBuffer.deepCopy(): ByteBuffer { } } +/** + * Deep copy of [ByteBuffer] from [IBufferPool]. + * + * Don't forget to put the returned [ByteBuffer] to the buffer pool when you are done with it. + * + * @param pool [IBufferPool] to use + * @return [ByteBuffer] deep copy + */ +fun ByteBuffer.deepCopy(pool: IBufferPool): ByteBuffer { + val originalPosition = this.position() + try { + val copy = pool.get(this.remaining()) + return copy.put(this).apply { rewind() } + } finally { + this.position(originalPosition) + } +} + /** * For AVC and HEVC */ diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt index ce748def0..f3036b9b8 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt @@ -87,4 +87,4 @@ class ByteBufferPool(private val isDirect: Boolean) : IBufferPool, C buffers.clear() } } -} \ No newline at end of file +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt index 85ac22509..999bc16e8 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt @@ -28,6 +28,7 @@ import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioFrameS import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSource import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSourceInternal import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob +import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool import io.github.thibaultbee.streampack.core.elements.utils.pool.RawFramePool import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.pipelines.DispatcherProvider.Companion.THREAD_NAME_AUDIO_PREPROCESSING @@ -132,13 +133,15 @@ internal class AudioInput( } // PROCESSOR + private val bufferPool = ByteBufferPool(true) + /** * The audio processor. */ - private val processorInternal = AudioFrameProcessor(dispatcherProvider.default) + private val processorInternal = AudioFrameProcessor(bufferPool, dispatcherProvider.default) override val processor: IAudioFrameProcessor = processorInternal private val port = if (config is PushConfig) { - PushAudioPort(processorInternal, config, dispatcherProvider) + PushAudioPort(processorInternal, config, bufferPool, dispatcherProvider) } else { CallbackAudioPort(processorInternal) // No threading needed, called from encoder thread } @@ -390,11 +393,13 @@ private sealed interface IAudioPort : Streamable, Releasable { private class PushAudioPort( audioFrameProcessor: AudioFrameProcessor, config: PushConfig, + bufferPool: ByteBufferPool, dispatcherProvider: IAudioDispatcherProvider ) : IAudioPort { private val audioPullPush = RawFramePullPush( audioFrameProcessor, config.onFrame, + bufferPool, dispatcherProvider.createAudioDispatcher( 1, THREAD_NAME_AUDIO_PREPROCESSING From 05c1fe65fa56c67b15db5c5483da4749cd60908b Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Mon, 2 Feb 2026 12:08:32 +0100 Subject: [PATCH 5/5] fix(core): processor: use `CopyOnWriteArrayList` instead of MutableList to avoid `ConcurrentModificationException` --- .../core/elements/processing/audio/AudioFrameProcessor.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt index ab249b756..f10f6af43 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt @@ -26,17 +26,16 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import java.io.Closeable import java.nio.ByteBuffer +import java.util.concurrent.CopyOnWriteArrayList import java.util.function.IntFunction /** * Audio frame processor. - * - * It is not thread-safe. */ class AudioFrameProcessor( private val bufferPool: IBufferPool, dispatcher: CoroutineDispatcher, - private val effects: MutableList = mutableListOf() + private val effects: CopyOnWriteArrayList = CopyOnWriteArrayList() ) : IProcessor, IAudioFrameProcessor, Closeable, MutableList by effects { private val coroutineScope = CoroutineScope(dispatcher + SupervisorJob())