diff --git a/OpenSOC-Topologies/readme.md b/OpenSOC-Topologies/readme.md index feac62d..0674d70 100644 --- a/OpenSOC-Topologies/readme.md +++ b/OpenSOC-Topologies/readme.md @@ -41,7 +41,7 @@ The sample topologies provided use a specific directory structure. The example https://github.com/OpenSOC/opensoc-streaming/tree/master/OpenSOC-Topologies/src/main/resources/OpenSOC_Configs ``` -topology.conf - settings specific to each topology -features_enabled.conf - turn on and off features for each topology and control parallelism -metrics.conf - export definitions for metrics to Graphite -topology_dentifier.conf - customer-specific tag (since we deploy to multiple data centers we need to identify where the alerts are coming from and what topologies we are looking at when we need to debug) +topology.conf - settings specific to each topology +features_enabled.conf - turn on and off features for each topology and control parallelism +metrics.conf - export definitions for metrics to Graphite +topology_dentifier.conf - customer-specific tag (since we deploy to multiple data centers we need to identify where the alerts are coming from and what topologies we are looking at when we need to debug) diff --git a/OpenSOC-Topologies/src/main/java/com/opensoc/topology/runner/TopologyRunner.java b/OpenSOC-Topologies/src/main/java/com/opensoc/topology/runner/TopologyRunner.java index 72c2240..06d4c8f 100644 --- a/OpenSOC-Topologies/src/main/java/com/opensoc/topology/runner/TopologyRunner.java +++ b/OpenSOC-Topologies/src/main/java/com/opensoc/topology/runner/TopologyRunner.java @@ -555,8 +555,8 @@ private boolean initializeKafkaSpout(String name) { SpoutConfig kafkaConfig = new SpoutConfig(zk, input_topic, "", input_topic); kafkaConfig.scheme = new SchemeAsMultiScheme(new RawScheme()); - kafkaConfig.forceFromStart = Boolean.valueOf("True"); - kafkaConfig.startOffsetTime = -1; + kafkaConfig.forceFromStart = config.getBoolean("spout.kafka.forcefromstart", Boolean.valueOf("True")); + kafkaConfig.startOffsetTime = config.getInt("spout.kafka.start.offset.time", -1); builder.setSpout(name, new KafkaSpout(kafkaConfig), config.getInt("spout.kafka.parallelism.hint")).setNumTasks(