Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ ktor = "3.0.0" # https://github.com/ktorio/ktor
kotlinx-coroutines = "1.9.0" # https://github.com/Kotlin/kotlinx.coroutines
kotlinx-serialization = "1.7.3" # https://github.com/Kotlin/kotlinx.serialization
kotlinx-datetime = "0.6.1" # https://github.com/Kotlin/kotlinx-datetime
kotlinx-collections-immutable = "0.3.8" # https://github.com/Kotlin/kotlinx.collections.immutable
kord-cache = "0.5.4" # https://github.com/kordlib/cache

# implementation dependencies
Expand Down Expand Up @@ -54,6 +55,7 @@ kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-c
kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "kotlinx-coroutines" }
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" }
kotlinx-datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version.ref = "kotlinx-datetime" }
kotlinx-collections-immutable = { module = "org.jetbrains.kotlinx:kotlinx-collections-immutable", version.ref = "kotlinx-collections-immutable"}

# other
kotlin-logging = { module = "io.github.oshai:kotlin-logging", version.ref = "kotlin-logging" }
Expand Down
14 changes: 5 additions & 9 deletions voice/api/voice.api
Original file line number Diff line number Diff line change
Expand Up @@ -1061,31 +1061,27 @@ public final class dev/kord/voice/io/ReadableByteArrayCursor {
}

