Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added build/classes/org/wso2/uima/types/HashTag.class
Binary file not shown.
Binary file added build/classes/org/wso2/uima/types/HashTag_Type$1.class
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added build/classes/org/wso2/uima/types/TimeStamp.class
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file removed lib/slf4j-1.5.10.wso2v1.jar
Binary file not shown.
Binary file modified pathing.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void initialize(UimaContext ctx)
IOUtils.closeQuietly(nameFinderStream);
IOUtils.closeQuietly(tokenizerStream);
IOUtils.closeQuietly(sentenceStream);
logger.info(LocationIdentifier.class.getSimpleName()+" Analysis Engine initialized successfully");
}
}

Expand All @@ -97,15 +98,15 @@ public void process(JCas jcas) throws AnalysisEngineProcessException {
tokens[i] = tokSpans[i].getCoveredText(sentence).toString();
}

logger.info("Tweet Text: " + jcas.getDocumentText());
Span locationSpans[] = locationFinder.find(tokens);
LocationIdentification annotation = new LocationIdentification(jcas);
for (Span location : locationSpans) {
annotation.setBegin(start + tokSpans[location.getStart()].getStart());
annotation.setEnd(start + tokSpans[location.getEnd() - 1].getEnd());
annotation.addToIndexes(jcas);
logger.info("Location Detected : " + annotation.getCoveredText());
}
logger.debug("Tweet Text: "+jcas.getDocumentText());
Span locationSpans[] = locationFinder.find(tokens);
LocationIdentification annotation = new LocationIdentification(jcas);
for (Span location: locationSpans) {
annotation.setBegin(start + tokSpans[location.getStart()].getStart());
annotation.setEnd(start + tokSpans[location.getEnd() - 1].getEnd());
annotation.addToIndexes(jcas);
logger.info("Location Detected : "+annotation.getCoveredText());
}


