diff --git a/backend/src/main/resources/config.properties b/backend/src/main/resources/config.properties index 3cb5414..963e276 100644 --- a/backend/src/main/resources/config.properties +++ b/backend/src/main/resources/config.properties @@ -8,3 +8,4 @@ elastic.search.text.boost=1 elastic.search.keywords.boost=2 elastic.search.title.boost=3 + diff --git a/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java b/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java index 079e4da..9f09ab8 100644 --- a/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java +++ b/crawler/src/test/java/in/nimbo/dao/HbaseSiteDaoImplTest.java @@ -44,7 +44,7 @@ public static void init() throws IOException { Configuration hBaseConfiguration = HBaseConfiguration.create(); conn = ConnectionFactory.createConnection(hBaseConfiguration); hbaseSiteDao = new HbaseSiteDaoImpl(conn, hBaseConfiguration, config); - TABLE_NAME = config.getString("hbase.read.table.name"); + TABLE_NAME = config.getString("hbase.table.name"); FAMILY_NAME = config.getString("hbase.table.column.family.anchors"); try (final Admin admin = conn.getAdmin()) { if (!admin.tableExists(TableName.valueOf(TABLE_NAME))) { diff --git a/crawler/src/test/resources/config.properties b/crawler/src/test/resources/config.properties index 04b6709..1e08d40 100644 --- a/crawler/src/test/resources/config.properties +++ b/crawler/src/test/resources/config.properties @@ -1,11 +1,11 @@ -politeness.waiting.time = 30 +politeness.waiting.time=30 metric.registry.name=data-pirates-crawler -hbase.table.name = s -hbase.table.column.family.anchors = l -Parser.numOfTests = 2 -Parser.confidence = 0.8 -Parser.link1 = http://www.jsoup.org -Parser.link2 = http://www.york.ac.uk/teaching/cws/wws +hbase.table.name=s +hbase.table.column.family.anchors=l +Parser.numOfTests=2 +Parser.confidence=0.8 +Parser.link1=http://www.jsoup.org +Parser.link2=http://www.york.ac.uk/teaching/cws/wws fetcher.connection.timeout.milliseconds=30000 fetcher.max.redirects=2 fetcher.client.num.of.maximum.total.connections=500 diff --git a/deploy/ansible/web-graph-mapreduce-playbook.yml b/deploy/ansible/web-graph-mapreduce-playbook.yml new file mode 100644 index 0000000..18edfbd --- /dev/null +++ b/deploy/ansible/web-graph-mapreduce-playbook.yml @@ -0,0 +1,35 @@ +- hosts: master + vars: + git_repository_url: https://github.com/nimbo3/DataPirates.git + git_repository_branch: issue-119-web-graph + project_module_name: web-graph + project_module_version: 1.0-SNAPSHOT + deploy_dir: /opt/data-pirates/ + environment: + JAVA_HOME: /usr/lib/jvm/jdk1.8.0_211 + tasks: + - name: Install needed packages with apt + apt: + name: "{{ packages }}" + vars: + packages: + - git + - maven + - name: Get latest project files from github + git: + repo: "{{ git_repository_url }}" + dest: "{{ deploy_dir }}" + force: yes + version: "{{ git_repository_branch }}" + - name: Create runnable jar file with maven + command: mvn clean package -DskipTests -DoutputDirectory=/opt + register: mvn + changed_when: "'BUILD FAILURE' not in mvn.stdout" + args: + chdir: "{{ deploy_dir }}{{ project_module_name }}" + - name: Run Map/Reduce With nohup + shell: "nohup /usr/local/spark/bin/spark-submit --class in.nimbo.App --master spark://master:7077 {{ project_module_name }}-{{ project_module_version }}.jar --jars local:{{ deploy_dir }}{{ project_module_name }}/target/{{ project_module_name }}-{{ project_module_version }}.jar &" + register: nohup + changed_when: "'failed' not in nohup.stderr" + args: + chdir: "{{ deploy_dir }}{{ project_module_name }}/target/" diff --git a/web-graph/pom.xml b/web-graph/pom.xml index 2449326..398bf3d 100644 --- a/web-graph/pom.xml +++ b/web-graph/pom.xml @@ -18,6 +18,7 @@ 2.4.3 1.2.4 6.6.2 + 0.7.0-spark2.4-s_2.11 @@ -33,7 +34,6 @@ hbase-server ${hbase.version} - diff --git a/web-graph/src/main/java/in/nimbo/App.java b/web-graph/src/main/java/in/nimbo/App.java index 63f74fe..3bb7eac 100644 --- a/web-graph/src/main/java/in/nimbo/App.java +++ b/web-graph/src/main/java/in/nimbo/App.java @@ -6,25 +6,33 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.VoidFunction; -import org.apache.spark.util.LongAccumulator; import scala.Tuple2; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; public class App { - public static void main(String[] args) { + + public static final String DEFAULT_PROTOCOL = "http://"; + + public static void main(String[] args) throws IOException { Config config = ConfigFactory.load("config"); String sparkAppName = config.getString("spark.app.name"); String hbaseXmlHadoop = config.getString("hbase.xml.url.in.hadoop"); @@ -32,9 +40,10 @@ public static void main(String[] args) { String hbaseReadTableName = config.getString("hbase.read.table.name"); String hbaseReadColumnFamily = config.getString("hbase.read.column.family"); String hbaseWriteTableName = config.getString("hbase.write.table.name"); - String hbaseWriteColumnFamily = config.getString("hbase.write.column.family"); - String hbaseWriteQualifier = config.getString("hbase.write.qualifier"); + String hbaseWriteColumnFamilyInput = config.getString("hbase.write.column.family.input.domains"); + String hbaseWriteColumnFamilyOutput = config.getString("hbase.write.column.family.output.domains"); String sparkExecutorCores = config.getString("spark.executor.cores"); + String sparkMaxExecutorCores = config.getString("spark.max.executor.cores"); String sparkExecutorMemory = config.getString("spark.executor.memory"); Configuration hbaseReadConfiguration = HBaseConfiguration.create(); @@ -42,21 +51,24 @@ public static void main(String[] args) { hbaseReadConfiguration.addResource(hbaseXmlHbase); hbaseReadConfiguration.set(TableInputFormat.INPUT_TABLE, hbaseReadTableName); hbaseReadConfiguration.set(TableInputFormat.SCAN_COLUMN_FAMILY, hbaseReadColumnFamily); - hbaseReadConfiguration.set(TableInputFormat.SCAN_CACHEDROWS, "500"); + hbaseReadConfiguration.set(TableInputFormat.SCAN_CACHEDROWS, "1000"); Configuration hbaseWriteConfiguration = HBaseConfiguration.create(); hbaseReadConfiguration.addResource(hbaseXmlHadoop); hbaseReadConfiguration.addResource(hbaseXmlHbase); hbaseWriteConfiguration.set(TableInputFormat.INPUT_TABLE, hbaseWriteTableName); + Job newAPIJobConfiguration = Job.getInstance(hbaseWriteConfiguration); + newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, hbaseWriteTableName); + newAPIJobConfiguration.setOutputFormatClass(TableOutputFormat.class); + SparkConf sparkConf = new SparkConf() .setAppName(sparkAppName) + .set("spark.cores.max", sparkMaxExecutorCores) .set("spark.executor.cores", sparkExecutorCores) .set("spark.executor.memory", sparkExecutorMemory); JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); - LongAccumulator domainToDomainPairSize = sparkContext.sc().longAccumulator(); - JavaRDD hbaseRDD = sparkContext.newAPIHadoopRDD(hbaseReadConfiguration , TableInputFormat.class, ImmutableBytesWritable.class, Result.class).values(); @@ -64,42 +76,48 @@ public static void main(String[] args) { JavaPairRDD, Integer> domainToDomainPairRDD = hbaseCells.flatMapToPair(cell -> { String source = new String(CellUtil.cloneRow(cell)); - String destination = new String(CellUtil.cloneValue(cell)); + String destination = new String(CellUtil.cloneQualifier(cell)); try { - String sourceDomain = getDomain("http://" + source); - String destinationDomain = getDomain("http://" + destination); + String sourceDomain = getDomain(DEFAULT_PROTOCOL + source); + String destinationDomain = getDomain(DEFAULT_PROTOCOL + destination); Tuple2 domainPair = new Tuple2<>(sourceDomain, destinationDomain); - domainToDomainPairSize.add(1); return Collections.singleton(new Tuple2<>(domainPair, 1)).iterator(); } catch (MalformedURLException e) { return Collections.emptyIterator(); } }); - // Todo : CellUtil.cloneRow() Or getValueArray() - JavaPairRDD, Integer> domainToDomainPairWeightRDD = domainToDomainPairRDD .reduceByKey((Function2) (integer, integer2) -> integer + integer2); - domainToDomainPairWeightRDD.foreach((VoidFunction, Integer>>) tuple2IntegerTuple2 -> { - System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2)); - }); - - System.err.println("Domain To Domain Pair Size : " + domainToDomainPairSize.sum()); - -// JavaPairRDD hbasePuts = domainToDomainPairWeightRDD -// .mapToPair((PairFunction, Integer>, ImmutableBytesWritable, Put>) t -> { -// Put put = new Put(Bytes.toBytes(t._1._1 + ":" + t._1._2)); -// put.addColumn(Bytes.toBytes(hbaseWriteColumnFamily), -// Bytes.toBytes(hbaseWriteQualifier), -// Bytes.toBytes(t._2)); -// -// return new Tuple2<>(new ImmutableBytesWritable(), put); -// }); - -// hbasePuts.saveAsNewAPIHadoopDataset(hbaseWriteConfiguration); - - sparkContext.close(); + JavaPairRDD hbasePuts = domainToDomainPairWeightRDD + .flatMapToPair(t -> { + if (!t._1._1.isEmpty() && !t._1._2.isEmpty() && t._2 != null) { + byte[] sourceDomainBytes = Bytes.toBytes(t._1._1); + byte[] destinationDomainBytes = Bytes.toBytes(t._1._2); + byte[] domainToDomainRefrences = Bytes.toBytes(t._2); + Set> hbasePut = new HashSet<>(); + + Put outputDomainPut = new Put(sourceDomainBytes); + outputDomainPut.addColumn(Bytes.toBytes(hbaseWriteColumnFamilyOutput), + destinationDomainBytes, + domainToDomainRefrences); + Put inputDomainPut = new Put(destinationDomainBytes); + inputDomainPut.addColumn(Bytes.toBytes(hbaseWriteColumnFamilyInput), + sourceDomainBytes, + domainToDomainRefrences); + + hbasePut.add(new Tuple2<>(new ImmutableBytesWritable(), inputDomainPut)); + hbasePut.add(new Tuple2<>(new ImmutableBytesWritable(), outputDomainPut)); + return hbasePut.iterator(); + } else { + return Collections.emptyIterator(); + } + }); + + hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration()); + + sparkContext.stop(); } public static String getDomain(String url) throws MalformedURLException { diff --git a/web-graph/src/main/resources/config.properties b/web-graph/src/main/resources/config.properties index bc20db1..981279c 100644 --- a/web-graph/src/main/resources/config.properties +++ b/web-graph/src/main/resources/config.properties @@ -4,7 +4,8 @@ hbase.xml.url.in.hbase="$HBASE_HOME/conf/hbase-site.xml" hbase.read.table.name=s hbase.read.column.family=l hbase.write.table.name=wg -hbase.write.column.family=c -hbase.write.qualifier=c -spark.executor.cores=4 -spark.executor.memory=10G +hbase.write.column.family.input.domains=i +hbase.write.column.family.output.domains=o +spark.executor.cores=6 +spark.max.executor.cores=12 +spark.executor.memory=20G diff --git a/web-graph/src/main/resources/hbase-site.xml b/web-graph/src/main/resources/hbase-site.xml index 1be97d3..49706d8 100644 --- a/web-graph/src/main/resources/hbase-site.xml +++ b/web-graph/src/main/resources/hbase-site.xml @@ -1,7 +1,7 @@ hbase.zookeeper.quorum - slave1 + localhost hbase.zookeeper.property.clientPort