diff --git a/build.sbt b/build.sbt index 3ea261c..3ffc89f 100644 --- a/build.sbt +++ b/build.sbt @@ -80,7 +80,7 @@ libraryDependencies ++= Seq( "org.scalatestplus.play" %% "scalatestplus-play" % "3.1.2" % Test, "com.fasterxml.jackson.module" % "jackson-module-scala_2.12" % "2.9.2" -) ++ grpcDeps ++ akkaDeps ++ circeDependencies ++ akkaStreamsContribDeps +) ++ grpcDeps ++ akkaDeps ++ circeDependencies ++ akkaStreamsContribDeps ++ elasticSearchDeps // Disable API Documentation sources in (Compile, doc) := Seq.empty diff --git a/project/Dependencies.scala b/project/Dependencies.scala index faf9851..1d4fbee 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,6 +9,7 @@ object Dependencies { val catsVersion = "0.9.0" val grpcVersion = "1.9.0" val scaleCubeVersion = "1.0.7" + val elasticSearchVersion = "6.3.6" val akkaStreamsContribDeps = Seq( "com.typesafe.akka" %% "akka-stream-contrib" % "0.9" @@ -52,4 +53,16 @@ object Dependencies { "io.scalecube" % "scalecube-cluster", "io.scalecube" % "scalecube-transport" ).map(_ % scaleCubeVersion) + + val elasticSearchDeps = Seq( + // "com.sksamuel.elastic4s" %% "elastic4s-streams", + "com.sksamuel.elastic4s" %% "elastic4s-core", + "com.sksamuel.elastic4s" %% "elastic4s-http", + "com.sksamuel.elastic4s" %% "elastic4s-circe", + "com.sksamuel.elastic4s" %% "elastic4s-embedded" + ).map(_ % elasticSearchVersion) ++ Seq( + "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elasticSearchVersion % "test", + "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elasticSearchVersion % "test" + ) } + diff --git a/test/org/tronscan/elasticsearch/ImportSpec.scala b/test/org/tronscan/elasticsearch/ImportSpec.scala new file mode 100644 index 0000000..d0d4da1 --- /dev/null +++ b/test/org/tronscan/elasticsearch/ImportSpec.scala @@ -0,0 +1,77 @@ +package org.tronscan.elasticsearch + + +import com.sksamuel.elastic4s.http.ElasticClient +import com.sksamuel.elastic4s.http.ElasticDsl._ +import io.circe.Json +import io.circe.generic.auto._ +import io.circe.syntax._ +import org.apache.http.HttpHost +import org.elasticsearch.client.RestClient +import org.joda.time.DateTime +import org.specs2.mutable._ +import play.api.Logger + +class ImportSpec extends Specification { + + "Elastic Search" should { + + "Import" in { + + val client = ElasticClient.fromRestClient(RestClient.builder(new HttpHost("localhost", 9200, "http")).build()) +// val localNode = LocalNode("mycluster", "/tmp/esdata") +// val client = localNode.client(shutdownNodeOnClose = true) + + Logger.info("DELETING INDEX") + client.execute { deleteIndex("blockchain") }.await + + Logger.info("CREATING INDEX") + client.execute { + createIndex("blockchain").mappings( + mapping("transaction").fields( + longField("block").index(true), + textField("hash").index(true), + dateField("date_created"), + booleanField("confirmed"), + objectField("contract_data"), + intField("contract_type"), + textField("owner_address"), + textField("to_address"), + textField("data") + ) + ) + }.await + + Logger.info("CREATING TRANSACTIONS") + + val transactions = for (i <- 1 to 2500000) yield { + indexInto("blockchain" / "transaction").doc(Json.obj( + "block" -> i.asJson, + "hash" -> "f54e1d904eb1744f3309f65b2940f56917426b91fc58d51a6096cdb970154044".asJson, + "date_created" -> DateTime.now.asJson, + "contract_type" -> 4.asJson, + "confirmed" -> true.asJson, + "contract_data" -> Json.obj( + "to" -> "TXJ1F79y9NaEQ2bVy7Db4VCe4ndM5QSYvS".asJson, + "from" -> "TQREAvGre73gkS2TUyNhbxcP2dpoe2fXPD".asJson, + "amount" -> 2000000.asJson + ), + "owner_address" -> "TQREAvGre73gkS2TUyNhbxcP2dpoe2fXPD".asJson, + "to_address" -> "TV3NmH1enpu4X5Hur8Z16eCyNymTqKXQDP".asJson, + "data" -> "Test Data!".asJson + ).toString()) + } + + Logger.info("POSTING TO ES") + + transactions.grouped(100000).foreach { bulks => + Logger.info("POSTING BATCH") + val response = client.execute { bulk(bulks) }.await + } + +// println(response) + + ok + } + } +}