if (locationSpans.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void initialize(UimaContext ctx)
throw new ResourceInitializationException(e);
} finally {
IOUtils.closeQuietly(docStream);
logger.info(TrafficLevelAnalyser.class.getSimpleName()+" Analysis Engine initialized successfully");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,24 @@

package org.wso2.uima.collectionProccesingEngine.consumers;

import org.apache.log4j.Logger;
import org.apache.uima.cas.CAS;
import org.apache.uima.cas.CASException;
import org.apache.uima.collection.CasConsumer_ImplBase;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceProcessException;
import org.wso2.uima.types.LocationIdentification;
import org.wso2.uima.collectionProccesingEngine.consumers.util.CasConsumerUtil;
public class ConsolePrinterCasConsumer extends CasConsumer_ImplBase{

import java.util.Iterator;
@Override
public void processCas(CAS cas) throws ResourceProcessException {

public class ConsolePrinterCasConsumer extends CasConsumer_ImplBase {
String locationString = CasConsumerUtil.getLocationString(cas);
String trafficLevel = CasConsumerUtil.getTrafficLevel(cas);

private static Logger logger = Logger.getLogger(ConsolePrinterCasConsumer.class);
if(locationString.isEmpty()){
return;
}

// TODO remove this class
@Override
public void processCas(CAS cas) throws ResourceProcessException {
JCas jcas = null;

try {
jcas = cas.getJCas();
} catch (CASException e) {
logger.error("Unable to get the JCas from the cas when trying to process Cas", e);
}

Iterator iterator = jcas.getAnnotationIndex(
LocationIdentification.type).iterator();
System.out.println("\nTweet : " + jcas.getDocumentText());
while (iterator.hasNext()) {
LocationIdentification tag = (LocationIdentification) iterator
.next();
System.out.println("\nAnnotation : " + tag.getCoveredText());
}

}
System.out.println("Annotated Location : " + locationString);
System.out.println("Annotated Traffic : " + trafficLevel);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.*;
import org.wso2.uima.collectionProccesingEngine.consumers.util.KeyStoreUtil;
import org.wso2.uima.collectionProccesingEngine.consumers.util.TweetScanner;
import org.wso2.uima.collectionProccesingEngine.consumers.util.CasConsumerUtil;

import java.net.MalformedURLException;
import java.sql.Timestamp;
Expand All @@ -42,24 +42,30 @@
*/

public class DataBridgeCasConsumer extends CasConsumer_ImplBase {
private static final String STREAM_NAME = "org.wso2.uima.TwitterExtractedInputFeed";
private static final String VERSION = "1.0.0";

private static final String PARAM_SERVER_URL = "serverURL";
private static final String PARAM_USERNAME = "username";
private static final String PARAM_PASSWORD = "password";
private static String streamID = null;
private static DataPublisher dataPublisher;
private static final String PARAM_STREAM_NAME = "streamName";
private static final String PARAM_STREAM_VERSION = "streamVersion";


private String streamID = null;
private DataPublisher dataPublisher;
private static Logger logger = Logger.getLogger(DataBridgeCasConsumer.class);

private String url;
private String username;
private String password;
private String streamName;
private String streamVersion;

@Override
public void processCas(CAS cas) throws ResourceProcessException {

String tweetText = TweetScanner.getTweetText(cas);
String locationString = TweetScanner.getLocationString(cas);
String trafficLevel = TweetScanner.getTrafficLevel(cas);
String tweetText = CasConsumerUtil.getTweetText(cas);
String locationString = CasConsumerUtil.getLocationString(cas);
String trafficLevel = CasConsumerUtil.getTrafficLevel(cas);

if (locationString.isEmpty()) {
return;
Expand All @@ -77,7 +83,7 @@ public void processCas(CAS cas) throws ResourceProcessException {
tweetText
);
} catch (AgentException e) {
logger.error("Unable to publish events due to errors in the data bridge", e);
logger.error("Unable to publish events to the data bridge to "+url, e);
}
}
}
Expand All @@ -88,53 +94,53 @@ public void initialize() throws ResourceInitializationException {
url = (String) getConfigParameterValue(PARAM_SERVER_URL);
username = (String) getConfigParameterValue(PARAM_USERNAME);
password = (String) getConfigParameterValue(PARAM_PASSWORD);
streamName = (String)getConfigParameterValue(PARAM_STREAM_NAME);
streamVersion = (String)getConfigParameterValue(PARAM_STREAM_VERSION);

try {
dataPublisher = new DataPublisher(url, username, password);
logger.debug("Data Publisher Created");
} catch (MalformedURLException e) {
logger.error("Unable to create the data publisher ", e);
logger.error("Unable to create the data publisher to url: "+url, e);
} catch (AgentException e) {
logger.error("Unable to create the data publisher ", e);
logger.error("Unable to create the data publisher to url: "+url, e);
} catch (AuthenticationException e) {
logger.error("Unable to create the data publisher ", e);
logger.error("Unable to create the data publisher using username: "+username+" password: "+password+" to "+url, e);
} catch (TransportException e) {
logger.error("Unable to create the data publisher ", e);
logger.error("Unable to create the data publisher to url: "+url, e);
}

try {
streamID = dataPublisher.findStream(STREAM_NAME, VERSION);
logger.info("Stream Definition Already Exists");
} catch (NoStreamDefinitionExistException | AgentException | StreamDefinitionException e) {
try {
StreamDefinition streamDef = new StreamDefinition(VERSION);
StreamDefinition streamDef = new StreamDefinition(streamVersion);
streamDef.setNickName("TwitterCEP");
streamDef.setDescription("Extracted Data Feed from Tweets");
streamDef.addTag("UIMA");
streamDef.addTag("CEP");

streamID = dataPublisher.defineStream("{" +
" 'name':'" + STREAM_NAME + "'," +
" 'version':'" + VERSION + "'," +
" 'nickName': 'TwitterCEP'," +
" 'description': 'Some Desc'," +
" 'name':'" + streamName + "'," +
" 'version':'" + streamVersion + "'," +
" 'nickName': 'twitter Input Stream'," +
" 'description': 'Input stream to recieve the extracted details from the twitter feed into the CEP'," +
" 'tags':['UIMA', 'CEP']," +
" 'metaData':[" +
" {'name':'timeStamp','type':'STRING'}" +
" {'name':'Timestamp','type':'STRING'}" +
" ]," +
" 'payloadData':[" +
" {'name':'Location','type':'STRING'}," +
" {'name':'TrafficLevel','type':'STRING'}," +
" {'name':'TweetText','type':'STRING'}" +
" {'name':'Traffic_Location','type':'STRING'}," +
" {'name':'Traffic_Level','type':'STRING'}," +
" {'name':'Twitter_Text','type':'STRING'}" +
" ]" +
"}");

logger.debug("Stream ID : " + streamID);
logger.debug("Stream was not found and defined successfully");
} catch (AgentException | MalformedStreamDefinitionException
} catch (AgentException | MalformedStreamDefinitionException
| StreamDefinitionException
| DifferentStreamDefinitionAlreadyDefinedException e1) {

logger.debug("Stream Definition Failed");
}
}

}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.wso2.uima.collectionProccesingEngine.consumers.util.KeyStoreUtil;
import org.wso2.uima.collectionProccesingEngine.consumers.util.TweetScanner;
import org.wso2.uima.collectionProccesingEngine.consumers.util.CasConsumerUtil;

import java.sql.Timestamp;
import java.util.Date;
Expand Down Expand Up @@ -70,9 +70,9 @@ public void initialize() throws ResourceInitializationException {
@Override
public void processCas(CAS cas) throws ResourceProcessException {

String tweetText = TweetScanner.getTweetText(cas);
String locationString = TweetScanner.getLocationString(cas);
String trafficLevel = TweetScanner.getTrafficLevel(cas);
String tweetText = CasConsumerUtil.getTweetText(cas);
String locationString = CasConsumerUtil.getLocationString(cas);
String trafficLevel = CasConsumerUtil.getTrafficLevel(cas);

if (locationString.isEmpty()) {
return;
Expand All @@ -83,7 +83,7 @@ public void processCas(CAS cas) throws ResourceProcessException {

if (!locationString.equals(""))
publish(tweetText, locationString, trafficLevel);
//TODO write a Util class

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.resource.ResourceProcessException;
import org.wso2.uima.collectionProccesingEngine.consumers.util.KeyStoreUtil;
import org.wso2.uima.collectionProccesingEngine.consumers.util.TweetScanner;
import org.wso2.uima.collectionProccesingEngine.consumers.util.CasConsumerUtil;

import javax.xml.soap.*;
import java.sql.Timestamp;
Expand All @@ -47,16 +47,16 @@ public class SoapCasConsumer extends CasConsumer_ImplBase {
public void processCas(CAS cas) throws ResourceProcessException {

//properties
String tweetText = TweetScanner.getTweetText(cas);
String locationString = TweetScanner.getLocationString(cas);
String trafficLevel = TweetScanner.getTrafficLevel(cas);
String tweetText = CasConsumerUtil.getTweetText(cas);
String locationString = CasConsumerUtil.getLocationString(cas);
String trafficLevel = CasConsumerUtil.getTrafficLevel(cas);

if (locationString.isEmpty()) {
return;
}

Logger.getLogger(SoapCasConsumer.class).info("Annotated Location : " + locationString.trim());
Logger.getLogger(SoapCasConsumer.class).info("Annotated Traffic Level : " + trafficLevel);
Logger.getLogger(SoapCasConsumer.class).debug("Annotated Location : " + locationString.trim());
Logger.getLogger(SoapCasConsumer.class).debug("Annotated Traffic Level : " + trafficLevel);

//creating soap message
Date date = new Date();
Expand Down Expand Up @@ -91,6 +91,8 @@ public void processCas(CAS cas) throws ResourceProcessException {

// System.out.println(soapMessage.getSOAPBody().toString());
this.publish(soapMessage);
logger.info("Event Published Successfully to "+ soapEndPoint+"\n");


} catch (Exception e) {
logger.error("Error occurs when creating the SOAP message", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import org.wso2.uima.types.TrafficLevelIdentifier;

import java.util.Iterator;

/**
* Scan the CAS object given and return the locations, trafficLevel and tweetText indicated within the cas.
*/
public class TweetScanner {

private TweetScanner() {
//Avoids the class from creating its instances.
public class CasConsumerUtil {

private CasConsumerUtil(){
//no instances.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ public void initialize() throws ResourceInitializationException {

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);

logger.info("Consumer Created Successfully");
consumer = session.createDurableSubscriber(topic, clientID);
logger.debug("Consumer Created Successfully");
consumer = session.createDurableSubscriber(topic,clientID);

} catch (JMSException e) {
logger.error("Error Initializing the Subscriber for ActiveMQReader", e);

logger.error("Error Initializing the Subscriber for ActiveMQReader",e);
throw new RuntimeException("Unable to initialize the CAS Reader");
}

logger.info("ActiveMQ Cas Reader Initialized Successfully");
}


Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added target/classes/org/wso2/uima/types/HashTag.class
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
5 changes: 5 additions & 0 deletions target/maven-archiver/pom.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#Generated by Maven
#Tue Jan 27 16:00:19 IST 2015
version=1.0
groupId=org.wso2.cep.uima
artifactId=org.wso2.cpe.uima.twitter-cpe
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
org/wso2/uima/types/LocationIdentification.class
org/wso2/uima/collectionProccesingEngine/consumers/util/KeyStoreUtil.class
org/wso2/uima/types/TrafficLevelIdentifier_Type$1.class
org/wso2/uima/types/HashTag_Type$1.class
org/wso2/uima/collectionProccesingEngine/consumers/HttpCasConsumer.class
org/wso2/uima/collectionProccesingEngine/reader/TwitterActiveMQReader.class
org/wso2/uima/types/LocationIdentification_Type.class
org/wso2/uima/collectionProccesingEngine/consumers/util/DefaultTrustManager.class
org/wso2/uima/types/HashTag_Type.class
org/wso2/uima/types/TimeStamp_Type$1.class
org/wso2/uima/types/TrafficLevelIdentifier.class
org/wso2/uima/types/TimeStamp.class
org/wso2/uima/types/TimeStamp_Type.class
org/wso2/uima/types/LocationIdentification_Type$1.class
org/wso2/uima/types/TrafficLevelIdentifier_Type.class
org/wso2/uima/collectionProccesingEngine/consumers/SoapCasConsumer.class
org/wso2/uima/main/CEPWithActiveMQ.class
org/wso2/uima/collectionProccesingEngine/analysisEngines/LocationIdentifier.class
org/wso2/uima/collectionProccesingEngine/consumers/ConsolePrinterCasConsumer.class
org/wso2/uima/collectionProccesingEngine/consumers/util/CasConsumerUtil.class
org/wso2/uima/main/StatusCallBackCPE.class
org/wso2/uima/collectionProccesingEngine/analysisEngines/TrafficLevelAnalyser.class
org/wso2/uima/types/HashTag.class
org/wso2/uima/collectionProccesingEngine/consumers/DataBridgeCasConsumer.class
org/wso2/uima/collectionProccesingEngine/consumers/AnnotationPrinter.class
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/analysisEngines/LocationIdentifier.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/consumers/util/KeyStoreUtil.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/analysisEngines/TrafficLevelAnalyser.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/LocationIdentification_Type.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/TimeStamp.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/consumers/DataBridgeCasConsumer.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/TrafficLevelIdentifier_Type.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/main/CEPwithActiveMQ.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/TimeStamp_Type.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/TrafficLevelIdentifier.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/HashTag.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/HashTag_Type.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/consumers/HttpCasConsumer.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/reader/TwitterActiveMQReader.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/types/LocationIdentification.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/consumers/AnnotationPrinter.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/consumers/SoapCasConsumer.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/consumers/ConsolePrinterCasConsumer.java
/home/supun/IdeaProjects/TwitterCPE/src/main/java/org/wso2/uima/collectionProccesingEngine/consumers/util/DefaultTrustManager.java
Binary file added target/org.wso2.cpe.uima.twitter-cpe-1.0.jar
Binary file not shown.