diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt index c9c08acea..ab615b1ad 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt @@ -16,7 +16,9 @@ package io.github.thibaultbee.streampack.core.elements.data import android.media.MediaFormat +import io.github.thibaultbee.streampack.core.elements.utils.extensions.deepCopy import io.github.thibaultbee.streampack.core.elements.utils.pool.FramePool +import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool import io.github.thibaultbee.streampack.core.elements.utils.pool.RawFramePool import java.io.Closeable import java.nio.ByteBuffer @@ -36,6 +38,24 @@ interface RawFrame : Closeable { val timestampInUs: Long } +/** + * Deep copy the [RawFrame.rawBuffer] into a new [RawFrame]. + * + * For better memory allocation, you should close the returned frame after usage. + */ +fun RawFrame.deepCopy( + bufferPool: IBufferPool, + timestampInUs: Long = this.timestampInUs, + onClosed: (RawFrame) -> Unit = {} +): RawFrame { + val copy = this.rawBuffer.deepCopy(bufferPool) + return copy( + rawBuffer = copy, timestampInUs = timestampInUs, onClosed = { + onClosed(it) + bufferPool.put(copy) + } + ) +} /** * Copy a [RawFrame] to a new [RawFrame]. diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt index cbccabd76..28dde5d47 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/mp4/models/TrackChunks.kt @@ -63,7 +63,7 @@ import io.github.thibaultbee.streampack.core.elements.utils.av.descriptors.SLCon import io.github.thibaultbee.streampack.core.elements.utils.av.video.avc.AVCDecoderConfigurationRecord import io.github.thibaultbee.streampack.core.elements.utils.av.video.hevc.HEVCDecoderConfigurationRecord import io.github.thibaultbee.streampack.core.elements.utils.av.video.vpx.VPCodecConfigurationRecord -import io.github.thibaultbee.streampack.core.elements.utils.extensions.clone +import io.github.thibaultbee.streampack.core.elements.utils.extensions.deepCopy import io.github.thibaultbee.streampack.core.elements.utils.extensions.isAnnexB import io.github.thibaultbee.streampack.core.elements.utils.extensions.isAvcc import io.github.thibaultbee.streampack.core.elements.utils.extensions.skipStartCode @@ -176,7 +176,7 @@ class TrackChunks( } val frameCopy = - frame.copy(rawBuffer = frame.rawBuffer.clone()) // Do not keep mediacodec buffer + frame.copy(rawBuffer = frame.rawBuffer.deepCopy()) // Do not keep mediacodec buffer chunks.last().add(frameId, frameCopy) frameId++ } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IEffectProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IEffectProcessor.kt new file mode 100644 index 000000000..0bad99623 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IEffectProcessor.kt @@ -0,0 +1,62 @@ +/* + * Copyright 2025 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.core.elements.processing + +/** + * Interface to process a data and returns a result. + * + * @param T type of data to proces) + */ +interface IProcessor { + /** + * Process a data and returns a result. + * + * @param data data to process + * @return processed data + */ + fun process(data: T): T +} + +/** + * Interface to process a data and returns a result. + * + * @param T type of data to proces) + */ +interface IEffectProcessor { + /** + * Process a data and returns a result. + * + * @param isMuted whether the data contains only 0 + * @param data data to process + * @return processed data + */ + fun process(isMuted: Boolean, data: T): T +} + +/** + * Interface to process a data. + * + * @param T type of data to process + */ +interface IEffectConsumer { + /** + * Process a data. + * + * @param isMuted whether the data contains only 0 + * @param data data to process + */ + fun consume(isMuted: Boolean, data: T) +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IFrameProcessor.kt deleted file mode 100644 index dc9375392..000000000 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/IFrameProcessor.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2025 Thibault B. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.github.thibaultbee.streampack.core.elements.processing - -import io.github.thibaultbee.streampack.core.elements.data.Frame -import io.github.thibaultbee.streampack.core.elements.data.RawFrame - -/** - * Interface to process a frame. - * - * @param T type of frame to process (probably [RawFrame] or [Frame]) - */ -interface IFrameProcessor { - fun processFrame(frame: T): T -} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt index f87b531ad..aa5bfedd5 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/RawFramePullPush.kt @@ -31,13 +31,6 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import java.util.concurrent.atomic.AtomicBoolean -fun RawFramePullPush( - frameProcessor: IFrameProcessor, - onFrame: suspend (RawFrame) -> Unit, - processDispatcher: CoroutineDispatcher, - isDirect: Boolean = true -) = RawFramePullPush(frameProcessor, onFrame, ByteBufferPool(isDirect), processDispatcher) - /** * A component that pull a frame from an input and push it to [onFrame] output. * @@ -47,7 +40,7 @@ fun RawFramePullPush( * @param processDispatcher the dispatcher to process frames on */ class RawFramePullPush( - private val frameProcessor: IFrameProcessor, + private val frameProcessor: IProcessor, val onFrame: suspend (RawFrame) -> Unit, private val bufferPool: ByteBufferPool, private val processDispatcher: CoroutineDispatcher, @@ -99,7 +92,7 @@ class RawFramePullPush( // Process buffer with effects val processedFrame = try { - frameProcessor.processFrame(rawFrame) + frameProcessor.process(rawFrame) } catch (t: Throwable) { Logger.e(TAG, "Failed to pre-process frame: ${t.message}") continue diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt index 83af9a5a7..bde293f80 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioEffects.kt @@ -16,30 +16,47 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio import io.github.thibaultbee.streampack.core.elements.data.RawFrame -import io.github.thibaultbee.streampack.core.elements.processing.IFrameProcessor +import io.github.thibaultbee.streampack.core.elements.processing.IEffectConsumer +import io.github.thibaultbee.streampack.core.elements.processing.IEffectProcessor import java.io.Closeable /** * The base audio effect. */ -interface IAudioEffect : IFrameProcessor, Closeable +sealed interface IAudioEffect : Closeable /** - * Mute audio effect. + * An audio effect that can be dispatched to another thread. The result is not use by the audio pipeline. + * Example: a VU meter. */ -class MuteEffect : IAudioEffect { +interface IConsumerAudioEffect : IAudioEffect, IEffectConsumer + +/** + * An audio effect that can't be dispatched to another thread. The result is used by the audio pipeline. + * + * The [RawFrame.rawBuffer] can't be modified. + */ +interface IProcessorAudioEffect : IAudioEffect, IEffectProcessor + +/** + * An audio effect that mute the audio. + */ +class MuteEffect : IProcessorAudioEffect { private var mutedByteArray: ByteArray? = null - override fun processFrame(frame: RawFrame): RawFrame { - val remaining = frame.rawBuffer.remaining() - val position = frame.rawBuffer.position() + override fun process(isMuted: Boolean, data: RawFrame): RawFrame { + if (!isMuted) { + return data + } + val remaining = data.rawBuffer.remaining() + val position = data.rawBuffer.position() if (remaining != mutedByteArray?.size) { mutedByteArray = ByteArray(remaining) } - frame.rawBuffer.put(mutedByteArray!!) - frame.rawBuffer.position(position) + data.rawBuffer.put(mutedByteArray!!) + data.rawBuffer.position(position) - return frame + return data } override fun close() { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt index 4cd6e7b7a..f10f6af43 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/AudioFrameProcessor.kt @@ -16,22 +16,76 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio import io.github.thibaultbee.streampack.core.elements.data.RawFrame -import io.github.thibaultbee.streampack.core.elements.processing.IFrameProcessor +import io.github.thibaultbee.streampack.core.elements.data.deepCopy +import io.github.thibaultbee.streampack.core.elements.processing.IProcessor +import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import java.io.Closeable +import java.nio.ByteBuffer +import java.util.concurrent.CopyOnWriteArrayList +import java.util.function.IntFunction /** * Audio frame processor. - * - * Only supports mute effect for now. */ -class AudioFrameProcessor : IFrameProcessor, - IAudioFrameProcessor { +class AudioFrameProcessor( + private val bufferPool: IBufferPool, + dispatcher: CoroutineDispatcher, + private val effects: CopyOnWriteArrayList = CopyOnWriteArrayList() +) : IProcessor, IAudioFrameProcessor, Closeable, MutableList by effects { + private val coroutineScope = CoroutineScope(dispatcher + SupervisorJob()) + + /** + * Whether the audio is muted. + * + * When the audio is muted, the audio effect are not processed. Only consumer effects are processed. + */ override var isMuted = false private val muteEffect = MuteEffect() - override fun processFrame(frame: RawFrame): RawFrame { - if (isMuted) { - return muteEffect.processFrame(frame) + private fun launchConsumerEffect( + effect: IConsumerAudioEffect, + isMuted: Boolean, + data: RawFrame + ) { + val consumeFrame = data.deepCopy(bufferPool) + coroutineScope.launch { + effect.consume(isMuted, consumeFrame) + } + } + + override fun process(data: RawFrame): RawFrame { + val isMuted = isMuted + + var processedFrame = muteEffect.process(isMuted, data) + + effects.forEach { + if (it is IProcessorAudioEffect) { + processedFrame = it.process(isMuted, processedFrame) + } else if (it is IConsumerAudioEffect) { + launchConsumerEffect(it, isMuted, processedFrame) + } } - return frame + + return processedFrame + } + + override fun close() { + effects.forEach { it.close() } + effects.clear() + + muteEffect.close() + + coroutineScope.cancel() + } + + @Deprecated("'fun toArray(generator: IntFunction!>!): Array<(out) T!>!' is deprecated. This declaration is redundant in Kotlin and might be removed soon.") + @Suppress("DEPRECATION") + override fun toArray(generator: IntFunction?>): Array { + return super.toArray(generator) } -} \ No newline at end of file +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt index 24c7606e1..19cc6ba9c 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/audio/IAudioFrameProcessor.kt @@ -20,7 +20,7 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio */ interface IAudioFrameProcessor { /** - * Mute audio. + * Whether the processor is muted. */ var isMuted: Boolean } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt index e4beee7ab..09cee1ff1 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensions.kt @@ -15,6 +15,7 @@ */ package io.github.thibaultbee.streampack.core.elements.utils.extensions +import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool import java.nio.ByteBuffer import java.nio.ByteOrder import java.nio.charset.StandardCharsets @@ -253,18 +254,37 @@ fun ByteBuffer.toByteArray(): ByteArray { } /** - * Clone [ByteBuffer]. + * Deep copy of [ByteBuffer]. * The position of the original [ByteBuffer] will be 0 after the clone. */ -fun ByteBuffer.clone(): ByteBuffer { +@Deprecated("Use ByteBufferPool instead") +fun ByteBuffer.deepCopy(): ByteBuffer { val originalPosition = this.position() try { - val clone = if (isDirect) { + val copy = if (isDirect) { ByteBuffer.allocateDirect(this.remaining()) } else { ByteBuffer.allocate(this.remaining()) } - return clone.put(this).apply { rewind() } + return copy.put(this).apply { rewind() } + } finally { + this.position(originalPosition) + } +} + +/** + * Deep copy of [ByteBuffer] from [IBufferPool]. + * + * Don't forget to put the returned [ByteBuffer] to the buffer pool when you are done with it. + * + * @param pool [IBufferPool] to use + * @return [ByteBuffer] deep copy + */ +fun ByteBuffer.deepCopy(pool: IBufferPool): ByteBuffer { + val originalPosition = this.position() + try { + val copy = pool.get(this.remaining()) + return copy.put(this).apply { rewind() } } finally { this.position(originalPosition) } diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt index ce748def0..f3036b9b8 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/utils/pool/ByteBufferPool.kt @@ -87,4 +87,4 @@ class ByteBufferPool(private val isDirect: Boolean) : IBufferPool, C buffers.clear() } } -} \ No newline at end of file +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt index 9560378e1..999bc16e8 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt @@ -28,6 +28,7 @@ import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioFrameS import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSource import io.github.thibaultbee.streampack.core.elements.sources.audio.IAudioSourceInternal import io.github.thibaultbee.streampack.core.elements.utils.ConflatedJob +import io.github.thibaultbee.streampack.core.elements.utils.pool.ByteBufferPool import io.github.thibaultbee.streampack.core.elements.utils.pool.RawFramePool import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.pipelines.DispatcherProvider.Companion.THREAD_NAME_AUDIO_PREPROCESSING @@ -132,15 +133,17 @@ internal class AudioInput( } // PROCESSOR + private val bufferPool = ByteBufferPool(true) + /** * The audio processor. */ - private val frameProcessorInternal = AudioFrameProcessor() - override val processor: IAudioFrameProcessor = frameProcessorInternal + private val processorInternal = AudioFrameProcessor(bufferPool, dispatcherProvider.default) + override val processor: IAudioFrameProcessor = processorInternal private val port = if (config is PushConfig) { - PushAudioPort(frameProcessorInternal, config, dispatcherProvider) + PushAudioPort(processorInternal, config, bufferPool, dispatcherProvider) } else { - CallbackAudioPort(frameProcessorInternal) // No threading needed, called from encoder thread + CallbackAudioPort(processorInternal) // No threading needed, called from encoder thread } // CONFIG @@ -355,6 +358,15 @@ internal class AudioInput( ) } + try { + processorInternal.close() + } catch (t: Throwable) { + Logger.w( + TAG, + "release: Can't close audio processor: ${t.message}" + ) + } + isStreamingJob.cancel() } coroutineScope.coroutineContext.cancelChildren() @@ -381,11 +393,13 @@ private sealed interface IAudioPort : Streamable, Releasable { private class PushAudioPort( audioFrameProcessor: AudioFrameProcessor, config: PushConfig, + bufferPool: ByteBufferPool, dispatcherProvider: IAudioDispatcherProvider ) : IAudioPort { private val audioPullPush = RawFramePullPush( audioFrameProcessor, config.onFrame, + bufferPool, dispatcherProvider.createAudioDispatcher( 1, THREAD_NAME_AUDIO_PREPROCESSING @@ -430,7 +444,7 @@ private class CallbackAudioPort(private val audioFrameProcessor: AudioFrameProce val timestampInUs = source.fillAudioFrame(buffer) pool.get(buffer, timestampInUs) } - return audioFrameProcessor.processFrame(frame) + return audioFrameProcessor.process(frame) } } diff --git a/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt b/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt index 3ceff1035..489364031 100644 --- a/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt +++ b/core/src/test/java/io/github/thibaultbee/streampack/core/elements/utils/extensions/ByteBufferExtensionsKtTest.kt @@ -117,7 +117,7 @@ class ByteBufferExtensionsKtTest { ) testBuffer.position(2) - val clonedBuffer = testBuffer.clone() + val clonedBuffer = testBuffer.deepCopy() assertArrayEquals( testBuffer.toByteArray(), clonedBuffer.toByteArray() )