Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package io.github.thibaultbee.streampack.core.elements.data

import android.media.MediaFormat
import io.github.thibaultbee.streampack.core.elements.utils.extensions.deepCopy
import io.github.thibaultbee.streampack.core.elements.utils.pool.FramePool
import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool
import io.github.thibaultbee.streampack.core.elements.utils.pool.RawFramePool
import java.io.Closeable
import java.nio.ByteBuffer
Expand All @@ -36,6 +38,24 @@ interface RawFrame : Closeable {
val timestampInUs: Long
}

/**
* Deep copy the [RawFrame.rawBuffer] into a new [RawFrame].
*
* For better memory allocation, you should close the returned frame after usage.
*/
fun RawFrame.deepCopy(
bufferPool: IBufferPool<ByteBuffer>,
timestampInUs: Long = this.timestampInUs,
onClosed: (RawFrame) -> Unit = {}
): RawFrame {
val copy = this.rawBuffer.deepCopy(bufferPool)
return copy(
rawBuffer = copy, timestampInUs = timestampInUs, onClosed = {
onClosed(it)
bufferPool.put(copy)
}
)
}

/**
* Copy a [RawFrame] to a new [RawFrame].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import io.github.thibaultbee.streampack.core.elements.utils.av.descriptors.SLCon
import io.github.thibaultbee.streampack.core.elements.utils.av.video.avc.AVCDecoderConfigurationRecord
import io.github.thibaultbee.streampack.core.elements.utils.av.video.hevc.HEVCDecoderConfigurationRecord
import io.github.thibaultbee.streampack.core.elements.utils.av.video.vpx.VPCodecConfigurationRecord
import io.github.thibaultbee.streampack.core.elements.utils.extensions.clone
import io.github.thibaultbee.streampack.core.elements.utils.extensions.deepCopy
import io.github.thibaultbee.streampack.core.elements.utils.extensions.isAnnexB
import io.github.thibaultbee.streampack.core.elements.utils.extensions.isAvcc
import io.github.thibaultbee.streampack.core.elements.utils.extensions.skipStartCode
Expand Down Expand Up @@ -176,7 +176,7 @@ class TrackChunks(
}

val frameCopy =
frame.copy(rawBuffer = frame.rawBuffer.clone()) // Do not keep mediacodec buffer
frame.copy(rawBuffer = frame.rawBuffer.deepCopy()) // Do not keep mediacodec buffer
chunks.last().add(frameId, frameCopy)
frameId++
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2025 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.core.elements.processing

/**
* Interface to process a data and returns a result.
*
* @param T type of data to proces)
*/
interface IProcessor<T> {
/**
* Process a data and returns a result.
*
* @param data data to process
* @return processed data
*/
fun process(data: T): T
}

/**
* Interface to process a data and returns a result.
*
* @param T type of data to proces)
*/
interface IEffectProcessor<T> {
/**
* Process a data and returns a result.
*
* @param isMuted whether the data contains only 0
* @param data data to process
* @return processed data
*/
fun process(isMuted: Boolean, data: T): T
}

