Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ libraryDependencies ++= Seq(
"org.scalatestplus.play" %% "scalatestplus-play" % "3.1.2" % Test,
"com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.9.2"

) ++ grpcDeps ++ akkaDeps ++ circeDependencies ++ akkaStreamsContribDeps
) ++ grpcDeps ++ akkaDeps ++ circeDependencies ++ akkaStreamsContribDeps ++ elasticSearchDeps

// Disable API Documentation
sources in (Compile, doc) := Seq.empty
Expand Down
13 changes: 13 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ object Dependencies {
val catsVersion = "0.9.0"
val grpcVersion = "1.9.0"
val scaleCubeVersion = "1.0.7"
val elasticSearchVersion = "6.3.6"

val akkaStreamsContribDeps = Seq(
"com.typesafe.akka" %% "akka-stream-contrib" % "0.9"
Expand Down Expand Up @@ -52,4 +53,16 @@ object Dependencies {
"io.scalecube" % "scalecube-cluster",
"io.scalecube" % "scalecube-transport"
).map(_ % scaleCubeVersion)

val elasticSearchDeps = Seq(
// "com.sksamuel.elastic4s" %% "elastic4s-streams",
"com.sksamuel.elastic4s" %% "elastic4s-core",
"com.sksamuel.elastic4s" %% "elastic4s-http",
"com.sksamuel.elastic4s" %% "elastic4s-circe",
"com.sksamuel.elastic4s" %% "elastic4s-embedded"
).map(_ % elasticSearchVersion) ++ Seq(
"com.sksamuel.elastic4s" %% "elastic4s-testkit" % elasticSearchVersion % "test",
"com.sksamuel.elastic4s" %% "elastic4s-embedded" % elasticSearchVersion % "test"
)
}

77 changes: 77 additions & 0 deletions test/org/tronscan/elasticsearch/ImportSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.tronscan.elasticsearch


import com.sksamuel.elastic4s.http.ElasticClient
import com.sksamuel.elastic4s.http.ElasticDsl._
import io.circe.Json
import io.circe.generic.auto._
import io.circe.syntax._
import org.apache.http.HttpHost
import org.elasticsearch.client.RestClient
import org.joda.time.DateTime
import org.specs2.mutable._
import play.api.Logger

class ImportSpec extends Specification {

"Elastic Search" should {

"Import" in {

val client = ElasticClient.fromRestClient(RestClient.builder(new HttpHost("localhost", 9200, "http")).build())
// val localNode = LocalNode("mycluster", "/tmp/esdata")
// val client = localNode.client(shutdownNodeOnClose = true)

Logger.info("DELETING INDEX")
client.execute { deleteIndex("blockchain") }.await

Logger.info("CREATING INDEX")
client.execute {
createIndex("blockchain").mappings(
mapping("transaction").fields(
longField("block").index(true),
textField("hash").index(true),
dateField("date_created"),
booleanField("confirmed"),
objectField("contract_data"),
intField("contract_type"),
textField("owner_address"),
textField("to_address"),
textField("data")
)
)
}.await

Logger.info("CREATING TRANSACTIONS")

val transactions = for (i <- 1 to 2500000) yield {
indexInto("blockchain" / "transaction").doc(Json.obj(
"block" -> i.asJson,
"hash" -> "f54e1d904eb1744f3309f65b2940f56917426b91fc58d51a6096cdb970154044".asJson,
"date_created" -> DateTime.now.asJson,
"contract_type" -> 4.asJson,
"confirmed" -> true.asJson,
"contract_data" -> Json.obj(
"to" -> "TXJ1F79y9NaEQ2bVy7Db4VCe4ndM5QSYvS".asJson,
"from" -> "TQREAvGre73gkS2TUyNhbxcP2dpoe2fXPD".asJson,
"amount" -> 2000000.asJson
),
"owner_address" -> "TQREAvGre73gkS2TUyNhbxcP2dpoe2fXPD".asJson,
"to_address" -> "TV3NmH1enpu4X5Hur8Z16eCyNymTqKXQDP".asJson,
"data" -> "Test Data!".asJson
).toString())
}

Logger.info("POSTING TO ES")

transactions.grouped(100000).foreach { bulks =>
Logger.info("POSTING BATCH")
val response = client.execute { bulk(bulks) }.await
}

// println(response)

ok
}
}
}