diff --git a/Dockerfile b/Dockerfile index b468f15..68b5a81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM docker:20-dind +FROM docker:dind # RUN sed 's/http:\/\/fr\./http:\/\//' /etc/apt/sources.list WORKDIR /root @@ -6,26 +6,32 @@ WORKDIR /root RUN apk update RUN apk add make \ bash \ + openjdk21-jdk \ curl \ openssl-dev \ python3-dev \ gmp-dev \ tar -RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing criu-dev \ - openjdk20-jre-headless +RUN apk add --repository http://dl-cdn.alpinelinux.org/alpine/edge/testing/x86_64/criu-dev +RUN apk add tar -ADD *.jar Node.jar -ADD config.json config.json +COPY . . ADD vdf-cli vdf-cli -ADD Start.sh Start.sh -ADD SaveContainer.sh SaveContainer.sh -ADD RunContainer.sh RunContainer.sh +#ADD / +#ADD *.jar Node.jar +#ADD config.json config.json +#ADD vdf-cli vdf-cli +#ADD Start.sh Start.sh +#ADD SaveContainer.sh SaveContainer.sh +#ADD RunContainer.sh RunContainer.sh + # ADD stress.sh stress.sh # COPY stress.tar stress.tar +RUN ./gradlew assemble jar RUN chmod 777 Start.sh RUN chmod 777 vdf-cli -RUN mv vdf-cli /usr/bin/vdf-cli +# RUN mv vdf-cli /usr/bin/vdf-cli RUN chmod +x Start.sh ENTRYPOINT ["./Start.sh"] \ No newline at end of file diff --git a/Start.sh b/Start.sh index 8a25108..16f7a19 100644 --- a/Start.sh +++ b/Start.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [ "$#" -ne 1 ]; then -echo "Sleeping..." -# sleep $(shuf -i 1-120 -n 1) -fi +#if [ "$#" -ne 1 ]; then +#echo "Sleeping..." +#sleep $(shuf -i 1-120 -n 1) +#fi dockerd --experimental & while [ ! -f /var/run/docker.pid ] @@ -11,10 +11,11 @@ echo "Waiting for docker daemon to initialize!" sleep 5 done -# docker load -i stress.tar -# if [ "$1" -eq 5005 ]; then -# echo "We're trusted. Running stress test." -# bash stress.sh & -# fi +#docker load -i stress.tar -java -jar Node.jar $1 \ No newline at end of file +#if [ "$1" -eq 5005 ]; then +# echo "We're trusted. Running stress test." +# bash stress.sh & +#fi + +java -jar build/libs/nion-core.jar $1 $2 \ No newline at end of file diff --git a/build.gradle b/build.gradle index 68897e2..afe79ec 100644 --- a/build.gradle +++ b/build.gradle @@ -1,43 +1,50 @@ plugins { id 'java' - id("org.jetbrains.kotlin.jvm") version "1.9.0" - id("org.jetbrains.kotlin.plugin.serialization") version "1.9.0" + id("org.jetbrains.kotlin.jvm") version "2.2.0" + id("org.jetbrains.kotlin.plugin.serialization") version "2.2.0" } group 'nion.network' -version '0.1' - -configurations.implementation.setCanBeResolved(true) +//version '0.1' +//configurations.implementation.setCanBeResolved(true) repositories { mavenCentral() } dependencies { - testImplementation('org.junit.jupiter:junit-jupiter-api:5.8.1') - testImplementation('org.junit.jupiter:junit-jupiter-engine:5.8.1') + testImplementation("org.junit.jupiter:junit-jupiter-api:5.10.2") + testImplementation("org.junit.jupiter:junit-jupiter-engine:5.10.2") + - implementation('org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.9.0') + implementation('org.jetbrains.kotlin:kotlin-stdlib-jdk8:2.2.0') implementation('org.jetbrains.kotlinx:kotlinx-serialization-protobuf:1.3.0') implementation('org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0') - implementation("com.influxdb:influxdb-client-kotlin:4.0.0") + implementation("com.influxdb:influxdb-client-kotlin:7.3.0") implementation('org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2-native-mt') implementation("io.javalin:javalin:5.6.1") implementation("org.slf4j:slf4j-simple:2.0.7") -} -tasks.withType(Test).configureEach { - it.scanForTestClasses = false - it.useJUnitPlatform() + implementation("org.jetbrains.exposed:exposed-core:1.0.0-rc-3") + implementation("org.jetbrains.exposed:exposed-jdbc:1.0.0-rc-3") + implementation("org.jetbrains.exposed:exposed-dao:1.0.0-rc-3") + implementation("org.xerial:sqlite-jdbc:3.49.1.0") } +tasks.test { + useJUnitPlatform() + scanForTestClasses = false + failOnNoDiscoveredTests = false +} tasks.jar { duplicatesStrategy = DuplicatesStrategy.EXCLUDE - setArchivesBaseName("Nion-Network") + archiveBaseName.set("nion-core") manifest { attributes.put("Main-Class", "LaunchKt") } - from { - configurations.implementation.collect { it.isDirectory() ? it : zipTree(it) } - } + dependsOn(configurations.runtimeClasspath) + from({ + configurations.runtimeClasspath + .collect { it.isDirectory() ? it : zipTree(it) } + }) } \ No newline at end of file diff --git a/config.json b/config.json index 8003d52..c8296b0 100644 --- a/config.json +++ b/config.json @@ -11,7 +11,7 @@ "influxUrl": "http://88.200.63.190:8086", "influxToken": "LtudFFrFu_b91XO6bCr0mzJuKV-Ffzom2phPlbn9FqNgDiYYcFRmh0mt63sHwPTo7JUv3jCe9RQNmB4AHMpKbQ==", "dashboardEnabled": false, - "loggingEnabled": false, + "loggingEnabled": true, "trustedLoggingEnabled": false, "historyMinuteClearance": 1, "historyCleaningFrequency": 1, @@ -21,4 +21,4 @@ "useCriu": true, "useTreeBasedMessageRoutingProtocol": true, "treeChildrenCount": 2 -} \ No newline at end of file +} diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 5d0f28c..5459771 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.0.0-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists # Could be upgraded to 8.3, depending on the user needs. \ No newline at end of file diff --git a/src/main/kotlin/Launch.kt b/src/main/kotlin/Launch.kt index 32ceb7f..2ca9ed0 100644 --- a/src/main/kotlin/Launch.kt +++ b/src/main/kotlin/Launch.kt @@ -13,10 +13,12 @@ import java.io.File @ExperimentalSerializationApi fun main(args: Array) { tryAndReport { - System.setProperty("kotlinx.coroutines.scheduler", "off") // Needed due to bugs with linux and docker containers. TODO: fix ASAP - val configuration = Json.decodeFromString(File("./config.json").readText()) + System.setProperty("kotlinx.coroutines.scheduler", "off") + + val passedConfiguration = args.getOrNull(0) ?: throw Exception("Configuration should be passed to the docker container.") + val configuration = Json.decodeFromString(passedConfiguration) Logger.toggleLogging(configuration.loggingEnabled) - args.getOrNull(0)?.toInt()?.apply { + args.getOrNull(1)?.toInt()?.apply { configuration.port = this } Nion(configuration).apply { diff --git a/src/main/kotlin/Nion.kt b/src/main/kotlin/Nion.kt index 9d5e83b..2aae9f4 100644 --- a/src/main/kotlin/Nion.kt +++ b/src/main/kotlin/Nion.kt @@ -1,10 +1,14 @@ import chain.ChainBuilder +import database.BlockTable import kotlinx.serialization.ExperimentalSerializationApi import logging.Dashboard import logging.Logger import network.data.Endpoint import network.data.MessageProcessing import network.data.messages.Message +import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.SchemaUtils +import org.jetbrains.exposed.v1.jdbc.transactions.transaction import utils.asHex import utils.launchCoroutine import utils.runAfter @@ -48,6 +52,10 @@ class Nion(configuration: Configuration) : ChainBuilder(configuration) { Dashboard.reportException(Exception("Local address: $localAddress!")) return } + Database.connect("jdbc:sqlite:nion.db", "org.sqlite.JDBC") + transaction { + SchemaUtils.create(BlockTable) + } attemptBootstrap() attemptInclusion() } diff --git a/src/main/kotlin/chain/Chain.kt b/src/main/kotlin/chain/Chain.kt index 2dc9433..0765056 100644 --- a/src/main/kotlin/chain/Chain.kt +++ b/src/main/kotlin/chain/Chain.kt @@ -1,7 +1,10 @@ package chain import chain.data.Block +import database.BlockTable import logging.Logger +import org.jetbrains.exposed.v1.jdbc.insert +import org.jetbrains.exposed.v1.jdbc.transactions.transaction import utils.CircularList import utils.asHex import utils.tryWithLock @@ -13,19 +16,23 @@ import kotlin.concurrent.withLock * on 16/11/2021 at 16:21 * using IntelliJ IDEA */ -class Chain(private val verifiableDelay: VerifiableDelay, private val initialDifficulty: Int, private val committeeSize: Int) { +class Chain( + private val verifiableDelay: VerifiableDelay, + private val initialDifficulty: Int, + private val committeeSize: Int +) { private val lock = ReentrantLock(true) private val blocks = CircularList(50) // ToDo: Remove, do not use Circular List but use persistent storage! /** Returns the last block in the chain. */ fun getLastBlock(): Block? { - return lock.withLock { blocks.lastOrNull() } + return lock.withLock { blocks.items.lastOrNull() } } /** Returns max 100 blocks [from slot][fromSlot].*/ fun getLastBlocks(fromSlot: Long): List { - return lock.withLock { blocks.takeLastWhile { it.slot > fromSlot } }.take(100) + return lock.withLock { blocks.items.takeLastWhile { it.slot > fromSlot } }.take(100) } /** Attempts to add each block one by one to the chain. */ @@ -34,7 +41,8 @@ class Chain(private val verifiableDelay: VerifiableDelay, private val initialDif val lastBlock = getLastBlock() val lastHash = lastBlock?.hash ?: "FFFF".toByteArray() val difficulty = lastBlock?.difficulty ?: initialDifficulty - val isLegitimate = nextBlock.slot == (lastBlock?.slot ?: 0) + 1 // verifiableDelay.verifyProof(lastHash, difficulty, nextBlock.vdfProof) + val isLegitimate = nextBlock.slot == (lastBlock?.slot + ?: 0) + 1 // verifiableDelay.verifyProof(lastHash, difficulty, nextBlock.vdfProof) if (!isLegitimate) { Logger.trace("Proof is not legitimate for block ${nextBlock.slot}!") Logger.chain("Last hash: ${lastHash.asHex}") @@ -45,9 +53,25 @@ class Chain(private val verifiableDelay: VerifiableDelay, private val initialDif return false } lock.tryWithLock { + blocks.add(nextBlock) // ToDo: Put chain history in some sort of storage instead of keeping in memory. + transaction { + BlockTable.insert { + it[BlockTable.slot] = nextBlock.slot + it[BlockTable.difficulty] = nextBlock.difficulty + it[BlockTable.blockProducer] = nextBlock.blockProducer + it[BlockTable.timestamp] = nextBlock.timestamp + it[BlockTable.precedentHash] = nextBlock.precedentHash.toHexString() + it[BlockTable.votes] = nextBlock.votes + it[BlockTable.committee] = nextBlock.committee.joinToString() + it[BlockTable.hash] = nextBlock.hash.toHexString() + it[BlockTable.votedMembers] = nextBlock.votedMembers.joinToString() + + } + } } + Logger.chain("Block[${nextBlock.votes}/$committeeSize] added [${nextBlock.slot}].") } return true diff --git a/src/main/kotlin/chain/VerifiableDelay.kt b/src/main/kotlin/chain/VerifiableDelay.kt index 891171d..8d7142c 100644 --- a/src/main/kotlin/chain/VerifiableDelay.kt +++ b/src/main/kotlin/chain/VerifiableDelay.kt @@ -11,6 +11,7 @@ class VerifiableDelay { /** Runs a vdf-cli command and returns the output of vdf computation. */ fun computeProof(difficulty: Int, hash: ByteArray): String { + return "0000" // Issues with computing vdf-cli val hexHash = hash.asHex val needed = hexHash.length % 2 val processBuilder = ProcessBuilder() diff --git a/src/main/kotlin/database/Tables.kt b/src/main/kotlin/database/Tables.kt new file mode 100644 index 0000000..7f89997 --- /dev/null +++ b/src/main/kotlin/database/Tables.kt @@ -0,0 +1,18 @@ +package database + +import org.jetbrains.exposed.v1.core.dao.id.IntIdTable + +/** + * @author Mihael Berčič on 9. 1. 26. + */ +object BlockTable : IntIdTable("blocks") { + val slot = long("slot") + val difficulty = integer("difficulty").default(0) + val blockProducer = text("block_producer") + val timestamp = long("timestamp") + val precedentHash = varchar("precendtHash", 255) + val votes = integer("votes").default(0) + val committee = text("committee") + val hash = text("hash") + val votedMembers = text("voted_members") +} \ No newline at end of file diff --git a/src/main/kotlin/docker/DockerContainer.kt b/src/main/kotlin/docker/DockerContainer.kt index 4748742..9d47253 100644 --- a/src/main/kotlin/docker/DockerContainer.kt +++ b/src/main/kotlin/docker/DockerContainer.kt @@ -1,5 +1,7 @@ package docker +import com.google.gson.annotations.SerializedName +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import utils.CircularList @@ -25,6 +27,28 @@ data class DockerContainer( var updated: Long = System.currentTimeMillis(), var latestSnapshot: String? = null ) { - val averageCpuUsage get(): Double = cpuUsage.elements().average().takeIf { !it.isNaN() } ?: 0.0 - val averageMemoryUsage get(): Double = memoryUsage.elements().average().takeIf { !it.isNaN() } ?: 0.0 -} \ No newline at end of file + val averageCpuUsage get(): Double = cpuUsage.items.average().takeIf { !it.isNaN() } ?: 0.0 + val averageMemoryUsage get(): Double = memoryUsage.items.average().takeIf { !it.isNaN() } ?: 0.0 +} + +@Serializable +data class DockerStatsModel( + @SerialName("BlockIO") + val blockIo: String, + @SerialName("CPUPerc") + val cpuPercentage: String, + @SerialName("Container") + val container: String, + @SerialName("ID") + val id: String, + @SerialName("MemPerc") + val memoryPercentage: String, + @SerialName("MemUsage") + val memUsage: String, + @SerialName("Name") + val name: String, + @SerialName("NetIO") + val netIo: String, + @SerialName("PIDs") + val pids: String, +) diff --git a/src/main/kotlin/docker/DockerProxy.kt b/src/main/kotlin/docker/DockerProxy.kt index 7a8fb16..532aa0f 100644 --- a/src/main/kotlin/docker/DockerProxy.kt +++ b/src/main/kotlin/docker/DockerProxy.kt @@ -3,6 +3,8 @@ package docker import Configuration import chain.data.Block import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.json.Json import logging.Dashboard import logging.Logger import network.data.Endpoint @@ -10,8 +12,6 @@ import network.data.clusters.Cluster import network.data.messages.Message import utils.CircularList import utils.runAfter -import java.nio.ByteBuffer -import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock @@ -65,7 +65,7 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con fun sendDockerStatistics(block: Block, blockProducer: String, clusters: Map>) { val slot = block.slot val currentTime = System.currentTimeMillis() - localContainers.entries.removeIf { (_, container) -> currentTime - container.updated >= 1000 } + localContainers.entries.removeIf { (_, container) -> currentTime - container.updated >= 30_000 } val mapped: List = localContainers.values.map { it.copy(id = networkMappings[it.id] ?: it.id) } val localStatistics = DockerStatistics(localNode.publicKey, mapped, slot) @@ -92,53 +92,40 @@ abstract class DockerProxy(configuration: Configuration) : MigrationStrategy(con /** Starts a process of `docker stats` and keeps the [localStatistics] up to date. */ private fun listenForDockerStatistics() { val numberOfElements = (configuration.slotDuration / 1000).toInt() + val json = Json { + ignoreUnknownKeys = true + } + while (true) { + // TODO: I believe this could be optimised + val process = ProcessBuilder() + .command("docker", "stats", "--no-stream", "--no-trunc", "--format", "{{ json . }}") + .redirectErrorStream(true) + .start() + val reader = process.inputStream.bufferedReader() + + reader.readLines().forEach { line -> + val stats = json.decodeFromString(line) + val pids = stats.pids.toIntOrNull() ?: 0 + val cpuPercentage = stats.cpuPercentage.trim('%').toDoubleOrNull() ?: 0.0 + val memoryPercentage = stats.memoryPercentage.trim('%').toDoubleOrNull() ?: 0.0 - val process = ProcessBuilder() - .command("docker", "stats", "--no-trunc", "--format", "{{.ID}} {{.CPUPerc}} {{.MemPerc}} {{.PIDs}}") - .redirectErrorStream(true) - .start() - - val buffer = ByteBuffer.allocate(100_000) - val escapeSequence = byteArrayOf(0x1B, 0x5B, 0x32, 0x4A, 0x1B, 0x5B, 0x48) // Escape sequence of CLI output. - var escapeIndex = 0 - process.inputStream.use { inputStream -> - while (true) { - try { - val byte = inputStream.read().toByte() - if (byte < 0) break - buffer.put(byte) - if (byte == escapeSequence[escapeIndex]) escapeIndex++ else escapeIndex = 0 - if (escapeIndex != escapeSequence.size) continue // If escape sequence was not detected, continue reading data. - val length = buffer.position() - escapeSequence.size - if (length > 0) String(buffer.array(), 0, length).split("\n").map { line -> - if (line.isNotEmpty()) { - val fields = line.split(" ") - val containerId = fields[0] - if (fields.none { it.contains("-") || it.isEmpty() }) { - val cpuPercentage = fields[1].trim('%').toDouble() - val memoryPercentage = fields[2].trim('%').toDouble() - val activeProcesses = fields[3].toInt() - val container = localContainers.computeIfAbsent(containerId) { - DockerContainer(containerId, activeProcesses, CircularList(numberOfElements), CircularList(numberOfElements)) - } - container.apply { - cpuUsage.add(cpuPercentage) - memoryUsage.add(memoryPercentage) - updated = System.currentTimeMillis() - processes = activeProcesses - } - } else localContainers[containerId]?.updated = System.currentTimeMillis() - } - } - buffer.clear() - escapeIndex = 0 - } catch (e: Exception) { - buffer.clear() - escapeIndex = 0 - Dashboard.reportException(e) + val container = localContainers.computeIfAbsent(stats.id) { + DockerContainer(stats.id, pids, CircularList(numberOfElements), CircularList(numberOfElements)) + } + container.apply { + cpuUsage.add(cpuPercentage) + memoryUsage.add(memoryPercentage) + processes = pids + updated = System.currentTimeMillis() } } + Thread.sleep(5000) } + + /* + + */ + } } \ No newline at end of file diff --git a/src/main/kotlin/network/data/messages/Message.kt b/src/main/kotlin/network/data/messages/Message.kt index 30ad040..6181f04 100644 --- a/src/main/kotlin/network/data/messages/Message.kt +++ b/src/main/kotlin/network/data/messages/Message.kt @@ -1,5 +1,8 @@ +@file:OptIn(ExperimentalSerializationApi::class) + package network.data.messages +import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.Serializable import kotlinx.serialization.decodeFromByteArray import kotlinx.serialization.protobuf.ProtoBuf diff --git a/src/main/kotlin/network/messaging/Server.kt b/src/main/kotlin/network/messaging/Server.kt index 635adf3..02eabff 100644 --- a/src/main/kotlin/network/messaging/Server.kt +++ b/src/main/kotlin/network/messaging/Server.kt @@ -65,6 +65,7 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati Logger.trace("Clearing message history...") messageHistory.entries.removeIf { (_, value) -> currentTime - value > maximumAge } } + } abstract fun processMessage(message: Message) @@ -91,7 +92,10 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati /** Returns [Configuration.broadcastSpreadPercentage] number of nodes. */ fun pickRandomNodes(amount: Int = 0): List { - val toTake = if (amount > 0) amount else 5 + (configuration.broadcastSpreadPercentage * Integer.max(totalKnownNodes, 1) / 100) + val toTake = if (amount > 0) amount else 5 + (configuration.broadcastSpreadPercentage * Integer.max( + totalKnownNodes, + 1 + ) / 100) return getRandomNodes(toTake).filter { it.identifier != localNode.identifier } } @@ -160,7 +164,11 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati val message = ProtoBuf.decodeFromByteArray(data) if (alreadySeen(message.uid.asHex)) return@use processingQueue.add(message) - if (message.endpoint.transmissionType == TransmissionType.Broadcast) broadcast(TransmissionLayer.TCP, message.uid.asHex, data) + if (message.endpoint.transmissionType == TransmissionType.Broadcast) broadcast( + TransmissionLayer.TCP, + message.uid.asHex, + data + ) } } } @@ -199,14 +207,21 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati broadcastNodes.add(neighbour) broadcastNodes.addAll(childrenKeys) broadcastNodes.addAll(neighbourChildrenKeys) - Logger.error("[$index] [$children] Neighbour: $neighbourIndex ... Children: ${childrenKeys.joinToString(",") { "${shuffled.indexOf(it)}" }}") + Logger.error( + "[$index] [$children] Neighbour: $neighbourIndex ... Children: ${ + childrenKeys.joinToString( + "," + ) { "${shuffled.indexOf(it)}" } + }" + ) } else -> broadcastNodes.addAll(pickRandomNodes().map { it.publicKey }) } - val knownAndNotInSet = knownNodes.values.map(Node::publicKey).filter { !validatorSet.activeValidators.contains(it) } + val knownAndNotInSet = + knownNodes.values.map(Node::publicKey).filter { !validatorSet.activeValidators.contains(it) } broadcastNodes.addAll(knownAndNotInSet) Logger.trace("We have to retransmit to [total: ${shuffled.size}] --> ${broadcastNodes.size} nodes.") @@ -214,7 +229,8 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati broadcastNodes.forEach { publicKey -> query(publicKey) { val outgoingData = OutgoingData(it, *data) - val outgoingQueue = if (transmissionLayer == TransmissionLayer.UDP) udpOutgoingQueue else tcpOutgoingQueue + val outgoingQueue = + if (transmissionLayer == TransmissionLayer.UDP) udpOutgoingQueue else tcpOutgoingQueue outgoingQueue.add(outgoingData) } } @@ -242,7 +258,11 @@ abstract class Server(val configuration: Configuration) : RPCManager(configurati if (alreadySeen(packetId) || alreadySeen(messageId)) return@tryAndReport val endpoint = Endpoint.byId(inputStream.read().toByte()) ?: return@tryAndReport - if (endpoint.transmissionType == TransmissionType.Broadcast) broadcast(TransmissionLayer.UDP, messageId, packet.data.copyOf()) + if (endpoint.transmissionType == TransmissionType.Broadcast) broadcast( + TransmissionLayer.UDP, + messageId, + packet.data.copyOf() + ) val totalSlices = inputStream.readInt() val currentSlice = inputStream.readInt() diff --git a/src/main/kotlin/network/rpc/RPCManager.kt b/src/main/kotlin/network/rpc/RPCManager.kt index 30b1dea..3135b8c 100644 --- a/src/main/kotlin/network/rpc/RPCManager.kt +++ b/src/main/kotlin/network/rpc/RPCManager.kt @@ -7,6 +7,7 @@ import io.javalin.websocket.WsConnectContext import io.javalin.websocket.WsContext import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json +import logging.Logger import network.kademlia.Kademlia import java.util.concurrent.TimeUnit @@ -30,6 +31,19 @@ open class RPCManager(configuration: Configuration) : Kademlia(configuration) { webServer.ws("/") { it.onConnect(this::onConnect) it.onClose(this::onClose) + it.onMessage { context -> + val message = context.message() + when (message) { + "dht" -> { + context.send(Json.encodeToString(knownNodes.values.toList())) + } + + else -> { + Logger.debug("A message of $message is being ignored via WS") + } + } + + } } } } @@ -47,6 +61,7 @@ open class RPCManager(configuration: Configuration) : Kademlia(configuration) { .add(webSocket) } sendToSubscribed(Topic.Logging, "Hello, this is Nion node!") + sendToSubscribed(Topic.Logging, knownNodes.values.toList()) } /** diff --git a/src/main/kotlin/utils/CircularList.kt b/src/main/kotlin/utils/CircularList.kt index 89a8ece..27401df 100644 --- a/src/main/kotlin/utils/CircularList.kt +++ b/src/main/kotlin/utils/CircularList.kt @@ -8,15 +8,12 @@ import kotlinx.serialization.Serializable * using IntelliJ IDEA */ @Serializable -class CircularList(private val maxCapacity: Int) : ArrayList(maxCapacity) { +class CircularList(private val maxCapacity: Int, val items: ArrayList = ArrayList(maxCapacity)) { /** Adds a new element to the list and removes the oldest element.*/ - override fun add(element: T): Boolean { - if (size == maxCapacity) removeFirst() - return super.add(element) + fun add(element: T): Boolean { + if (items.size == maxCapacity) items.removeFirst() + return items.add(element) } - /** Returns all elements in this circular list. */ - fun elements() = toList() - } \ No newline at end of file diff --git a/src/test/kotlin/CircularListTest.kt b/src/test/kotlin/CircularListTest.kt index ecd64bd..c075545 100644 --- a/src/test/kotlin/CircularListTest.kt +++ b/src/test/kotlin/CircularListTest.kt @@ -13,6 +13,6 @@ class CircularListTest { fun circularTest() { val circularList = CircularList(3) for (i in 0..10) circularList.add(i) - assertEquals(circularList.elements(), listOf(8, 9, 10)) + assertEquals(circularList.items, listOf(8, 9, 10)) } } \ No newline at end of file diff --git a/stress.sh b/stress.sh index f1c0092..b32e321 100644 --- a/stress.sh +++ b/stress.sh @@ -1,5 +1,5 @@ #!/bin/bash -count=200 +count=$1 echo "Running stress test of $count apps..." for i in $(seq $count); do