Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions src/main/java/com/inferlytics/druidlet/loader/Loader.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,30 @@
package com.inferlytics.druidlet.loader;

import com.inferlytics.druidlet.loader.impl.CSVLoader;
import com.inferlytics.druidlet.loader.impl.JsonLoader;
import io.druid.data.input.InputRow;

import java.io.Reader;
import java.util.List;


/**
* Abstract class which acts as an interface for loading files onto a QueryableIndex
* Must be implemented to support CSV, TSV, JSON, XML, etc.
*/
public abstract class Loader implements Iterable<InputRow> {
public abstract class Loader implements Iterable<InputRow>
{
protected List<String> dimensions;
protected String timestampDimension;

public Loader(List<String> dims, String ts) {

public Loader( List<String> dims, String ts )
{
this.dimensions = dims;
this.timestampDimension = ts;
}


/**
* CSVLoader implementation of the Loader
*
Expand All @@ -35,7 +41,14 @@ public Loader(List<String> dims, String ts) {
* @param timestampDimension Timestamp dimension
* @return A new CSVLoader to the CSV file specified by the reader
*/
public static Loader csv(Reader reader, List<String> columns, List<String> dimensions, String timestampDimension) {
return new CSVLoader(reader, columns, dimensions, timestampDimension);
public static Loader csv( Reader reader, List<String> columns, List<String> dimensions, String timestampDimension )
{
return new CSVLoader( reader, columns, dimensions, timestampDimension );
}


public static Loader json( Reader reader, List<String> dimensions, String timestampDimension )
{
return new JsonLoader(reader, dimensions, timestampDimension);
}
}
145 changes: 145 additions & 0 deletions src/main/java/com/inferlytics/druidlet/loader/impl/JsonLoader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package com.inferlytics.druidlet.loader.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.inferlytics.druidlet.loader.Loader;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import org.joda.time.DateTime;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;


/**
* Reads JSON files with the given column headers, generates InputRow
* objects with the given dimensions and timestampDimension
* Created by Samarth Bhargav on 5/11/16.
*/
public class JsonLoader extends Loader
{
private static final TypeReference<HashMap<String, Object>> TYPE_REFERENCE = new TypeReference<HashMap<String, Object>>()
{
};

private final Reader reader;


/**
* Create a new JSON reader
* @param reader the reader to read from
* @param dimensions the list of dimensions
* @param timestampDimension the time stamp dimensions
*/
public JsonLoader( Reader reader, List<String> dimensions, String timestampDimension )
{
super( dimensions, timestampDimension );
this.reader = reader;
}


@Override public Iterator<InputRow> iterator()
{
return new JsonReaderIterator( this.reader, this.dimensions, this.timestampDimension );
}


/**
* Iterator class
*/
private class JsonReaderIterator implements Iterator<InputRow>, AutoCloseable
{
private final BufferedReader reader;
private final List<String> dimensions;
private final String timestampDimension;
private final ObjectMapper objectMapper;
private String nextLine;


/**
* Initialize a new JsonReaderIterator
* @param reader the reader
* @param dimensions the list of dimensions
* @param timestampDimension the time stamp dimension
*/
public JsonReaderIterator( Reader reader, List<String> dimensions, String timestampDimension )
{
this.reader = new BufferedReader( reader );
this.dimensions = dimensions;
this.timestampDimension = timestampDimension;
this.objectMapper = new ObjectMapper();
}


/**
* Helper method to get the timestamp
* @param map the map i.e data
* @return the timestamp in the data if it is present, or the default value
*/
private Long getTimestamp( Map<String, Object> map )
{
if ( timestampDimension == null ) {
return 1L;
} else {
return Long.valueOf( map.get( timestampDimension ).toString() );
}
}


@Override public boolean hasNext()
{
try {
if ( nextLine == null && ( nextLine = reader.readLine() ) == null ) {
close();
return false;
} else {
return true;
}
} catch ( IOException e ) {
e.printStackTrace();
try {
close();
} catch ( IOException e1 ) {
e1.printStackTrace();
}
return false;
}
}


@Override public InputRow next()
{
if ( !hasNext() ) {
return null;
}
Map<String, Object> map = null;
try {
map = this.objectMapper.readValue( nextLine, TYPE_REFERENCE );
} catch ( IOException e ) {
e.printStackTrace();
}
nextLine = null;
if ( map == null ) {
return next();
}
return new MapBasedInputRow( getTimestamp( map ), dimensions, map );
}


@Override public void remove()
{
throw new UnsupportedOperationException();
}


@Override public void close() throws IOException
{
this.reader.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.inferlytics.druidlet.loader.impl;

import io.druid.data.input.InputRow;
import org.testng.annotations.Test;

import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.testng.Assert.*;


/**
* Tests for JsonLoader
* Created by Samarth Bhargav on 5/15/16.
*/
public class JsonLoaderTest
{
@Test public void testJsonLoader() throws Exception
{
JsonLoader jsonLoader = new JsonLoader(
new InputStreamReader( JsonLoaderTest.class.getClassLoader().getResourceAsStream( "testJsonLoader.json" ) ),
Arrays.asList( "dim1", "dim2", "dim3" ), "timestamp" );
List<InputRow> inputRowList = new ArrayList<>();

for ( InputRow inputRow : jsonLoader ) {
inputRowList.add( inputRow );
}

assertEquals( 3, inputRowList.size() );

for ( int i = 0; i < inputRowList.size(); i++ ) {
InputRow inputRow = inputRowList.get( i );
for ( String dim : inputRow.getDimensions() ) {
assertEquals( String.valueOf( i + 1 ), inputRow.getDimension( dim ).get( 0 ) );
}
}
}
}
3 changes: 3 additions & 0 deletions src/test/resources/testJsonLoader.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"dim1": "1", "dim2": "1", "dim3": "1", "timestamp": 10000}
{"dim1": "2", "dim2": "2", "dim3": "2", "timestamp": 20000}
{"dim1": "3", "dim2": "3", "dim3": "3", "timestamp": 30000}