public final class dev/kord/voice/streams/DefaultStreams : dev/kord/voice/streams/Streams {
public fun <init> (Ldev/kord/voice/gateway/VoiceGateway;Ldev/kord/voice/udp/VoiceUdpSocket;Ldev/kord/voice/encryption/strategies/NonceStrategy;)V
public fun <init> (Ldev/kord/voice/udp/VoiceUdpSocket;Ldev/kord/voice/encryption/strategies/NonceStrategy;)V
public fun getIncomingAudioFrames ()Lkotlinx/coroutines/flow/Flow;
public synthetic fun getIncomingAudioPackets ()Lkotlinx/coroutines/flow/Flow;
public fun getIncomingAudioPackets ()Lkotlinx/coroutines/flow/SharedFlow;
public synthetic fun getIncomingUserStreams ()Lkotlinx/coroutines/flow/Flow;
public fun getIncomingUserStreams ()Lkotlinx/coroutines/flow/SharedFlow;
public fun getSsrcToUser ()Ljava/util/Map;
public fun listen ([BLio/ktor/network/sockets/SocketAddress;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getIncomingUserStreams ()Lkotlinx/coroutines/flow/Flow;
public fun listen ([BLio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class dev/kord/voice/streams/NOPStreams : dev/kord/voice/streams/Streams {
public static final field INSTANCE Ldev/kord/voice/streams/NOPStreams;
public fun getIncomingAudioFrames ()Lkotlinx/coroutines/flow/Flow;
public fun getIncomingAudioPackets ()Lkotlinx/coroutines/flow/Flow;
public fun getIncomingUserStreams ()Lkotlinx/coroutines/flow/Flow;
public fun getSsrcToUser ()Ljava/util/Map;
public fun listen ([BLio/ktor/network/sockets/SocketAddress;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun listen ([BLio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class dev/kord/voice/streams/Streams {
public abstract fun getIncomingAudioFrames ()Lkotlinx/coroutines/flow/Flow;
public abstract fun getIncomingAudioPackets ()Lkotlinx/coroutines/flow/Flow;
public abstract fun getIncomingUserStreams ()Lkotlinx/coroutines/flow/Flow;
public abstract fun getSsrcToUser ()Ljava/util/Map;
public abstract fun listen ([BLio/ktor/network/sockets/SocketAddress;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun listen ([BLio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class dev/kord/voice/udp/AudioFrameSender {
Expand Down
1 change: 1 addition & 0 deletions voice/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
api(projects.common)
api(projects.gateway)

implementation(libs.kotlinx.collections.immutable)
implementation(libs.kotlin.logging)
implementation(libs.slf4j.api)

Expand Down
3 changes: 2 additions & 1 deletion voice/src/main/kotlin/VoiceConnectionBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ public class VoiceConnectionBuilder(
nonceStrategy
)
)

val streams =
streams ?: if (receiveVoice) DefaultStreams(voiceGateway, udpSocket, nonceStrategy) else NOPStreams
streams ?: if (receiveVoice) DefaultStreams(udpSocket, nonceStrategy) else NOPStreams

return VoiceConnection(
voiceConnectionData,
Expand Down
17 changes: 12 additions & 5 deletions voice/src/main/kotlin/handlers/StreamsHandler.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package dev.kord.voice.handlers

import dev.kord.voice.gateway.Close
import dev.kord.voice.gateway.Ready
import dev.kord.voice.gateway.SessionDescription
import dev.kord.voice.gateway.VoiceEvent
import dev.kord.common.entity.Snowflake
import dev.kord.voice.gateway.*
import dev.kord.voice.gateway.handler.GatewayEventHandler
import dev.kord.voice.streams.Streams
import io.ktor.network.sockets.*
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.collections.immutable.persistentHashMapOf
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
Expand All @@ -22,15 +22,22 @@ internal class StreamsHandler(

private var streamsJob: Job? by atomic(null)

private val s2u = atomic(persistentHashMapOf<UInt, Snowflake>())

@OptIn(ExperimentalUnsignedTypes::class)
override suspend fun start() = coroutineScope {
on<Speaking> { speaking ->
s2u.update { it.put(speaking.ssrc, speaking.userId) }
}

on<Ready> {
server.value = InetSocketAddress(it.ip, it.port)
}

on<SessionDescription> {
streamsJob?.cancel()
streamsJob = launch { streams.listen(it.secretKey.toUByteArray().toByteArray(), server.value!!) }
streamsJob =
launch { streams.listen(it.secretKey.toUByteArray().toByteArray(), server.value!!, s2u = { s2u.value[it] }) }
}

on<Close> {
Expand Down
53 changes: 8 additions & 45 deletions voice/src/main/kotlin/streams/DefaultStreams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,28 @@ import dev.kord.common.entity.Snowflake
import dev.kord.voice.AudioFrame
import dev.kord.voice.encryption.XSalsa20Poly1305Codec
import dev.kord.voice.encryption.strategies.NonceStrategy
import dev.kord.voice.gateway.Speaking
import dev.kord.voice.gateway.VoiceGateway
import dev.kord.voice.io.*
import dev.kord.voice.udp.PayloadType
import dev.kord.voice.udp.RTPPacket
import dev.kord.voice.udp.VoiceUdpSocket
import io.github.oshai.kotlinlogging.KotlinLogging
import io.ktor.network.sockets.*
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*

private val defaultStreamsLogger = KotlinLogging.logger { }

@KordVoice
public class DefaultStreams(
private val voiceGateway: VoiceGateway,
private val udp: VoiceUdpSocket,
private val nonceStrategy: NonceStrategy
) : Streams {
private fun CoroutineScope.listenForIncoming(key: ByteArray, server: SocketAddress) {
private var s2u: (UInt) -> Snowflake? = { null }

override suspend fun listen(key: ByteArray, server: SocketAddress, s2u: (UInt) -> Snowflake?): Unit =
coroutineScope {
[email protected] = s2u

udp.incoming
.filter { it.address == server }
.mapNotNull { RTPPacket.fromPacket(it.packet) }
Expand All @@ -41,49 +38,15 @@ public class DefaultStreams(
.launchIn(this)
}

private fun CoroutineScope.listenForUserFrames() {
voiceGateway.events
.filterIsInstance<Speaking>()
.buffer(Channel.UNLIMITED)
.onEach { speaking ->
_ssrcToUser.update {
it.computeIfAbsent(speaking.ssrc) {
incomingAudioFrames
.filter { (ssrc, _) -> speaking.ssrc == ssrc }
.map { (_, frame) -> speaking.userId to frame }
.onEach { value -> _incomingUserAudioFrames.emit(value) }
.launchIn(this)

speaking.userId
}

it
}
}.launchIn(this)
}

override suspend fun listen(key: ByteArray, server: SocketAddress): Unit = coroutineScope {
listenForIncoming(key, server)
listenForUserFrames()
}

private val _incomingAudioPackets: MutableSharedFlow<RTPPacket> = MutableSharedFlow()

override val incomingAudioPackets: SharedFlow<RTPPacket> = _incomingAudioPackets

override val incomingAudioFrames: Flow<Pair<UInt, AudioFrame>>
get() = incomingAudioPackets.map { it.ssrc to AudioFrame(it.payload.toByteArray()) }

private val _incomingUserAudioFrames: MutableSharedFlow<Pair<Snowflake, AudioFrame>> =
MutableSharedFlow()

override val incomingUserStreams: SharedFlow<Pair<Snowflake, AudioFrame>> =
_incomingUserAudioFrames

private val _ssrcToUser: AtomicRef<MutableMap<UInt, Snowflake>> =
atomic(mutableMapOf())

override val ssrcToUser: Map<UInt, Snowflake> get() = _ssrcToUser.value
override val incomingUserStreams: Flow<Pair<Snowflake, AudioFrame>>
get() = incomingAudioFrames.mapNotNull { (ssrc, frame) -> s2u(ssrc)?.to(frame) }
}

private fun Flow<RTPPacket>.decrypt(nonceStrategy: NonceStrategy, key: ByteArray): Flow<RTPPacket> {
Expand Down
3 changes: 1 addition & 2 deletions voice/src/main/kotlin/streams/NOPStreams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import kotlinx.coroutines.flow.flow

@KordVoice
public object NOPStreams : Streams {
override suspend fun listen(key: ByteArray, server: SocketAddress) {}
override suspend fun listen(key: ByteArray, server: SocketAddress, s2u: (UInt) -> Snowflake?) {}

override val incomingAudioPackets: Flow<RTPPacket> = flow { }
override val incomingAudioFrames: Flow<Pair<UInt, AudioFrame>> = flow { }
override val incomingUserStreams: Flow<Pair<Snowflake, AudioFrame>> = flow { }
override val ssrcToUser: Map<UInt, Snowflake> = emptyMap()
}
9 changes: 3 additions & 6 deletions voice/src/main/kotlin/streams/Streams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import dev.kord.common.entity.Snowflake
import dev.kord.voice.AudioFrame
import dev.kord.voice.udp.RTPPacket
import io.ktor.network.sockets.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow

/**
Expand All @@ -14,8 +15,9 @@ import kotlinx.coroutines.flow.Flow
public interface Streams {
/**
* Starts propagating packets from [server] with the following [key] to decrypt the incoming frames.
* @param s2u a way to convert a ssrc to a snowflake, and should generally be handled by the voice connection.
*/
public suspend fun listen(key: ByteArray, server: SocketAddress)
public suspend fun listen(key: ByteArray, server: SocketAddress, s2u: (UInt) -> Snowflake?)

/**
* A flow of all incoming [dev.kord.voice.udp.RTPPacket]s through the UDP connection.
Expand All @@ -32,9 +34,4 @@ public interface Streams {
* Streams for every user should be built over time and will not be immediately available.
*/
public val incomingUserStreams: Flow<Pair<Snowflake, AudioFrame>>

/**
* A map of [ssrc][UInt]s to their corresponding [userId][Snowflake].
*/
public val ssrcToUser: Map<UInt, Snowflake>
}