From a729161dc3091c7bb1e5be93c9822ab6c8d0f5a2 Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Sun, 18 Aug 2019 07:51:54 +0430 Subject: [PATCH 1/9] fix saving results in hbase --- web-graph/pom.xml | 1 + web-graph/src/main/java/in/nimbo/App.java | 46 +++++++++++++++-------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/web-graph/pom.xml b/web-graph/pom.xml index 2449326..8eb4816 100644 --- a/web-graph/pom.xml +++ b/web-graph/pom.xml @@ -34,6 +34,7 @@ ${hbase.version} + diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index 63f74fe..86895a8 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -6,25 +6,34 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.util.LongAccumulator; import scala.Tuple2; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.Collections; public class App { - public static void main(String[] args) { + + public static final String DEFAULT_PROTOCOL = "http://"; + + public static void main(String[] args) throws IOException { Config config = ConfigFactory.load("config"); String sparkAppName = config.getString("spark.app.name"); String hbaseXmlHadoop = config.getString("hbase.xml.url.in.hadoop"); @@ -49,6 +58,10 @@ public static void main(String[] args) { hbaseReadConfiguration.addResource(hbaseXmlHbase); hbaseWriteConfiguration.set(TableInputFormat.INPUT_TABLE, hbaseWriteTableName); + Job newAPIJobConfiguration = Job.getInstance(hbaseWriteConfiguration); + newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, hbaseWriteTableName); + newAPIJobConfiguration.setOutputFormatClass(TableOutputFormat.class); + SparkConf sparkConf = new SparkConf() .setAppName(sparkAppName) .set("spark.executor.cores", sparkExecutorCores) @@ -56,6 +69,7 @@ public static void main(String[] args) { JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); LongAccumulator domainToDomainPairSize = sparkContext.sc().longAccumulator(); + LongAccumulator domainToDomainPairWeightedSize = sparkContext.sc().longAccumulator(); JavaRDD hbaseRDD = sparkContext.newAPIHadoopRDD(hbaseReadConfiguration , TableInputFormat.class, ImmutableBytesWritable.class, Result.class).values(); @@ -66,8 +80,8 @@ public static void main(String[] args) { String source = new String(CellUtil.cloneRow(cell)); String destination = new String(CellUtil.cloneValue(cell)); try { - String sourceDomain = getDomain("http://" + source); - String destinationDomain = getDomain("http://" + destination); + String sourceDomain = getDomain(DEFAULT_PROTOCOL + source); + String destinationDomain = getDomain(DEFAULT_PROTOCOL + destination); Tuple2 domainPair = new Tuple2<>(sourceDomain, destinationDomain); domainToDomainPairSize.add(1); return Collections.singleton(new Tuple2<>(domainPair, 1)).iterator(); @@ -76,30 +90,30 @@ public static void main(String[] args) { } }); - // Todo : CellUtil.cloneRow() Or getValueArray() - JavaPairRDD, Integer> domainToDomainPairWeightRDD = domainToDomainPairRDD .reduceByKey((Function2) (integer, integer2) -> integer + integer2); domainToDomainPairWeightRDD.foreach((VoidFunction, Integer>>) tuple2IntegerTuple2 -> { + domainToDomainPairWeightedSize.add(1); System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); }); System.err.println("Domain To Domain Pair Size : " + domainToDomainPairSize.sum()); + System.err.println("Domain To Domain Pair Weighted Size : " + domainToDomainPairWeightedSize.sum()); + + JavaPairRDD hbasePuts = domainToDomainPairWeightRDD + .mapToPair((PairFunction, Integer>, ImmutableBytesWritable, Put>) t -> { + Put put = new Put(Bytes.toBytes(t._1._1 + ":" + t._1._2)); + put.addColumn(Bytes.toBytes(hbaseWriteColumnFamily), + Bytes.toBytes(hbaseWriteQualifier), + Bytes.toBytes(t._2)); -// JavaPairRDD hbasePuts = domainToDomainPairWeightRDD -// .mapToPair((PairFunction, Integer>, ImmutableBytesWritable, Put>) t -> { -// Put put = new Put(Bytes.toBytes(t._1._1 + ":" + t._1._2)); -// put.addColumn(Bytes.toBytes(hbaseWriteColumnFamily), -// Bytes.toBytes(hbaseWriteQualifier), -// Bytes.toBytes(t._2)); -// -// return new Tuple2<>(new ImmutableBytesWritable(), put); -// }); + return new Tuple2<>(new ImmutableBytesWritable(), put); + }); -// hbasePuts.saveAsNewAPIHadoopDataset(hbaseWriteConfiguration); + hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration()); - sparkContext.close(); + sparkContext.stop(); } public static String getDomain(String url) throws MalformedURLException { From 73afd0e70367a8c3a41075e0c5a0b0ff5af46f79 Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Sun, 18 Aug 2019 08:41:07 +0430 Subject: [PATCH 2/9] change value to qualifier getter in distination domain --- web-graph/src/main/java/in/nimbo/App.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index 86895a8..cb73fb2 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -21,6 +21,8 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; @@ -31,6 +33,8 @@ public class App { + private static Logger logger = LoggerFactory.getLogger(App.class); + public static final String DEFAULT_PROTOCOL = "http://"; public static void main(String[] args) throws IOException { @@ -78,7 +82,7 @@ public static void main(String[] args) throws IOException { JavaPairRDD, Integer> domainToDomainPairRDD = hbaseCells.flatMapToPair(cell -> { String source = new String(CellUtil.cloneRow(cell)); - String destination = new String(CellUtil.cloneValue(cell)); + String destination = new String(CellUtil.cloneQualifier(cell)); try { String sourceDomain = getDomain(DEFAULT_PROTOCOL + source); String destinationDomain = getDomain(DEFAULT_PROTOCOL + destination); @@ -98,8 +102,8 @@ public static void main(String[] args) throws IOException { System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); }); - System.err.println("Domain To Domain Pair Size : " + domainToDomainPairSize.sum()); - System.err.println("Domain To Domain Pair Weighted Size : " + domainToDomainPairWeightedSize.sum()); + logger.info("Domain To Domain Pair Size : " + domainToDomainPairSize.sum()); + logger.info("Domain To Domain Pair Weighted Size : " + domainToDomainPairWeightedSize.sum()); JavaPairRDD hbasePuts = domainToDomainPairWeightRDD .mapToPair((PairFunction, Integer>, ImmutableBytesWritable, Put>) t -> { From 86a48f5e2ae96aa7c1d06f52ee36760cd49ef2bd Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Sun, 18 Aug 2019 08:44:08 +0430 Subject: [PATCH 3/9] add playbook for webgraph mapreduce --- .../ansible/web-graph-mapreduce-playbook.yml | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 deploy/ansible/web-graph-mapreduce-playbook.yml diff --git a/deploy/ansible/web-graph-mapreduce-playbook.yml b/deploy/ansible/web-graph-mapreduce-playbook.yml new file mode 100644 index 0000000..18edfbd --- /dev/null +++ b/deploy/ansible/web-graph-mapreduce-playbook.yml @@ -0,0 +1,35 @@ +- hosts: master + vars: + git_repository_url: https://github.com/nimbo3/DataPirates.git + git_repository_branch: issue-119-web-graph + project_module_name: web-graph + project_module_version: 1.0-SNAPSHOT + deploy_dir: /opt/data-pirates/ + environment: + JAVA_HOME: /usr/lib/jvm/jdk1.8.0_211 + tasks: + - name: Install needed packages with apt + apt: + name: "{{ packages }}" + vars: + packages: + - git + - maven + - name: Get latest project files from github + git: + repo: "{{ git_repository_url }}" + dest: "{{ deploy_dir }}" + force: yes + version: "{{ git_repository_branch }}" + - name: Create runnable jar file with maven + command: mvn clean package -DskipTests -DoutputDirectory=/opt + register: mvn + changed_when: "'BUILD FAILURE' not in mvn.stdout" + args: + chdir: "{{ deploy_dir }}{{ project_module_name }}" + - name: Run Map/Reduce With nohup + shell: "nohup /usr/local/spark/bin/spark-submit --class in.nimbo.App --master spark://master:7077 {{ project_module_name }}-{{ project_module_version }}.jar --jars local:{{ deploy_dir }}{{ project_module_name }}/target/{{ project_module_name }}-{{ project_module_version }}.jar &" + register: nohup + changed_when: "'failed' not in nohup.stderr" + args: + chdir: "{{ deploy_dir }}{{ project_module_name }}/target/" From 67c8daa7727c2bc1ef6bf5b41134b71a287056cc Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Sun, 18 Aug 2019 10:24:27 +0430 Subject: [PATCH 4/9] fix hbase test --- crawler/src/test/resources/config.properties | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crawler/src/test/resources/config.properties b/crawler/src/test/resources/config.properties index 04b6709..3d0a854 100644 --- a/crawler/src/test/resources/config.properties +++ b/crawler/src/test/resources/config.properties @@ -1,11 +1,11 @@ -politeness.waiting.time = 30 +politeness.waiting.time=30 metric.registry.name=data-pirates-crawler -hbase.table.name = s -hbase.table.column.family.anchors = l -Parser.numOfTests = 2 -Parser.confidence = 0.8 -Parser.link1 = http://www.jsoup.org -Parser.link2 = http://www.york.ac.uk/teaching/cws/wws +hbase.read.table.name=s +hbase.table.column.family.anchors=l +Parser.numOfTests=2 +Parser.confidence=0.8 +Parser.link1=http://www.jsoup.org +Parser.link2=http://www.york.ac.uk/teaching/cws/wws fetcher.connection.timeout.milliseconds=30000 fetcher.max.redirects=2 fetcher.client.num.of.maximum.total.connections=500 From 959fc6875c3d02c6024040334d83e105a3294fc3 Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Sun, 18 Aug 2019 10:29:36 +0430 Subject: [PATCH 5/9] add graphx dependency --- web-graph/pom.xml | 13 +++++++++++++ web-graph/src/main/java/in/nimbo/App.java | 6 ++++++ 2 files changed, 19 insertions(+) diff --git a/web-graph/pom.xml b/web-graph/pom.xml index 8eb4816..b2494bc 100644 --- a/web-graph/pom.xml +++ b/web-graph/pom.xml @@ -18,6 +18,7 @@ 2.4.3 1.2.4 6.6.2 + 0.7.0-spark2.4-s_2.11 @@ -34,6 +35,18 @@ ${hbase.version} + + graphframes + graphframes + ${graphframes.version} + + + + org.apache.spark + spark-graphx_2.12 + 2.4.3 + + diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index cb73fb2..46648f8 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -8,6 +8,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultStatsUtil; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; @@ -20,6 +22,8 @@ import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.internal.Logging; +import org.apache.spark.internal.Logging$; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +82,8 @@ public static void main(String[] args) throws IOException { JavaRDD hbaseRDD = sparkContext.newAPIHadoopRDD(hbaseReadConfiguration , TableInputFormat.class, ImmutableBytesWritable.class, Result.class).values(); + JavaRDD hbaseRows = hbaseRDD.map(result -> new String(result.getRow())); + JavaRDD hbaseCells = hbaseRDD.flatMap(result -> result.listCells().iterator()); JavaPairRDD, Integer> domainToDomainPairRDD = hbaseCells.flatMapToPair(cell -> { From 28f64c1db0a9dcfc7d404ca76bcf520ecf4ff649 Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Mon, 19 Aug 2019 07:59:55 +0430 Subject: [PATCH 6/9] remove unused dependencies | print metrics --- web-graph/pom.xml | 14 -------------- web-graph/src/main/java/in/nimbo/App.java | 6 +++--- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/web-graph/pom.xml b/web-graph/pom.xml index b2494bc..398bf3d 100644 --- a/web-graph/pom.xml +++ b/web-graph/pom.xml @@ -34,20 +34,6 @@ hbase-server ${hbase.version} - - - graphframes - graphframes - ${graphframes.version} - - - - org.apache.spark - spark-graphx_2.12 - 2.4.3 - - - diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index 46648f8..0e08e13 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -82,7 +82,7 @@ public static void main(String[] args) throws IOException { JavaRDD hbaseRDD = sparkContext.newAPIHadoopRDD(hbaseReadConfiguration , TableInputFormat.class, ImmutableBytesWritable.class, Result.class).values(); - JavaRDD hbaseRows = hbaseRDD.map(result -> new String(result.getRow())); +// JavaRDD hbaseRows = hbaseRDD.map(result -> new String(result.getRow())); JavaRDD hbaseCells = hbaseRDD.flatMap(result -> result.listCells().iterator()); @@ -108,8 +108,8 @@ public static void main(String[] args) throws IOException { System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); }); - logger.info("Domain To Domain Pair Size : " + domainToDomainPairSize.sum()); - logger.info("Domain To Domain Pair Weighted Size : " + domainToDomainPairWeightedSize.sum()); + System.err.println("Domain To Domain Pair Size : " + domainToDomainPairSize.sum()); + System.err.println("Domain To Domain Pair Weighted Size : " + domainToDomainPairWeightedSize.sum()); JavaPairRDD hbasePuts = domainToDomainPairWeightRDD .mapToPair((PairFunction, Integer>, ImmutableBytesWritable, Put>) t -> { From fbef303c1b6cab8a14ab26d7a3c39b867d97cfc3 Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Tue, 27 Aug 2019 15:24:41 +0430 Subject: [PATCH 7/9] add hbase insertion | change configs --- web-graph/src/main/java/in/nimbo/App.java | 56 ++++++++++--------- .../src/main/resources/config.properties | 12 ++-- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index 0e08e13..5e425e4 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -8,8 +8,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultStatsUtil; -import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; @@ -20,10 +18,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.internal.Logging; -import org.apache.spark.internal.Logging$; import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +28,14 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; public class App { - private static Logger logger = LoggerFactory.getLogger(App.class); - public static final String DEFAULT_PROTOCOL = "http://"; + private static Logger logger = LoggerFactory.getLogger(App.class); public static void main(String[] args) throws IOException { Config config = ConfigFactory.load("config"); @@ -49,8 +45,8 @@ public static void main(String[] args) throws IOException { String hbaseReadTableName = config.getString("hbase.read.table.name"); String hbaseReadColumnFamily = config.getString("hbase.read.column.family"); String hbaseWriteTableName = config.getString("hbase.write.table.name"); - String hbaseWriteColumnFamily = config.getString("hbase.write.column.family"); - String hbaseWriteQualifier = config.getString("hbase.write.qualifier"); + String hbaseWriteColumnFamilyInput = config.getString("hbase.write.column.family.input.domains"); + String hbaseWriteColumnFamilyOutput = config.getString("hbase.write.column.family.output.domains"); String sparkExecutorCores = config.getString("spark.executor.cores"); String sparkExecutorMemory = config.getString("spark.executor.memory"); @@ -59,7 +55,7 @@ public static void main(String[] args) throws IOException { hbaseReadConfiguration.addResource(hbaseXmlHbase); hbaseReadConfiguration.set(TableInputFormat.INPUT_TABLE, hbaseReadTableName); hbaseReadConfiguration.set(TableInputFormat.SCAN_COLUMN_FAMILY, hbaseReadColumnFamily); - hbaseReadConfiguration.set(TableInputFormat.SCAN_CACHEDROWS, "500"); + hbaseReadConfiguration.set(TableInputFormat.SCAN_CACHEDROWS, "1000"); Configuration hbaseWriteConfiguration = HBaseConfiguration.create(); hbaseReadConfiguration.addResource(hbaseXmlHadoop); @@ -72,18 +68,14 @@ public static void main(String[] args) throws IOException { SparkConf sparkConf = new SparkConf() .setAppName(sparkAppName) + .set("spark.cores.max", "12") .set("spark.executor.cores", sparkExecutorCores) .set("spark.executor.memory", sparkExecutorMemory); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); - LongAccumulator domainToDomainPairSize = sparkContext.sc().longAccumulator(); - LongAccumulator domainToDomainPairWeightedSize = sparkContext.sc().longAccumulator(); - JavaRDD hbaseRDD = sparkContext.newAPIHadoopRDD(hbaseReadConfiguration , TableInputFormat.class, ImmutableBytesWritable.class, Result.class).values(); -// JavaRDD hbaseRows = hbaseRDD.map(result -> new String(result.getRow())); - JavaRDD hbaseCells = hbaseRDD.flatMap(result -> result.listCells().iterator()); JavaPairRDD, Integer> domainToDomainPairRDD = hbaseCells.flatMapToPair(cell -> { @@ -93,7 +85,6 @@ public static void main(String[] args) throws IOException { String sourceDomain = getDomain(DEFAULT_PROTOCOL + source); String destinationDomain = getDomain(DEFAULT_PROTOCOL + destination); Tuple2 domainPair = new Tuple2<>(sourceDomain, destinationDomain); - domainToDomainPairSize.add(1); return Collections.singleton(new Tuple2<>(domainPair, 1)).iterator(); } catch (MalformedURLException e) { return Collections.emptyIterator(); @@ -104,21 +95,32 @@ public static void main(String[] args) throws IOException { .reduceByKey((Function2) (integer, integer2) -> integer + integer2); domainToDomainPairWeightRDD.foreach((VoidFunction, Integer>>) tuple2IntegerTuple2 -> { - domainToDomainPairWeightedSize.add(1); System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); }); - System.err.println("Domain To Domain Pair Size : " + domainToDomainPairSize.sum()); - System.err.println("Domain To Domain Pair Weighted Size : " + domainToDomainPairWeightedSize.sum()); - JavaPairRDD hbasePuts = domainToDomainPairWeightRDD - .mapToPair((PairFunction, Integer>, ImmutableBytesWritable, Put>) t -> { - Put put = new Put(Bytes.toBytes(t._1._1 + ":" + t._1._2)); - put.addColumn(Bytes.toBytes(hbaseWriteColumnFamily), - Bytes.toBytes(hbaseWriteQualifier), - Bytes.toBytes(t._2)); - - return new Tuple2<>(new ImmutableBytesWritable(), put); + .flatMapToPair(t -> { + if (!t._1._1.isEmpty() && !t._1._2.isEmpty() && t._2 != null) { + byte[] sourceDomainBytes = Bytes.toBytes(t._1._1); + byte[] destinationDomainBytes = Bytes.toBytes(t._1._2); + byte[] domainToDomainRefrences = Bytes.toBytes(t._2); + Set> hbasePut = new HashSet<>(); + + Put outputDomainPut = new Put(sourceDomainBytes); + outputDomainPut.addColumn(Bytes.toBytes(hbaseWriteColumnFamilyOutput), + destinationDomainBytes, + domainToDomainRefrences); + Put inputDomainPut = new Put(destinationDomainBytes); + inputDomainPut.addColumn(Bytes.toBytes(hbaseWriteColumnFamilyInput), + sourceDomainBytes, + domainToDomainRefrences); + + hbasePut.add(new Tuple2<>(new ImmutableBytesWritable(), inputDomainPut)); + hbasePut.add(new Tuple2<>(new ImmutableBytesWritable(), outputDomainPut)); + return hbasePut.iterator(); + } else { + return Collections.emptyIterator(); + } }); hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration()); diff --git a/web-graph/src/main/resources/config.properties b/web-graph/src/main/resources/config.properties index bc20db1..77417a4 100644 --- a/web-graph/src/main/resources/config.properties +++ b/web-graph/src/main/resources/config.properties @@ -1,10 +1,10 @@ spark.app.name="Web Graph" hbase.xml.url.in.hadoop="$HADOOP_HOME/etc/hadoop/core-site.xml" hbase.xml.url.in.hbase="$HBASE_HOME/conf/hbase-site.xml" -hbase.read.table.name=s +hbase.read.table.name=p hbase.read.column.family=l -hbase.write.table.name=wg -hbase.write.column.family=c -hbase.write.qualifier=c -spark.executor.cores=4 -spark.executor.memory=10G +hbase.write.table.name=wgt +hbase.write.column.family.input.domains=i +hbase.write.column.family.output.domains=o +spark.executor.cores=6 +spark.executor.memory=20G From f72a0f2e38a5fc396ec76f588568967054c8558c Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Mon, 2 Sep 2019 07:35:46 +0430 Subject: [PATCH 8/9] Change configs --- backend/src/main/resources/config.properties | 1 + web-graph/src/main/java/in/nimbo/App.java | 6 +++--- web-graph/src/main/resources/config.properties | 4 ++-- web-graph/src/main/resources/hbase-site.xml | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/backend/src/main/resources/config.properties b/backend/src/main/resources/config.properties index 3cb5414..963e276 100644 --- a/backend/src/main/resources/config.properties +++ b/backend/src/main/resources/config.properties @@ -8,3 +8,4 @@ elastic.search.text.boost=1 elastic.search.keywords.boost=2 elastic.search.title.boost=3 + diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index 5e425e4..5b2c7b0 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -94,9 +94,9 @@ public static void main(String[] args) throws IOException { JavaPairRDD, Integer> domainToDomainPairWeightRDD = domainToDomainPairRDD .reduceByKey((Function2) (integer, integer2) -> integer + integer2); - domainToDomainPairWeightRDD.foreach((VoidFunction, Integer>>) tuple2IntegerTuple2 -> { - System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); - }); +// domainToDomainPairWeightRDD.foreach((VoidFunction, Integer>>) tuple2IntegerTuple2 -> { +// System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); +// }); JavaPairRDD hbasePuts = domainToDomainPairWeightRDD .flatMapToPair(t -> { diff --git a/web-graph/src/main/resources/config.properties b/web-graph/src/main/resources/config.properties index 77417a4..41f42fc 100644 --- a/web-graph/src/main/resources/config.properties +++ b/web-graph/src/main/resources/config.properties @@ -1,9 +1,9 @@ spark.app.name="Web Graph" hbase.xml.url.in.hadoop="$HADOOP_HOME/etc/hadoop/core-site.xml" hbase.xml.url.in.hbase="$HBASE_HOME/conf/hbase-site.xml" -hbase.read.table.name=p +hbase.read.table.name=s hbase.read.column.family=l -hbase.write.table.name=wgt +hbase.write.table.name=wg hbase.write.column.family.input.domains=i hbase.write.column.family.output.domains=o spark.executor.cores=6 diff --git a/web-graph/src/main/resources/hbase-site.xml b/web-graph/src/main/resources/hbase-site.xml index 1be97d3..49706d8 100644 --- a/web-graph/src/main/resources/hbase-site.xml +++ b/web-graph/src/main/resources/hbase-site.xml @@ -1,7 +1,7 @@ hbase.zookeeper.quorum - slave1 + localhost hbase.zookeeper.property.clientPort From c8c929eb12b896fa79de73c6f2bf36ecabf6bb85 Mon Sep 17 00:00:00 2001 From: Hamidreza Sharifzadeh Date: Mon, 2 Sep 2019 16:00:20 +0430 Subject: [PATCH 9/9] Remove comments | add configs --- .../test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java | 2 +- crawler/src/test/resources/config.properties | 2 +- web-graph/src/main/java/in/nimbo/App.java | 12 ++---------- web-graph/src/main/resources/config.properties | 1 + 4 files changed, 5 insertions(+), 12 deletions(-) diff --git a/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java b/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java index 079e4da..9f09ab8 100644 --- a/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java +++ b/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java @@ -44,7 +44,7 @@ public static void init() throws IOException { Configuration hBaseConfiguration = HBaseConfiguration.create(); conn = ConnectionFactory.createConnection(hBaseConfiguration); hbaseSiteDao = new HbaseSiteDaoImpl(conn, hBaseConfiguration, config); - TABLE_NAME = config.getString("hbase.read.table.name"); + TABLE_NAME = config.getString("hbase.table.name"); FAMILY_NAME = config.getString("hbase.table.column.family.anchors"); try (final Admin admin = conn.getAdmin()) { if (!admin.tableExists(TableName.valueOf(TABLE_NAME))) { diff --git a/crawler/src/test/resources/config.properties b/crawler/src/test/resources/config.properties index 3d0a854..1e08d40 100644 --- a/crawler/src/test/resources/config.properties +++ b/crawler/src/test/resources/config.properties @@ -1,6 +1,6 @@ politeness.waiting.time=30 metric.registry.name=data-pirates-crawler -hbase.read.table.name=s +hbase.table.name=s hbase.table.column.family.anchors=l Parser.numOfTests=2 Parser.confidence=0.8 diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index 5b2c7b0..3bb7eac 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -18,10 +18,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.util.LongAccumulator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; @@ -35,7 +31,6 @@ public class App { public static final String DEFAULT_PROTOCOL = "http://"; - private static Logger logger = LoggerFactory.getLogger(App.class); public static void main(String[] args) throws IOException { Config config = ConfigFactory.load("config"); @@ -48,6 +43,7 @@ public static void main(String[] args) throws IOException { String hbaseWriteColumnFamilyInput = config.getString("hbase.write.column.family.input.domains"); String hbaseWriteColumnFamilyOutput = config.getString("hbase.write.column.family.output.domains"); String sparkExecutorCores = config.getString("spark.executor.cores"); + String sparkMaxExecutorCores = config.getString("spark.max.executor.cores"); String sparkExecutorMemory = config.getString("spark.executor.memory"); Configuration hbaseReadConfiguration = HBaseConfiguration.create(); @@ -68,7 +64,7 @@ public static void main(String[] args) throws IOException { SparkConf sparkConf = new SparkConf() .setAppName(sparkAppName) - .set("spark.cores.max", "12") + .set("spark.cores.max", sparkMaxExecutorCores) .set("spark.executor.cores", sparkExecutorCores) .set("spark.executor.memory", sparkExecutorMemory); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); @@ -94,10 +90,6 @@ public static void main(String[] args) throws IOException { JavaPairRDD, Integer> domainToDomainPairWeightRDD = domainToDomainPairRDD .reduceByKey((Function2) (integer, integer2) -> integer + integer2); -// domainToDomainPairWeightRDD.foreach((VoidFunction, Integer>>) tuple2IntegerTuple2 -> { -// System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); -// }); - JavaPairRDD hbasePuts = domainToDomainPairWeightRDD .flatMapToPair(t -> { if (!t._1._1.isEmpty() && !t._1._2.isEmpty() && t._2 != null) { diff --git a/web-graph/src/main/resources/config.properties b/web-graph/src/main/resources/config.properties index 41f42fc..981279c 100644 --- a/web-graph/src/main/resources/config.properties +++ b/web-graph/src/main/resources/config.properties @@ -7,4 +7,5 @@ hbase.write.table.name=wg hbase.write.column.family.input.domains=i hbase.write.column.family.output.domains=o spark.executor.cores=6 +spark.max.executor.cores=12 spark.executor.memory=20G