Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/src/main/resources/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ elastic.search.text.boost=1
elastic.search.keywords.boost=2
elastic.search.title.boost=3


Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void init() throws IOException {
Configuration hBaseConfiguration = HBaseConfiguration.create();
conn = ConnectionFactory.createConnection(hBaseConfiguration);
hbaseSiteDao = new HbaseSiteDaoImpl(conn, hBaseConfiguration, config);
TABLE_NAME = config.getString("hbase.read.table.name");
TABLE_NAME = config.getString("hbase.table.name");
FAMILY_NAME = config.getString("hbase.table.column.family.anchors");
try (final Admin admin = conn.getAdmin()) {
if (!admin.tableExists(TableName.valueOf(TABLE_NAME))) {
Expand Down
14 changes: 7 additions & 7 deletions crawler/src/test/resources/config.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
politeness.waiting.time = 30
politeness.waiting.time=30
metric.registry.name=data-pirates-crawler
hbase.table.name = s
hbase.table.column.family.anchors = l
Parser.numOfTests = 2
Parser.confidence = 0.8
Parser.link1 = http://www.jsoup.org
Parser.link2 = http://www.york.ac.uk/teaching/cws/wws
hbase.table.name=s
hbase.table.column.family.anchors=l
Parser.numOfTests=2
Parser.confidence=0.8
Parser.link1=http://www.jsoup.org
Parser.link2=http://www.york.ac.uk/teaching/cws/wws
fetcher.connection.timeout.milliseconds=30000
fetcher.max.redirects=2
fetcher.client.num.of.maximum.total.connections=500
Expand Down
35 changes: 35 additions & 0 deletions deploy/ansible/web-graph-mapreduce-playbook.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
- hosts: master
vars:
git_repository_url: https://github.com/nimbo3/DataPirates.git
git_repository_branch: issue-119-web-graph
project_module_name: web-graph
project_module_version: 1.0-SNAPSHOT
deploy_dir: /opt/data-pirates/
environment:
JAVA_HOME: /usr/lib/jvm/jdk1.8.0_211
tasks:
- name: Install needed packages with apt
apt:
name: "{{ packages }}"
vars:
packages:
- git
- maven
- name: Get latest project files from github
git:
repo: "{{ git_repository_url }}"
dest: "{{ deploy_dir }}"
force: yes
version: "{{ git_repository_branch }}"
- name: Create runnable jar file with maven
command: mvn clean package -DskipTests -DoutputDirectory=/opt
register: mvn
changed_when: "'BUILD FAILURE' not in mvn.stdout"
args:
chdir: "{{ deploy_dir }}{{ project_module_name }}"
- name: Run Map/Reduce With nohup
shell: "nohup /usr/local/spark/bin/spark-submit --class in.nimbo.App --master spark://master:7077 {{ project_module_name }}-{{ project_module_version }}.jar --jars local:{{ deploy_dir }}{{ project_module_name }}/target/{{ project_module_name }}-{{ project_module_version }}.jar &"
register: nohup
changed_when: "'failed' not in nohup.stderr"
args:
chdir: "{{ deploy_dir }}{{ project_module_name }}/target/"
2 changes: 1 addition & 1 deletion web-graph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<spark.version>2.4.3</spark.version>
<hbase.version>1.2.4</hbase.version>
<elasticsearch.spark.version>6.6.2</elasticsearch.spark.version>
<graphframes.version>0.7.0-spark2.4-s_2.11</graphframes.version>
</properties>

<dependencies>
Expand All @@ -33,7 +34,6 @@
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
84 changes: 51 additions & 33 deletions web-graph/src/main/java/in/nimbo/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,100 +6,118 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;


public class App {
public static void main(String[] args) {

public static final String DEFAULT_PROTOCOL = "http://";

public static void main(String[] args) throws IOException {
Config config = ConfigFactory.load("config");
String sparkAppName = config.getString("spark.app.name");
String hbaseXmlHadoop = config.getString("hbase.xml.url.in.hadoop");
String hbaseXmlHbase = config.getString("hbase.xml.url.in.hbase");
String hbaseReadTableName = config.getString("hbase.read.table.name");
String hbaseReadColumnFamily = config.getString("hbase.read.column.family");
String hbaseWriteTableName = config.getString("hbase.write.table.name");
String hbaseWriteColumnFamily = config.getString("hbase.write.column.family");
String hbaseWriteQualifier = config.getString("hbase.write.qualifier");
String hbaseWriteColumnFamilyInput = config.getString("hbase.write.column.family.input.domains");
String hbaseWriteColumnFamilyOutput = config.getString("hbase.write.column.family.output.domains");
String sparkExecutorCores = config.getString("spark.executor.cores");
String sparkMaxExecutorCores = config.getString("spark.max.executor.cores");
String sparkExecutorMemory = config.getString("spark.executor.memory");

Configuration hbaseReadConfiguration = HBaseConfiguration.create();
hbaseReadConfiguration.addResource(hbaseXmlHadoop);
hbaseReadConfiguration.addResource(hbaseXmlHbase);
hbaseReadConfiguration.set(TableInputFormat.INPUT_TABLE, hbaseReadTableName);
hbaseReadConfiguration.set(TableInputFormat.SCAN_COLUMN_FAMILY, hbaseReadColumnFamily);
hbaseReadConfiguration.set(TableInputFormat.SCAN_CACHEDROWS, "500");
hbaseReadConfiguration.set(TableInputFormat.SCAN_CACHEDROWS, "1000");

Configuration hbaseWriteConfiguration = HBaseConfiguration.create();
hbaseReadConfiguration.addResource(hbaseXmlHadoop);
hbaseReadConfiguration.addResource(hbaseXmlHbase);
hbaseWriteConfiguration.set(TableInputFormat.INPUT_TABLE, hbaseWriteTableName);

Job newAPIJobConfiguration = Job.getInstance(hbaseWriteConfiguration);
newAPIJobConfiguration.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, hbaseWriteTableName);
newAPIJobConfiguration.setOutputFormatClass(TableOutputFormat.class);

SparkConf sparkConf = new SparkConf()
.setAppName(sparkAppName)
.set("spark.cores.max", sparkMaxExecutorCores)
.set("spark.executor.cores", sparkExecutorCores)
.set("spark.executor.memory", sparkExecutorMemory);
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

LongAccumulator domainToDomainPairSize = sparkContext.sc().longAccumulator();

JavaRDD<Result> hbaseRDD = sparkContext.newAPIHadoopRDD(hbaseReadConfiguration
, TableInputFormat.class, ImmutableBytesWritable.class, Result.class).values();

JavaRDD<Cell> hbaseCells = hbaseRDD.flatMap(result -> result.listCells().iterator());

JavaPairRDD<Tuple2<String, String>, Integer> domainToDomainPairRDD = hbaseCells.flatMapToPair(cell -> {
String source = new String(CellUtil.cloneRow(cell));
String destination = new String(CellUtil.cloneValue(cell));
String destination = new String(CellUtil.cloneQualifier(cell));
try {
String sourceDomain = getDomain("http://" + source);
String destinationDomain = getDomain("http://" + destination);
String sourceDomain = getDomain(DEFAULT_PROTOCOL + source);
String destinationDomain = getDomain(DEFAULT_PROTOCOL + destination);
Tuple2<String, String> domainPair = new Tuple2<>(sourceDomain, destinationDomain);
domainToDomainPairSize.add(1);
return Collections.singleton(new Tuple2<>(domainPair, 1)).iterator();
} catch (MalformedURLException e) {
return Collections.emptyIterator();
}
});

// Todo : CellUtil.cloneRow() Or getValueArray()

JavaPairRDD<Tuple2<String, String>, Integer> domainToDomainPairWeightRDD = domainToDomainPairRDD
.reduceByKey((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer + integer2);

domainToDomainPairWeightRDD.foreach((VoidFunction<Tuple2<Tuple2<String, String>, Integer>>) tuple2IntegerTuple2 -> {
System.out.println(String.format("%s -> %s : %d", tuple2IntegerTuple2._1._1, tuple2IntegerTuple2._1._2, tuple2IntegerTuple2._2));
});

System.err.println("Domain To Domain Pair Size : " + domainToDomainPairSize.sum());

// JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = domainToDomainPairWeightRDD
// .mapToPair((PairFunction<Tuple2<Tuple2<String, String>, Integer>, ImmutableBytesWritable, Put>) t -> {
// Put put = new Put(Bytes.toBytes(t._1._1 + ":" + t._1._2));
// put.addColumn(Bytes.toBytes(hbaseWriteColumnFamily),
// Bytes.toBytes(hbaseWriteQualifier),
// Bytes.toBytes(t._2));
//
// return new Tuple2<>(new ImmutableBytesWritable(), put);
// });

// hbasePuts.saveAsNewAPIHadoopDataset(hbaseWriteConfiguration);

sparkContext.close();
JavaPairRDD<ImmutableBytesWritable, Put> hbasePuts = domainToDomainPairWeightRDD
.flatMapToPair(t -> {
if (!t._1._1.isEmpty() && !t._1._2.isEmpty() && t._2 != null) {
byte[] sourceDomainBytes = Bytes.toBytes(t._1._1);
byte[] destinationDomainBytes = Bytes.toBytes(t._1._2);
byte[] domainToDomainRefrences = Bytes.toBytes(t._2);
Set<Tuple2<ImmutableBytesWritable, Put>> hbasePut = new HashSet<>();

Put outputDomainPut = new Put(sourceDomainBytes);
outputDomainPut.addColumn(Bytes.toBytes(hbaseWriteColumnFamilyOutput),
destinationDomainBytes,
domainToDomainRefrences);
Put inputDomainPut = new Put(destinationDomainBytes);
inputDomainPut.addColumn(Bytes.toBytes(hbaseWriteColumnFamilyInput),
sourceDomainBytes,
domainToDomainRefrences);

hbasePut.add(new Tuple2<>(new ImmutableBytesWritable(), inputDomainPut));
hbasePut.add(new Tuple2<>(new ImmutableBytesWritable(), outputDomainPut));
return hbasePut.iterator();
} else {
return Collections.emptyIterator();
}
});

hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration.getConfiguration());

sparkContext.stop();
}

public static String getDomain(String url) throws MalformedURLException {
Expand Down
9 changes: 5 additions & 4 deletions web-graph/src/main/resources/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ hbase.xml.url.in.hbase="$HBASE_HOME/conf/hbase-site.xml"
hbase.read.table.name=s
hbase.read.column.family=l
hbase.write.table.name=wg
hbase.write.column.family=c
hbase.write.qualifier=c
spark.executor.cores=4
spark.executor.memory=10G
hbase.write.column.family.input.domains=i
hbase.write.column.family.output.domains=o
spark.executor.cores=6
spark.max.executor.cores=12
spark.executor.memory=20G
2 changes: 1 addition & 1 deletion web-graph/src/main/resources/hbase-site.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>slave1</value>
<value>localhost</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
Expand Down