/**
* Interface to process a data.
*
* @param T type of data to process
*/
interface IEffectConsumer<T> {
/**
* Process a data.
*
* @param isMuted whether the data contains only 0
* @param data data to process
*/
fun consume(isMuted: Boolean, data: T)
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicBoolean

fun RawFramePullPush(
frameProcessor: IFrameProcessor<RawFrame>,
onFrame: suspend (RawFrame) -> Unit,
processDispatcher: CoroutineDispatcher,
isDirect: Boolean = true
) = RawFramePullPush(frameProcessor, onFrame, ByteBufferPool(isDirect), processDispatcher)

/**
* A component that pull a frame from an input and push it to [onFrame] output.
*
Expand All @@ -47,7 +40,7 @@ fun RawFramePullPush(
* @param processDispatcher the dispatcher to process frames on
*/
class RawFramePullPush(
private val frameProcessor: IFrameProcessor<RawFrame>,
private val frameProcessor: IProcessor<RawFrame>,
val onFrame: suspend (RawFrame) -> Unit,
private val bufferPool: ByteBufferPool,
private val processDispatcher: CoroutineDispatcher,
Expand Down Expand Up @@ -99,7 +92,7 @@ class RawFramePullPush(

// Process buffer with effects
val processedFrame = try {
frameProcessor.processFrame(rawFrame)
frameProcessor.process(rawFrame)
} catch (t: Throwable) {
Logger.e(TAG, "Failed to pre-process frame: ${t.message}")
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,47 @@
package io.github.thibaultbee.streampack.core.elements.processing.audio

import io.github.thibaultbee.streampack.core.elements.data.RawFrame
import io.github.thibaultbee.streampack.core.elements.processing.IFrameProcessor
import io.github.thibaultbee.streampack.core.elements.processing.IEffectConsumer
import io.github.thibaultbee.streampack.core.elements.processing.IEffectProcessor
import java.io.Closeable

/**
* The base audio effect.
*/
interface IAudioEffect : IFrameProcessor<RawFrame>, Closeable
sealed interface IAudioEffect : Closeable

/**
* Mute audio effect.
* An audio effect that can be dispatched to another thread. The result is not use by the audio pipeline.
* Example: a VU meter.
*/
class MuteEffect : IAudioEffect {
interface IConsumerAudioEffect : IAudioEffect, IEffectConsumer<RawFrame>

/**
* An audio effect that can't be dispatched to another thread. The result is used by the audio pipeline.
*
* The [RawFrame.rawBuffer] can't be modified.
*/
interface IProcessorAudioEffect : IAudioEffect, IEffectProcessor<RawFrame>

/**
* An audio effect that mute the audio.
*/
class MuteEffect : IProcessorAudioEffect {
private var mutedByteArray: ByteArray? = null

override fun processFrame(frame: RawFrame): RawFrame {
val remaining = frame.rawBuffer.remaining()
val position = frame.rawBuffer.position()
override fun process(isMuted: Boolean, data: RawFrame): RawFrame {
if (!isMuted) {
return data
}
val remaining = data.rawBuffer.remaining()
val position = data.rawBuffer.position()
if (remaining != mutedByteArray?.size) {
mutedByteArray = ByteArray(remaining)
}
frame.rawBuffer.put(mutedByteArray!!)
frame.rawBuffer.position(position)
data.rawBuffer.put(mutedByteArray!!)
data.rawBuffer.position(position)

return frame
return data
}

override fun close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,76 @@
package io.github.thibaultbee.streampack.core.elements.processing.audio

import io.github.thibaultbee.streampack.core.elements.data.RawFrame
import io.github.thibaultbee.streampack.core.elements.processing.IFrameProcessor
import io.github.thibaultbee.streampack.core.elements.data.deepCopy
import io.github.thibaultbee.streampack.core.elements.processing.IProcessor
import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import java.io.Closeable
import java.nio.ByteBuffer
import java.util.concurrent.CopyOnWriteArrayList
import java.util.function.IntFunction

/**
* Audio frame processor.
*
* Only supports mute effect for now.
*/
class AudioFrameProcessor : IFrameProcessor<RawFrame>,
IAudioFrameProcessor {
class AudioFrameProcessor(
private val bufferPool: IBufferPool<ByteBuffer>,
dispatcher: CoroutineDispatcher,
private val effects: CopyOnWriteArrayList<IAudioEffect> = CopyOnWriteArrayList()
) : IProcessor<RawFrame>, IAudioFrameProcessor, Closeable, MutableList<IAudioEffect> by effects {
private val coroutineScope = CoroutineScope(dispatcher + SupervisorJob())

/**
* Whether the audio is muted.
*
* When the audio is muted, the audio effect are not processed. Only consumer effects are processed.
*/
override var isMuted = false
private val muteEffect = MuteEffect()

override fun processFrame(frame: RawFrame): RawFrame {
if (isMuted) {
return muteEffect.processFrame(frame)
private fun launchConsumerEffect(
effect: IConsumerAudioEffect,
isMuted: Boolean,
data: RawFrame
) {
val consumeFrame = data.deepCopy(bufferPool)
coroutineScope.launch {
effect.consume(isMuted, consumeFrame)
}
}

override fun process(data: RawFrame): RawFrame {
val isMuted = isMuted

var processedFrame = muteEffect.process(isMuted, data)

effects.forEach {
if (it is IProcessorAudioEffect) {
processedFrame = it.process(isMuted, processedFrame)
} else if (it is IConsumerAudioEffect) {
launchConsumerEffect(it, isMuted, processedFrame)
}
}
return frame

return processedFrame
}

override fun close() {
effects.forEach { it.close() }
effects.clear()

muteEffect.close()

coroutineScope.cancel()
}

@Deprecated("'fun <T : Any!> toArray(generator: IntFunction<Array<(out) T!>!>!): Array<(out) T!>!' is deprecated. This declaration is redundant in Kotlin and might be removed soon.")
@Suppress("DEPRECATION")
override fun <T : Any?> toArray(generator: IntFunction<Array<out T?>?>): Array<out T?> {
return super.toArray(generator)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package io.github.thibaultbee.streampack.core.elements.processing.audio
*/
interface IAudioFrameProcessor {
/**
* Mute audio.
* Whether the processor is muted.
*/
var isMuted: Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.github.thibaultbee.streampack.core.elements.utils.extensions

import io.github.thibaultbee.streampack.core.elements.utils.pool.IBufferPool
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.charset.StandardCharsets
Expand Down Expand Up @@ -253,18 +254,37 @@ fun ByteBuffer.toByteArray(): ByteArray {
}

/**
* Clone [ByteBuffer].
* Deep copy of [ByteBuffer].
* The position of the original [ByteBuffer] will be 0 after the clone.
*/
fun ByteBuffer.clone(): ByteBuffer {
@Deprecated("Use ByteBufferPool instead")
fun ByteBuffer.deepCopy(): ByteBuffer {
val originalPosition = this.position()
try {
val clone = if (isDirect) {
val copy = if (isDirect) {
ByteBuffer.allocateDirect(this.remaining())
} else {
ByteBuffer.allocate(this.remaining())
}
return clone.put(this).apply { rewind() }
return copy.put(this).apply { rewind() }
} finally {
this.position(originalPosition)
}
}

/**
* Deep copy of [ByteBuffer] from [IBufferPool].
*
* Don't forget to put the returned [ByteBuffer] to the buffer pool when you are done with it.
*
* @param pool [IBufferPool] to use
* @return [ByteBuffer] deep copy
*/
fun ByteBuffer.deepCopy(pool: IBufferPool<ByteBuffer>): ByteBuffer {
val originalPosition = this.position()
try {
val copy = pool.get(this.remaining())
return copy.put(this).apply { rewind() }
} finally {
this.position(originalPosition)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@ class ByteBufferPool(private val isDirect: Boolean) : IBufferPool<ByteBuffer>, C
buffers.clear()
}
}
}
}
Loading
Loading